Module pipettin-piper.piper.controller

Controller module that drives OLA's Pipettin robot.

Functions

def launch(piper_config=None)
Expand source code
def launch(piper_config=None):
    """This function is launched by '__main__.py' when the piper module is loaded.
    Run: 'python3 -m piper'
    See: '__main__.py'.
    """
    if piper_config is None:
        piper_config={}

    # Startup.
    logging.debug("Starting up with options:\n" + pformat(piper_config))

    # Instantiate the Controller.
    moon_kontrol = Controller(config=piper_config)

    # Run!
    try:
        if piper_config.get("verbose", False):
            asyncio.run(moon_kontrol.test())
        moon_kontrol.start()

    except Exception as e:
        # Handle aberrant interruption.
        msg = f"Caught exception with message: {str(e)}\n" + traceback.format_exc()
        logging.error(msg)
        print(msg)

        # Try to stop the controller gracefully.
        try:
            logging.info("Stopping the controller and closing connections gracefully.")
            asyncio.run(moon_kontrol.stop())
        except Exception as e2:
            # Handle failure to stop the controller.
            msg = f"Could not stop controller gracefully. Error message: {str(e2)}\n" + traceback.format_exc()
            logging.error(msg)
            print(msg)
            # Error handled unsuccessfully.
            result = False
        else:
            # Error handled successfully.
            result = True

    else:
        # Everything looks ok.
        logging.info("The coroutines have ended properly.")
        result = True

    return result

This function is launched by 'main.py' when the piper module is loaded. Run: 'python3 -m piper' See: 'main.py'.

Classes

