Module pipettin-piper.piper.plugins.colorimeter_serial
Functions
def load_plugin(controller: Controller, **kwargs)-
Expand source code
def load_plugin(controller: Controller, **kwargs): """ Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ try: class_instance = ColorController(controller, **kwargs) except Exception as e: msg = f"Colorimeter failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instancePlugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class ColorController (controller: Controller, launch_type=None, *args, **kwargs)-
Expand source code
class ColorController(Plugin): """A barebones sub-commander class, meant to connect to and operate new hardware modules. The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.color_action_id'). All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial. TODO: - Review and compare code with pocketpcr_serial. It seems that this code is outdated. - Consider writing a "DevicePlugin" class to avoid code duplication with pocketpcr_serial. """ # NOTE: the action name must match the module's name (case insensitive). # TODO: see if its worth/simple using "__file__" for less hard-coding. port='/dev/ttyACM0' baudrate=115200 launch_type="thread" # launch_type="to_thread" # launch_type="process" verbose=True def __init__(self, controller: Controller, launch_type=None, *args, **kwargs) -> None: # Init args. if controller: self.verbose = controller.verbose self.config = controller.config if launch_type: self.launch_type=launch_type # Save the controller for later. self.controller = controller # Get config values. plugin_config = self.config.get(PLUGIN_NAME, {}) self.port = plugin_config.get("port", self.port) self.baudrate = plugin_config.get("baudrate", self.baudrate) self.launch_type = plugin_config.get("launch_type", self.launch_type) self.verbose = plugin_config.get("verbose", self.verbose) # Logs. logging.info(f"Configuring serial port '{self.port}' at {self.baudrate} bps.") logging.debug(f"Configuration for '{PLUGIN_NAME}' plugin:\n{pformat(plugin_config)}") # Instantiate the serial-comms handler. # Controller for a Colorimeter by reGOSH: <https://gitlab.com/pipettin-bot/equipment/esp8266-iorodeo-colormeter> self.arduino = SerialCommunicationSync(port=self.port, baudrate=self.baudrate, launch_type=self.launch_type, logger=logging, verbose=False) # Save the methods used to queue messages and check for responses. self.put = self.arduino.put self.get = self.arduino.get # Here you can add asyncio coroutines that the main "Controller" class can start and manage. # Add any coroutines that you want the Controller to gather. controller.coroutine_methods.extend([ # NOTE: launch using the builtin method of the SerialCommunicationSync class. self.start() # NOTE: use this for slow and blocking coroutines. # self.run_as_thread() # NOTE: placeholder method, not implemented. # If your program can run thread-safely in a separate process/thread, # you are very welcome to do that instead. # NOTE: This requires sharing event and queue objects to interact. # self.run_as_process() ]) # Add your handlers for new protocol actions. # The controller handler is used by 'controller.run_actions_protocol'. self.controller.add_action_runner(name=self.color_action_id, function=self.run_action) # The builder handler is used by 'builder.parseAction' to generate GCODE. self.controller.builder.add_action_handler(name=self.color_action_id, function=self.parse_action) # Set status. self._status = True @property def status(self): state = self._status and self.run and self.ping_status and self.arduino.is_connected return state async def wait_for_ready(self): while not self.status: await asyncio.sleep(0.2) #### Handler methods section #### color_action_id = "COLOR" """String identifying the actions that will be passed to this class bu their 'cmd' property.""" def parse_action(self, action: dict, i: int): """The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'. A regular python function is expected (i.e. non-async).""" logging.info(f"Parsing action with index {i}") # ACTION CONTROLLER method async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float): """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. An async function is expected.""" logging.info(f"Executing action with index {i}") # Wait for ready. if not self.status: logging.info("Device not yet ready, waiting for 2 seconds before processing the action.") try: await asyncio.wait_for(self.wait_for_ready(), timeout=2) except TimeoutError as e: raise ProtocolError("Colorimeter connection not ready for measurements.", cause=e) from e # Generate a unique ID for the command. cmd_id = self.make_cmd_id(self.color_action_id) # Prepare the command. rpc_command = {"method": "read", "id": cmd_id} # Submit the command. self.put(rpc_command) try: response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=2), timeout=10) self.controller.send_alert(f"Got response: {response}", alert_type="message") except TimeoutError as e: msg = f"Timed-out while waiting for a response the the read command: {rpc_command}" logging.info(msg) raise ProtocolError(msg, cause=e) from e logging.info(f"Done executing {self.color_action_id} action with index {i}.") logging.debug(f"Response data: {response}") # Wait for a response. async def get_response(self, cmd_id, read_interval=0.3): """Wait for a response indefinitely by command ID.""" response = self.get(cmd_id) while not response: await asyncio.sleep(read_interval) response = self.get(cmd_id) return response # GCODE BUILDER method def make_gcode(self, *args, **kwargs): """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above.""" # START methods for serial communication #### run = False """Flag indicating wether the device driver is already running.""" def check_run(self): """Raise an exception if the device controller is already running.""" if self.run: raise PluginError("Another instance is already running or has not shutdown.") async def start(self): """Start the device controller.""" # Here you can write your asynchronous program. logging.info("Start coroutine initialized.") # Check that it has not already been started. self.check_run() self.run = True # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: # await self.arduino.start() await asyncio.gather( self.arduino.start(), self.ping(), self.run_monitor() ) except asyncio.CancelledError: logging.error("Task cancelled, stoppping the Colorimeter.") try: async with asyncio.timeout(5): await self.arduino.stop() except TimeoutError: logging.error("Failed to stop the device. Timed out while waiting.") except Exception as e: logging.error(f"Ended with an error: {e}") raise CommanderError(f"Plugin stopped unexpectedly with cause: {e}", cause=e) from e self.run = False logging.info("Start coroutine ended.") async def run_monitor(self): logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run: # Send warnings if something is wrong. if not self.run: logging.warning("Device loop not running.") elif not self.arduino.is_connected: logging.warning("Serial disconnected.") elif not self.ping_status: logging.warning("Pings have failed.") # Wait for a bit before re-checking. await asyncio.sleep(1) # Handle errors. except asyncio.CancelledError: logging.error("Task cancelled.") except Exception as e: logging.error(f"Coroutine ended with an error: + {e}\n" + traceback.format_exc()) # Try to stop the device. logging.info("Coroutine loop ended: stopping device.") try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the serial device. Error: + {e}\n" + traceback.format_exc()) logging.warning("Coroutine ended.") ping_status = None ping_method = "example" async def ping(self, timeout=2.0, wait_time=2.0): """Example coroutine that sends messages""" logging.info("Ping coroutine started.") # Ping count. i = 0 try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Ping indefinitely. while self.controller.run: # Prepare command. cmd = {"method": self.ping_method} cmd_id = self.ping_method + "-" + self.controller.machine.hash_cmd(str(cmd)) cmd["id"] = cmd_id # Send command. self.put(cmd) logging.debug(f"Sent ping #{i} with command={cmd}") # Expect response. response = None try: response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=timeout/3), timeout=timeout) except asyncio.TimeoutError: pass # Parse result. if response: logging.debug(f"Done with ping #{i}, got response: '{response}'") self.ping_status = True else: logging.error(f"The device did not respond to ping #{i}.") self.ping_status = False # Increment the ping count. i += 1 # Wait for a bit before retrying. await asyncio.sleep(wait_time) logging.info("Coroutine loop ended: controller not running.") # Handle errors. except asyncio.CancelledError: logging.error("Task cancelled.") except Exception as e: logging.error(f"Ended with an error: {e}\n" + traceback.format_exc()) # Update status flag. self.ping_status = False logging.info("Ping coroutine ended.") # ALTERNATIVE START methods for serial communication #### async def run_as_process(self): logging.info("Launching arduino controller as a process.") # Check that it has not already been started. self.check_run() self.run = True def ardu_start(arduino): # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only. # With threading it was possible. asyncio.run(arduino.start()) try: # Create the process. # in_queue = queue.Queue() # BaseTracker # out_queue = queue.Queue() # BaseTracker # run = Event() # BaseExecutor proc = Process( target=ardu_start, args = (self.arduino,), # kwargs={ # "arduino": self.arduino, # "run_event": self.controller.run # }, name = "arduino_pyserial", daemon=True ) # Start the process. proc.start() # Wait for a bit. await asyncio.sleep(0.2) # Loop until the parent controller's tasks end or are cancelled. while self.controller.run: await asyncio.sleep(1) # Sharing data with an mp.process is hard... # NOTE: https://stackoverflow.com/a/58869967 if not self.arduino.run.is_set(): logging.info("Arduino class not running.") break except asyncio.CancelledError: logging.info("Cancellation exception received.") except Exception as e: logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc()) try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False async def run_as_thread(self): """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE To have it do something, override or replace the "blocking_task" method. The idea here is that blocking tasks can be sent to threads by asyncio. Threads are, however, not simple to stop from outside. """ # Check that it has not already been started. self.check_run() self.run = True try: # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1) except Exception as e: logging.error(f"Process failed with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False def blocking_task(self): """An example 'blocking' task (i.e. non async).""" # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2)A barebones sub-commander class, meant to connect to and operate new hardware modules.
The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.color_action_id').
All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial.
Todo
- Review and compare code with pocketpcr_serial. It seems that this code is outdated.
- Consider writing a "DevicePlugin" class to avoid code duplication with pocketpcr_serial.
Ancestors
Class variables
var baudratevar color_action_id-
String identifying the actions that will be passed to this class bu their 'cmd' property.
var launch_typevar ping_methodvar ping_statusvar portvar run-
Flag indicating wether the device driver is already running.
var verbose
Methods
def blocking_task(self)-
Expand source code
def blocking_task(self): """An example 'blocking' task (i.e. non async).""" # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2)An example 'blocking' task (i.e. non async).
def check_run(self)-
Expand source code
def check_run(self): """Raise an exception if the device controller is already running.""" if self.run: raise PluginError("Another instance is already running or has not shutdown.")Raise an exception if the device controller is already running.
async def get_response(self, cmd_id, read_interval=0.3)-
Expand source code
async def get_response(self, cmd_id, read_interval=0.3): """Wait for a response indefinitely by command ID.""" response = self.get(cmd_id) while not response: await asyncio.sleep(read_interval) response = self.get(cmd_id) return responseWait for a response indefinitely by command ID.
def make_gcode(self, *args, **kwargs)-
Expand source code
def make_gcode(self, *args, **kwargs): """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above."""Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above.
def parse_action(self, action: dict, i: int)-
Expand source code
def parse_action(self, action: dict, i: int): """The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'. A regular python function is expected (i.e. non-async).""" logging.info(f"Parsing action with index {i}")The 'parse_action' method (with this exact name) will be called by the GcodeBuilder from 'parseAction'. A regular python function is expected (i.e. non-async).
async def ping(self, timeout=2.0, wait_time=2.0)-
Expand source code
async def ping(self, timeout=2.0, wait_time=2.0): """Example coroutine that sends messages""" logging.info("Ping coroutine started.") # Ping count. i = 0 try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Ping indefinitely. while self.controller.run: # Prepare command. cmd = {"method": self.ping_method} cmd_id = self.ping_method + "-" + self.controller.machine.hash_cmd(str(cmd)) cmd["id"] = cmd_id # Send command. self.put(cmd) logging.debug(f"Sent ping #{i} with command={cmd}") # Expect response. response = None try: response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=timeout/3), timeout=timeout) except asyncio.TimeoutError: pass # Parse result. if response: logging.debug(f"Done with ping #{i}, got response: '{response}'") self.ping_status = True else: logging.error(f"The device did not respond to ping #{i}.") self.ping_status = False # Increment the ping count. i += 1 # Wait for a bit before retrying. await asyncio.sleep(wait_time) logging.info("Coroutine loop ended: controller not running.") # Handle errors. except asyncio.CancelledError: logging.error("Task cancelled.") except Exception as e: logging.error(f"Ended with an error: {e}\n" + traceback.format_exc()) # Update status flag. self.ping_status = False logging.info("Ping coroutine ended.")Example coroutine that sends messages
async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float)-
Expand source code
async def run_action(self, action: dict, i: int, wait: bool, check: bool, timeout: float): """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. An async function is expected.""" logging.info(f"Executing action with index {i}") # Wait for ready. if not self.status: logging.info("Device not yet ready, waiting for 2 seconds before processing the action.") try: await asyncio.wait_for(self.wait_for_ready(), timeout=2) except TimeoutError as e: raise ProtocolError("Colorimeter connection not ready for measurements.", cause=e) from e # Generate a unique ID for the command. cmd_id = self.make_cmd_id(self.color_action_id) # Prepare the command. rpc_command = {"method": "read", "id": cmd_id} # Submit the command. self.put(rpc_command) try: response = await asyncio.wait_for(self.get_response(cmd_id, read_interval=2), timeout=10) self.controller.send_alert(f"Got response: {response}", alert_type="message") except TimeoutError as e: msg = f"Timed-out while waiting for a response the the read command: {rpc_command}" logging.info(msg) raise ProtocolError(msg, cause=e) from e logging.info(f"Done executing {self.color_action_id} action with index {i}.") logging.debug(f"Response data: {response}")The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. An async function is expected.
async def run_as_process(self)-
Expand source code
async def run_as_process(self): logging.info("Launching arduino controller as a process.") # Check that it has not already been started. self.check_run() self.run = True def ardu_start(arduino): # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only. # With threading it was possible. asyncio.run(arduino.start()) try: # Create the process. # in_queue = queue.Queue() # BaseTracker # out_queue = queue.Queue() # BaseTracker # run = Event() # BaseExecutor proc = Process( target=ardu_start, args = (self.arduino,), # kwargs={ # "arduino": self.arduino, # "run_event": self.controller.run # }, name = "arduino_pyserial", daemon=True ) # Start the process. proc.start() # Wait for a bit. await asyncio.sleep(0.2) # Loop until the parent controller's tasks end or are cancelled. while self.controller.run: await asyncio.sleep(1) # Sharing data with an mp.process is hard... # NOTE: https://stackoverflow.com/a/58869967 if not self.arduino.run.is_set(): logging.info("Arduino class not running.") break except asyncio.CancelledError: logging.info("Cancellation exception received.") except Exception as e: logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc()) try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False async def run_as_thread(self)-
Expand source code
async def run_as_thread(self): """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE To have it do something, override or replace the "blocking_task" method. The idea here is that blocking tasks can be sent to threads by asyncio. Threads are, however, not simple to stop from outside. """ # Check that it has not already been started. self.check_run() self.run = True try: # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1) except Exception as e: logging.error(f"Process failed with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = FalseTHIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE
To have it do something, override or replace the "blocking_task" method.
The idea here is that blocking tasks can be sent to threads by asyncio.
Threads are, however, not simple to stop from outside.
async def run_monitor(self)-
Expand source code
async def run_monitor(self): logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run: # Send warnings if something is wrong. if not self.run: logging.warning("Device loop not running.") elif not self.arduino.is_connected: logging.warning("Serial disconnected.") elif not self.ping_status: logging.warning("Pings have failed.") # Wait for a bit before re-checking. await asyncio.sleep(1) # Handle errors. except asyncio.CancelledError: logging.error("Task cancelled.") except Exception as e: logging.error(f"Coroutine ended with an error: + {e}\n" + traceback.format_exc()) # Try to stop the device. logging.info("Coroutine loop ended: stopping device.") try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the serial device. Error: + {e}\n" + traceback.format_exc()) logging.warning("Coroutine ended.") async def start(self)-
Expand source code
async def start(self): """Start the device controller.""" # Here you can write your asynchronous program. logging.info("Start coroutine initialized.") # Check that it has not already been started. self.check_run() self.run = True # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: # await self.arduino.start() await asyncio.gather( self.arduino.start(), self.ping(), self.run_monitor() ) except asyncio.CancelledError: logging.error("Task cancelled, stoppping the Colorimeter.") try: async with asyncio.timeout(5): await self.arduino.stop() except TimeoutError: logging.error("Failed to stop the device. Timed out while waiting.") except Exception as e: logging.error(f"Ended with an error: {e}") raise CommanderError(f"Plugin stopped unexpectedly with cause: {e}", cause=e) from e self.run = False logging.info("Start coroutine ended.")Start the device controller.
async def wait_for_ready(self)-
Expand source code
async def wait_for_ready(self): while not self.status: await asyncio.sleep(0.2)
Inherited members