Module pipettin-piper.piper.plugins.colorimeter_serial

Functions

def load_plugin(controller: Controller, **kwargs)
Expand source code
def load_plugin(controller: Controller, **kwargs):
    """
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    try:
        class_instance = ColorController(controller, **kwargs)
    except Exception as e:
        msg = f"Colorimeter failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class ColorController (controller: Controller, launch_type=None, *args, **kwargs)
Expand source code
class ColorController(Plugin):
    """A barebones sub-commander class, meant to connect to and operate new hardware modules.

    The commander will handle a particular 'action' from the pipetting mid-level protocol by
    calling this class, finding it by a unique 'cmd' property (i.e. 'self.color_action_id').

    All actions with 'cmd' equal to the 'cmd' property will end up being processed here,
    both for GCODE generation and device control over serial.

    TODO:
        - Review and compare code with pocketpcr_serial. It seems that this code is outdated.
        - Consider writing a "DevicePlugin" class to avoid code duplication with pocketpcr_serial.
    """
    # NOTE: the action name must match the module's name (case insensitive).
    # TODO: see if its worth/simple using "__file__" for less hard-coding.
    port='/dev/ttyACM0'
    baudrate=115200
    launch_type="thread"
    # launch_type="to_thread"
    # launch_type="process"
    verbose=True
    def __init__(self, controller: Controller, launch_type=None, *args, **kwargs) -> None:
        # Init args.
        if controller:
            self.verbose = controller.verbose
            self.config = controller.config
        if launch_type:
            self.launch_type=launch_type
        # Save the controller for later.
        self.controller = controller

        # Get config values.
        plugin_config = self.config.get(PLUGIN_NAME, {})
        self.port = plugin_config.get("port", self.port)
        self.baudrate = plugin_config.get("baudrate", self.baudrate)
        self.launch_type = plugin_config.get("launch_type", self.launch_type)
        self.verbose = plugin_config.get("verbose", self.verbose)

        # Logs.
        logging.info(f"Configuring serial port '{self.port}' at {self.baudrate} bps.")
        logging.debug(f"Configuration for '{PLUGIN_NAME}' plugin:\n{pformat(plugin_config)}")

        # Instantiate the serial-comms handler.
        # Controller for a Colorimeter by reGOSH: <https://gitlab.com/pipettin-bot/equipment/esp8266-iorodeo-colormeter>
        self.arduino = SerialCommunicationSync(port=self.port,
                                               baudrate=self.baudrate,
                                               launch_type=self.launch_type,
                                               logger=logging,
                                               verbose=False)

        # Save the methods used to queue messages and check for responses.
        self.put = self.arduino.put
        self.get = self.arduino.get

        # Here you can add asyncio coroutines that the main "Controller" class can start and manage.
        # Add any coroutines that you want the Controller to gather.
        controller.coroutine_methods.extend([

            # NOTE: launch using the builtin method of the SerialCommunicationSync class.
            self.start()

            # NOTE: use this for slow and blocking coroutines.
            # self.run_as_thread()  # NOTE: placeholder method, not implemented.

            # If your program can run thread-safely in a separate process/thread,
            # you are very welcome to do that instead.
            # NOTE: This requires sharing event and queue objects to interact.
            # self.run_as_process()
        ])

        # Add your handlers for new protocol actions.
        # The controller handler is used by 'controller.run_actions_protocol'.
        self.controller.add_action_runner(name=self.color_action_id,
                                          function=self.run_action)
        # The builder handler is used by 'builder.parseAction' to generate GCODE.
        self.controller.builder.add_action_handler(name=self.color_action_id,
                                                   function=self.parse_action)

        # Set status.
        self._status = True

    @property
    def status(self):
        state = self._status and self.run and self.ping_status and self.arduino.is_connected
        return state

    async def wait_for_ready(self):
        while not self.status:
            await asyncio.sleep(0.2)

    #### Handler methods section ####
    color_action_id = "COLOR"
    """String identifying the actions that will be passed to this class bu their 'cmd' property."""

    def parse_action(self, action: dict, i: int):
        """The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'.
        A regular python function is expected (i.e. non-async)."""

        logging.info(f"Parsing action with index {i}")

    # ACTION CONTROLLER method
    async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float):
        """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'.
        An async function is expected."""

        logging.info(f"Executing action with index {i}")

        # Wait for ready.
        if not self.status:
            logging.info("Device not yet ready, waiting for 2 seconds before processing the action.")
            try:
                await asyncio.wait_for(self.wait_for_ready(), timeout=2)
            except TimeoutError as e:
                raise ProtocolError("Colorimeter connection not ready for measurements.", cause=e) from e

        # Generate a unique ID for the command.
        cmd_id = self.make_cmd_id(self.color_action_id)

        # Prepare the command.
        rpc_command = {"method": "read", "id": cmd_id}

        # Submit the command.
        self.put(rpc_command)

        try:
            response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=2), timeout=10)
            self.controller.send_alert(f"Got response: {response}", alert_type="message")
        except TimeoutError as e:
            msg = f"Timed-out while waiting for a response the the read command: {rpc_command}"
            logging.info(msg)
            raise ProtocolError(msg, cause=e) from e

        logging.info(f"Done executing {self.color_action_id} action with index {i}.")
        logging.debug(f"Response data: {response}")

    # Wait for a response.
    async def get_response(self, cmd_id, read_interval=0.3):
        """Wait for a response indefinitely by command ID."""
        response = self.get(cmd_id)
        while not response:
            await asyncio.sleep(read_interval)
            response = self.get(cmd_id)
        return response

    # GCODE BUILDER method
    def make_gcode(self, *args, **kwargs):
        """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above."""

    # START methods for serial communication ####
    run = False
    """Flag indicating wether the device driver is already running."""

    def check_run(self):
        """Raise an exception if the device controller is already running."""
        if self.run:
            raise PluginError("Another instance is already running or has not shutdown.")

    async def start(self):
        """Start the device controller."""
        # Here you can write your asynchronous program.
        logging.info("Start coroutine initialized.")
        # Check that it has not already been started.
        self.check_run()
        self.run = True
        # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
        try:
            # await self.arduino.start()
            await asyncio.gather(
                self.arduino.start(),
                self.ping(),
                self.run_monitor()
            )
        except asyncio.CancelledError:
            logging.error("Task cancelled, stoppping the Colorimeter.")
            try:
                async with asyncio.timeout(5):
                    await self.arduino.stop()
            except TimeoutError:
                logging.error("Failed to stop the device. Timed out while waiting.")
        except Exception as e:
            logging.error(f"Ended with an error: {e}")
            raise CommanderError(f"Plugin stopped unexpectedly with cause: {e}", cause=e) from e

        self.run = False

        logging.info("Start coroutine ended.")


    async def run_monitor(self):
        logging.info("Coroutine started.")

        try:
            while not self.controller.run:
                asyncio.sleep(self.controller.wait_default)
            logging.info("Coroutine ready.")

            while self.controller.run:
                # Send warnings if something is wrong.
                if not self.run:
                    logging.warning("Device loop not running.")
                elif not self.arduino.is_connected:
                    logging.warning("Serial disconnected.")
                elif not self.ping_status:
                    logging.warning("Pings have failed.")
                # Wait for a bit before re-checking.
                await asyncio.sleep(1)

        # Handle errors.
        except asyncio.CancelledError:
            logging.error("Task cancelled.")
        except Exception as e:
            logging.error(f"Coroutine ended with an error: + {e}\n" + traceback.format_exc())

        # Try to stop the device.
        logging.info("Coroutine loop ended: stopping device.")
        try:
            await self.arduino.stop()
        except Exception as e:
            logging.error(f"Failed to stop the serial device. Error: + {e}\n" + traceback.format_exc())

        logging.warning("Coroutine ended.")

    ping_status = None
    ping_method = "example"
    async def ping(self, timeout=2.0, wait_time=2.0):
        """Example coroutine that sends messages"""
        logging.info("Ping coroutine started.")

        # Ping count.
        i = 0

        try:
            while not self.controller.run:
                asyncio.sleep(self.controller.wait_default)
            logging.info("Coroutine ready.")

            # Ping indefinitely.
            while self.controller.run:
                # Prepare command.
                cmd = {"method": self.ping_method}
                cmd_id = self.ping_method + "-" + self.controller.machine.hash_cmd(str(cmd))
                cmd["id"] = cmd_id

                # Send command.
                self.put(cmd)
                logging.debug(f"Sent ping #{i} with command={cmd}")

                # Expect response.
                response = None
                try:
                    response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=timeout/3), timeout=timeout)
                except asyncio.TimeoutError:
                    pass

                # Parse result.
                if response:
                    logging.debug(f"Done with ping #{i}, got response: '{response}'")
                    self.ping_status = True
                else:
                    logging.error(f"The device did not respond to ping #{i}.")
                    self.ping_status = False

                # Increment the ping count.
                i += 1

                # Wait for a bit before retrying.
                await asyncio.sleep(wait_time)

            logging.info("Coroutine loop ended: controller not running.")

        # Handle errors.
        except asyncio.CancelledError:
            logging.error("Task cancelled.")
        except Exception as e:
            logging.error(f"Ended with an error: {e}\n" + traceback.format_exc())

        # Update status flag.
        self.ping_status = False

        logging.info("Ping coroutine ended.")

    # ALTERNATIVE START methods for serial communication ####
    async def run_as_process(self):
        logging.info("Launching arduino controller as a process.")

        # Check that it has not already been started.
        self.check_run()
        self.run = True

        def ardu_start(arduino):
            # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only.
            #       With threading it was possible.
            asyncio.run(arduino.start())

        try:
            # Create the process.
            # in_queue = queue.Queue()      # BaseTracker
            # out_queue = queue.Queue()     # BaseTracker
            # run = Event() # BaseExecutor
            proc = Process(
                target=ardu_start,
                args = (self.arduino,),
                # kwargs={
                #     "arduino": self.arduino,
                #     "run_event": self.controller.run
                #     },
                name = "arduino_pyserial",
                daemon=True
            )

            # Start the process.
            proc.start()
            # Wait for a bit.
            await asyncio.sleep(0.2)

            # Loop until the parent controller's tasks end or are cancelled.
            while self.controller.run:
                await asyncio.sleep(1)

                # Sharing data with an mp.process is hard...
                # NOTE: https://stackoverflow.com/a/58869967
                if not self.arduino.run.is_set():
                    logging.info("Arduino class not running.")
                    break
        except asyncio.CancelledError:
            logging.info("Cancellation exception received.")
        except Exception as e:
            logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc())

        try:
            await self.arduino.stop()
        except Exception as e:
            logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc())

        # Flag stopped state.
        self.run = False

    async def run_as_thread(self):
        """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

        To have it do something, override or replace the "blocking_task" method.

        The idea here is that blocking tasks can be sent to threads by asyncio.

        Threads are, however, not simple to stop from outside.
        """

        # Check that it has not already been started.
        self.check_run()
        self.run = True

        try:
            # If your code can block the main thread for any significant time, then send the programs to a new one instead.
            # First, create a coroutine for the blocking task.
            blocking_coroutine = asyncio.to_thread(self.blocking_task)

            # Then schedule the task.
            task = asyncio.create_task(blocking_coroutine)

            # Allow the scheduled task to start by awaiting something.
            # For example, you can run this task and other tasks concurrently with gather().
            await asyncio.gather(
                # Add other tasks/coroutines here to run them concurrently.
                # self.another_async_method(),

                # Put the threading task here.
                task
            )

            # Allow the scheduled task to start by awaiting something.
            while not task.done() and not task.cancelled():
                # Wait for the task here (hoping to prevent "never awaited" errors).
                await asyncio.sleep(1)
        except Exception as e:
            logging.error(f"Process failed with an error: {e}\n" + traceback.format_exc())

        # Flag stopped state.
        self.run = False

    def blocking_task(self):
        """An example 'blocking' task (i.e. non async)."""
        # NOTE: Take care when accessing data from a task running in a new thread.
        #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
        time.sleep(2)

A barebones sub-commander class, meant to connect to and operate new hardware modules.

The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.color_action_id').

All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial.

Todo

  • Review and compare code with pocketpcr_serial. It seems that this code is outdated.
  • Consider writing a "DevicePlugin" class to avoid code duplication with pocketpcr_serial.

Ancestors

Class variables

var baudrate
var color_action_id

String identifying the actions that will be passed to this class bu their 'cmd' property.

var launch_type
var ping_method
var ping_status
var port
var run

Flag indicating wether the device driver is already running.

var verbose

Methods

def blocking_task(self)
Expand source code
def blocking_task(self):
    """An example 'blocking' task (i.e. non async)."""
    # NOTE: Take care when accessing data from a task running in a new thread.
    #       Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.).
    time.sleep(2)

An example 'blocking' task (i.e. non async).

def check_run(self)
Expand source code
def check_run(self):
    """Raise an exception if the device controller is already running."""
    if self.run:
        raise PluginError("Another instance is already running or has not shutdown.")

Raise an exception if the device controller is already running.

async def get_response(self, cmd_id, read_interval=0.3)
Expand source code
async def get_response(self, cmd_id, read_interval=0.3):
    """Wait for a response indefinitely by command ID."""
    response = self.get(cmd_id)
    while not response:
        await asyncio.sleep(read_interval)
        response = self.get(cmd_id)
    return response

Wait for a response indefinitely by command ID.

def make_gcode(self, *args, **kwargs)
Expand source code
def make_gcode(self, *args, **kwargs):
    """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above."""

Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above.

def parse_action(self, action: dict, i: int)
Expand source code
def parse_action(self, action: dict, i: int):
    """The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'.
    A regular python function is expected (i.e. non-async)."""

    logging.info(f"Parsing action with index {i}")

The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'. A regular python function is expected (i.e. non-async).

async def ping(self, timeout=2.0, wait_time=2.0)
Expand source code
async def ping(self, timeout=2.0, wait_time=2.0):
    """Example coroutine that sends messages"""
    logging.info("Ping coroutine started.")

    # Ping count.
    i = 0

    try:
        while not self.controller.run:
            asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        # Ping indefinitely.
        while self.controller.run:
            # Prepare command.
            cmd = {"method": self.ping_method}
            cmd_id = self.ping_method + "-" + self.controller.machine.hash_cmd(str(cmd))
            cmd["id"] = cmd_id

            # Send command.
            self.put(cmd)
            logging.debug(f"Sent ping #{i} with command={cmd}")

            # Expect response.
            response = None
            try:
                response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=timeout/3), timeout=timeout)
            except asyncio.TimeoutError:
                pass

            # Parse result.
            if response:
                logging.debug(f"Done with ping #{i}, got response: '{response}'")
                self.ping_status = True
            else:
                logging.error(f"The device did not respond to ping #{i}.")
                self.ping_status = False

            # Increment the ping count.
            i += 1

            # Wait for a bit before retrying.
            await asyncio.sleep(wait_time)

        logging.info("Coroutine loop ended: controller not running.")

    # Handle errors.
    except asyncio.CancelledError:
        logging.error("Task cancelled.")
    except Exception as e:
        logging.error(f"Ended with an error: {e}\n" + traceback.format_exc())

    # Update status flag.
    self.ping_status = False

    logging.info("Ping coroutine ended.")

Example coroutine that sends messages

async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float)
Expand source code
async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float):
    """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'.
    An async function is expected."""

    logging.info(f"Executing action with index {i}")

    # Wait for ready.
    if not self.status:
        logging.info("Device not yet ready, waiting for 2 seconds before processing the action.")
        try:
            await asyncio.wait_for(self.wait_for_ready(), timeout=2)
        except TimeoutError as e:
            raise ProtocolError("Colorimeter connection not ready for measurements.", cause=e) from e

    # Generate a unique ID for the command.
    cmd_id = self.make_cmd_id(self.color_action_id)

    # Prepare the command.
    rpc_command = {"method": "read", "id": cmd_id}

    # Submit the command.
    self.put(rpc_command)

    try:
        response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=2), timeout=10)
        self.controller.send_alert(f"Got response: {response}", alert_type="message")
    except TimeoutError as e:
        msg = f"Timed-out while waiting for a response the the read command: {rpc_command}"
        logging.info(msg)
        raise ProtocolError(msg, cause=e) from e

    logging.info(f"Done executing {self.color_action_id} action with index {i}.")
    logging.debug(f"Response data: {response}")

The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. An async function is expected.

async def run_as_process(self)
Expand source code
async def run_as_process(self):
    logging.info("Launching arduino controller as a process.")

    # Check that it has not already been started.
    self.check_run()
    self.run = True

    def ardu_start(arduino):
        # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only.
        #       With threading it was possible.
        asyncio.run(arduino.start())

    try:
        # Create the process.
        # in_queue = queue.Queue()      # BaseTracker
        # out_queue = queue.Queue()     # BaseTracker
        # run = Event() # BaseExecutor
        proc = Process(
            target=ardu_start,
            args = (self.arduino,),
            # kwargs={
            #     "arduino": self.arduino,
            #     "run_event": self.controller.run
            #     },
            name = "arduino_pyserial",
            daemon=True
        )

        # Start the process.
        proc.start()
        # Wait for a bit.
        await asyncio.sleep(0.2)

        # Loop until the parent controller's tasks end or are cancelled.
        while self.controller.run:
            await asyncio.sleep(1)

            # Sharing data with an mp.process is hard...
            # NOTE: https://stackoverflow.com/a/58869967
            if not self.arduino.run.is_set():
                logging.info("Arduino class not running.")
                break
    except asyncio.CancelledError:
        logging.info("Cancellation exception received.")
    except Exception as e:
        logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc())

    try:
        await self.arduino.stop()
    except Exception as e:
        logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc())

    # Flag stopped state.
    self.run = False
async def run_as_thread(self)
Expand source code
async def run_as_thread(self):
    """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

    To have it do something, override or replace the "blocking_task" method.

    The idea here is that blocking tasks can be sent to threads by asyncio.

    Threads are, however, not simple to stop from outside.
    """

    # Check that it has not already been started.
    self.check_run()
    self.run = True

    try:
        # If your code can block the main thread for any significant time, then send the programs to a new one instead.
        # First, create a coroutine for the blocking task.
        blocking_coroutine = asyncio.to_thread(self.blocking_task)

        # Then schedule the task.
        task = asyncio.create_task(blocking_coroutine)

        # Allow the scheduled task to start by awaiting something.
        # For example, you can run this task and other tasks concurrently with gather().
        await asyncio.gather(
            # Add other tasks/coroutines here to run them concurrently.
            # self.another_async_method(),

            # Put the threading task here.
            task
        )

        # Allow the scheduled task to start by awaiting something.
        while not task.done() and not task.cancelled():
            # Wait for the task here (hoping to prevent "never awaited" errors).
            await asyncio.sleep(1)
    except Exception as e:
        logging.error(f"Process failed with an error: {e}\n" + traceback.format_exc())

    # Flag stopped state.
    self.run = False

THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE

To have it do something, override or replace the "blocking_task" method.

The idea here is that blocking tasks can be sent to threads by asyncio.

Threads are, however, not simple to stop from outside.

async def run_monitor(self)
Expand source code
async def run_monitor(self):
    logging.info("Coroutine started.")

    try:
        while not self.controller.run:
            asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        while self.controller.run:
            # Send warnings if something is wrong.
            if not self.run:
                logging.warning("Device loop not running.")
            elif not self.arduino.is_connected:
                logging.warning("Serial disconnected.")
            elif not self.ping_status:
                logging.warning("Pings have failed.")
            # Wait for a bit before re-checking.
            await asyncio.sleep(1)

    # Handle errors.
    except asyncio.CancelledError:
        logging.error("Task cancelled.")
    except Exception as e:
        logging.error(f"Coroutine ended with an error: + {e}\n" + traceback.format_exc())

    # Try to stop the device.
    logging.info("Coroutine loop ended: stopping device.")
    try:
        await self.arduino.stop()
    except Exception as e:
        logging.error(f"Failed to stop the serial device. Error: + {e}\n" + traceback.format_exc())

    logging.warning("Coroutine ended.")
async def start(self)
Expand source code
async def start(self):
    """Start the device controller."""
    # Here you can write your asynchronous program.
    logging.info("Start coroutine initialized.")
    # Check that it has not already been started.
    self.check_run()
    self.run = True
    # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something.
    try:
        # await self.arduino.start()
        await asyncio.gather(
            self.arduino.start(),
            self.ping(),
            self.run_monitor()
        )
    except asyncio.CancelledError:
        logging.error("Task cancelled, stoppping the Colorimeter.")
        try:
            async with asyncio.timeout(5):
                await self.arduino.stop()
        except TimeoutError:
            logging.error("Failed to stop the device. Timed out while waiting.")
    except Exception as e:
        logging.error(f"Ended with an error: {e}")
        raise CommanderError(f"Plugin stopped unexpectedly with cause: {e}", cause=e) from e

    self.run = False

    logging.info("Start coroutine ended.")

Start the device controller.

async def wait_for_ready(self)
Expand source code
async def wait_for_ready(self):
    while not self.status:
        await asyncio.sleep(0.2)

Inherited members