class Controller (datatools: str = None,
plugins: tuple = None,
config: pipettin-piper.piper.config.config_helper.TrackedDict = None,
default_wait_interval=0.05,
database_tools: pipettin-piper.piper.datatools.mongo.MongoObjects = None,
**kwargs)
Expand source code
class Controller():
    """A class to drive the Pipettin robot.
    Most of the code is in the "commander" classes, instantiated below,
    which add core functionality to the robot controller.
    This class instantiates "database tools" and the GCODE "builder" classses,
    and loads "plugins" that further extend its functions.
    """

    def __init__(self,
                 datatools: str=None,
                 plugins: tuple=None,
                 config: TrackedDict=None,
                 default_wait_interval=0.05,
                 database_tools: MongoObjects = None,
                 **kwargs):


        # Note startup.
        logging.info("Initiating the controller.")

        # List for the coroutines started by "launch".
        self.coroutine_methods = []
        # List of asyncio tasks, setup by "gather" in launch, and stopped elsewhere too.
        self.tasks = []
        # Dictionary to register handlers for protocol actions.
        self.action_runners = {}
        # Post-init callback functions.
        self.post_init_callbacks = []
        # Callback functions for events.
        self.events_callbacks: dict = {}
        # ID for the "status" internal event (non-SIO).
        self.status_event = "status"
        # Life status.
        self.killed = False
        # Verbose and logging.
        self.verbose: bool = False
        # Helper variables for asyncio coroutines.
        self.run = True
        self.background_task = None
        # Create a property for the database object (i.e. only MongoObjects for now).
        # This should be overwitten by plugins.
        self.database_tools = None
        # Protocol pausing and stopping events.
        self.pause_protocol: Event = Event()
        self.stop_protocol: Event = Event()
        self.current_action: dict = {}

        # Save configuration if specified. Else it is left as an empty dict.
        if config is None:
            config = {}

        # Update datatools from config, or from arguments.
        if datatools:
            config["datatools"] = datatools
        # Update plugin_names from argument.
        if plugins is not None:
            config["plugins"] = self.unique(
                l1=list(plugins),
                l2=config.get("plugins", [])
            )

        # Update defaults with a copy of the config.
        self.config: TrackedDict = default_config()
        self.config.update(
            deepcopy(config)
        )

        # Read the settings file.
        # TODO: Consider deleting this method, the functionality is not implemented UI-side yet.
        # self.update_config_from_settings()

        # Set up logging.
        setup_logging(directory=self.config.get('logdir', None),
                      level=self.config.get('loglevel', 3),
                      loglimit_mb=config.get('loglimit_mb', None),
                      verbose=self.config.get('verbose', True))

        # Set verbose level.
        self.verbose = self.config.get("verbose", kwargs.get("verbose", False))

        # Set paranoid mode.
        self.paranoid = self.config.get("paranoid", True)

        # Set pause time out.
        self.pause_timeout = self.config.get("pause_timeout", 600)

        if self.verbose:
            logging.info(f"Starting up with config:\n{pformat(self.config)}")
        else:
            logging.debug(f"Starting up with config:\n{pformat(self.config)}")

        # Database tools.
        if database_tools is not None:
            self.database_tools = database_tools
            logging.warning("Using the provided 'database_tools' object. Ignoring DB settings from config.")
        else:
            # Instantiate the database tools.
            if not self.config["datatools"]:
                raise CommanderError("Missing entry 'datatools' is configuration.")
            logging.info(f"Loading datatools module '{self.config['datatools']}'.")
            module = importlib.import_module("piper.datatools." + self.config["datatools"])
            self.database_tools: MongoObjects = module.load_datatools(self)

        # Instantiate main sub-controller classes.
        self.comms: SioCommander = SioCommander(config=self.config, controller=self)
        self.machine: KlipperCommander = KlipperCommander(config=self.config, controller=self)

        # Instantiate a GcodeBuilder class.
        # This will track the state of the machine and generate GCODE.
        self.builder = GcodeBuilder(controller=self,
                                    config=self.config,
                                    verbose=self.verbose)

        # General time for "asyncio.sleep" across the program.
        self.wait_default = self.config.get("default_wait_interval", default_wait_interval)

        # Loop over the requrested plugins, which must be the names of
        # python files (without ".py" extension), and "load" them into a dictonay property.
        logging.info(f"Loading plugins: {self.config['plugins']}")
        self.plugin_names = self.config["plugins"]
        self.plugins = {}
        self.load_plugins(plugins=self.plugin_names, plugins_tracker=self.plugins)

        # Run the-post init callbacks.
        self.post_init()

    def update_config_from_settings(self, settings_file:str=None):
        """Read configuration values from a settings JSON file on disk.
        This is meant as an additional (and optional) way to override configuration values,
        from a "settings.json" file exported by the UI.

        TODO: The DB connection details are parsed from a ".env" in the data tools object at startup.
              Consider deleting this method.
        """
        if settings_file is None:
            settings_file = self.config.get("settings_file", None)
        if settings_file:
            try:
                logging.info(f"Reading settings from file: {settings_file}")
                with open(settings_file, "r", encoding="utf-8") as file:
                    settings = json.load(file)
                    self.config.setdefault("settings", {})
                    self.config["settings"].update(settings)
            except Exception as e:
                logging.warning(f"Failed to update configuration from settings file '{settings_file}': {e}")
            else:
                logging.debug(f"Parsed settings data: {settings}")
        else:
            logging.warning("No settings file to read from.")

    @staticmethod
    def unique(l1:list, l2:list=None):
        if l2 is None:
            l2 = []
        return list(set( l1 + l2 ))

    post_init_callbacks: list
    """List of functions to be run after all core modules are loaded.
    These are: GcodeBuilder, Machine, and Comms.
    """
    def post_init(self):
        """Run all post-init methods, once all python modules are loaded and initialized."""
        # Run all callbacks.
        for cb in self.post_init_callbacks:
            # Pass no arguments.
            cb()
        # Print available tools.
        logging.info(f"Available tools after setup: {list(self.builder.tools)}")
        logging.debug("Tool data:\n" + pformat(self.builder.tools))
        # Print available handlers and runners.
        logging.info(f"Available action handlers at startup: {list(self.builder.action_switch_case)}")
        logging.info(f"Available action runners at startup: {list(self.action_runners)}")

    def load_plugins(self, plugins: list, plugins_tracker: dict):
        """Load python modules from the 'plugins' directory through their 'load_plugin' function.

        Plugins are expected to return an Exception (or false-like) object if they could not load,
        or true-like object when they load correctly. In that case the the object is saved to an
        internal dictionary.
        """
        for module_name in plugins:
            logging.debug(f"Loading plugin '{module_name}'.")

            # Load the requested module from the "plugins" sundirectory.
            module = importlib.import_module("piper.plugins." + module_name)

            # Instantiate the plugin.
            try:
                plugin_instance = module.load_plugin(self)
            except Exception as e:
                logging.error(f"Plugin '{module_name}' failed to load: {e}")
                plugins_tracker[module_name] = FailedPlugin(module_name)
                continue

            # Parse the result.
            if not plugin_instance:
                logging.warning(f"Plugin '{module_name}' seems empty.")
            elif plugin_instance:
                # Save the module to the plugins list.
                plugins_tracker[module_name] = plugin_instance
                logging.info(f"Plugin '{module_name}' loaded successfully.")
            else:
                logging.warning(f"Unhandled result when loading '{module_name}' plugin.")

    action_runners: dict
    """Dictionary to store controller methods from plugins."""
    def add_action_runner(self, name, function):
        """Register a callback function to handle incoming actions by their type (i.e. the name)."""
        if self.action_runners.get(name, None):
            msg = f"Error: attempted to overwrite action handler '{name}'."
            logging.error(msg)
            raise CommanderError(msg)
        else:
            logging.info(f"Registered executor for '{name}' actions.")
            self.action_runners[name] = function

    def register_event_callback(self, event_name: str, callback_name: str, callback_function):
        """Register a function to be called when an event happens.

        Args:
            event_name (str): The name of the event.
            callback_name (str): A unique ID for the callback function.
            callback_function (_type_): The asyncio function to execute when an event happens. It must accept any number of arguments and keyword arguments.
        """
        # Check that the callback function is awaitable.
        if not inspect.iscoroutinefunction(callback_function):
            msg = f"Error: the callback function with name '{callback_name}' is not awaitable."
            msg += " Did you forget to use 'async' in the function definition? (i.e async def foo(bar): ...)"
            logging.error(msg)
            raise ValueError(msg)

        # Create the event if it does not exist.
        if self.events_callbacks.get(event_name, None) is None:
            self.events_callbacks[event_name] = {}

        # Check that the name is unique in existing callbacks.
        if self.events_callbacks[event_name].get(callback_name, None) is not None:
            raise ValueError(
                f"Error: a callback named '{callback_name}'" + \
                f" is already registered for the event '{event_name}'."
            )

        # Register the callback.
        self.events_callbacks[event_name][callback_name] = callback_function

    async def trigger_event_callback(self, event_name: str, callback_name: str = None, **kwargs):
        """Trigger all callbacks associated to an event, or a particular callback.

        Args:
            event_name (str): The name of the event as registered using `register_event_callback`.
            callback_name (str, optional): Run only a single registered callback for the event, by
                its registered name. Defaults to None.
        """
        results = {}
        logging.debug(f"Processing event '{event_name}'.")

        # Get the callbacks for the event.
        event_callbacks = self.events_callbacks.get(event_name, None)
        # Check if there are any callbacks for the wvent.
        if event_callbacks is not None:
            # Get and trigger callbacks.
            if callback_name is not None:
                # Trigger a single callback.
                callback_function = event_callbacks.get(callback_name, None)
                if callback_function is None:
                    logging.warning(f"Warning: there is no callback named '{callback_name}' for event '{event_name}'. Skipped running callback.")
                else:
                    try:
                        results[callback_name] = await callback_function(**kwargs)
                    except Exception:
                        error_message = traceback.format_exc()
                        logging.error(f"Caught an error when running callback function '{callback_name}':\n" + error_message)
                        results[callback_name] = {"error": error_message}
            else:
                # Trigger all callbacks of the event.
                for callback_name, callback_function in event_callbacks.items():
                    try:
                        results[callback_name] = await callback_function(**kwargs)
                    except Exception:
                        error_message = traceback.format_exc()
                        logging.error(f"Caught an error when running callback function '{callback_name}':\n" + error_message)
                        results[callback_name] = {"error": error_message}
                if not results:
                    logging.warning(f"Warning: no callbacks found for event '{event_name}'.")
        else:
            logging.warning(f"There are no callbacks registered for the '{event_name}' event. Doing nothing.")

        return results

    # PROTOCOL methods section ####
    async def run_callback(self, cb, *args, **kwargs):
        """Run a function, detecting if it is regular or async.
        See: https://stackoverflow.com/a/36076663

        Regular functions are run in threads and should consider thread safety.
        These functions must accept a "stop" event.
        """
        if inspect.iscoroutinefunction(cb):
            await cb(*args, **kwargs)
        else:
            try:
                stop = Event()
                await asyncio.to_thread(cb, *args, **kwargs, stop_event=stop)
            except asyncio.CancelledError:
                stop.set()
                logging.error("Stop flagged for threaded callback. Returning now.")

    def check_greenlight(self, action=None):
        """Check if the commander is free.
        Raises an error if a current action is already being processed.
        """
        if self.current_action:
            msg = f"Failed to greenlight; another one is being processed by the controller: {self.current_action}"
            logging.error(msg)
            raise ProtocolError(msg, action=action)

    def check_for_stop_event(self):
        """Raise an exception if the stop protocol flag is set, and clear it."""
        if self.stop_protocol.is_set():
            self.stop_protocol.clear()
            msg = "Action aborted: stop_protocol event found set."
            logging.error(msg)
            raise ProtocolError(msg)

    async def handle_pause(self, i=None):
        """Handle pauses."""
        if self.pause_protocol.is_set():
            logging.warning("Pausing action execution.")
            try:
                await asyncio.wait_for(self.wait_for_event_clear(self.pause_protocol), timeout=self.pause_timeout)
            except asyncio.TimeoutError as e:
                msg = f"Action {i} aborted. Still paused set after timing-out at {self.pause_timeout} seconds."
                self.pause_protocol.clear()
                logging.error(msg)
                raise ProtocolError(msg) from e

    async def wait_for_event_clear(self, event: Event):
        """Waits for the event to clear its flag."""
        while event.is_set():
            self.check_for_stop_event()
            await asyncio.sleep(self.wait_default)

    def get_action_runner(self, action):
        # Get the action's command.
        action_command_id = action['cmd']
        # Check if it is a JSON action, and update the command id with the one in its arguments.
        if action_command_id == "JSON":
            action_command_id = action['args']['cmd']
            logging.debug(f"Updated action command from JSON action to: {action_command_id}")

        # Try to find a function (inserted by a plugin) that can handle this protocol action.
        return action_command_id, self.action_runners.get(action_command_id, None)

    async def run_actions_protocol(self, actions: list, wait=None, check=None, timeout=10.0):
        """Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

        This method wraps run_actions to handle exceptions.

        Args:
            actions (List): A list of dictionaries, each one an action from a pipetting protocol.
            wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None.
            check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None.
            timeout (float, optional): Timeout for the action to complete its execution. Defaults to 10.0.
        """

        # Default to the machine's paranoia status if unspecified.
        if wait is None:
            wait = self.paranoid
        if check is None:
            check = self.paranoid

        logging.info(f"Running {len(actions)} actions protocol.")
        logging.debug(f"Run options: wait={wait}, check={check}, timeout={timeout}")

        # Run actions in the list.
        try:
            await self.run_actions(actions, wait, check, timeout)

        # Handle exceptions.
        except asyncio.exceptions.CancelledError:
            # Log cancellations.
            logging.error("Protocol cancelled while running action.")
        except Exception as e:
            # Raise other exceptions as protocol errors.
            raise ProtocolError("Protocol aborted due to an unhandled exception while running.") from e
            # NOTE: The logic here is that action handlers are not expected to handle asyncio cancellations, and
            #       may raise ProtocolErrors on their own. Those should be passed on to this calling environment.
            #       Other exceptions are re-raised here as ProtocolErrors.
        else:
            logging.info("Actions protocol executed successfully.")

    async def run_actions(self, actions: list, wait=None, check=None, timeout=10.0):
        """Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

        If the actions contain a 'cmd' identifier registered by a plugin, the "run_action" method of that plugin will be called.
        Otherwise, this method will call `run_actions_protocol` in the machine class, that expects a "GCODE" key in each action.

        Exceptions are caught to clear the current_action attribute, and immediately re-raised.
        
        Args:
            actions (List): A list of dictionaries, each one an action from a pipetting protocol.
            wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None.
            check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None.
            timeout (float, optional): Timeout for the action to complete its execution. Defaults to 10.0.
        """
        logging.debug(f"Incoming actions protocol with {len(actions)} actions.")

        # Check if the commander is free to run new actions.
        self.check_greenlight()
        # NOTE: This must be here instead of run_actions_protocol because it would introduce
        #       an intermediate await, between the check and setting the current action.

        # Counter for the current action.
        i = 0
        # Iterate until actions run out.
        while i < len(actions):
            logging.debug(f"Next action index: {i}")
            try:
                # A clean start.
                error = None

                # Set the current action.
                action = actions[i]
                self.current_action = action

                # Handle pauses.
                await self.handle_pause(i)
                # Handle stop.
                self.check_for_stop_event()

                # Try getting a runner function.
                action_command_id, runner_function = self.get_action_runner(action)
                logging.debug(f"Processing action {i}: {action_command_id}")

                # Execute the action using a registered runner, if any.
                if runner_function is not None:
                    logging.debug(f"Running {action_command_id} action using a handler.")
                    await self.run_callback(
                        runner_function,
                        action=action, i=i,
                        wait=wait, check=check,
                        timeout=timeout
                    )
                    logging.debug(f"Action {action_command_id} completed by the handler.")
                    # Increment the action index by one.
                    i += 1

                # Otherwise, batch-send the actions directly to the Machine.
                else:
                    logging.debug("Running actions in the machine.")
                    # Prepare variables to make a batch.
                    actions_batch, next_actions = [action], actions[i+1:]
                    # Grow the batch.
                    for j, next_action in enumerate(next_actions):
                        # Try getting a future runner function.
                        cmd, fun = self.get_action_runner(next_action)
                        if fun is not None:
                            # Stop growing the batch if a runner is found.
                            logging.debug(f"Batch grown to {len(actions_batch)} actions. Next action {actions.index(next_action)} is not batch-able: {cmd}")
                            break
                        else:
                            # Add to the batch if no handler function is found.
                            logging.debug(f"Appending action {actions.index(next_action)} and command {cmd} to batch with {len(actions_batch)} (+1) actions.")
                            actions_batch.append(next_action)
                    # Let the Machine execute the action (only parsing its GCODE).
                    logging.debug(f"Executing batch of {len(actions_batch)} actions directly in the machine.")
                    await self.machine.run_actions_protocol(
                        actions=actions_batch, i=i,
                        wait=wait, check=check,
                        default_action_timeout=timeout
                    )
                    # Increment the action index by the batch's length (at least 1).
                    i += len(actions_batch)

                # Ensure idle printer before continuing to a different action type.
                while not await self.machine.wait_for_idle_printer():
                    # TODO: This loop may get stuck if the controller is not running or something else fails.
                    if not self.run:
                        raise CommanderError("The commander is no longer running.")
                    # TODO: Consider reimplementing this with a "G4 P0" command instead.
                    logging.info(f"Machine not yet idle. Posponing the end of action {i}: {action['cmd']}")
                    # Check for a stop event before retrying.
                    self.check_for_stop_event()

                # Logs.
                logging.debug("Machine idle. " + "No actions left." if i == len(actions) else f"Moving on to action {i}.")

            # Handle exception for the current action.
            except Exception as e:
                logging.error("An error was raised while running the protocol: " + str(e))
                # Re-raise the error later.
                error = e

            # Reset the current action.
            self.current_action = {}

            # Raise any errors.
            if error is not None:
                raise ProtocolError(f"Failed to run action {i}: {action['cmd']}") from error

        logging.debug(f"Executed {len(actions)} actions.")

    # START methods section ####
    def start(self):
        """
        Starts the main controller asynchronously by running the launch method in the event loop.

        This method is the entry point for the Controller's main execution flow. It sets up the
        asynchronous event loop and calls the `launch` method to initialize and manage tasks.
        The method handles a KeyboardInterrupt exception to allow for graceful shutdown when
        interrupted by the user.

        Raises:
            KeyboardInterrupt: Gracefully exits on user interrupt (Ctrl+C).
        """
        try:
            asyncio.run(self.launch())
        except KeyboardInterrupt:
            logging.warning("Caught keyboard interrupt.")
            print("Caught keyboard interrupt.")
        except Exception as e:
            logging.error(f"An error occurred while launching the controller: {e}")
            raise

    async def start_as_task(self, hold=True):
        """
        Starts the main controller's asynchronous tasks as a background task.

        This method creates a background task to run the `launch` method asynchronously, allowing
        it to operate independently within an event loop. The method also handles KeyboardInterrupt
        to gracefully cancel the background task if the user interrupts the program.

        Args:
            hold (bool): If True, the call will block. Else the background task will be started and returned immediately.

        Raises:
            KeyboardInterrupt: Cancels the background task upon user interrupt (Ctrl+C).
        """
        try:
            self.background_task = asyncio.create_task(self.launch())
            if hold:
                await self.background_task
        except KeyboardInterrupt:
            logging.warning("Caught keyboard interrupt. Canceling tasks...")
            self.background_task.cancel()

        return self.background_task

    async def start_and_wait(self):
        """Start the commander, wait it to start, and keep it running as an asyncio background task.
        Returns the result of setup and readyness checks."""
        self.background_task = asyncio.create_task(self.launch())
        setup_ok = await self.machine.wait_for_setup()
        ready_ok = await self.machine.wait_for_ready()
        return setup_ok and ready_ok

    async def launch(self):
        """Main co-routine to start piper.
        Launches the other co-routines and catching any uncaught errors.
        Remember to clean-up by running "stop" after this method returns or raises exceptions.
        """
        logging.info("Starting controller coroutines.")

        # Create a list of asyncio tasks.
        self.tasks = [asyncio.create_task(method) for method in self.coroutine_methods]

        # Schedule calls *concurrently*:
        try:
            await asyncio.gather(*self.tasks)

        except asyncio.exceptions.CancelledError:
            exit_status = False
            msg = "Tasks were cancelled before gathering. Cleaning up."
            logging.warning(msg)
            self.cancel_tasks()

        except Exception as e:
            exit_status = False
            msg = f"Exceptions occured in the controller's tasks: {e}\n" + traceback.format_exc()
            logging.critical(msg)
            self.cancel_tasks()

        else:
            exit_status = True
            msg = "Coroutines gathered, stopping coroutines and closing sockets."
            logging.info(msg)

        # Log message.
        print(msg)

        return exit_status

    def cancel_tasks(self):
        """Call 'cancel' on all of the controller's tasks and set the run flag to false."""
        logging.warning("Cancelling all tasks.")
        # Signal coroutines to end
        self.run = False
        # End all tasks if at least one failed.
        for task in self.tasks:
            # https://stackoverflow.com/a/59655833
            task.cancel()

    # STOP/RESTART methods section ####
    async def stop(self, timeout=3.0):
        """Stop the coroutines cleanly, or 'cancel' them if that failed.
        This methods also tries to close the socketio and websocket connections.
        NOTE: This method is used by the pylabrobot backend to 'close' it at the end of a protocol.

        Returns a tuple of two boolean values. The first indicating if tasks have "ended",
        and the second indicating if tasks were killed.
        """

        logging.warning("Stopping the commander.")
        self.run = False

        exit_status = None, None

        # Stop
        if self.background_task is not None:
            # If started by "start_as_task" or "start_and_wait".
            exit_status = await self.stop_background_task(timeout=timeout)
        elif self.tasks:
            # If started by "start()" or from "launch()" directly.
            exit_status = await self.stop_tasks_list(timeout=timeout)
        else:
            logging.warning("Warning, stopping had no effect!. Coroutines were" + \
                            " not started by run nor awaited as background_tasks.")

        # Close websocket and socketio connections.
        logging.warning("Closing connections.")
        await self.close()

        return exit_status

    async def stop_tasks_list(self, timeout=3.0):
        logging.info("Stopping task list.")
        elapsed = 0.0
        while not self.task_list_done():
            await asyncio.sleep(timeout/5)
            elapsed += timeout/5
            if elapsed >= timeout:
                logging.warning("Timed out.")
                break
            logging.debug(f"Waiting. {timeout-elapsed} seconds remaining.")

        if not self.task_list_done():
            logging.warning("Tasks not yet done. Killing coroutines by cancel.")
            await self.kill()
        
        logging.info("Tasks done.")
        return self.task_list_done(), self.task_list_cancelled(self.tasks)

    async def stop_background_task(self, timeout=3.0):
        logging.info("Waiting for the background task to finish.")
        elapsed = 0.0
        while not self.background_task.done():
            await asyncio.sleep(timeout/5)
            elapsed += timeout/5
            if elapsed >= timeout:
                logging.warning("Timed out.")
                break
            logging.debug(f"Waiting. {timeout-elapsed} seconds remaining.")

        if not self.background_task.done():
            logging.warning("Background_task not yet done. Cancelling coroutines.")
            await self.kill()
        else:
            logging.info("Tasks done.")

        return self.background_task.done(), self.background_task.cancelled()

    # KILL methods section ####
    # Stuff to terminate eveything violently.
    killed: bool = False
    """Indicates if the kill method has been called to terminate all coroutines."""
    async def kill(self, timeout=None):
        """Kill the coroutines using the 'cancel' method."""
        if timeout is None:
            timeout = self.wait_default*2.1

        # Flag killed status.
        self.killed = True

        # Output variables.
        tasks_done = None
        tasks_cancelled = None

        # If started by "start_as_task" or "start_and_wait".
        if self.background_task is not None:
            if not self.background_task.done():
                logging.info("Cancelling task.")
                self.background_task.cancel()

                elapsed = 0.0
                while not self.background_task.cancelled():
                    await asyncio.sleep(self.wait_default)
                    elapsed += self.wait_default
                    if elapsed >= timeout:
                        logging.warning("Timed out.")
                        break

            if self.background_task.done() or self.background_task.cancelled():
                logging.info("Done.")

            tasks_done, tasks_cancelled = self.background_task.done(), self.background_task.cancelled()

        # If started by "start()" or from "launch()" directly.
        elif self.tasks:
            logging.info("Cancelling the tasks list.")
            for task in self.tasks:
                task.cancel()
            elapsed = 0.0
            while not self.task_list_cancelled(self.tasks):
                await asyncio.sleep(self.wait_default/4)
                elapsed += self.wait_default/4
                if elapsed >= timeout:
                    logging.warning("Timed out.")
                    break

            if self.task_list_cancelled(self.tasks):
                logging.info("Done.")

            tasks_done, tasks_cancelled = self.task_list_done(), self.task_list_cancelled(self.tasks)

        else:
            logging.info("Nothing to kill.")
            tasks_done, tasks_cancelled = True, True

        return tasks_done, tasks_cancelled

    async def close(self, timeout=3):
        """Call the "close" method from the main 'commander' classes."""
        logging.info("Closing all controller objects.")
        await self.machine.close(timeout)
        await self.comms.close(timeout)

    # HELPER methods section ####
    def task_list_done(self):
        """Check if all tasks in a list of tasks are done."""
        result = all([task.done() for task in self.tasks])
        logging.debug(f"Parsed status of {len(self.tasks)} coroutine tasks: {result}")
        return result

    @staticmethod
    def task_list_cancelled(task_list):
        """Check if all tasks in a list of tasks are cancelled."""
        return all([task.cancelled() for task in task_list])

    def killed_task(self):
        """Check on the status of the background async task."""
        if self.background_task:
            return self.background_task.done() or self.background_task.cancelled()
        elif self.tasks:
            return self.task_list_done() or self.task_list_cancelled(self.tasks)
        else:
            logging.warning("Neither self.background_task nor self.tasks are defined.")
        return None

    async def status(self):
        """Gather information and compute the overall status of this class."""
        logging.debug("Controller.status: gathering status data.")

        # TODO: Add printer status.
        status = {"controller": {
            "run": self.run,
            "killed": self.killed,
            "plugins": {name: "OK" if plugin.status else "WARN" for name, plugin in self.plugins.items()}
        }}

        # Add background task info if it had been configured.
        if self.background_task:
            status["controller"].update({
                "background_task": "OK",
                "background_task_done": "WARN" if self.background_task.done() else "OK",
                "background_task_cancelled": "WARN" if self.background_task.cancelled() else "OFF",
                "background_task_live": "WARN" if self.killed_task() else "OK"
            })
        else:
            status["controller"].update({
                "background_task": "OFF",
                "task_done": "WARN" if self.task_list_done() else "STANDBY",
                "task_cancelled": "WARN" if self.task_list_cancelled(self.tasks) else "OFF",
                "task_live": "WARN" if self.killed_task() else "OK"
            })

        # NOTE: Update the dictionary with status reports from commander modules,
        #       letting them report status as well.
        # Sio commander.
        comms_status = await self.comms.status()
        status.update(comms_status)
        # Klipper commander.
        machine_status = await self.machine.status()
        status.update(machine_status)

        # Let eveyone who cares know that a status update is in progress,
        # passing them the "status" dictionary refernce for modification.
        await self.trigger_event_callback(event_name=self.status_event, status=status)

        return status

    async def test(self):
        """Run initialization tests.
        TODO: These are placeholders for now.
        """
        await self.comms.test()
        await self.machine.test()

    def send_alert(self, text: str, alert_type: str="message"):
        """Send an alert message to the UI through a SocketIO connection.
        This function is not async, but will run the async function for
        the alert using "asyncio.run" in the current event loop (or creating one).
        NOTE: I'm not sure why I did this.
        """
        if self.comms.connected:
            # https://stackoverflow.com/a/61331974
            try:
                loop = asyncio.get_running_loop()
            except RuntimeError:  # 'RuntimeError: There is no current event loop...'
                loop = None

            if loop and loop.is_running():
                logging.debug('send_alert: Async event loop already running. Adding coroutine to the event loop.')
                loop.create_task(self.comms.sio_emit_alert(text=text, alert_type=alert_type))
            else:
                logging.debug('send_alert: Starting new event loop')
                asyncio.run(self.comms.sio_emit_alert(text=text, alert_type=alert_type))
        else:
            logging.warning(f"send_alert: SIO not connected. Failed to send message with text: {text}")

