Module pipettin-piper.piper.plugins.pocketpcr_serial
Functions
def load_plugin(controller: "'Controller'", **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """ Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ logging.debug(f"load_plugin: loading {plugin_name} plugin.") try: class_instance = PocketController(controller, **kwargs) logging.info("pocketpcr_serial: plugin instance loaded successfully.") except Exception as e: msg = f"Failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instancePlugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class PocketController (controller: Controller, launch_type=None, *args, **kwargs)-
Expand source code
class PocketController(Plugin): """A barebones sub-commander class, meant to connect to and operate new hardware modules. The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.pcr_action_id'). All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial. """ # NOTE: the action name must match the module's name (case insensitive). # TODO: see if its worth/simple using "__file__" for less hard-coding. port='/dev/ttyACM0' baudrate=115200 launch_type="thread" # launch_type="to_thread" # launch_type="process" verbose=True def __init__(self, controller: Controller, launch_type=None, *args, **kwargs) -> None: # Save the controller for later, just in case. self.controller = controller # Init args. self.verbose = self.controller.verbose if launch_type: self.launch_type=launch_type # Instantiate any device-specific stuff over here. # Instantiate the arduino handler (NOTE: there is an older class called SerialCommunication too). # self.arduino = SerialCommunication(port='/dev/ttyACM0', baudrate=115200) # Controller for the arduino zero / pocket pcr fork. self.arduino = SerialCommunicationSync(port=self.port, baudrate=self.baudrate, launch_type=self.launch_type, logger=logging, loglevel=logging.DEBUG, verbose=self.verbose) # Save the methods used to queue messages and check for responses. self.put = self.arduino.put self.get = self.arduino.get # Here you can add asyncio coroutines that the main "Controller" class can start and manage. # Add any coroutines that you want the Controller to gather. self.controller.coroutine_methods.extend([ # NOTE: launch using the builtin method of the SerialCommunicationSync class. self.start() # NOTE: use this for slow and blocking coroutines. # self.run_as_thread() # NOTE: placeholder method, not implemented. # If your program can run thread-safely in a separate process/thread, # you are very welcome to do that instead. # NOTE: This requires sharing event and queue objects to interact. # self.run_as_process() ]) # Add your handlers for new protocol actions. # The controller handler is used by 'controller.run_actions_protocol'. self.controller.add_action_runner(name=self.pcr_action_id, function=self.run_action) # The builder handler is used by 'builder.parseAction' to generate GCODE. self.controller.builder.add_action_handler(name=self.pcr_action_id, function=self.make_gcode) # Register event handler for action status updates. self.controller.register_event_callback( event_name=self.controller.status_event, callback_name="PocketPCR.status", callback_function=self.status_callback) # Set status. self._status = True @property def status(self): state = self._status and self.run and self.ping_status and self.arduino.is_connected return state async def status_callback(self, status=None): """This function generates status data for the commander's "status" event.""" # Ensure basic structure. if status is None: status = {} status.setdefault("plugins", {}) # Add data to the incoming dict. status["plugins"]["pocketpcr"] = { "run": self.run, "port": self.port, "baudrate": self.baudrate, "launch_type": self.launch_type, "responsive": self.ping_status, "connected": self.arduino.is_connected, "status": self.status } return status #### Handler methods section #### pcr_action_id = "PCR" """String identifying the actions that will be passed to this class bu their 'cmd' property.""" # ACTION CONTROLLER method async def run_action(self, action: dict, i: int): """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. This must be an async function. """ logging.info(f"PocketPCR: executing action with index {i}") # TODO: communicate with the PocketPCR to run a protocol. id = 873254 self.put({"method": "example", "id": id}) response = None while not response: await asyncio.sleep(1.0) response = self.get(id) logging.info(f"PocketPCR: done executing action with index {i} and response: {response}") print(i, action) # GCODE BUILDER method def make_gcode(self, action, action_index, *args, **kwargs): """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above. The output of this function is not used elsewhere. """ # The PocketPCR has no moving parts, thus the action requires no GCODE in particular. # The action object is left unmodified. return # START methods for serial communication #### def check_run(self): if self.run: raise PluginError("PocketController.start: another instance is already running or has not shutdown.") run = False """Flag indicating wether the device driver is already running.""" async def start(self): # Here you can write your asynchronous program. logging.info("PocketPCR: start coroutine initialized.") # Check that it has not already been started. self.check_run() self.run = True # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: # await self.arduino.start() await asyncio.gather( self.arduino.start(), self.ping() ) except asyncio.CancelledError: logging.error("PocketController.start: task cancelled, stoppping the PocketPCR") try: async with asyncio.timeout(5): await self.arduino.stop() except TimeoutError: print("PocketController: failed to stop the PocketPCR. Timed out while waiting.") except Exception as e: logging.error(f"PocketController.start: ended with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False logging.info("PocketPCR: start coroutine ended.") ping_status = None ping_method = "ping" async def ping(self, timeout=2.0, wait_time=0.2): """Example coroutine that sends messages""" logging.info("PocketPCR: ping coroutine started.") # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: i = 0 while self.controller.run: i += 1 await asyncio.sleep(1) # Send command: cmd = {"method": self.ping_method} cmd_id = self.controller.machine.hash_cmd(str(cmd)) cmd["id"] = cmd_id self.put(cmd) logging.debug(f"PocketController.ping: sent ping #{i} with command={cmd}") # Expect response: response = None elapsed = 0 while not response and elapsed < timeout: elapsed += wait_time await asyncio.sleep(wait_time) response = self.get(cmd_id) # Parse result. if response: logging.debug(f"PocketController.ping: done with ping #{i}, got response: '{response}'") self.ping_status = True else: logging.error(f"PocketController.ping: the device did not respond to ping #{i}.") self.ping_status = False # Handle errors. except asyncio.CancelledError: logging.error("PocketController.ping: task cancelled.") except Exception as e: logging.error(f"PocketController.ping: ended with an error: {e}\n" + traceback.format_exc()) logging.info("PocketPCR: ping coroutine ended.") # ALTERNATIVE START methods for serial communication #### @staticmethod def ardu_start(arduino: SerialCommunicationSync): # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only. # With threading it was possible. asyncio.run(arduino.start()) async def run_as_process(self): logging.info("Launching arduino controller as a process.") # Check that it has not already been started. self.check_run() self.run = True try: # Create the process. # in_queue = queue.Queue() # BaseTracker # out_queue = queue.Queue() # BaseTracker # run = Event() # BaseExecutor proc = Process( target=self.ardu_start, args = (self.arduino,), # kwargs={ # "arduino": self.arduino, # "run_event": self.controller.run # }, name = "arduino_pyserial", daemon=True ) # Start the process. proc.start() # Wait for a bit. await asyncio.sleep(0.2) # Loop until the parent controller's tasks end or are cancelled. while self.controller.run: await asyncio.sleep(1) # Sharing data with an mp.process is hard... # NOTE: https://stackoverflow.com/a/58869967 if not self.arduino.run.is_set(): logging.info("Arduino class not running.") break except asyncio.CancelledError: logging.info("Cancellation exception received.") except Exception as e: logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc()) try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False async def run_as_thread(self): """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE To have it do something, override or replace the "blocking_task" method. The idea here is that blocking tasks can be sent to threads by asyncio. Threads are, however, not simple to stop from outside. """ # Check that it has not already been started. self.check_run() self.run = True try: # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1) except Exception as e: logging.error(f"PocketController.run_as_thread: process failed with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False def blocking_task(self): # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2)A barebones sub-commander class, meant to connect to and operate new hardware modules.
The commander will handle a particular 'action' from the pipetting mid-level protocol by calling this class, finding it by a unique 'cmd' property (i.e. 'self.pcr_action_id').
All actions with 'cmd' equal to the 'cmd' property will end up being processed here, both for GCODE generation and device control over serial.
Ancestors
Class variables
var baudratevar launch_typevar pcr_action_id-
String identifying the actions that will be passed to this class bu their 'cmd' property.
var ping_methodvar ping_statusvar portvar run-
Flag indicating wether the device driver is already running.
var verbose
Static methods
def ardu_start(arduino: SerialCommunicationSync)-
Expand source code
@staticmethod def ardu_start(arduino: SerialCommunicationSync): # NOTE: I could not get this to run on its own thread by using "asyncio.to_thread" only. # With threading it was possible. asyncio.run(arduino.start())
Methods
def blocking_task(self)-
Expand source code
def blocking_task(self): # NOTE: Take care when accessing data from a task running in a new thread. # Consider using "thread-safe" operations and objects (e.g. queues, events, semaphores, etc.). time.sleep(2) def check_run(self)-
Expand source code
def check_run(self): if self.run: raise PluginError("PocketController.start: another instance is already running or has not shutdown.") def make_gcode(self, action, action_index, *args, **kwargs)-
Expand source code
def make_gcode(self, action, action_index, *args, **kwargs): """Any relevant GCODE can be added to the action here, and then handled in the 'run_action' method above. The output of this function is not used elsewhere. """ # The PocketPCR has no moving parts, thus the action requires no GCODE in particular. # The action object is left unmodified. returnAny relevant GCODE can be added to the action here, and then handled in the 'run_action' method above. The output of this function is not used elsewhere.
async def ping(self, timeout=2.0, wait_time=0.2)-
Expand source code
async def ping(self, timeout=2.0, wait_time=0.2): """Example coroutine that sends messages""" logging.info("PocketPCR: ping coroutine started.") # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: i = 0 while self.controller.run: i += 1 await asyncio.sleep(1) # Send command: cmd = {"method": self.ping_method} cmd_id = self.controller.machine.hash_cmd(str(cmd)) cmd["id"] = cmd_id self.put(cmd) logging.debug(f"PocketController.ping: sent ping #{i} with command={cmd}") # Expect response: response = None elapsed = 0 while not response and elapsed < timeout: elapsed += wait_time await asyncio.sleep(wait_time) response = self.get(cmd_id) # Parse result. if response: logging.debug(f"PocketController.ping: done with ping #{i}, got response: '{response}'") self.ping_status = True else: logging.error(f"PocketController.ping: the device did not respond to ping #{i}.") self.ping_status = False # Handle errors. except asyncio.CancelledError: logging.error("PocketController.ping: task cancelled.") except Exception as e: logging.error(f"PocketController.ping: ended with an error: {e}\n" + traceback.format_exc()) logging.info("PocketPCR: ping coroutine ended.")Example coroutine that sends messages
async def run_action(self, action: dict, i: int)-
Expand source code
async def run_action(self, action: dict, i: int): """The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. This must be an async function. """ logging.info(f"PocketPCR: executing action with index {i}") # TODO: communicate with the PocketPCR to run a protocol. id = 873254 self.put({"method": "example", "id": id}) response = None while not response: await asyncio.sleep(1.0) response = self.get(id) logging.info(f"PocketPCR: done executing action with index {i} and response: {response}") print(i, action)The 'run_action' method (with this exact name) will be called by the Controller from 'run_actions_protocol'. This must be an async function.
async def run_as_process(self)-
Expand source code
async def run_as_process(self): logging.info("Launching arduino controller as a process.") # Check that it has not already been started. self.check_run() self.run = True try: # Create the process. # in_queue = queue.Queue() # BaseTracker # out_queue = queue.Queue() # BaseTracker # run = Event() # BaseExecutor proc = Process( target=self.ardu_start, args = (self.arduino,), # kwargs={ # "arduino": self.arduino, # "run_event": self.controller.run # }, name = "arduino_pyserial", daemon=True ) # Start the process. proc.start() # Wait for a bit. await asyncio.sleep(0.2) # Loop until the parent controller's tasks end or are cancelled. while self.controller.run: await asyncio.sleep(1) # Sharing data with an mp.process is hard... # NOTE: https://stackoverflow.com/a/58869967 if not self.arduino.run.is_set(): logging.info("Arduino class not running.") break except asyncio.CancelledError: logging.info("Cancellation exception received.") except Exception as e: logging.info(f"Unexpected exception: {e}\n" + traceback.format_exc()) try: await self.arduino.stop() except Exception as e: logging.error(f"Failed to stop the arduino: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False async def run_as_thread(self)-
Expand source code
async def run_as_thread(self): """THIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE To have it do something, override or replace the "blocking_task" method. The idea here is that blocking tasks can be sent to threads by asyncio. Threads are, however, not simple to stop from outside. """ # Check that it has not already been started. self.check_run() self.run = True try: # If your code can block the main thread for any significant time, then send the programs to a new one instead. # First, create a coroutine for the blocking task. blocking_coroutine = asyncio.to_thread(self.blocking_task) # Then schedule the task. task = asyncio.create_task(blocking_coroutine) # Allow the scheduled task to start by awaiting something. # For example, you can run this task and other tasks concurrently with gather(). await asyncio.gather( # Add other tasks/coroutines here to run them concurrently. # self.another_async_method(), # Put the threading task here. task ) # Allow the scheduled task to start by awaiting something. while not task.done() and not task.cancelled(): # Wait for the task here (hoping to prevent "never awaited" errors). await asyncio.sleep(1) except Exception as e: logging.error(f"PocketController.run_as_thread: process failed with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = FalseTHIS METHOD DOES NOTHING, IT IS ONLY AN IMPLEMENTATION EXAMPLE
To have it do something, override or replace the "blocking_task" method.
The idea here is that blocking tasks can be sent to threads by asyncio.
Threads are, however, not simple to stop from outside.
async def start(self)-
Expand source code
async def start(self): # Here you can write your asynchronous program. logging.info("PocketPCR: start coroutine initialized.") # Check that it has not already been started. self.check_run() self.run = True # Note that nothing can take too long to run; the entirety of the program is waiting for this code to "await" something. try: # await self.arduino.start() await asyncio.gather( self.arduino.start(), self.ping() ) except asyncio.CancelledError: logging.error("PocketController.start: task cancelled, stoppping the PocketPCR") try: async with asyncio.timeout(5): await self.arduino.stop() except TimeoutError: print("PocketController: failed to stop the PocketPCR. Timed out while waiting.") except Exception as e: logging.error(f"PocketController.start: ended with an error: {e}\n" + traceback.format_exc()) # Flag stopped state. self.run = False logging.info("PocketPCR: start coroutine ended.") async def status_callback(self, status=None)-
Expand source code
async def status_callback(self, status=None): """This function generates status data for the commander's "status" event.""" # Ensure basic structure. if status is None: status = {} status.setdefault("plugins", {}) # Add data to the incoming dict. status["plugins"]["pocketpcr"] = { "run": self.run, "port": self.port, "baudrate": self.baudrate, "launch_type": self.launch_type, "responsive": self.ping_status, "connected": self.arduino.is_connected, "status": self.status } return statusThis function generates status data for the commander's "status" event.
Inherited members