Module pipettin-piper.piper.plugins.example_task
Functions
def load_plugin(controller: Controller, **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """ Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ try: class_instance = ExampleController(controller, **kwargs) except Exception as e: msg = f"Serial: failed to load with error: {e}\n" + traceback.format_exc() logging.error(msg) raise PluginError(msg) from e return class_instancePlugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class ExampleController (controller, **kwargs)-
Expand source code
class ExampleController: """A barebones commander class, meant to connect to and operate new hardware modules. The commander will be called to handle a particular 'action' from the pipetting mid-level protocol. """ # NOTE: the action name must match the module's name (case insensitive). # TODO: see if its worth/simple using "__file__" for less hard-coding. action_name = "EXAMPLE_SERIAL" def __init__(self, controller, **kwargs) -> None: self.controller = controller # Add your handlers for new protocol actions. # The builder handler is used by 'builder.parseAction' to generate GCODE. self.controller.builder.add_action_handler(name=self.action_name, function=self.action_handler) # The controller handler is used by 'controller.run_actions_protocol', # to execute the action. self.controller.add_action_runner(name=self.action_name, function=self.action_runner) # Here you can add asyncio coroutines that the main "Controller" class can start and manage. # Add any coroutines that you want the Controller to gather. controller.coroutine_methods.extend([ # Example async task that run fast. self.run, # Example slow or blocking task. self.run_as_thread ]) # Set status. self._status = True #### Task runners #### async def run(self): # Here you can write your asynchronous program. # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. await asyncio.sleep(2) async def run_as_thread(self): # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1) def blocking_task(self): # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2) #### Action parsers and executors #### def action_handler(self, action: dict, i: int): """This method must generate GCODE for the provided action. Args: action (dict): Raw action data. i (int): Action index. """ # GCODE must be saved in a list. commands = [] # Append a comment to the GCODE command list. commands.append( self.controller.builder.gcode.comment( "Use the GcodePrimitives class (or whatever) to generate GCODE for this action." ) ) # Commit the command to the GcodeBuilder class. self.controller.builder.extend_commands(commands, action) # The return values are not used. You're free. def action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float): """This method must execute provided action. Args: action (dict): Processed action data. i (int): Action index. wait (bool): Check that the action "finishes" before returning. check (bool): Check that the action completed "successfully" before returning. timeout (float): Timeout for the "wait" and "check" checks. """ # Do your thing here. The return values are not used.A barebones commander class, meant to connect to and operate new hardware modules.
The commander will be called to handle a particular 'action' from the pipetting mid-level protocol.
Class variables
var action_name
Methods
def action_handler(self, action: dict, i: int)-
Expand source code
def action_handler(self, action: dict, i: int): """This method must generate GCODE for the provided action. Args: action (dict): Raw action data. i (int): Action index. """ # GCODE must be saved in a list. commands = [] # Append a comment to the GCODE command list. commands.append( self.controller.builder.gcode.comment( "Use the GcodePrimitives class (or whatever) to generate GCODE for this action." ) ) # Commit the command to the GcodeBuilder class. self.controller.builder.extend_commands(commands, action) # The return values are not used. You're free.This method must generate GCODE for the provided action.
Args
action:dict- Raw action data.
i:int- Action index.
def action_runner(self, action: dict, i: int, wait: bool, check: bool, timeout: float)-
Expand source code
def action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float): """This method must execute provided action. Args: action (dict): Processed action data. i (int): Action index. wait (bool): Check that the action "finishes" before returning. check (bool): Check that the action completed "successfully" before returning. timeout (float): Timeout for the "wait" and "check" checks. """ # Do your thing here. The return values are not used.This method must execute provided action.
Args
action:dict- Processed action data.
i:int- Action index.
wait:bool- Check that the action "finishes" before returning.
check:bool- Check that the action completed "successfully" before returning.
timeout:float- Timeout for the "wait" and "check" checks.
def blocking_task(self)-
Expand source code
def blocking_task(self): # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2) async def run(self)-
Expand source code
async def run(self): # Here you can write your asynchronous program. # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. await asyncio.sleep(2) async def run_as_thread(self)-
Expand source code
async def run_as_thread(self): # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1)