A class to drive the Pipettin robot. Most of the code is in the "commander" classes, instantiated below, which add core functionality to the robot controller. This class instantiates "database tools" and the GCODE "builder" classses, and loads "plugins" that further extend its functions.

Class variables

var action_runners : dict

Dictionary to store controller methods from plugins.

var killed : bool

Indicates if the kill method has been called to terminate all coroutines.

var post_init_callbacks : list

List of functions to be run after all core modules are loaded. These are: GcodeBuilder, Machine, and Comms.

Static methods

def task_list_cancelled(task_list)
Expand source code
@staticmethod
def task_list_cancelled(task_list):
    """Check if all tasks in a list of tasks are cancelled."""
    return all([task.cancelled() for task in task_list])

Check if all tasks in a list of tasks are cancelled.

def unique(l1: list, l2: list = None)
Expand source code
@staticmethod
def unique(l1:list, l2:list=None):
    if l2 is None:
        l2 = []
    return list(set( l1 + l2 ))

Methods

def add_action_runner(self, name, function)
Expand source code
def add_action_runner(self, name, function):
    """Register a callback function to handle incoming actions by their type (i.e. the name)."""
    if self.action_runners.get(name, None):
        msg = f"Error: attempted to overwrite action handler '{name}'."
        logging.error(msg)
        raise CommanderError(msg)
    else:
        logging.info(f"Registered executor for '{name}' actions.")
        self.action_runners[name] = function

