Module pipettin-piper.piper.plugins.pocketpcr_serial

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = PocketController(controller, **kwargs)
        logging.info("pocketpcr_serial: plugin instance loaded successfully.")
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class PocketController (controller: Controller, launch_type=None, *args, **kwargs)
Expand source code
class PocketController(Plugin):
    """A barebones sub-commander class, meant to connect to and operate new hardware modules.

    The commander will handle a particular 'action' from the pipetting mid-level protocol by
    calling this class, finding it by a unique 'cmd' property (i.e. 'self.pcr_action_id').

    All actions with 'cmd' equal to the 'cmd' property will end up being processed here,
    both for GCODE generation and device control over serial.
    """
    # NOTE: the action name must match the module's name (case insensitive).
    # TODO: see if its worth/simple using "__file__" for less hard-coding.
    port='/dev/ttyACM0'
    baudrate=115200
    launch_type="thread"
    # launch_type="to_thread"
    # launch_type="process"
    verbose=True
    def __init__(self, controller: Controller, launch_type=None, *args, **kwargs) -> None:

        # Save the controller for later, just in case.
        self.controller = controller

        # Init args.
        self.verbose = self.controller.verbose
        if launch_type:
            self.launch_type=launch_type

        # Instantiate any device-specific stuff over here.

        # Instantiate the arduino handler (NOTE: there is an older class called SerialCommunication too).
        # self.arduino = SerialCommunication(port='/dev/ttyACM0', baudrate=115200)
        # Controller for the arduino zero / pocket pcr fork.
        self.arduino = SerialCommunicationSync(port=self.port,
                                               baudrate=self.baudrate,
                                               launch_type=self.launch_type,
                                               logger=logging,
                                               loglevel=logging.DEBUG,
                                               verbose=self.verbose)

        # Save the methods used to queue messages and check for responses.
        self.put = self.arduino.put
        self.get = self.arduino.get

        # Here you can add asyncio coroutines that the main "Controller" class can start and manage.
        # Add any coroutines that you want the Controller to gather.
        self.controller.coroutine_methods.extend([

            # NOTE: launch using the builtin method of the SerialCommunicationSync class.
            self.start()

            # NOTE: use this for slow and blocking coroutines.
            # self.run_as_thread()  # NOTE: placeholder method, not implemented.

            # If your program can run thread-safely in a separate process/thread,
            # you are very welcome to do that instead.
            # NOTE: This requires sharing event and queue objects to interact.
            # self.run_as_process()
        ])

        # Add your handlers for new protocol actions.
        # The controller handler is used by 'controller.run_actions_protocol'.
        self.controller.add_action_runner(name=self.pcr_action_id,
                                          function=self.run_action)
        # The builder handler is used by 'builder.parseAction' to generate GCODE.
        self.controller.builder.add_action_handler(name=self.pcr_action_id,
                                                   function=self.make_gcode)

        # Register event handler for action status updates.
        self.controller.register_event_callback(
            event_name=self.controller.status_event,
            callback_name="PocketPCR.status",
            callback_function=self.status_callback)

        # Set status.
        self._status = True

    @property
    def status(self):
        state = self._status and self.run and self.ping_status and self.arduino.is_connected
        return state

    async def status_callback(self, status=None):
        """This function generates status data for the commander's "status" event."""
        # Ensure basic structure.
        if status is None:
            status = {}
        status.setdefault("plugins", {})
        # Add data to the incoming dict.
        status["plugins"]["pocketpcr"] = {
            "run": self.run,
            "port": self.port,
            "baudrate": self.baudrate,
            "launch_type": self.launch_type,
            "responsive": self.ping_status,
            "connected": self.arduino.is_connected,
            "status": self.status
        }
        return status

    #### Handler methods section ####
    pcr_action_id = "PCR"
    """String identifying the actions that will be passed to this class bu their 'cmd' property."""

    # ACTION CONTROLLER method
    async def run_action(self, action: dict, i: int):
        """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'.
        This must be an async function.
        """
        logging.info(f"PocketPCR: executing action with index {i}")

        # TODO: communicate with the PocketPCR to run a protocol.
        id = 873254
        self.put({"method": "example", "id": id})
        response = None
        while not response:
            await asyncio.sleep(1.0)
            response = self.get(id)

        logging.info(f"PocketPCR: done executing action with index {i} and response: {response}")
        print(i, action)

    # GCODE BUILDER method
    def make_gcode(self, action, action_index, *args, **kwargs):
        """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above.
        The output of this function is not used elsewhere.
        """
        # The PocketPCR has no moving parts, thus the action requires no GCODE in particular.
        # The action object is left unmodified.
        return

    # START methods for serial communication ####
    def check_run(self):
        if self.run:
            raise PluginError("PocketController.start: another instance is already running or has not shutdown.")
    run = False
    """Flag indicating wether the device driver is already running."""
    async def start(self):
        # Here you can write your asynchronous program.
        logging.info("PocketPCR: start coroutine initialized.")
        # Check that it has not already been started.
        self.check_run()
        self.run = True
        # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
        try:
            # await self.arduino.start()
            await asyncio.gather(
                self.arduino.start(),
                self.ping()
            )
        except asyncio.CancelledError:
            logging.error("PocketController.start: task cancelled, stoppping the PocketPCR")
            try:
                async with asyncio.timeout(5):
                    await self.arduino.stop()
            except TimeoutError:
                print("PocketController: failed to stop the PocketPCR. Timed out while waiting.")
        except Exception as e:
            logging.error(f"PocketController.start: ended with an error: {e}\n" + traceback.format_exc())

        # Flag stopped state.
        self.run = False

        logging.info("PocketPCR: start coroutine ended.")

    ping_status = None
    ping_method = "ping"
    async def ping(self, timeout=2.0, wait_time=0.2):
        """Example coroutine that sends messages"""
        logging.info("PocketPCR: ping coroutine started.")

        # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
        try:
            i = 0
            while self.controller.run:
                i += 1
                await asyncio.sleep(1)

                # Send command:
                cmd = {"method": self.ping_method}
                cmd_id = self.controller.machine.hash_cmd(str(cmd))
                cmd["id"] = cmd_id
                self.put(cmd)
                logging.debug(f"PocketController.ping: sent ping #{i} with command={cmd}")

                # Expect response:
                response = None
                elapsed = 0
                while not response and elapsed < timeout:
                    elapsed += wait_time
                    await asyncio.sleep(wait_time)
                    response = self.get(cmd_id)

                # Parse result.
                if response:
                    logging.debug(f"PocketController.ping: done with ping #{i}, got response: '{response}'")
                    self.ping_status = True
                else:
                    logging.error(f"PocketController.ping: the device did not respond to ping #{i}.")
                    self.ping_status = False

        # Handle errors.
        except asyncio.CancelledError:
            logging.error("PocketController.ping: task cancelled.")
        except Exception as e:
            logging.error(f"PocketController.ping: ended with an error: {e}\n" + traceback.format_exc())

        logging.info("PocketPCR: ping coroutine ended.")


    # ALTERNATIVE START methods for serial communication ####
    @staticmethod
    def ardu_start(arduino: SerialCommunicationSync):
        # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only.
        #       With threading it was possible.
        asyncio.run(arduino.start())

    async def run_as_process(self):
        logging.info("Launching arduino controller as a process.")

        # Check that it has not already been started.
        self.check_run()
        self.run = True

        try:
            # Create the process.
            # in_queue = queue.Queue()      # BaseTracker
            # out_queue = queue.Queue()     # BaseTracker
            # run = Event() # BaseExecutor
            proc = Process(
                target=self.ardu_start,
                args = (self.arduino,),
                # kwargs={
                #     "arduino": self.arduino,
                #     "run_event": self.controller.run
                #     },
                name = "arduino_pyserial",
                daemon=True
                )

            # Start the process.
            proc.start()
            # Wait for a bit.
            await asyncio.sleep(0.2)

            # Loop until the parent controller's tasks end or are cancelled.
            while self.controller.run:
                await asyncio.sleep(1)

                # Sharing data with an mp.process is hard...
                # NOTE: https://stackoverflow.com/a/58869967
                if not self.arduino.run.is_set():
                    logging.info("Arduino class not running.")
                    break
        except asyncio.CancelledError:
            logging.info("Cancellation exception received.")
        except Exception as e:
            logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc())

        try:
            await self.arduino.stop()
        except Exception as e:
            logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc())

        # Flag stopped state.
        self.run = False

    async def run_as_thread(self):
        """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

        To have it do something, override or replace the "blocking_task" method.

        The idea here is that blocking tasks can be sent to threads by asyncio.

        Threads are, however, not simple to stop from outside.
        """

        # Check that it has not already been started.
        self.check_run()
        self.run = True

        try:
            # If your code can block the main thread for any significant time, then send the programs to a new one instead.
            # First, create a coroutine for the blocking task.
            blocking_coroutine = asyncio.to_thread(self.blocking_task)

            # Then schedule the task.
            task = asyncio.create_task(blocking_coroutine)

            # Allow the scheduled task to start by awaiting something.
            # For example, you can run this task and other tasks concurrently with gather().
            await asyncio.gather(
                # Add other tasks/coroutines here to run them concurrently.
                # self.another_async_method(),

                # Put the threading task here.
                task
            )

            # Allow the scheduled task to start by awaiting something.
            while not task.done() and not task.cancelled():
                # Wait for the task here (hoping to prevent "never awaited" errors).
                await asyncio.sleep(1)
        except Exception as e:
            logging.error(f"PocketController.run_as_thread: process failed with an error: {e}\n" + traceback.format_exc())

        # Flag stopped state.
        self.run = False

    def blocking_task(self):
        # NOTE: Take care when accessing data from a task running in a new thread.
        #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
        time.sleep(2)

