Module pipettin-piper.piper.plugins.example_task

Functions

def load_plugin(controller: Controller, **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    try:
        class_instance = ExampleController(controller, **kwargs)
    except Exception as e:
        msg = f"Serial: failed to load with error: {e}\n" + traceback.format_exc()
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class ExampleController (controller, **kwargs)
Expand source code
class ExampleController:
    """A barebones commander class, meant to connect to and operate new hardware modules.

    The commander will be called to handle a particular 'action' from the pipetting mid-level protocol.
    """
    # NOTE: the action name must match the module's name (case insensitive).
    # TODO: see if its worth/simple using "__file__" for less hard-coding.
    action_name = "EXAMPLE_SERIAL"

    def __init__(self, controller, **kwargs) -> None:

        self.controller = controller

        # Add your handlers for new protocol actions.
        # The builder handler is used by 'builder.parseAction' to generate GCODE.
        self.controller.builder.add_action_handler(name=self.action_name,
                                                   function=self.action_handler)
        # The controller handler is used by 'controller.run_actions_protocol',
        # to execute the action.
        self.controller.add_action_runner(name=self.action_name,
                                          function=self.action_runner)
        
        # Here you can add asyncio coroutines that the main "Controller" class can start and manage.
        # Add any coroutines that you want the Controller to gather.
        controller.coroutine_methods.extend([
            # Example async task that run fast.
            self.run,
            # Example slow or blocking task.
            self.run_as_thread
        ])

        # Set status.
        self._status = True

    #### Task runners ####
    async def run(self):
        # Here you can write your asynchronous program.
        # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
        await asyncio.sleep(2)

    async def run_as_thread(self):
        # If your code can block the main thread for any significant time, then send the programs to a new one instead.
        # First, create a coroutine for the blocking task.
        blocking_coroutine = asyncio.to_thread(self.blocking_task)

        # Then schedule the task.
        task = asyncio.create_task(blocking_coroutine)

        # Allow the scheduled task to start by awaiting something.
        # For example, you can run this task and other tasks concurrently with gather().
        await asyncio.gather(
            # Add other tasks/coroutines here to run them concurrently.
            # self.another_async_method(),

            # Put the threading task here.
            task
        )

        # Allow the scheduled task to start by awaiting something.
        while not task.done() and not task.cancelled():
            # Wait for the task here (hoping to prevent "never awaited" errors).
            await asyncio.sleep(1)

    def blocking_task(self):
        # NOTE: Take care when accessing data from a task running in a new thread.
        #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
        time.sleep(2)

    #### Action parsers and executors ####
    def action_handler(self,  action: dict, i: int):
        """This method must generate GCODE for the provided action.

        Args:
            action (dict): Raw action data.
            i (int): Action index.
        """
        # GCODE must be saved in a list.
        commands = []

        # Append a comment to the GCODE command list.
        commands.append(
            self.controller.builder.gcode.comment(
                "Use the GcodePrimitives class (or whatever) to generate GCODE for this action."
            )
        )

        # Commit the command to the GcodeBuilder class.
        self.controller.builder.extend_commands(commands, action)

        # The return values are not used. You're free.

    def action_runner(self,  action: dict, i: int, wait:bool, check:bool, timeout:float):
        """This method must execute provided action.

        Args:
            action (dict): Processed action data.
            i (int): Action index.
            wait (bool): Check that the action "finishes" before returning.
            check (bool):  Check that the action completed "successfully" before returning.
            timeout (float): Timeout for the "wait" and "check" checks.
        """
        # Do your thing here. The return values are not used.

A barebones commander class, meant to connect to and operate new hardware modules.

The commander will be called to handle a particular 'action' from the pipetting mid-level protocol.

Class variables

var action_name

Methods

def action_handler(self, action: dict, i: int)
Expand source code
def action_handler(self,  action: dict, i: int):
    """This method must generate GCODE for the provided action.

    Args:
        action (dict): Raw action data.
        i (int): Action index.
    """
    # GCODE must be saved in a list.
    commands = []

    # Append a comment to the GCODE command list.
    commands.append(
        self.controller.builder.gcode.comment(
            "Use the GcodePrimitives class (or whatever) to generate GCODE for this action."
        )
    )

    # Commit the command to the GcodeBuilder class.
    self.controller.builder.extend_commands(commands, action)

    # The return values are not used. You're free.

This method must generate GCODE for the provided action.

Args

action : dict
Raw action data.
i : int
Action index.
def action_runner(self, action: dict, i: int, wait: bool, check: bool, timeout: float)
Expand source code
def action_runner(self,  action: dict, i: int, wait:bool, check:bool, timeout:float):
    """This method must execute provided action.

    Args:
        action (dict): Processed action data.
        i (int): Action index.
        wait (bool): Check that the action "finishes" before returning.
        check (bool):  Check that the action completed "successfully" before returning.
        timeout (float): Timeout for the "wait" and "check" checks.
    """
    # Do your thing here. The return values are not used.

This method must execute provided action.

Args

action : dict
Processed action data.
i : int
Action index.
wait : bool
Check that the action "finishes" before returning.
check : bool
Check that the action completed "successfully" before returning.
timeout : float
Timeout for the "wait" and "check" checks.
def blocking_task(self)
Expand source code
def blocking_task(self):
    # NOTE: Take care when accessing data from a task running in a new thread.
    #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
    time.sleep(2)
async def run(self)
Expand source code
async def run(self):
    # Here you can write your asynchronous program.
    # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
    await asyncio.sleep(2)
async def run_as_thread(self)
Expand source code
async def run_as_thread(self):
    # If your code can block the main thread for any significant time, then send the programs to a new one instead.
    # First, create a coroutine for the blocking task.
    blocking_coroutine = asyncio.to_thread(self.blocking_task)

    # Then schedule the task.
    task = asyncio.create_task(blocking_coroutine)

    # Allow the scheduled task to start by awaiting something.
    # For example, you can run this task and other tasks concurrently with gather().
    await asyncio.gather(
        # Add other tasks/coroutines here to run them concurrently.
        # self.another_async_method(),

        # Put the threading task here.
        task
    )

    # Allow the scheduled task to start by awaiting something.
    while not task.done() and not task.cancelled():
        # Wait for the task here (hoping to prevent "never awaited" errors).
        await asyncio.sleep(1)