Register a callback function to handle incoming actions by their type (i.e. the name).

def cancel_tasks(self)
Expand source code
def cancel_tasks(self):
    """Call 'cancel' on all of the controller's tasks and set the run flag to false."""
    logging.warning("Cancelling all tasks.")
    # Signal coroutines to end
    self.run = False
    # End all tasks if at least one failed.
    for task in self.tasks:
        # https://stackoverflow.com/a/59655833
        task.cancel()

Call 'cancel' on all of the controller's tasks and set the run flag to false.

def check_for_stop_event(self)
Expand source code
def check_for_stop_event(self):
    """Raise an exception if the stop protocol flag is set, and clear it."""
    if self.stop_protocol.is_set():
        self.stop_protocol.clear()
        msg = "Action aborted: stop_protocol event found set."
        logging.error(msg)
        raise ProtocolError(msg)

Raise an exception if the stop protocol flag is set, and clear it.

def check_greenlight(self, action=None)
Expand source code
def check_greenlight(self, action=None):
    """Check if the commander is free.
    Raises an error if a current action is already being processed.
    """
    if self.current_action:
        msg = f"Failed to greenlight; another one is being processed by the controller: {self.current_action}"
        logging.error(msg)
        raise ProtocolError(msg, action=action)

Check if the commander is free. Raises an error if a current action is already being processed.