A barebones sub-commander class, meant to connect to and operate new hardware modules.

The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.pcr_action_id').

All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial.

Ancestors

Class variables

var baudrate
var launch_type
var pcr_action_id

String identifying the actions that will be passed to this class bu their 'cmd' property.

var ping_method
var ping_status
var port
var run

Flag indicating wether the device driver is already running.

var verbose

Static methods

def ardu_start(arduino: SerialCommunicationSync)
Expand source code
@staticmethod
def ardu_start(arduino: SerialCommunicationSync):
    # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only.
    #       With threading it was possible.
    asyncio.run(arduino.start())

Methods

def blocking_task(self)
Expand source code
def blocking_task(self):
    # NOTE: Take care when accessing data from a task running in a new thread.
    #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
    time.sleep(2)
def check_run(self)
Expand source code
def check_run(self):
    if self.run:
        raise PluginError("PocketController.start: another instance is already running or has not shutdown.")
def make_gcode(self, action, action_index, *args, **kwargs)
Expand source code
def make_gcode(self, action, action_index, *args, **kwargs):
    """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above.
    The output of this function is not used elsewhere.
    """
    # The PocketPCR has no moving parts, thus the action requires no GCODE in particular.
    # The action object is left unmodified.
    return

Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above. The output of this function is not used elsewhere.