async def close(self, timeout=3)
Expand source code
async def close(self, timeout=3):
    """Call the "close" method from the main 'commander' classes."""
    logging.info("Closing all controller objects.")
    await self.machine.close(timeout)
    await self.comms.close(timeout)

Call the "close" method from the main 'commander' classes.

def get_action_runner(self, action)
Expand source code
def get_action_runner(self, action):
    # Get the action's command.
    action_command_id = action['cmd']
    # Check if it is a JSON action, and update the command id with the one in its arguments.
    if action_command_id == "JSON":
        action_command_id = action['args']['cmd']
        logging.debug(f"Updated action command from JSON action to: {action_command_id}")

    # Try to find a function (inserted by a plugin) that can handle this protocol action.
    return action_command_id, self.action_runners.get(action_command_id, None)
async def handle_pause(self, i=None)
Expand source code
async def handle_pause(self, i=None):
    """Handle pauses."""
    if self.pause_protocol.is_set():
        logging.warning("Pausing action execution.")
        try:
            await asyncio.wait_for(self.wait_for_event_clear(self.pause_protocol), timeout=self.pause_timeout)
        except asyncio.TimeoutError as e:
            msg = f"Action {i} aborted. Still paused set after timing-out at {self.pause_timeout} seconds."
            self.pause_protocol.clear()
            logging.error(msg)
            raise ProtocolError(msg) from e

Handle pauses.

async def kill(self, timeout=None)
Expand source code
async def kill(self, timeout=None):
    """Kill the coroutines using the 'cancel' method."""
    if timeout is None:
        timeout = self.wait_default*2.1

    # Flag killed status.
    self.killed = True

    # Output variables.
    tasks_done = None
    tasks_cancelled = None

    # If started by "start_as_task" or "start_and_wait".
    if self.background_task is not None:
        if not self.background_task.done():
            logging.info("Cancelling task.")
            self.background_task.cancel()

            elapsed = 0.0
            while not self.background_task.cancelled():
                await asyncio.sleep(self.wait_default)
                elapsed += self.wait_default
                if elapsed >= timeout:
                    logging.warning("Timed out.")
                    break

        if self.background_task.done() or self.background_task.cancelled():
            logging.info("Done.")

        tasks_done, tasks_cancelled = self.background_task.done(), self.background_task.cancelled()

    # If started by "start()" or from "launch()" directly.
    elif self.tasks:
        logging.info("Cancelling the tasks list.")
        for task in self.tasks:
            task.cancel()
        elapsed = 0.0
        while not self.task_list_cancelled(self.tasks):
            await asyncio.sleep(self.wait_default/4)
            elapsed += self.wait_default/4
            if elapsed >= timeout:
                logging.warning("Timed out.")
                break

        if self.task_list_cancelled(self.tasks):
            logging.info("Done.")

        tasks_done, tasks_cancelled = self.task_list_done(), self.task_list_cancelled(self.tasks)

    else:
        logging.info("Nothing to kill.")
        tasks_done, tasks_cancelled = True, True

    return tasks_done, tasks_cancelled

Kill the coroutines using the 'cancel' method.

def killed_task(self)
Expand source code
def killed_task(self):
    """Check on the status of the background async task."""
    if self.background_task:
        return self.background_task.done() or self.background_task.cancelled()
    elif self.tasks:
        return self.task_list_done() or self.task_list_cancelled(self.tasks)
    else:
        logging.warning("Neither self.background_task nor self.tasks are defined.")
    return None

Check on the status of the background async task.

async def launch(self)
Expand source code
async def launch(self):
    """Main co-routine to start piper.
    Launches the other co-routines and catching any uncaught errors.
    Remember to clean-up by running "stop" after this method returns or raises exceptions.
    """
    logging.info("Starting controller coroutines.")

    # Create a list of asyncio tasks.
    self.tasks = [asyncio.create_task(method) for method in self.coroutine_methods]

    # Schedule calls *concurrently*:
    try:
        await asyncio.gather(*self.tasks)

    except asyncio.exceptions.CancelledError:
        exit_status = False
        msg = "Tasks were cancelled before gathering. Cleaning up."
        logging.warning(msg)
        self.cancel_tasks()

    except Exception as e:
        exit_status = False
        msg = f"Exceptions occured in the controller's tasks: {e}\n" + traceback.format_exc()
        logging.critical(msg)
        self.cancel_tasks()

    else:
        exit_status = True
        msg = "Coroutines gathered, stopping coroutines and closing sockets."
        logging.info(msg)

    # Log message.
    print(msg)

    return exit_status

Main co-routine to start piper. Launches the other co-routines and catching any uncaught errors. Remember to clean-up by running "stop" after this method returns or raises exceptions.

def load_plugins(self, plugins: list, plugins_tracker: dict)
Expand source code
def load_plugins(self, plugins: list, plugins_tracker: dict):
    """Load python modules from the 'plugins' directory through their 'load_plugin' function.

    Plugins are expected to return an Exception (or false-like) object if they could not load,
    or true-like object when they load correctly. In that case the the object is saved to an
    internal dictionary.
    """
    for module_name in plugins:
        logging.debug(f"Loading plugin '{module_name}'.")

        # Load the requested module from the "plugins" sundirectory.
        module = importlib.import_module("piper.plugins." + module_name)

        # Instantiate the plugin.
        try:
            plugin_instance = module.load_plugin(self)
        except Exception as e:
            logging.error(f"Plugin '{module_name}' failed to load: {e}")
            plugins_tracker[module_name] = FailedPlugin(module_name)
            continue

        # Parse the result.
        if not plugin_instance:
            logging.warning(f"Plugin '{module_name}' seems empty.")
        elif plugin_instance:
            # Save the module to the plugins list.
            plugins_tracker[module_name] = plugin_instance
            logging.info(f"Plugin '{module_name}' loaded successfully.")
        else:
            logging.warning(f"Unhandled result when loading '{module_name}' plugin.")

Load python modules from the 'plugins' directory through their 'load_plugin' function.

Plugins are expected to return an Exception (or false-like) object if they could not load, or true-like object when they load correctly. In that case the the object is saved to an internal dictionary.

def post_init(self)
Expand source code
def post_init(self):
    """Run all post-init methods, once all python modules are loaded and initialized."""
    # Run all callbacks.
    for cb in self.post_init_callbacks:
        # Pass no arguments.
        cb()
    # Print available tools.
    logging.info(f"Available tools after setup: {list(self.builder.tools)}")
    logging.debug("Tool data:\n" + pformat(self.builder.tools))
    # Print available handlers and runners.
    logging.info(f"Available action handlers at startup: {list(self.builder.action_switch_case)}")
    logging.info(f"Available action runners at startup: {list(self.action_runners)}")

Run all post-init methods, once all python modules are loaded and initialized.

def register_event_callback(self, event_name: str, callback_name: str, callback_function)
Expand source code
def register_event_callback(self, event_name: str, callback_name: str, callback_function):
    """Register a function to be called when an event happens.

    Args:
        event_name (str): The name of the event.
        callback_name (str): A unique ID for the callback function.
        callback_function (_type_): The asyncio function to execute when an event happens. It must accept any number of arguments and keyword arguments.
    """
    # Check that the callback function is awaitable.
    if not inspect.iscoroutinefunction(callback_function):
        msg = f"Error: the callback function with name '{callback_name}' is not awaitable."
        msg += " Did you forget to use 'async' in the function definition? (i.e async def foo(bar): ...)"
        logging.error(msg)
        raise ValueError(msg)

    # Create the event if it does not exist.
    if self.events_callbacks.get(event_name, None) is None:
        self.events_callbacks[event_name] = {}

    # Check that the name is unique in existing callbacks.
    if self.events_callbacks[event_name].get(callback_name, None) is not None:
        raise ValueError(
            f"Error: a callback named '{callback_name}'" + \
            f" is already registered for the event '{event_name}'."
        )

    # Register the callback.
    self.events_callbacks[event_name][callback_name] = callback_function

Register a function to be called when an event happens.

Args

event_name : str
The name of the event.
callback_name : str
A unique ID for the callback function.
callback_function : _type_
The asyncio function to execute when an event happens. It must accept any number of arguments and keyword arguments.
async def run_actions(self, actions: list, wait=None, check=None, timeout=10.0)
Expand source code
async def run_actions(self, actions: list, wait=None, check=None, timeout=10.0):
    """Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

    If the actions contain a 'cmd' identifier registered by a plugin, the "run_action" method of that plugin will be called.
    Otherwise, this method will call `run_actions_protocol` in the machine class, that expects a "GCODE" key in each action.

    Exceptions are caught to clear the current_action attribute, and immediately re-raised.
    
    Args:
        actions (List): A list of dictionaries, each one an action from a pipetting protocol.
        wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None.
        check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None.
        timeout (float, optional): Timeout for the action to complete its execution. Defaults to 10.0.
    """
    logging.debug(f"Incoming actions protocol with {len(actions)} actions.")

    # Check if the commander is free to run new actions.
    self.check_greenlight()
    # NOTE: This must be here instead of run_actions_protocol because it would introduce
    #       an intermediate await, between the check and setting the current action.

    # Counter for the current action.
    i = 0
    # Iterate until actions run out.
    while i < len(actions):
        logging.debug(f"Next action index: {i}")
        try:
            # A clean start.
            error = None

            # Set the current action.
            action = actions[i]
            self.current_action = action

            # Handle pauses.
            await self.handle_pause(i)
            # Handle stop.
            self.check_for_stop_event()

            # Try getting a runner function.
            action_command_id, runner_function = self.get_action_runner(action)
            logging.debug(f"Processing action {i}: {action_command_id}")

            # Execute the action using a registered runner, if any.
            if runner_function is not None:
                logging.debug(f"Running {action_command_id} action using a handler.")
                await self.run_callback(
                    runner_function,
                    action=action, i=i,
                    wait=wait, check=check,
                    timeout=timeout
                )
                logging.debug(f"Action {action_command_id} completed by the handler.")
                # Increment the action index by one.
                i += 1

            # Otherwise, batch-send the actions directly to the Machine.
            else:
                logging.debug("Running actions in the machine.")
                # Prepare variables to make a batch.
                actions_batch, next_actions = [action], actions[i+1:]
                # Grow the batch.
                for j, next_action in enumerate(next_actions):
                    # Try getting a future runner function.
                    cmd, fun = self.get_action_runner(next_action)
                    if fun is not None:
                        # Stop growing the batch if a runner is found.
                        logging.debug(f"Batch grown to {len(actions_batch)} actions. Next action {actions.index(next_action)} is not batch-able: {cmd}")
                        break
                    else:
                        # Add to the batch if no handler function is found.
                        logging.debug(f"Appending action {actions.index(next_action)} and command {cmd} to batch with {len(actions_batch)} (+1) actions.")
                        actions_batch.append(next_action)
                # Let the Machine execute the action (only parsing its GCODE).
                logging.debug(f"Executing batch of {len(actions_batch)} actions directly in the machine.")
                await self.machine.run_actions_protocol(
                    actions=actions_batch, i=i,
                    wait=wait, check=check,
                    default_action_timeout=timeout
                )
                # Increment the action index by the batch's length (at least 1).
                i += len(actions_batch)

            # Ensure idle printer before continuing to a different action type.
            while not await self.machine.wait_for_idle_printer():
                # TODO: This loop may get stuck if the controller is not running or something else fails.
                if not self.run:
                    raise CommanderError("The commander is no longer running.")
                # TODO: Consider reimplementing this with a "G4 P0" command instead.
                logging.info(f"Machine not yet idle. Posponing the end of action {i}: {action['cmd']}")
                # Check for a stop event before retrying.
                self.check_for_stop_event()

            # Logs.
            logging.debug("Machine idle. " + "No actions left." if i == len(actions) else f"Moving on to action {i}.")

        # Handle exception for the current action.
        except Exception as e:
            logging.error("An error was raised while running the protocol: " + str(e))
            # Re-raise the error later.
            error = e

        # Reset the current action.
        self.current_action = {}

        # Raise any errors.
        if error is not None:
            raise ProtocolError(f"Failed to run action {i}: {action['cmd']}") from error

    logging.debug(f"Executed {len(actions)} actions.")

Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

If the actions contain a 'cmd' identifier registered by a plugin, the "run_action" method of that plugin will be called. Otherwise, this method will call run_actions_protocol in the machine class, that expects a "GCODE" key in each action.

Exceptions are caught to clear the current_action attribute, and immediately re-raised.

Args