async def ping(self, timeout=2.0, wait_time=0.2)
Expand source code
async def ping(self, timeout=2.0, wait_time=0.2):
    """Example coroutine that sends messages"""
    logging.info("PocketPCR: ping coroutine started.")

    # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
    try:
        i = 0
        while self.controller.run:
            i += 1
            await asyncio.sleep(1)

            # Send command:
            cmd = {"method": self.ping_method}
            cmd_id = self.controller.machine.hash_cmd(str(cmd))
            cmd["id"] = cmd_id
            self.put(cmd)
            logging.debug(f"PocketController.ping: sent ping #{i} with command={cmd}")

            # Expect response:
            response = None
            elapsed = 0
            while not response and elapsed < timeout:
                elapsed += wait_time
                await asyncio.sleep(wait_time)
                response = self.get(cmd_id)

            # Parse result.
            if response:
                logging.debug(f"PocketController.ping: done with ping #{i}, got response: '{response}'")
                self.ping_status = True
            else:
                logging.error(f"PocketController.ping: the device did not respond to ping #{i}.")
                self.ping_status = False

    # Handle errors.
    except asyncio.CancelledError:
        logging.error("PocketController.ping: task cancelled.")
    except Exception as e:
        logging.error(f"PocketController.ping: ended with an error: {e}\n" + traceback.format_exc())

    logging.info("PocketPCR: ping coroutine ended.")

Example coroutine that sends messages

async def run_action(self, action: dict, i: int)
Expand source code
async def run_action(self, action: dict, i: int):
    """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'.
    This must be an async function.
    """
    logging.info(f"PocketPCR: executing action with index {i}")

    # TODO: communicate with the PocketPCR to run a protocol.
    id = 873254
    self.put({"method": "example", "id": id})
    response = None
    while not response:
        await asyncio.sleep(1.0)
        response = self.get(id)

    logging.info(f"PocketPCR: done executing action with index {i} and response: {response}")
    print(i, action)