actions : List
A list of dictionaries, each one an action from a pipetting protocol.
wait : bool, optional
Wait for a response to all commands, aborting if it times out. Defaults to None.
check : bool, optional
Check each response for an "ok", aborting if it is not found. Defaults to None.
timeout : float, optional
Timeout for the action to complete its execution. Defaults to 10.0.
async def run_actions_protocol(self, actions: list, wait=None, check=None, timeout=10.0)
Expand source code
async def run_actions_protocol(self, actions: list, wait=None, check=None, timeout=10.0):
    """Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

    This method wraps run_actions to handle exceptions.

    Args:
        actions (List): A list of dictionaries, each one an action from a pipetting protocol.
        wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None.
        check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None.
        timeout (float, optional): Timeout for the action to complete its execution. Defaults to 10.0.
    """

    # Default to the machine's paranoia status if unspecified.
    if wait is None:
        wait = self.paranoid
    if check is None:
        check = self.paranoid

    logging.info(f"Running {len(actions)} actions protocol.")
    logging.debug(f"Run options: wait={wait}, check={check}, timeout={timeout}")

    # Run actions in the list.
    try:
        await self.run_actions(actions, wait, check, timeout)

    # Handle exceptions.
    except asyncio.exceptions.CancelledError:
        # Log cancellations.
        logging.error("Protocol cancelled while running action.")
    except Exception as e:
        # Raise other exceptions as protocol errors.
        raise ProtocolError("Protocol aborted due to an unhandled exception while running.") from e
        # NOTE: The logic here is that action handlers are not expected to handle asyncio cancellations, and
        #       may raise ProtocolErrors on their own. Those should be passed on to this calling environment.
        #       Other exceptions are re-raised here as ProtocolErrors.
    else:
        logging.info("Actions protocol executed successfully.")

Executes a list of actions from a pipetting protocol, which already contain the corresponding GCODE.

This method wraps run_actions to handle exceptions.

Args

actions : List
A list of dictionaries, each one an action from a pipetting protocol.
wait : bool, optional
Wait for a response to all commands, aborting if it times out. Defaults to None.
check : bool, optional
Check each response for an "ok", aborting if it is not found. Defaults to None.
timeout : float, optional
Timeout for the action to complete its execution. Defaults to 10.0.
async def run_callback(self, cb, *args, **kwargs)
Expand source code
async def run_callback(self, cb, *args, **kwargs):
    """Run a function, detecting if it is regular or async.
    See: https://stackoverflow.com/a/36076663

    Regular functions are run in threads and should consider thread safety.
    These functions must accept a "stop" event.
    """
    if inspect.iscoroutinefunction(cb):
        await cb(*args, **kwargs)
    else:
        try:
            stop = Event()
            await asyncio.to_thread(cb, *args, **kwargs, stop_event=stop)
        except asyncio.CancelledError:
            stop.set()
            logging.error("Stop flagged for threaded callback. Returning now.")

Run a function, detecting if it is regular or async. See: https://stackoverflow.com/a/36076663

Regular functions are run in threads and should consider thread safety. These functions must accept a "stop" event.

def send_alert(self, text: str, alert_type: str = 'message')
Expand source code
def send_alert(self, text: str, alert_type: str="message"):
    """Send an alert message to the UI through a SocketIO connection.
    This function is not async, but will run the async function for
    the alert using "asyncio.run" in the current event loop (or creating one).
    NOTE: I'm not sure why I did this.
    """
    if self.comms.connected:
        # https://stackoverflow.com/a/61331974
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:  # 'RuntimeError: There is no current event loop...'
            loop = None

        if loop and loop.is_running():
            logging.debug('send_alert: Async event loop already running. Adding coroutine to the event loop.')
            loop.create_task(self.comms.sio_emit_alert(text=text, alert_type=alert_type))
        else:
            logging.debug('send_alert: Starting new event loop')
            asyncio.run(self.comms.sio_emit_alert(text=text, alert_type=alert_type))
    else:
        logging.warning(f"send_alert: SIO not connected. Failed to send message with text: {text}")

Send an alert message to the UI through a SocketIO connection. This function is not async, but will run the async function for the alert using "asyncio.run" in the current event loop (or creating one). NOTE: I'm not sure why I did this.

def start(self)
Expand source code
def start(self):
    """
    Starts the main controller asynchronously by running the launch method in the event loop.

    This method is the entry point for the Controller's main execution flow. It sets up the
    asynchronous event loop and calls the `launch` method to initialize and manage tasks.
    The method handles a KeyboardInterrupt exception to allow for graceful shutdown when
    interrupted by the user.

    Raises:
        KeyboardInterrupt: Gracefully exits on user interrupt (Ctrl+C).
    """
    try:
        asyncio.run(self.launch())
    except KeyboardInterrupt:
        logging.warning("Caught keyboard interrupt.")
        print("Caught keyboard interrupt.")
    except Exception as e:
        logging.error(f"An error occurred while launching the controller: {e}")
        raise

Starts the main controller asynchronously by running the launch method in the event loop.

This method is the entry point for the Controller's main execution flow. It sets up the asynchronous event loop and calls the launch() method to initialize and manage tasks. The method handles a KeyboardInterrupt exception to allow for graceful shutdown when interrupted by the user.

Raises

KeyboardInterrupt
Gracefully exits on user interrupt (Ctrl+C).
async def start_and_wait(self)
Expand source code
async def start_and_wait(self):
    """Start the commander, wait it to start, and keep it running as an asyncio background task.
    Returns the result of setup and readyness checks."""
    self.background_task = asyncio.create_task(self.launch())
    setup_ok = await self.machine.wait_for_setup()
    ready_ok = await self.machine.wait_for_ready()
    return setup_ok and ready_ok

Start the commander, wait it to start, and keep it running as an asyncio background task. Returns the result of setup and readyness checks.

async def start_as_task(self, hold=True)
Expand source code
async def start_as_task(self, hold=True):
    """
    Starts the main controller's asynchronous tasks as a background task.

    This method creates a background task to run the `launch` method asynchronously, allowing
    it to operate independently within an event loop. The method also handles KeyboardInterrupt
    to gracefully cancel the background task if the user interrupts the program.

    Args:
        hold (bool): If True, the call will block. Else the background task will be started and returned immediately.

    Raises:
        KeyboardInterrupt: Cancels the background task upon user interrupt (Ctrl+C).
    """
    try:
        self.background_task = asyncio.create_task(self.launch())
        if hold:
            await self.background_task
    except KeyboardInterrupt:
        logging.warning("Caught keyboard interrupt. Canceling tasks...")
        self.background_task.cancel()

    return self.background_task

Starts the main controller's asynchronous tasks as a background task.

This method creates a background task to run the launch() method asynchronously, allowing it to operate independently within an event loop. The method also handles KeyboardInterrupt to gracefully cancel the background task if the user interrupts the program.

Args

hold : bool
If True, the call will block. Else the background task will be started and returned immediately.

Raises

KeyboardInterrupt
Cancels the background task upon user interrupt (Ctrl+C).
async def status(self)
Expand source code
async def status(self):
    """Gather information and compute the overall status of this class."""
    logging.debug("Controller.status: gathering status data.")

    # TODO: Add printer status.
    status = {"controller": {
        "run": self.run,
        "killed": self.killed,
        "plugins": {name: "OK" if plugin.status else "WARN" for name, plugin in self.plugins.items()}
    }}

    # Add background task info if it had been configured.
    if self.background_task:
        status["controller"].update({
            "background_task": "OK",
            "background_task_done": "WARN" if self.background_task.done() else "OK",
            "background_task_cancelled": "WARN" if self.background_task.cancelled() else "OFF",
            "background_task_live": "WARN" if self.killed_task() else "OK"
        })
    else:
        status["controller"].update({
            "background_task": "OFF",
            "task_done": "WARN" if self.task_list_done() else "STANDBY",
            "task_cancelled": "WARN" if self.task_list_cancelled(self.tasks) else "OFF",
            "task_live": "WARN" if self.killed_task() else "OK"
        })

    # NOTE: Update the dictionary with status reports from commander modules,
    #       letting them report status as well.
    # Sio commander.
    comms_status = await self.comms.status()
    status.update(comms_status)
    # Klipper commander.
    machine_status = await self.machine.status()
    status.update(machine_status)

    # Let eveyone who cares know that a status update is in progress,
    # passing them the "status" dictionary refernce for modification.
    await self.trigger_event_callback(event_name=self.status_event, status=status)

    return status

Gather information and compute the overall status of this class.

async def stop(self, timeout=3.0)
Expand source code
async def stop(self, timeout=3.0):
    """Stop the coroutines cleanly, or 'cancel' them if that failed.
    This methods also tries to close the socketio and websocket connections.
    NOTE: This method is used by the pylabrobot backend to 'close' it at the end of a protocol.

    Returns a tuple of two boolean values. The first indicating if tasks have "ended",
    and the second indicating if tasks were killed.
    """

    logging.warning("Stopping the commander.")
    self.run = False

    exit_status = None, None

    # Stop
    if self.background_task is not None:
        # If started by "start_as_task" or "start_and_wait".
        exit_status = await self.stop_background_task(timeout=timeout)
    elif self.tasks:
        # If started by "start()" or from "launch()" directly.
        exit_status = await self.stop_tasks_list(timeout=timeout)
    else:
        logging.warning("Warning, stopping had no effect!. Coroutines were" + \
                        " not started by run nor awaited as background_tasks.")

    # Close websocket and socketio connections.
    logging.warning("Closing connections.")
    await self.close()

    return exit_status

Stop the coroutines cleanly, or 'cancel' them if that failed. This methods also tries to close the socketio and websocket connections. NOTE: This method is used by the pylabrobot backend to 'close' it at the end of a protocol.

Returns a tuple of two boolean values. The first indicating if tasks have "ended", and the second indicating if tasks were killed.

async def stop_background_task(self, timeout=3.0)
Expand source code
async def stop_background_task(self, timeout=3.0):
    logging.info("Waiting for the background task to finish.")
    elapsed = 0.0
    while not self.background_task.done():
        await asyncio.sleep(timeout/5)
        elapsed += timeout/5
        if elapsed >= timeout:
            logging.warning("Timed out.")
            break
        logging.debug(f"Waiting. {timeout-elapsed} seconds remaining.")

    if not self.background_task.done():
        logging.warning("Background_task not yet done. Cancelling coroutines.")
        await self.kill()
    else:
        logging.info("Tasks done.")

    return self.background_task.done(), self.background_task.cancelled()
async def stop_tasks_list(self, timeout=3.0)
Expand source code
async def stop_tasks_list(self, timeout=3.0):
    logging.info("Stopping task list.")
    elapsed = 0.0
    while not self.task_list_done():
        await asyncio.sleep(timeout/5)
        elapsed += timeout/5
        if elapsed >= timeout:
            logging.warning("Timed out.")
            break
        logging.debug(f"Waiting. {timeout-elapsed} seconds remaining.")

    if not self.task_list_done():
        logging.warning("Tasks not yet done. Killing coroutines by cancel.")
        await self.kill()
    
    logging.info("Tasks done.")
    return self.task_list_done(), self.task_list_cancelled(self.tasks)
def task_list_done(self)
Expand source code
def task_list_done(self):
    """Check if all tasks in a list of tasks are done."""
    result = all([task.done() for task in self.tasks])
    logging.debug(f"Parsed status of {len(self.tasks)} coroutine tasks: {result}")
    return result

Check if all tasks in a list of tasks are done.

async def test(self)
Expand source code
async def test(self):
    """Run initialization tests.
    TODO: These are placeholders for now.
    """
    await self.comms.test()
    await self.machine.test()

Run initialization tests. TODO: These are placeholders for now.

async def trigger_event_callback(self, event_name: str, callback_name: str = None, **kwargs)
Expand source code
async def trigger_event_callback(self, event_name: str, callback_name: str = None, **kwargs):
    """Trigger all callbacks associated to an event, or a particular callback.

    Args:
        event_name (str): The name of the event as registered using `register_event_callback`.
        callback_name (str, optional): Run only a single registered callback for the event, by
            its registered name. Defaults to None.
    """
    results = {}
    logging.debug(f"Processing event '{event_name}'.")

    # Get the callbacks for the event.
    event_callbacks = self.events_callbacks.get(event_name, None)
    # Check if there are any callbacks for the wvent.
    if event_callbacks is not None:
        # Get and trigger callbacks.
        if callback_name is not None:
            # Trigger a single callback.
            callback_function = event_callbacks.get(callback_name, None)
            if callback_function is None:
                logging.warning(f"Warning: there is no callback named '{callback_name}' for event '{event_name}'. Skipped running callback.")
            else:
                try:
                    results[callback_name] = await callback_function(**kwargs)
                except Exception:
                    error_message = traceback.format_exc()
                    logging.error(f"Caught an error when running callback function '{callback_name}':\n" + error_message)
                    results[callback_name] = {"error": error_message}
        else:
            # Trigger all callbacks of the event.
            for callback_name, callback_function in event_callbacks.items():
                try:
                    results[callback_name] = await callback_function(**kwargs)
                except Exception:
                    error_message = traceback.format_exc()
                    logging.error(f"Caught an error when running callback function '{callback_name}':\n" + error_message)
                    results[callback_name] = {"error": error_message}
            if not results:
                logging.warning(f"Warning: no callbacks found for event '{event_name}'.")
    else:
        logging.warning(f"There are no callbacks registered for the '{event_name}' event. Doing nothing.")

    return results

Trigger all callbacks associated to an event, or a particular callback.

Args

event_name : str
The name of the event as registered using register_event_callback.
callback_name : str, optional
Run only a single registered callback for the event, by its registered name. Defaults to None.
def update_config_from_settings(self, settings_file: str = None)
Expand source code
def update_config_from_settings(self, settings_file:str=None):
    """Read configuration values from a settings JSON file on disk.
    This is meant as an additional (and optional) way to override configuration values,
    from a "settings.json" file exported by the UI.

    TODO: The DB connection details are parsed from a ".env" in the data tools object at startup.
          Consider deleting this method.
    """
    if settings_file is None:
        settings_file = self.config.get("settings_file", None)
    if settings_file:
        try:
            logging.info(f"Reading settings from file: {settings_file}")
            with open(settings_file, "r", encoding="utf-8") as file:
                settings = json.load(file)
                self.config.setdefault("settings", {})
                self.config["settings"].update(settings)
        except Exception as e:
            logging.warning(f"Failed to update configuration from settings file '{settings_file}': {e}")
        else:
            logging.debug(f"Parsed settings data: {settings}")
    else:
        logging.warning("No settings file to read from.")

Read configuration values from a settings JSON file on disk. This is meant as an additional (and optional) way to override configuration values, from a "settings.json" file exported by the UI.

TODO: The DB connection details are parsed from a ".env" in the data tools object at startup. Consider deleting this method.

async def wait_for_event_clear(self, event: threading.Event)
Expand source code
async def wait_for_event_clear(self, event: Event):
    """Waits for the event to clear its flag."""
    while event.is_set():
        self.check_for_stop_event()
        await asyncio.sleep(self.wait_default)

Waits for the event to clear its flag.

class FailedPlugin (name)
Expand source code
class FailedPlugin:
    def __init__(self, name):
        self.name = name
    @property
    def status(self):
        return False

Instance variables

prop status
Expand source code
@property
def status(self):
    return False