The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. This must be an async function.

async def run_as_process(self)
Expand source code
async def run_as_process(self):
    logging.info("Launching arduino controller as a process.")

    # Check that it has not already been started.
    self.check_run()
    self.run = True

    try:
        # Create the process.
        # in_queue = queue.Queue()      # BaseTracker
        # out_queue = queue.Queue()     # BaseTracker
        # run = Event() # BaseExecutor
        proc = Process(
            target=self.ardu_start,
            args = (self.arduino,),
            # kwargs={
            #     "arduino": self.arduino,
            #     "run_event": self.controller.run
            #     },
            name = "arduino_pyserial",
            daemon=True
            )

        # Start the process.
        proc.start()
        # Wait for a bit.
        await asyncio.sleep(0.2)

        # Loop until the parent controller's tasks end or are cancelled.
        while self.controller.run:
            await asyncio.sleep(1)

            # Sharing data with an mp.process is hard...
            # NOTE: https://stackoverflow.com/a/58869967
            if not self.arduino.run.is_set():
                logging.info("Arduino class not running.")
                break
    except asyncio.CancelledError:
        logging.info("Cancellation exception received.")
    except Exception as e:
        logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc())

    try:
        await self.arduino.stop()
    except Exception as e:
        logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc())

    # Flag stopped state.
    self.run = False
async def run_as_thread(self)
Expand source code
async def run_as_thread(self):
    """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

    To have it do something, override or replace the "blocking_task" method.

    The idea here is that blocking tasks can be sent to threads by asyncio.

    Threads are, however, not simple to stop from outside.
    """

    # Check that it has not already been started.
    self.check_run()
    self.run = True

    try:
        # If your code can block the main thread for any significant time, then send the programs to a new one instead.
        # First, create a coroutine for the blocking task.
        blocking_coroutine = asyncio.to_thread(self.blocking_task)

        # Then schedule the task.
        task = asyncio.create_task(blocking_coroutine)

        # Allow the scheduled task to start by awaiting something.
        # For example, you can run this task and other tasks concurrently with gather().
        await asyncio.gather(
            # Add other tasks/coroutines here to run them concurrently.
            # self.another_async_method(),

            # Put the threading task here.
            task
        )

        # Allow the scheduled task to start by awaiting something.
        while not task.done() and not task.cancelled():
            # Wait for the task here (hoping to prevent "never awaited" errors).
            await asyncio.sleep(1)
    except Exception as e:
        logging.error(f"PocketController.run_as_thread: process failed with an error: {e}\n" + traceback.format_exc())

    # Flag stopped state.
    self.run = False

THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

To have it do something, override or replace the "blocking_task" method.

The idea here is that blocking tasks can be sent to threads by asyncio.

Threads are, however, not simple to stop from outside.

async def start(self)
Expand source code
async def start(self):
    # Here you can write your asynchronous program.
    logging.info("PocketPCR: start coroutine initialized.")
    # Check that it has not already been started.
    self.check_run()
    self.run = True
    # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
    try:
        # await self.arduino.start()
        await asyncio.gather(
            self.arduino.start(),
            self.ping()
        )
    except asyncio.CancelledError:
        logging.error("PocketController.start: task cancelled, stoppping the PocketPCR")
        try:
            async with asyncio.timeout(5):
                await self.arduino.stop()
        except TimeoutError:
            print("PocketController: failed to stop the PocketPCR. Timed out while waiting.")
    except Exception as e:
        logging.error(f"PocketController.start: ended with an error: {e}\n" + traceback.format_exc())

    # Flag stopped state.
    self.run = False

    logging.info("PocketPCR: start coroutine ended.")
async def status_callback(self, status=None)
Expand source code
async def status_callback(self, status=None):
    """This function generates status data for the commander's "status" event."""
    # Ensure basic structure.
    if status is None:
        status = {}
    status.setdefault("plugins", {})
    # Add data to the incoming dict.
    status["plugins"]["pocketpcr"] = {
        "run": self.run,
        "port": self.port,
        "baudrate": self.baudrate,
        "launch_type": self.launch_type,
        "responsive": self.ping_status,
        "connected": self.arduino.is_connected,
        "status": self.status
    }
    return status

This function generates status data for the commander's "status" event.

Inherited members