Module pipettin-piper.piper.gcode

Classes

class GcodeBuilder (protocol=None,
workspace=None,
platformsInWorkspace=None,
controller: Controller = None,
config: TrackedDict = None,
verbose=False)
Expand source code
class GcodeBuilder:
    """
    This class generates GCODE from Pipettin protocol "actions".

    It containse and consumes all information for the pipetting task: protocol, workspace, platforms and tools.

    It holds and tracks machine status or configuration during prococol 'slicing' to GCODE.

    Also stores all GCODE commands after parsing protocols.

    Some actions might depend on previous actions or action history.
    So these objects are for tracking the state of the machine during the translation of actions to GCODE,
    or during gcode streaming to the machine controllers.
    """
    protocol: dict = None
    workspace: dict = None
    platformsInWorkspace: list = None
    tools_data: list
    current_action: dict
    def __init__(self, protocol=None, workspace=None, platformsInWorkspace=None,
                 controller:Controller=None,
                 config:TrackedDict=None,
                 verbose=False):
        """
        A class to contain all information for the pipetting task: protocol, workspace, platforms and tools.
        :param protocol: A protocol dictionary (JSON object) from the GUI/MongoDB, containing protocol 'actions'.
        :param workspace: A workspace dictionary (JSON object) from the GUI/MongoDB, containing the platform instances in the workspace.
        :param platformsInWorkspace: A platforms dictionary (JSON object) from the GUI/MongoDB, containing the definitions of platforms in the workspace.
        :param pipettes: A dictionary with pipette tool definitions.
        :param config: Configuration options loaded from the 'config.yml' YAML file.
        """

        logging.info("Initializing pipettin's gcode generator class.")

        # Save configuration.
        if config is None:
            config = {}
        self.config = config
        self.verbose = verbose or config.get("verbose", False)

        # Instantiate gcode generator.
        self.gcode = GcodePrimitives(config=config)

        # GCODE commands list.
        self.commands = []
        # Current Action property.
        self.previous_action = {}
        self.current_action = {}

        # Initialize snapshot variable.
        self._checkpoint: dict = None

        # Save controller and DB tools.
        self.controller: Controller = controller

        # Set workspace, platforms, and workspace properties.
        self.initialize_objects(protocol, workspace, platformsInWorkspace)

        # List of default parsers for the requested actions.
        self.action_switch_case = {}
        self.add_action_handler(name=self.comment_action_name,
                                function=self.comment_action_parser)

        # Actions that can skip a tool-change check.
        self.tc_check_skip_actions = [self.comment_action_name]

        # TODO: base these values on a config file.
        self.extra_clearance = 1.0

        # Get tool data.
        # TODO: make this attribute update itself on DB tools replacement.
        #self.tools_data: list = deepcopy(self.controller.database_tools.tools)

        self.tools_data = []
        self.action_callbacks = []

        # Setup tool tracking.
        self.tools = {}
        self.tool_loaders = {}
        self.current_tool: str = None

        # Register post-init methods.
        if controller:
            self.controller.post_init_callbacks.append(self.load_tools)

        # __init__ END.

    @property
    def feedrate(self):
        """Main feedrate for XYZ moves
        Default feedrate in mm/min (F parameter for G1) used for XYZ moves.
        This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate").
        Equivalences:
            60000 mm/min = 1000 * 60 mm/min  = 1000 mm/s = 1 m/s
        """
        return self.gcode.feedrate

    #### Action handlers to convert actions into GCODE ####
    comment_action_name = 'COMMENT'
    def comment_action_parser(self, action, i):
        comment_command = [self.gcode.comment(message=action["args"]["text"])]
        self.extend_commands(comment_command)
        logging.info(f"Processed {self.comment_action_name} action with index {i} and text: '{action['args']['text']}'")
        return comment_command

    #### Utility functions for action handling ####
    def add_action_handler(self, name: str, function: Callable):
        """Add a callback function responsible for generating precalculated GCODE for a certain protocol action.

        This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory.
        The "base_actions" plugin can be used as a reference.

        Args:
            name (str): Name for the action handler, this must match the value of the "CMD" key in the action.
            function (Callable): Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively.

        Raises:
            Exception: Raises an exception when a handler has already been registered for a certain CMD.
        """
        if self.action_switch_case.get(name, None):
            msg = f"Attempted to overwrite action handler '{name}'."
            logging.error(msg)
            raise CommanderError(msg)
        else:
            logging.info(f"Registered new action handler for '{name}' actions.")
            self.action_switch_case[name] = function

    action_callbacks: list
    def register_action_callback(self, commands: list, callback_function: Callable):
        """Callback functions to be run by 'parseAction' when the an an action is parsed.

        This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below.

        Args:
            commands (list): a list of strings, each an action command.
            callback (Callable): a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument.
        """
        self.action_callbacks.append({
            "commands": commands,
            "callback": callback_function
            })

    #### Utility functions for loading or updating pipettin's data objects ####
    tool_loaders: dict
    """Dictionary of tool-loading functions: with keys matching names of tool-type, and callbacks as values."""
    def register_tool_loader(self, tool_type: str, loader_function):
        """Register a 'loader' function for a particular tool 'type', which will be called to load all of its kind.
        There can be only one loader for each tool type, and each should be registered by a particular plugin.
        For example, see 'load_pipette' defined in the 'pipettes.py' plugin.
        """
        # Raise an error if the tool_type already has a registered loader function.
        if tool_type in self.tool_loaders:
            msg = f"Tool type '{tool_type}' already has a handler function: {self.tool_loaders[tool_type]}"
            logging.error(msg)
            raise DataError(msg)
        # Register the function.
        self.tool_loaders[tool_type] = loader_function

    def load_tools(self, tools_list: list = None):
        """Calls 'load_tool' on each tool in the list.
        If empty, it defaults to all tools in the database.
        """
        # Default to all tools from the DB.
        if tools_list is None:
            tools_list: list = deepcopy(self.controller.database_tools.tools)
        # Load all tools in the list.
        for tool_data in tools_list:
            if not tool_data.get("enabled", True):
                # TODO: Remove the default above once all tools have the property.
                logging.warning(f"Tool '{tool_data['name']}' is disabled, and will be skipped.")
            else:
                self.load_tool(tool_data)

    def load_tool(self, tool_data):
        """Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data.
        The initialized tool object is finally registered in the tools dict by calling 'register_tool'.
        If no loader is found, then an error is logged.
        """
        # Parse some basic info.
        tool_type = tool_data["type"]
        tool_name = tool_data["name"]
        # Check if enabled.
        if not tool_data.get("enabled", True):
            raise DataError(f"Won't load disabled tool '{tool_name}'. Set the 'enable' property in its data first.")
        # Get the loader function for this tool type.
        loader_function = self.tool_loaders.get(tool_type, None)
        # Load and register the tool.
        if loader_function is None:
            # Log a warning if no tool loader was found for this tool type.
            logging.error(f"There is no loader function for tool '{tool_name}' of type '{tool_type}'.")
        else:
            # Load the tool.
            logging.info(f"Loading tool '{tool_name}' of type '{tool_type}'.")
            tool = loader_function(tool_data)
            # Register the tool.
            logging.info(f"Registering tool '{tool_name}' of type '{tool_type}'.")
            self.register_tool(tool)

    # NOTE: TOOL STUFF UNDER CONSTRUCTION.
    tools: dict
    def register_tool(self, tool: Tool):
        """Add a tool object to the tools dictionary.

        A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets).

        It also must have a few important methods called form this class (e.g. home).
        """
        tool_name = tool.name
        logging.info(f"Registering tool with name '{tool_name}'.")
        if tool_name in list(self.tools):
            msg = f"The tool '{tool_name}' has already been registered."
            logging.error(msg)
            raise DataError(msg)
        self.tools[tool_name] = tool

    # TODO: Add "tool" stuff and figure out how to handle re-initializations.
    def initialize_objects(self,
                           protocol:dict=None,
                           workspace:dict=None,
                           platformsInWorkspace:dict=None,
                           clear_unspecified=True):
        """Set or reset protocol, workspace, platforms, and clearance properties.
        This can be useful when switching protocols or streaming actions, and still preserve state tracking.

        This clears the unspecified objects when `clear_unspecified=True`.
        See 'update_objects' for a non-clearing update.
        """
        # Define protocol property.
        if protocol is not None:
            self.protocol = protocol
            logging.info(f"New protocol set to: {protocol['name']}")
            logging.debug(f"New protocol definition:\n{pformat(protocol)}")
        elif clear_unspecified:
            self.protocol = dict()
            logging.warning("Warning, protocol property now empty.")

        # Set workspace.
        if workspace is not None:
            self.workspace = workspace
            logging.info(f"New workspace set to: '{workspace['name']}'")
        elif clear_unspecified:
            self.workspace = {}
            logging.warning("Warning, workspace property now empty.")

        # Get platform definitions of platform items in wokspace.
        if platformsInWorkspace is not None:
            self.platformsInWorkspace = platformsInWorkspace
            logging.info(f"New platformsInWorkspace set to {[i['name'] for i in platformsInWorkspace]}")
        elif clear_unspecified:
            self.platformsInWorkspace = list()
            logging.warning("Warning, platformsInWorkspace property now empty.")
        elif workspace is not None:
            # Get the platforms for the provided workspace, if no platforms were provided.
            logging.info(f"platformsInWorkspace not provided. Getting it from the provided workspace '{workspace['name']}'.")
            self.platformsInWorkspace = self.controller.database_tools.getPlatformsInWorkspace(workspace)
            logging.info(f"Available items: {[i['name'] for i in self.platformsInWorkspace]}")

        # Clearance and probing parameters.
        # NOTE: Using @propery decorator below to defer getClearance() calculation until a workspace is available.
        #       This must be run because otherwise the getter method can fail with "AttributeError".
        if self.platformsInWorkspace:
            # NOTE: getClearance requires "self.platformsInWorkspace".
            self.clearance = self.getClearance()
        elif clear_unspecified:
            self.clearance = None

        # Consistency check.
        if self.protocol and self.workspace:
            if self.workspace["name"] != self.protocol["workspace"]:
                msg = f"The protocol's workspace ({self.workspace['name']}) "
                msg += f"does not match the specified workspace ({self.protocol['workspace']})."
                logging.warning(msg)

    def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None):
        """Update the specified objects, not clearing the rest.
        If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead.
        Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects").
        """
        logging.info("Updating objects.")
        self.initialize_objects(
            protocol=protocol,
            workspace=workspace,
            platformsInWorkspace=platforms_in_workspace,
            # Do not clear the unspecified objects.
            clear_unspecified=False)

    def update_objects_by_workspace(self, workspace_name:str, force: bool = True):
        """Override the workspace and current platforms objects of the builder

        If the DB needs to be updated, consider using "apply_settings" before.

        New objects are fetched from the DB by the workspace's name.

        Args:
            workspace_name (str): Name of the workspace.
            force (bool): Wether to force an update if the name of the current and new workspaces are equal. Defaults to True.
        """
        logging.info(f"Updating builder objects to match a new workspace: '{workspace_name}'")
        if self.workspace.get("name", None) == workspace_name:
            if force:
                logging.warning("Forced update. The previous workspace has the same name, but may hold a different state.")
            else:
                logging.warning("The previous workspace has the same name, update skipped.")
                return

        # Get workspace data.
        ws = self.controller.database_tools.getWorkspaceByName(workspace_name=workspace_name)
        # Get platforms.
        pl = self.controller.database_tools.getPlatformsInWorkspace(workspace=ws)
        # Update builder objects and clearance (not clearing the protocol).
        self.update_objects(workspace=ws, platforms_in_workspace=pl)


    # STATE ROLLBACK SYSTEM ####
    state_attributes = [
        "workspace", "protocol", "commands",
        "current_action", "previous_action", "current_tool"
    ]
    """List of attributes to consider as part of the builder's state."""
    def checkpoint(self):
        """Creates a snapshot (checkpoint) of the current state of relevant attributes.

        Saves the state of workspaces, protocol, current_action, and previous_action.
        """
        try:
            self._checkpoint = {n: deepcopy(getattr(self, n)) for n in self.state_attributes}
            logging.info("Checkpoint created for the current state.")
        except AttributeError as e:
            logging.error(f"Failed to create checkpoint: {e}")
            raise RuntimeError("Could not create checkpoint due to missing attributes.") from e

    def commit(self):
        """Commits the current state by clearing the checkpoint.

        This method should be called after successfully processing an action.
        """
        self._checkpoint = None  # Clear the checkpoint since the changes are confirmed
        logging.info("Changes committed, checkpoint cleared.")

    def rollback(self):
        """Rolls back to the previous state using the saved checkpoint.

        Restores workspaces, protocol, current_action, and previous_action from the checkpoint.
        Raises an error if no checkpoint is available.
        """
        if not hasattr(self, "_checkpoint") or not self._checkpoint:
            logging.error("No checkpoint available to roll back to.")
            raise RuntimeError("No checkpoint available for rollback.")
        try:
            for attr_name in self.state_attributes:
                setattr(self, attr_name, deepcopy(self._checkpoint[attr_name]))
            logging.warning("Rolled back to the previous checkpoint state.")
        except Exception as e:
            logging.error(f"Error during rollback: {e}")
            raise RuntimeError(f"Failed to rollback due to an error: {e}") from e


    #### Properties ####
    # See:  https://www.programiz.com/python-programming/property
    #       https://stackoverflow.com/a/51342825/11524079
    _clearance: float = None
    @property
    def clearance(self):
        """Current clearance height."""
        if self._clearance:
            logging.debug("Getting clearance value...")
            val = self._clearance
        else:
            logging.debug("Calculating clearance value...")
            val = self.getClearance()
        return val
    # Setter method for "self.clearance".
    @clearance.setter
    def clearance(self, value):
        if self.verbose:
            logging.debug(f"Setting clearance value='{value}'...")
        self._clearance = value

    #### Convenienve math functions ####
    @staticmethod
    def sign(x):
        """Return the sign of a number (as 1 or -1)."""
        return copysign(1, x)

    #### Utility methods for GCODE generation ####

    @staticmethod
    def itemCenter(item, platform):
        """Calculate the XY center and the active-height Z of an item.
        TODO: Move this method to data-tools.
        """
        x = item["position"]["x"] + platform["width"]/2
        y = item["position"]["y"] + platform["length"]/2
        z = platform["activeHeight"]
        return x, y, z

    def getItemXYZ(self, item_name: str):
        """Get the XYZ coordinates of an item.
        TODO: Move this method to data-tools.
        """
        # Get platform and platform item.
        item = self.controller.database_tools.getWorkspaceItemByName(
            workspace=self.workspace,
            item_name=item_name)
        # Get platform definition from the name in the platform item.
        platform = self.controller.database_tools.getPlatformByName(
            platformsInWorkspace=self.platformsInWorkspace,
            platform_item=item)
        # Get coordinates
        x, y, z = self.itemCenter(item, platform)

        return x, y, z

    @staticmethod
    def getContentXY(content: dict, platform_item: dict, platform: dict):
        """
        Calculate XY of content center.
        Defaults to using the col/row pair or, if unavailable,
        pre-existing x/y coordinates in the content's position.
        """

        # Get the content's position.
        position: dict = content.get("position", None)

        # Get the platform's slots.
        slots: list = platform.get("slots", None)

        # If no position was received, try to recreate it by the content's index.
        if position is None:
            position = {}
            index = content["index"]

            # Guess by a platforms grid if present.
            ncols = platform.get("wellsColumns", None)
            nrows = platform.get("wellsRows", None)
            if (ncols is not None) and (nrows is not None):
                # Calculate the row and column.
                position["row"], position["col"] = datautils.get_colrow(index, ncols, nrows)
                # Check for consistency.
                if nrows < index // ncols:
                    msg = f"Error: the platform '{platform.get('name', None)}'"
                    msg += f" does not have enough rows to get a content with index {index}."
                    msg += f" platform={platform} content={content}"
                    raise DataError(msg)

            # If not, try to get it from the slot.
            elif slots is not None:
                slot = slots[index-1]
                position = slot["position"]
            else:
                msg = "Error: the content did not specify an position, and it could not be inferred "
                raise DataError(msg)

        # Calculate X of content center
        if "col" in position:
            _x = platform_item["position"]["x"]
            _x += platform['firstWellCenterX']
            _x += (position["col"] - 1) * platform['wellSeparationX']
        else:
            _x = position["x"]

        # Calculate Y of content center
        if "row" in position:
            _y = platform_item["position"]["y"]
            _y += platform['firstWellCenterY']
            _y += (position["row"] - 1) * platform['wellSeparationY']
        else:
            _x = position["y"]

        return _x, _y

    def getContentZ(self, content=None, platform_item=None, platform=None):
        """
        Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip).

        Defaults to using the Z from the content or, if unavailable, information from the platform.

        Formula: z = `deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z`
        """

        # Default to using the Z defined in the content.
        if content is not None:
            # Skip everything if the content provides a Z coordinate.
            if "position" in content:
                if "z" in content["position"]:
                    return content["position"]["z"]

        # Use the platform item to get the platform if undefined.
        if (platform is None) and (platform_item is not None):
            platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item)

        # Get the default Z from the platform.
        if platform is not None:
            # Get container link info.
            container_name = content["container"]
            container_link = next(c for c in platform["containers"] if c["container"] == container_name)
            container = self.controller.database_tools.getContainerByName(container_name)

            # Workspace Z, the starting point.
            workspace_z = self.workspace.get("z", 0)
            # TODO: This is not in the schema, perhaps it should be.

            # Item Z, position relative to workspace.
            item_z = platform_item["position"].get("z", 0)

            # Z-coordinate of the bottom of the platform (i.e. relative to the reference tool, with offsets [0,0,0]).
            active_z_platform = platform["activeHeight"]
            # NOTE: Prevously named "defaultBottomPosition" or "defaultLoadBottom".

            # Subtract the container's offset to the platform.
            offset_z = -container_link["containerOffsetZ"]

            # Add the container's active height.
            active_z_container = container["activeHeight"]
            # NOTE: In the case of tips, this is equal to their total length.

            # Add it up.
            z = workspace_z + item_z + active_z_platform + offset_z + active_z_container
            logging.debug(f"Parsed content height {z} mm from: workspace_z={workspace_z} item_z={item_z} active_z_platform={active_z_platform} offset_z={offset_z} active_z_container={active_z_container}")

            # Done, phew!
            return z

        # Raise an error if the inut did not match any condition above.
        raise DataError("getContentZ: Failed to calculate Z. Information is missing.")

    def getContentCoords(self, action, allow_next=False,
                         pop_from_item=False, pop_from_db=False):
        """
        Get XYZ coordinates for the action's platform item (e.g. a "tube").

        Also apply tool offsets, if any.
        """

        # Get platform and platform item.
        item_name = action["args"]["item"]
        platform_item = self.controller.database_tools.getWorkspaceItemByName(
            workspace=self.workspace,
            item_name=item_name)

        # Get platform definition from the name in the platform item.
        platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item)

        # Get the content.
        content = action["args"].get("content", None)
        selector = action["args"].get("selector", None)
        if content:
            # Use the provided content.
            pass
        elif selector:
            # Get content from the platform item using the selector.
            # TODO: Consider getting and using "content_type" as a filter here.
            content = self.controller.database_tools.getNextContent(
                platform_item=platform_item, selector=selector,
                # Pass optional "pop" arguments.
                pop_from_item=pop_from_item, pop_from_db=pop_from_db
            )
        elif allow_next:
            # Get any next content from the platform item.
            # TODO: Review this and consider deduplicating with the clause above.
            content = self.controller.database_tools.getNextContent(
                platform_item=platform_item,
                # Pass optional "pop" arguments.
                pop_from_item=pop_from_item, pop_from_db=pop_from_db
            )
        else:
            msg = f"Can't obtain content from action. Insufficient selector information:\n{pprint.pformat(action)}"
            logging.error(msg)
            raise DataError(msg)

        # Calculate XY of tube center.
        x, y = self.getContentXY(content, platform_item, platform)

        # Calculate Z for pipetting.
        z = self.getContentZ(content, platform_item, platform)

        return content, x, y, z

    def getTubeAndCoords(self, action):
        """Get XYZ coordinates of a tube and it's parameters.

        TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else.

        It can either get the info from a reference to a platform item and a content within,
        (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR).

        Args:
            action (_type_): _description_

        Returns:
            _type_: _description_

        Example action 'args' for coords-based position: {
            "position": {'x': ,'y': , 'z':},
            "content": {"volume": 10}
        }

        Example tube content: {
            'index': 2,
            'maxVolume': 1500,
            'name': 'buffer',
            'position': {'col': 2, 'row': 1},
            'tags': ['mixcomp', 'buffer'],
            'type': 'tube',
            'volume': 7.2
        }
        """
        if "position" in action["args"]:
            # Get coords from a coords-type tube object.
            coords = action["args"]["position"]
            x, y, z = coords.get("x", None), coords.get("y", None), coords.get("z", None)
            # Get information about the tube.
            content = action["args"]["content"]
            # A tube must contain "volume". Ensure that the required data is there.
            # NOTE: Removed because an exception would be raised in 'macroGoToAndMix' anyways,
            #       which is the only function actually using volume information.
            # content["volume"] = float(action["args"]["content"]["volume"])
        else:
            # Get XYZ coordinates for the action's "platform"-type tube,
            # applying current tool offsets if any.
            content, x, y, z = self.getContentCoords(action)

        return content, x, y, z

    #### Tool-related GCODE methods ####
    def addToolOffsets(self, x=None, y=None, z=None, tool_name:str=None):
        """Add offsets of the current tool.

        Args:
            x (float, optional): Value of the x coordinate to offset. Defaults to None.
            y (float, optional): Value of the y coordinate to offset. Defaults to None.
            z (float, optional): Value of the z coordinate to offset. Defaults to None.
            tool_name (str, optional): Name of a tool. Defaults to None.

        Returns:
            tuple: An updated coordiante set, with one element per each non-None coordinate argument.
        """
        if tool_name is None:
            # Use the current tool if none was provided.
            if self.current_tool:
                logging.debug(f"Adding offsets with self.current_tool={self.current_tool}")
                tool_name = self.current_tool
            # If there is not current tool, then return the input.
            else:
                logging.debug("No offsets added, self.current_tool was 'None'.")
                return x, y, z

        # Defaults.
        coords = [x, y, z]

        if x is not None:
            coords[0] += self.tools[tool_name].x_offset

        if y is not None:
            coords[1] += self.tools[tool_name].y_offset

        if z is not None:
            coords[2] += self.tools[tool_name].z_offset

        return tuple(coords)

    #### Convenience functions for clearance coordinates ####
    def calc_clearance(self, platforms: list):
        """Calculate a conservative clearance for a set of platforms, considering all their possible containers."""
        if not platforms:
            raise ValueError("No platforms provided for calculation.")

        max_height = 0.0
        for platform in platforms:
            # Get platform height and active height.
            height = platform["height"]
            active_h = platform["activeHeight"]
            # Take containers into account.
            max_container_height = 0
            for link in platform.get("containers", []):
                offset = link["containerOffsetZ"]
                container = self.controller.database_tools.getContainerByName(link["container"])
                length = container["length"]
                max_container_height = max(max_container_height, active_h + length - offset )
            # Update the max height.
            max_height = max(max_height, height, max_container_height)

        return max_height

    def getClearance(self, item_names:list=None):
        """Get the minimum clearance level for the Z axis from the platformsInWorkspace definition"""
        if not self.platformsInWorkspace:
            logging.error("There are no platforms in the workspace, or no workspace has been set up. Returning None.")
            clearance = None
        else:
            if item_names:
                logging.warning(f"Calculating clearance only between items: {item_names}")
                platforms_to_clear = [i for i in self.platformsInWorkspace if i["name"] in item_names]
                clearance = self.calc_clearance(platforms_to_clear)
            else:
                clearance = self.calc_clearance(self.platformsInWorkspace)

        logging.info(f"Calculated platform clearance value: {clearance}")
        return clearance

    def getSafeHeight(self, item_names:list=None):
        """
        Calculate the Z coordinate above all contents, considering if a tip is placed on the current tool.
        Returns "None" if the clearance has not been set.
        """
        # Calculate safe hieght.
        z = 0

        # Add minimum clearance to Z coordinate.
        z += self.extra_clearance

        # Get clearance height from platforms.
        platform_clearance = self.getClearance(item_names=item_names)
        if platform_clearance:
            z += platform_clearance

        # Add clearance from tools.
        if self.current_tool:
            logging.info(f"Adding tool offset from tool '{self.current_tool}' to clearance.")
            # Unpacking is needed.
            z = self.addToolOffsets(z=z)[2]

        # Done!
        logging.info(f"Calculated safe height: {z}")
        return z

    def getSafeY(self):
        """
        There is a limit on the Y axis movement. The tool-holder can crash into the parked tools.
        Here we use the parking post coordinates to establish a safe limit for the Y direction.
        NOTE: The return value can be "None" when there are no tools at all.
        """
        # Default.
        y_clearance = None

        # Get Y position of the empty tool post of the currently attached tool, if any.
        # In theory this one will be the farthest from the tool-head, but it needs to be
        # accounted for even so (e.g. maybe it has really "long" pins on the tool-post).
        if self.current_tool:
            y_clearance = self.tools[self.current_tool].safe_y_post

        # Get safe Y position of the parked tools.
        safe_y_parked = [tool.safe_y_parked for tool in self.tools.values() if tool.name != self.current_tool]

        # Use the closest safe distance.
        if safe_y_parked and y_clearance:
            # NOTE: Corrected for axis inversion.
            y_clearance = max(y_clearance, *safe_y_parked)
        elif safe_y_parked:
            # NOTE: Corrected for axis inversion.
            y_clearance = max(safe_y_parked)

        # Adjust tool distance protruding from the toolhead, if any.
        if self.current_tool:
            # NOTE: Corrected for axis inversion.
            y_clearance += self.tools[self.current_tool].safe_y

        return y_clearance

    #### Useful GCODE generating macros for action handlers ####
    def macroMoveSafeHeight(self, item_names=None, extend=True):
        """Fast move to safe height in Z axis, considering loaded tip's length"""
        # Get the clearance height considering all platforms.
        z = self.getSafeHeight(item_names=item_names)
        # Build the move command.
        move_commands = self.gcode.gcodeClearance(
            z=z, f=self.feedrate,
            add_comment="Moving to the clearance height."
        )
        if extend:
            # Extend the commands list.
            self.extend_commands(move_commands)
        return move_commands

    def macroGoTo(self, action, i, extend=True):
        # Move over the target content's XY.
        commands = self.macroGoToTubeXY(action, i, extend=extend)
        # Move to the bottom position of the conntent.
        commands += self.macroGoToTubeZ(action, i, extend=extend)

        return commands

    def macroGoToTubeXY(self, action, i, extend=True) -> list:
        """Get XY coordinates for the action's tube, applying current tool offsets if any."""
        logging.info(f"Getting XY coordinates for a tube in action {i}.")
        _, x, y, _ = self.getTubeAndCoords(action)

        # Move over the content.
        go_over_tip_commands = self.macroGoToXYZ(x=x, y=y, extend=extend)

        return go_over_tip_commands

    def macroGoToTubeZ(self, action, i, extend=True) -> list:
        """Get Z coordinate for the action's tube, applying current tool offsets if any.
        TODO: be smarter about pipetting height, using current tube volume to increase this.
        """
        logging.info(f"Getting Z coordinates for a tube in action {i}.")
        _, _, _, z = self.getTubeAndCoords(action)

        # Go to the bottom of the tube.
        go_into_tip_commands = self.macroGoToXYZ(z=z, extend=extend)

        return go_into_tip_commands

    xyz_move_event: str = "xyz_move"
    """Name of the event raised by 'macroGoToXYZ' when called."""
    def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment="") -> list:
        """Move to an absolute XYZ position, adding tool-offsets by default.
        TODO: Send an 'xyz_move' event to any relevant event listeners.
              This is meant to implement obstacle avoidance or advanced movepath planning.
        """
        # Default feedrate.
        if f is None:
            f = self.feedrate

        # Add the current tool's offsets.
        if add_offset:
            # Unpacking is needed.
            x, y, z = self.addToolOffsets(x=x, y=y, z=z)

        # Go to the requested position.
        commands = self.gcode.G1(x=x, y=y, z=z, f=f, absolute=True, add_comment=add_comment)

        # Let everyone know about the move.
        # NOTE: Other parts of the program may alter the commands list.
        #       This is meant to implement obstacle avoidance or advanced
        #       movepath planning with plugins.
        # TODO: Rethink the event system for non-async stuff.
        # self.controller.trigger_event_callback(
        #     event_name=self.xyz_move_event,
        #     parameters={"x": x, "y": y, "z": z, "f": f, "add_offset": add_offset},
        #     commands=commands
        # )

        # Add the commands to the current action and internal list.
        if extend:
            self.extend_commands(commands)

        return commands

    def toolChangeMacro(self, new_tool:str, extend=True, operation_time=30.0, action:dict=None):

        logging.info(f"Changing tools from '{self.current_tool}' to '{new_tool}'")

        # Set the default action to the current action.
        if action is None:
            logging.info("No action provided, defaulting to the current action.")
            action = self.current_action

        # Park current tool, if any.
        park_commands = []
        if self.current_tool is None:
            # Nothing to park.
            msg = "Nothing to park, the current tool is 'None'."
            logging.info(msg)
            park_commands += [self.gcode.comment(msg)]
        elif self.current_tool == new_tool:
            # Nothing to do.
            msg = "Parking skipped. Incoming and outgoing tool are equal."
            logging.info(msg)
            park_commands += [self.gcode.comment(msg)]
        else:
            # Generate GCODE for the parking sequence.
            tool = self.tools[self.current_tool]
            park_commands += tool.park(self.current_tool, new_tool)
            # Extend the timeout.
            if action is not None:
                logging.debug("Adding parking operation time to action.")
                self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

        # # Update the current tool (intermediary state).
        # self.current_tool = None

        # Pick-up the next tool
        pickup_commands = []
        if new_tool is None:
            # There is nothing to do if the toolchange was just for parking.
            msg = "Nothing to pickup, the new tool is 'None'."
            logging.info(msg)
            park_commands += [self.gcode.comment(msg)]
        elif self.current_tool == new_tool:
            # Nothing to do.
            msg = "Pickup skipped. Incoming and outgoing tool are equal."
            logging.info(msg)
            park_commands += [self.gcode.comment(msg)]
        else:
            # Generate GCODE for the change.
            tool = self.tools[new_tool]
            pickup_commands += tool.pickup(self.current_tool, new_tool)
            # Extend the timeout.
            if action is not None:
                logging.debug("Adding pickup operation time to action.")
                self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

        # Update the current tool.
        self.current_tool = new_tool

        # Add pickup commands to the list.
        tc_commands = park_commands + pickup_commands
        if extend and tc_commands:
            logging.debug("Extending GCODE commands in action.")
            self.extend_commands(commands=tc_commands, action=action)

        # Return
        return tc_commands

    def check_toolchange(self, action: dict, action_command_id: str):
        """Check if the action requires a toolchange and apply it."""
        # Check for a toolchange.
        toolchange = False

        # Check for arguments in action.
        if "args" in list(action):
            # Either "str" (tool name), None (do nothing) or False/"" (drop tool).
            new_tool = action["args"].get("tool", None)
            logging.info(f"Checking for toolchange on new_tool={new_tool} and action: {action}")

            # Handle cases.
            if action["cmd"] in self.tc_check_skip_actions:
                # Do nothing if this action does not demand a tool-change check.
                pass
            if new_tool is False or new_tool == "":
                # NOTE: Enter this block withother false-like values.
                # Drop current tool.
                new_tool = None
                if self.current_tool:
                    # Park the current tool if any.
                    toolchange = True
            elif new_tool:
                # If the tool ID is different from the current one,
                # a tool-change is needed before the action takes place.
                if new_tool != self.current_tool:
                    # Flag the tool-change, so its GCODE is added to the action.
                    toolchange = True
            else:
                # Do nothing if None.
                assert new_tool is None

        # Append the tool-change command to the action commands if a toolchange is needed
        if toolchange:
            self.toolChangeMacro(new_tool=new_tool)
        # If no toolchange was triggered, check if the current action is a HOME action.
        elif action_command_id == "HOME":
            # Park the loaded tool if any, before homing.
            # TODO: Replace hardcoded action ID with the one in base_actions.py.
            self.toolChangeMacro(new_tool=None)

        return toolchange

    #### HELPERS for mid-level action parsers ####
    def validate_current_action_workspace(self):
        """
        Validates that the workspace specified in the current action matches the active workspace.

        This method checks the `workspace` name from the current action and compares it with the
        active workspace name. If the two workspaces do not match, a `ProtocolError` is raised.
        If the action does not specify a workspace, it will default to the current workspace and
        log a warning.

        Raises:
            ProtocolError: If the workspace specified in the current action does not match the active workspace.

        Notes:
            - If both the action and the current workspace specify a name, they are compared.
            - If the action does not have a workspace name, the current workspace name is assigned
            to the action and a warning is logged.
        """
        current_workspace_name = self.workspace.get("name", None)
        action_workspace_name = self.current_action.get("workspace", None)
        if action_workspace_name and current_workspace_name:
            # If set, validate the workspace name from the action with the current workspace.
            if action_workspace_name != current_workspace_name:
                msg = f"Action with index {self.current_action.get('index', None)} and workspace '{action_workspace_name}'"
                msg += f" does not match the currently active workspace '{current_workspace_name}'."
                logging.error(msg)
                raise ProtocolError(msg)
        elif current_workspace_name:
            # If the action did not specify a workspace name, then add the name of the current workspace to it.
            msg = f"Action with index {self.current_action.get('index', None)} has no workspace info."
            msg += f" Defaulting to the current workspace: '{current_workspace_name}'"
            logging.warning(msg)
            self.current_action["workspace"] = current_workspace_name

    def get_current_action_cmd(self):
        """
        Retrieves the command ID from the current action.

        This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the `cmd` field
        of the current action. If the action is a JSON-based action (i.e., the `cmd` field is "JSON"),
        the method updates the command ID to the one found within the action's arguments and logs the update.

        Returns:
            str: The command ID of the current action.

        Notes:
            - If the command is a JSON action (where `cmd` is "JSON"), the method looks in the
            `args` dictionary of the current action to retrieve the actual command ID.
            - Logs an informational message if the command ID is updated from a JSON action.
        """
        # Get the action command ID.
        action_command_id = self.current_action['cmd']

        # Check if it is a JSON action, and update the command id with the one in its arguments.
        if action_command_id == "JSON":
            action_command_id = self.current_action['args']['cmd']
            logging.info(f"Updated action command from JSON action to: {action_command_id}")

        return action_command_id

    #### Mid-level action parsers ####
    def parseAction(self, action, action_index=None):
        """This function interprets a protocol action and produces the corresponding GCODE

        Note that GCODE generation can depend on the current state.

        Args:
            action (dict): Python dictionary defining the action.
            action_index (integer): Action index number.

        Raises:
            Exception: Any exception is re-raised. Context info and traceback are printed just before.

        Returns:
            list: List of GCODE commands associated to the action.
        """
        # Python switch case implementations https://data-flair.training/blogs/python-switch-case/
        # All action interpreting functions will take two arguments
        # action: the action object
        # i: the action index for the current action
        # task: the object of class "TaskEnvironment"
        logging.info(f"Parsing action with index {action_index} and command '{action['cmd']}'.")

        # Save checkpoint.
        self.checkpoint()

        # Default value for action index.
        if action_index is None:
            action_index = len(self.getActions())
            logging.warning(f"Replaced null index with index {action_index}.")

        # Save previous action.
        self.previous_action = self.current_action
        # Assign current action to self slot.
        self.current_action = action
        # Create a list for the actions GCODE commands.
        self.current_action["GCODE"] = []

        # Add defaults to required properties.
        self.current_action.setdefault("args", {})

        # Validate workspace name.
        self.validate_current_action_workspace()

        # Parse the current action into GCODE, checking for tool-changes first.
        try:
            # Get the action command ID.
            action_command_id = self.get_current_action_cmd()

            # Get the action's GCODE function.
            action_function = self.action_switch_case[action_command_id]

            # Note new action start as comment in the GCODE.
            comments = [self.gcode.comment(f"Building action {action_index}, with command: {action_command_id}")]
            self.extend_commands(comments)

            # Check and produce GCODE for a toolchange, if needed.
            self.check_toolchange(self.current_action, action_command_id)

            # Produce the GCODE for the action.
            if self.verbose:
                logging.debug(f"Processing command '{action_command_id}' with index '{action_index}'.")
            # Use the action function to generate its GCODE.
            # NOTE: The "action_output"s do not contain any gcode, only mildly informative messages.
            #       GCODE is saved to the current action and to the "commands" property.
            action_function(self.current_action, action_index)

            # Make a copy of the GCODE.
            final_action = deepcopy(self.current_action)

            # Run action callbacks.
            # This allows other parts of the program to "do stuff"
            # when a certain kind of command has been parsed.
            # TODO: Replace it with the Controller's events system.
            self.run_action_callbacks(action_command_id, final_action)

            # Log success.
            logging.info(f"Successfully parsed action with index {action_index} and command '{action_command_id}'.")

        except Exception as err:
            msg = f"Parser error at action number {action_index} with content '{self.current_action}' message: {err}"
            logging.error(msg)
            print(msg)
            # Rollback changes.
            self.rollback()
            raise ProtocolError(msg) from err

        else:
            # Note action parsed.
            msg = f"Done processing action with index={action_index} and command={action_command_id}"
            logging.debug(msg)
            # Commit changes.
            self.commit()

        return final_action # Returns GCODE and current action.

    def run_action_callbacks(self, action_command_id: str, action: dict):
        """Run any callback functions registered to run after an action is processed.
        TODO: consider rewriting this in reverse, by "emitting" events like Klipper does.
        TODO: Replace it with the Controller's events system.
        """
        for action_callback in self.action_callbacks:
            # Get the action commands that the callback is meant to handle and the callback.
            callback_cmds = action_callback["commands"]
            callback_function = action_callback["callback"]
            # Execute the callback if there is a match.
            if action_command_id in callback_cmds:
                callback_function(self, action)
            # Check that the callback has a chance of doing something, just in case.
            for callback_cmd in callback_cmds:
                if not self.action_switch_case.get(callback_cmd, None):
                    msg = f"The callback command '{callback_cmd}' was not found in the "
                    msg += f"registered action handlers: {list(self.action_switch_case)}"
                    logging.warning(msg)

    def addAction(self, action, parse=True, append=True):
        """
        Adds a new action to the current protocol, optionally parsing and appending it.

        This method inserts an action into the protocol's action list and, if specified, parses it
        to generate GCODE.

        Args:
            action (dict): The action to add to the protocol. It should contain necessary details
                        such as the command (`cmd`) and any associated arguments (`args`).
            parse (bool, optional): Whether to parse the action after adding it. Defaults to True.
            append (bool, optional): Whether to append the action to the protocol's action list.
                                    If False, the action is not appended. Defaults to True.

        Returns:
            tuple: A tuple containing:
                - action_commands (None or parsed action commands): If `parse` is True, returns the parsed action commands.
                - action (dict): The original or parsed action, depending on whether parsing was requested.

        Notes:
            - The action is appended to the protocol's "actions" list if `append` is True.
            - The `parseAction` method is invoked if `parse` is set to True to process the action.
            - If the protocol doesn't contain an "actions" key, this method initializes it.
        """
        logging.debug("Adding action.")
        # Default action index.
        action_index = None
        # Add the new action to the protocol.
        if append:
            logging.debug("Appending action to protocol list.")
            # Check that the protocol object has an actions key or create it.
            actions = self.protocol.setdefault("actions", [])
            # Append the action (note: this propagates to the action in the self.protocol dictionary).
            actions.append(action)
            action_index=len(actions)-1

        action_commands, action = None, None
        if parse:
            logging.debug("Parsing action.")
            action = self.parseAction(action=action, action_index=action_index)

        # TODO: Remove "action_commands", it does nothing.
        return action_commands, action

    def parseProtocol(self, actions:list=None):
        """This function takes parses a list of actions into gcode, either provided or from the class' property."""
        if actions is None:
            actions = self.protocol["actions"]
            logging.warning(f"No actions passed. Using {len(actions)} actions from the active protocol.")

        logging.info(f"Parsing {len(actions)} protocol actions.")

        # NOTE: "parseAction" returns a tuple with the commands list and the action, hence the "[0]" below.
        # NOTE: Cada action parser guarda el gcode implicitamente en "self.protocol" y en el GCODE del action.
        final_actions = []
        for i, action in enumerate(actions):
            final_actions.append(self.parseAction(action, i))

        logging.info(f"Parsed {len(actions)} actions.")

        return deepcopy(self.protocol)

    #### GCODE command helpers ####
    def extend_commands(self, commands: list, action=None):
        """Append GCODE to the commands list, and to the active protocol action (if any)."""
        # Extend GCODE commands list.
        self.commands.extend(commands)
        # Extend the GCODE list in the action.
        if action is None:
            # Override the action argument with the current action.
            action = self.current_action
        # Extend the GCODE in the action.
        action.setdefault("GCODE", [])
        action["GCODE"].extend(commands)

    #### Output ####
    def save_gcode(self, path="examples/test.gcode"):
        """Save the GCODE to a file"""
        logging.debug(f"commanderTest.py message: Saving GCODE to file: {path}")
        with open(path, "w", encoding="utf-8") as gcode_file:
            gcode_file.writelines(self.getGcode())

    def getGcode(self):
        return "\n".join(self.commands)

    def getActions(self):
        return self.protocol.get("actions", [])

This class generates GCODE from Pipettin protocol "actions".

It containse and consumes all information for the pipetting task: protocol, workspace, platforms and tools.

It holds and tracks machine status or configuration during prococol 'slicing' to GCODE.

Also stores all GCODE commands after parsing protocols.

Some actions might depend on previous actions or action history. So these objects are for tracking the state of the machine during the translation of actions to GCODE, or during gcode streaming to the machine controllers.

A class to contain all information for the pipetting task: protocol, workspace, platforms and tools. :param protocol: A protocol dictionary (JSON object) from the GUI/MongoDB, containing protocol 'actions'. :param workspace: A workspace dictionary (JSON object) from the GUI/MongoDB, containing the platform instances in the workspace. :param platformsInWorkspace: A platforms dictionary (JSON object) from the GUI/MongoDB, containing the definitions of platforms in the workspace. :param pipettes: A dictionary with pipette tool definitions. :param config: Configuration options loaded from the 'config.yml' YAML file.

Class variables

var action_callbacks : list
var comment_action_name
var current_action : dict
var platformsInWorkspace : list
var protocol : dict
var state_attributes

List of attributes to consider as part of the builder's state.

var tool_loaders : dict

Dictionary of tool-loading functions: with keys matching names of tool-type, and callbacks as values.

var tools : dict
var tools_data : list
var workspace : dict
var xyz_move_event : str

Name of the event raised by 'macroGoToXYZ' when called.

Static methods

def getContentXY(content: dict, platform_item: dict, platform: dict)
Expand source code
@staticmethod
def getContentXY(content: dict, platform_item: dict, platform: dict):
    """
    Calculate XY of content center.
    Defaults to using the col/row pair or, if unavailable,
    pre-existing x/y coordinates in the content's position.
    """

    # Get the content's position.
    position: dict = content.get("position", None)

    # Get the platform's slots.
    slots: list = platform.get("slots", None)

    # If no position was received, try to recreate it by the content's index.
    if position is None:
        position = {}
        index = content["index"]

        # Guess by a platforms grid if present.
        ncols = platform.get("wellsColumns", None)
        nrows = platform.get("wellsRows", None)
        if (ncols is not None) and (nrows is not None):
            # Calculate the row and column.
            position["row"], position["col"] = datautils.get_colrow(index, ncols, nrows)
            # Check for consistency.
            if nrows < index // ncols:
                msg = f"Error: the platform '{platform.get('name', None)}'"
                msg += f" does not have enough rows to get a content with index {index}."
                msg += f" platform={platform} content={content}"
                raise DataError(msg)

        # If not, try to get it from the slot.
        elif slots is not None:
            slot = slots[index-1]
            position = slot["position"]
        else:
            msg = "Error: the content did not specify an position, and it could not be inferred "
            raise DataError(msg)

    # Calculate X of content center
    if "col" in position:
        _x = platform_item["position"]["x"]
        _x += platform['firstWellCenterX']
        _x += (position["col"] - 1) * platform['wellSeparationX']
    else:
        _x = position["x"]

    # Calculate Y of content center
    if "row" in position:
        _y = platform_item["position"]["y"]
        _y += platform['firstWellCenterY']
        _y += (position["row"] - 1) * platform['wellSeparationY']
    else:
        _x = position["y"]

    return _x, _y

Calculate XY of content center. Defaults to using the col/row pair or, if unavailable, pre-existing x/y coordinates in the content's position.

def itemCenter(item, platform)
Expand source code
@staticmethod
def itemCenter(item, platform):
    """Calculate the XY center and the active-height Z of an item.
    TODO: Move this method to data-tools.
    """
    x = item["position"]["x"] + platform["width"]/2
    y = item["position"]["y"] + platform["length"]/2
    z = platform["activeHeight"]
    return x, y, z

Calculate the XY center and the active-height Z of an item. TODO: Move this method to data-tools.

def sign(x)
Expand source code
@staticmethod
def sign(x):
    """Return the sign of a number (as 1 or -1)."""
    return copysign(1, x)

Return the sign of a number (as 1 or -1).

Instance variables

prop clearance
Expand source code
@property
def clearance(self):
    """Current clearance height."""
    if self._clearance:
        logging.debug("Getting clearance value...")
        val = self._clearance
    else:
        logging.debug("Calculating clearance value...")
        val = self.getClearance()
    return val

Current clearance height.

prop feedrate
Expand source code
@property
def feedrate(self):
    """Main feedrate for XYZ moves
    Default feedrate in mm/min (F parameter for G1) used for XYZ moves.
    This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate").
    Equivalences:
        60000 mm/min = 1000 * 60 mm/min  = 1000 mm/s = 1 m/s
    """
    return self.gcode.feedrate

Main feedrate for XYZ moves Default feedrate in mm/min (F parameter for G1) used for XYZ moves. This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate").

Equivalences

60000 mm/min = 1000 * 60 mm/min = 1000 mm/s = 1 m/s

Methods

def addAction(self, action, parse=True, append=True)
Expand source code
def addAction(self, action, parse=True, append=True):
    """
    Adds a new action to the current protocol, optionally parsing and appending it.

    This method inserts an action into the protocol's action list and, if specified, parses it
    to generate GCODE.

    Args:
        action (dict): The action to add to the protocol. It should contain necessary details
                    such as the command (`cmd`) and any associated arguments (`args`).
        parse (bool, optional): Whether to parse the action after adding it. Defaults to True.
        append (bool, optional): Whether to append the action to the protocol's action list.
                                If False, the action is not appended. Defaults to True.

    Returns:
        tuple: A tuple containing:
            - action_commands (None or parsed action commands): If `parse` is True, returns the parsed action commands.
            - action (dict): The original or parsed action, depending on whether parsing was requested.

    Notes:
        - The action is appended to the protocol's "actions" list if `append` is True.
        - The `parseAction` method is invoked if `parse` is set to True to process the action.
        - If the protocol doesn't contain an "actions" key, this method initializes it.
    """
    logging.debug("Adding action.")
    # Default action index.
    action_index = None
    # Add the new action to the protocol.
    if append:
        logging.debug("Appending action to protocol list.")
        # Check that the protocol object has an actions key or create it.
        actions = self.protocol.setdefault("actions", [])
        # Append the action (note: this propagates to the action in the self.protocol dictionary).
        actions.append(action)
        action_index=len(actions)-1

    action_commands, action = None, None
    if parse:
        logging.debug("Parsing action.")
        action = self.parseAction(action=action, action_index=action_index)

    # TODO: Remove "action_commands", it does nothing.
    return action_commands, action

Adds a new action to the current protocol, optionally parsing and appending it.

This method inserts an action into the protocol's action list and, if specified, parses it to generate GCODE.

Args

action : dict
The action to add to the protocol. It should contain necessary details such as the command (cmd) and any associated arguments (args).
parse : bool, optional
Whether to parse the action after adding it. Defaults to True.
append : bool, optional
Whether to append the action to the protocol's action list. If False, the action is not appended. Defaults to True.

Returns

tuple
A tuple containing: - action_commands (None or parsed action commands): If parse is True, returns the parsed action commands. - action (dict): The original or parsed action, depending on whether parsing was requested.

Notes

  • The action is appended to the protocol's "actions" list if append is True.
  • The parseAction method is invoked if parse is set to True to process the action.
  • If the protocol doesn't contain an "actions" key, this method initializes it.
def addToolOffsets(self, x=None, y=None, z=None, tool_name: str = None)
Expand source code
def addToolOffsets(self, x=None, y=None, z=None, tool_name:str=None):
    """Add offsets of the current tool.

    Args:
        x (float, optional): Value of the x coordinate to offset. Defaults to None.
        y (float, optional): Value of the y coordinate to offset. Defaults to None.
        z (float, optional): Value of the z coordinate to offset. Defaults to None.
        tool_name (str, optional): Name of a tool. Defaults to None.

    Returns:
        tuple: An updated coordiante set, with one element per each non-None coordinate argument.
    """
    if tool_name is None:
        # Use the current tool if none was provided.
        if self.current_tool:
            logging.debug(f"Adding offsets with self.current_tool={self.current_tool}")
            tool_name = self.current_tool
        # If there is not current tool, then return the input.
        else:
            logging.debug("No offsets added, self.current_tool was 'None'.")
            return x, y, z

    # Defaults.
    coords = [x, y, z]

    if x is not None:
        coords[0] += self.tools[tool_name].x_offset

    if y is not None:
        coords[1] += self.tools[tool_name].y_offset

    if z is not None:
        coords[2] += self.tools[tool_name].z_offset

    return tuple(coords)

Add offsets of the current tool.

Args

x : float, optional
Value of the x coordinate to offset. Defaults to None.
y : float, optional
Value of the y coordinate to offset. Defaults to None.
z : float, optional
Value of the z coordinate to offset. Defaults to None.
tool_name : str, optional
Name of a tool. Defaults to None.

Returns

tuple
An updated coordiante set, with one element per each non-None coordinate argument.
def add_action_handler(self, name: str, function: Callable)
Expand source code
def add_action_handler(self, name: str, function: Callable):
    """Add a callback function responsible for generating precalculated GCODE for a certain protocol action.

    This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory.
    The "base_actions" plugin can be used as a reference.

    Args:
        name (str): Name for the action handler, this must match the value of the "CMD" key in the action.
        function (Callable): Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively.

    Raises:
        Exception: Raises an exception when a handler has already been registered for a certain CMD.
    """
    if self.action_switch_case.get(name, None):
        msg = f"Attempted to overwrite action handler '{name}'."
        logging.error(msg)
        raise CommanderError(msg)
    else:
        logging.info(f"Registered new action handler for '{name}' actions.")
        self.action_switch_case[name] = function

Add a callback function responsible for generating precalculated GCODE for a certain protocol action.

This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory. The "base_actions" plugin can be used as a reference.

Args

name : str
Name for the action handler, this must match the value of the "CMD" key in the action.
function : Callable
Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively.

Raises

Exception
Raises an exception when a handler has already been registered for a certain CMD.
def calc_clearance(self, platforms: list)
Expand source code
def calc_clearance(self, platforms: list):
    """Calculate a conservative clearance for a set of platforms, considering all their possible containers."""
    if not platforms:
        raise ValueError("No platforms provided for calculation.")

    max_height = 0.0
    for platform in platforms:
        # Get platform height and active height.
        height = platform["height"]
        active_h = platform["activeHeight"]
        # Take containers into account.
        max_container_height = 0
        for link in platform.get("containers", []):
            offset = link["containerOffsetZ"]
            container = self.controller.database_tools.getContainerByName(link["container"])
            length = container["length"]
            max_container_height = max(max_container_height, active_h + length - offset )
        # Update the max height.
        max_height = max(max_height, height, max_container_height)

    return max_height

Calculate a conservative clearance for a set of platforms, considering all their possible containers.

def check_toolchange(self, action: dict, action_command_id: str)
Expand source code
def check_toolchange(self, action: dict, action_command_id: str):
    """Check if the action requires a toolchange and apply it."""
    # Check for a toolchange.
    toolchange = False

    # Check for arguments in action.
    if "args" in list(action):
        # Either "str" (tool name), None (do nothing) or False/"" (drop tool).
        new_tool = action["args"].get("tool", None)
        logging.info(f"Checking for toolchange on new_tool={new_tool} and action: {action}")

        # Handle cases.
        if action["cmd"] in self.tc_check_skip_actions:
            # Do nothing if this action does not demand a tool-change check.
            pass
        if new_tool is False or new_tool == "":
            # NOTE: Enter this block withother false-like values.
            # Drop current tool.
            new_tool = None
            if self.current_tool:
                # Park the current tool if any.
                toolchange = True
        elif new_tool:
            # If the tool ID is different from the current one,
            # a tool-change is needed before the action takes place.
            if new_tool != self.current_tool:
                # Flag the tool-change, so its GCODE is added to the action.
                toolchange = True
        else:
            # Do nothing if None.
            assert new_tool is None

    # Append the tool-change command to the action commands if a toolchange is needed
    if toolchange:
        self.toolChangeMacro(new_tool=new_tool)
    # If no toolchange was triggered, check if the current action is a HOME action.
    elif action_command_id == "HOME":
        # Park the loaded tool if any, before homing.
        # TODO: Replace hardcoded action ID with the one in base_actions.py.
        self.toolChangeMacro(new_tool=None)

    return toolchange

Check if the action requires a toolchange and apply it.

def checkpoint(self)
Expand source code
def checkpoint(self):
    """Creates a snapshot (checkpoint) of the current state of relevant attributes.

    Saves the state of workspaces, protocol, current_action, and previous_action.
    """
    try:
        self._checkpoint = {n: deepcopy(getattr(self, n)) for n in self.state_attributes}
        logging.info("Checkpoint created for the current state.")
    except AttributeError as e:
        logging.error(f"Failed to create checkpoint: {e}")
        raise RuntimeError("Could not create checkpoint due to missing attributes.") from e

Creates a snapshot (checkpoint) of the current state of relevant attributes.

Saves the state of workspaces, protocol, current_action, and previous_action.

def comment_action_parser(self, action, i)
Expand source code
def comment_action_parser(self, action, i):
    comment_command = [self.gcode.comment(message=action["args"]["text"])]
    self.extend_commands(comment_command)
    logging.info(f"Processed {self.comment_action_name} action with index {i} and text: '{action['args']['text']}'")
    return comment_command
def commit(self)
Expand source code
def commit(self):
    """Commits the current state by clearing the checkpoint.

    This method should be called after successfully processing an action.
    """
    self._checkpoint = None  # Clear the checkpoint since the changes are confirmed
    logging.info("Changes committed, checkpoint cleared.")

Commits the current state by clearing the checkpoint.

This method should be called after successfully processing an action.

def extend_commands(self, commands: list, action=None)
Expand source code
def extend_commands(self, commands: list, action=None):
    """Append GCODE to the commands list, and to the active protocol action (if any)."""
    # Extend GCODE commands list.
    self.commands.extend(commands)
    # Extend the GCODE list in the action.
    if action is None:
        # Override the action argument with the current action.
        action = self.current_action
    # Extend the GCODE in the action.
    action.setdefault("GCODE", [])
    action["GCODE"].extend(commands)

Append GCODE to the commands list, and to the active protocol action (if any).

def getActions(self)
Expand source code
def getActions(self):
    return self.protocol.get("actions", [])
def getClearance(self, item_names: list = None)
Expand source code
def getClearance(self, item_names:list=None):
    """Get the minimum clearance level for the Z axis from the platformsInWorkspace definition"""
    if not self.platformsInWorkspace:
        logging.error("There are no platforms in the workspace, or no workspace has been set up. Returning None.")
        clearance = None
    else:
        if item_names:
            logging.warning(f"Calculating clearance only between items: {item_names}")
            platforms_to_clear = [i for i in self.platformsInWorkspace if i["name"] in item_names]
            clearance = self.calc_clearance(platforms_to_clear)
        else:
            clearance = self.calc_clearance(self.platformsInWorkspace)

    logging.info(f"Calculated platform clearance value: {clearance}")
    return clearance

Get the minimum clearance level for the Z axis from the platformsInWorkspace definition

def getContentCoords(self, action, allow_next=False, pop_from_item=False, pop_from_db=False)
Expand source code
def getContentCoords(self, action, allow_next=False,
                     pop_from_item=False, pop_from_db=False):
    """
    Get XYZ coordinates for the action's platform item (e.g. a "tube").

    Also apply tool offsets, if any.
    """

    # Get platform and platform item.
    item_name = action["args"]["item"]
    platform_item = self.controller.database_tools.getWorkspaceItemByName(
        workspace=self.workspace,
        item_name=item_name)

    # Get platform definition from the name in the platform item.
    platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item)

    # Get the content.
    content = action["args"].get("content", None)
    selector = action["args"].get("selector", None)
    if content:
        # Use the provided content.
        pass
    elif selector:
        # Get content from the platform item using the selector.
        # TODO: Consider getting and using "content_type" as a filter here.
        content = self.controller.database_tools.getNextContent(
            platform_item=platform_item, selector=selector,
            # Pass optional "pop" arguments.
            pop_from_item=pop_from_item, pop_from_db=pop_from_db
        )
    elif allow_next:
        # Get any next content from the platform item.
        # TODO: Review this and consider deduplicating with the clause above.
        content = self.controller.database_tools.getNextContent(
            platform_item=platform_item,
            # Pass optional "pop" arguments.
            pop_from_item=pop_from_item, pop_from_db=pop_from_db
        )
    else:
        msg = f"Can't obtain content from action. Insufficient selector information:\n{pprint.pformat(action)}"
        logging.error(msg)
        raise DataError(msg)

    # Calculate XY of tube center.
    x, y = self.getContentXY(content, platform_item, platform)

    # Calculate Z for pipetting.
    z = self.getContentZ(content, platform_item, platform)

    return content, x, y, z

Get XYZ coordinates for the action's platform item (e.g. a "tube").

Also apply tool offsets, if any.

def getContentZ(self, content=None, platform_item=None, platform=None)
Expand source code
def getContentZ(self, content=None, platform_item=None, platform=None):
    """
    Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip).

    Defaults to using the Z from the content or, if unavailable, information from the platform.

    Formula: z = `deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z`
    """

    # Default to using the Z defined in the content.
    if content is not None:
        # Skip everything if the content provides a Z coordinate.
        if "position" in content:
            if "z" in content["position"]:
                return content["position"]["z"]

    # Use the platform item to get the platform if undefined.
    if (platform is None) and (platform_item is not None):
        platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item)

    # Get the default Z from the platform.
    if platform is not None:
        # Get container link info.
        container_name = content["container"]
        container_link = next(c for c in platform["containers"] if c["container"] == container_name)
        container = self.controller.database_tools.getContainerByName(container_name)

        # Workspace Z, the starting point.
        workspace_z = self.workspace.get("z", 0)
        # TODO: This is not in the schema, perhaps it should be.

        # Item Z, position relative to workspace.
        item_z = platform_item["position"].get("z", 0)

        # Z-coordinate of the bottom of the platform (i.e. relative to the reference tool, with offsets [0,0,0]).
        active_z_platform = platform["activeHeight"]
        # NOTE: Prevously named "defaultBottomPosition" or "defaultLoadBottom".

        # Subtract the container's offset to the platform.
        offset_z = -container_link["containerOffsetZ"]

        # Add the container's active height.
        active_z_container = container["activeHeight"]
        # NOTE: In the case of tips, this is equal to their total length.

        # Add it up.
        z = workspace_z + item_z + active_z_platform + offset_z + active_z_container
        logging.debug(f"Parsed content height {z} mm from: workspace_z={workspace_z} item_z={item_z} active_z_platform={active_z_platform} offset_z={offset_z} active_z_container={active_z_container}")

        # Done, phew!
        return z

    # Raise an error if the inut did not match any condition above.
    raise DataError("getContentZ: Failed to calculate Z. Information is missing.")

Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip).

Defaults to using the Z from the content or, if unavailable, information from the platform.

Formula: z = deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z

def getGcode(self)
Expand source code
def getGcode(self):
    return "\n".join(self.commands)
def getItemXYZ(self, item_name: str)
Expand source code
def getItemXYZ(self, item_name: str):
    """Get the XYZ coordinates of an item.
    TODO: Move this method to data-tools.
    """
    # Get platform and platform item.
    item = self.controller.database_tools.getWorkspaceItemByName(
        workspace=self.workspace,
        item_name=item_name)
    # Get platform definition from the name in the platform item.
    platform = self.controller.database_tools.getPlatformByName(
        platformsInWorkspace=self.platformsInWorkspace,
        platform_item=item)
    # Get coordinates
    x, y, z = self.itemCenter(item, platform)

    return x, y, z

Get the XYZ coordinates of an item. TODO: Move this method to data-tools.

def getSafeHeight(self, item_names: list = None)
Expand source code
def getSafeHeight(self, item_names:list=None):
    """
    Calculate the Z coordinate above all contents, considering if a tip is placed on the current tool.
    Returns "None" if the clearance has not been set.
    """
    # Calculate safe hieght.
    z = 0

    # Add minimum clearance to Z coordinate.
    z += self.extra_clearance

    # Get clearance height from platforms.
    platform_clearance = self.getClearance(item_names=item_names)
    if platform_clearance:
        z += platform_clearance

    # Add clearance from tools.
    if self.current_tool:
        logging.info(f"Adding tool offset from tool '{self.current_tool}' to clearance.")
        # Unpacking is needed.
        z = self.addToolOffsets(z=z)[2]

    # Done!
    logging.info(f"Calculated safe height: {z}")
    return z

Calculate the Z coordinate above all contents, considering if a tip is placed on the current tool. Returns "None" if the clearance has not been set.

def getSafeY(self)
Expand source code
def getSafeY(self):
    """
    There is a limit on the Y axis movement. The tool-holder can crash into the parked tools.
    Here we use the parking post coordinates to establish a safe limit for the Y direction.
    NOTE: The return value can be "None" when there are no tools at all.
    """
    # Default.
    y_clearance = None

    # Get Y position of the empty tool post of the currently attached tool, if any.
    # In theory this one will be the farthest from the tool-head, but it needs to be
    # accounted for even so (e.g. maybe it has really "long" pins on the tool-post).
    if self.current_tool:
        y_clearance = self.tools[self.current_tool].safe_y_post

    # Get safe Y position of the parked tools.
    safe_y_parked = [tool.safe_y_parked for tool in self.tools.values() if tool.name != self.current_tool]

    # Use the closest safe distance.
    if safe_y_parked and y_clearance:
        # NOTE: Corrected for axis inversion.
        y_clearance = max(y_clearance, *safe_y_parked)
    elif safe_y_parked:
        # NOTE: Corrected for axis inversion.
        y_clearance = max(safe_y_parked)

    # Adjust tool distance protruding from the toolhead, if any.
    if self.current_tool:
        # NOTE: Corrected for axis inversion.
        y_clearance += self.tools[self.current_tool].safe_y

    return y_clearance

There is a limit on the Y axis movement. The tool-holder can crash into the parked tools. Here we use the parking post coordinates to establish a safe limit for the Y direction. NOTE: The return value can be "None" when there are no tools at all.

def getTubeAndCoords(self, action)
Expand source code
def getTubeAndCoords(self, action):
    """Get XYZ coordinates of a tube and it's parameters.

    TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else.

    It can either get the info from a reference to a platform item and a content within,
    (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR).

    Args:
        action (_type_): _description_

    Returns:
        _type_: _description_

    Example action 'args' for coords-based position: {
        "position": {'x': ,'y': , 'z':},
        "content": {"volume": 10}
    }

    Example tube content: {
        'index': 2,
        'maxVolume': 1500,
        'name': 'buffer',
        'position': {'col': 2, 'row': 1},
        'tags': ['mixcomp', 'buffer'],
        'type': 'tube',
        'volume': 7.2
    }
    """
    if "position" in action["args"]:
        # Get coords from a coords-type tube object.
        coords = action["args"]["position"]
        x, y, z = coords.get("x", None), coords.get("y", None), coords.get("z", None)
        # Get information about the tube.
        content = action["args"]["content"]
        # A tube must contain "volume". Ensure that the required data is there.
        # NOTE: Removed because an exception would be raised in 'macroGoToAndMix' anyways,
        #       which is the only function actually using volume information.
        # content["volume"] = float(action["args"]["content"]["volume"])
    else:
        # Get XYZ coordinates for the action's "platform"-type tube,
        # applying current tool offsets if any.
        content, x, y, z = self.getContentCoords(action)

    return content, x, y, z

Get XYZ coordinates of a tube and it's parameters.

TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else.

It can either get the info from a reference to a platform item and a content within, (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR).

Args

action : _type_
description

Returns

_type_
description

Example action 'args' for coords-based position: { "position": {'x': ,'y': , 'z':}, "content": {"volume": 10} }

Example tube content: { 'index': 2, 'maxVolume': 1500, 'name': 'buffer', 'position': {'col': 2, 'row': 1}, 'tags': ['mixcomp', 'buffer'], 'type': 'tube', 'volume': 7.2 }

def get_current_action_cmd(self)
Expand source code
def get_current_action_cmd(self):
    """
    Retrieves the command ID from the current action.

    This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the `cmd` field
    of the current action. If the action is a JSON-based action (i.e., the `cmd` field is "JSON"),
    the method updates the command ID to the one found within the action's arguments and logs the update.

    Returns:
        str: The command ID of the current action.

    Notes:
        - If the command is a JSON action (where `cmd` is "JSON"), the method looks in the
        `args` dictionary of the current action to retrieve the actual command ID.
        - Logs an informational message if the command ID is updated from a JSON action.
    """
    # Get the action command ID.
    action_command_id = self.current_action['cmd']

    # Check if it is a JSON action, and update the command id with the one in its arguments.
    if action_command_id == "JSON":
        action_command_id = self.current_action['args']['cmd']
        logging.info(f"Updated action command from JSON action to: {action_command_id}")

    return action_command_id

Retrieves the command ID from the current action.

This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the cmd field of the current action. If the action is a JSON-based action (i.e., the cmd field is "JSON"), the method updates the command ID to the one found within the action's arguments and logs the update.

Returns

str
The command ID of the current action.

Notes

  • If the command is a JSON action (where cmd is "JSON"), the method looks in the args dictionary of the current action to retrieve the actual command ID.
  • Logs an informational message if the command ID is updated from a JSON action.
def initialize_objects(self,
protocol: dict = None,
workspace: dict = None,
platformsInWorkspace: dict = None,
clear_unspecified=True)
Expand source code
def initialize_objects(self,
                       protocol:dict=None,
                       workspace:dict=None,
                       platformsInWorkspace:dict=None,
                       clear_unspecified=True):
    """Set or reset protocol, workspace, platforms, and clearance properties.
    This can be useful when switching protocols or streaming actions, and still preserve state tracking.

    This clears the unspecified objects when `clear_unspecified=True`.
    See 'update_objects' for a non-clearing update.
    """
    # Define protocol property.
    if protocol is not None:
        self.protocol = protocol
        logging.info(f"New protocol set to: {protocol['name']}")
        logging.debug(f"New protocol definition:\n{pformat(protocol)}")
    elif clear_unspecified:
        self.protocol = dict()
        logging.warning("Warning, protocol property now empty.")

    # Set workspace.
    if workspace is not None:
        self.workspace = workspace
        logging.info(f"New workspace set to: '{workspace['name']}'")
    elif clear_unspecified:
        self.workspace = {}
        logging.warning("Warning, workspace property now empty.")

    # Get platform definitions of platform items in wokspace.
    if platformsInWorkspace is not None:
        self.platformsInWorkspace = platformsInWorkspace
        logging.info(f"New platformsInWorkspace set to {[i['name'] for i in platformsInWorkspace]}")
    elif clear_unspecified:
        self.platformsInWorkspace = list()
        logging.warning("Warning, platformsInWorkspace property now empty.")
    elif workspace is not None:
        # Get the platforms for the provided workspace, if no platforms were provided.
        logging.info(f"platformsInWorkspace not provided. Getting it from the provided workspace '{workspace['name']}'.")
        self.platformsInWorkspace = self.controller.database_tools.getPlatformsInWorkspace(workspace)
        logging.info(f"Available items: {[i['name'] for i in self.platformsInWorkspace]}")

    # Clearance and probing parameters.
    # NOTE: Using @propery decorator below to defer getClearance() calculation until a workspace is available.
    #       This must be run because otherwise the getter method can fail with "AttributeError".
    if self.platformsInWorkspace:
        # NOTE: getClearance requires "self.platformsInWorkspace".
        self.clearance = self.getClearance()
    elif clear_unspecified:
        self.clearance = None

    # Consistency check.
    if self.protocol and self.workspace:
        if self.workspace["name"] != self.protocol["workspace"]:
            msg = f"The protocol's workspace ({self.workspace['name']}) "
            msg += f"does not match the specified workspace ({self.protocol['workspace']})."
            logging.warning(msg)

Set or reset protocol, workspace, platforms, and clearance properties. This can be useful when switching protocols or streaming actions, and still preserve state tracking.

This clears the unspecified objects when clear_unspecified=True. See 'update_objects' for a non-clearing update.

def load_tool(self, tool_data)
Expand source code
def load_tool(self, tool_data):
    """Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data.
    The initialized tool object is finally registered in the tools dict by calling 'register_tool'.
    If no loader is found, then an error is logged.
    """
    # Parse some basic info.
    tool_type = tool_data["type"]
    tool_name = tool_data["name"]
    # Check if enabled.
    if not tool_data.get("enabled", True):
        raise DataError(f"Won't load disabled tool '{tool_name}'. Set the 'enable' property in its data first.")
    # Get the loader function for this tool type.
    loader_function = self.tool_loaders.get(tool_type, None)
    # Load and register the tool.
    if loader_function is None:
        # Log a warning if no tool loader was found for this tool type.
        logging.error(f"There is no loader function for tool '{tool_name}' of type '{tool_type}'.")
    else:
        # Load the tool.
        logging.info(f"Loading tool '{tool_name}' of type '{tool_type}'.")
        tool = loader_function(tool_data)
        # Register the tool.
        logging.info(f"Registering tool '{tool_name}' of type '{tool_type}'.")
        self.register_tool(tool)

Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data. The initialized tool object is finally registered in the tools dict by calling 'register_tool'. If no loader is found, then an error is logged.

def load_tools(self, tools_list: list = None)
Expand source code
def load_tools(self, tools_list: list = None):
    """Calls 'load_tool' on each tool in the list.
    If empty, it defaults to all tools in the database.
    """
    # Default to all tools from the DB.
    if tools_list is None:
        tools_list: list = deepcopy(self.controller.database_tools.tools)
    # Load all tools in the list.
    for tool_data in tools_list:
        if not tool_data.get("enabled", True):
            # TODO: Remove the default above once all tools have the property.
            logging.warning(f"Tool '{tool_data['name']}' is disabled, and will be skipped.")
        else:
            self.load_tool(tool_data)

Calls 'load_tool' on each tool in the list. If empty, it defaults to all tools in the database.

def macroGoTo(self, action, i, extend=True)
Expand source code
def macroGoTo(self, action, i, extend=True):
    # Move over the target content's XY.
    commands = self.macroGoToTubeXY(action, i, extend=extend)
    # Move to the bottom position of the conntent.
    commands += self.macroGoToTubeZ(action, i, extend=extend)

    return commands
def macroGoToTubeXY(self, action, i, extend=True) ‑> list
Expand source code
def macroGoToTubeXY(self, action, i, extend=True) -> list:
    """Get XY coordinates for the action's tube, applying current tool offsets if any."""
    logging.info(f"Getting XY coordinates for a tube in action {i}.")
    _, x, y, _ = self.getTubeAndCoords(action)

    # Move over the content.
    go_over_tip_commands = self.macroGoToXYZ(x=x, y=y, extend=extend)

    return go_over_tip_commands

Get XY coordinates for the action's tube, applying current tool offsets if any.

def macroGoToTubeZ(self, action, i, extend=True) ‑> list
Expand source code
def macroGoToTubeZ(self, action, i, extend=True) -> list:
    """Get Z coordinate for the action's tube, applying current tool offsets if any.
    TODO: be smarter about pipetting height, using current tube volume to increase this.
    """
    logging.info(f"Getting Z coordinates for a tube in action {i}.")
    _, _, _, z = self.getTubeAndCoords(action)

    # Go to the bottom of the tube.
    go_into_tip_commands = self.macroGoToXYZ(z=z, extend=extend)

    return go_into_tip_commands

Get Z coordinate for the action's tube, applying current tool offsets if any. TODO: be smarter about pipetting height, using current tube volume to increase this.

def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment='') ‑> list
Expand source code
def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment="") -> list:
    """Move to an absolute XYZ position, adding tool-offsets by default.
    TODO: Send an 'xyz_move' event to any relevant event listeners.
          This is meant to implement obstacle avoidance or advanced movepath planning.
    """
    # Default feedrate.
    if f is None:
        f = self.feedrate

    # Add the current tool's offsets.
    if add_offset:
        # Unpacking is needed.
        x, y, z = self.addToolOffsets(x=x, y=y, z=z)

    # Go to the requested position.
    commands = self.gcode.G1(x=x, y=y, z=z, f=f, absolute=True, add_comment=add_comment)

    # Let everyone know about the move.
    # NOTE: Other parts of the program may alter the commands list.
    #       This is meant to implement obstacle avoidance or advanced
    #       movepath planning with plugins.
    # TODO: Rethink the event system for non-async stuff.
    # self.controller.trigger_event_callback(
    #     event_name=self.xyz_move_event,
    #     parameters={"x": x, "y": y, "z": z, "f": f, "add_offset": add_offset},
    #     commands=commands
    # )

    # Add the commands to the current action and internal list.
    if extend:
        self.extend_commands(commands)

    return commands

Move to an absolute XYZ position, adding tool-offsets by default. TODO: Send an 'xyz_move' event to any relevant event listeners. This is meant to implement obstacle avoidance or advanced movepath planning.

def macroMoveSafeHeight(self, item_names=None, extend=True)
Expand source code
def macroMoveSafeHeight(self, item_names=None, extend=True):
    """Fast move to safe height in Z axis, considering loaded tip's length"""
    # Get the clearance height considering all platforms.
    z = self.getSafeHeight(item_names=item_names)
    # Build the move command.
    move_commands = self.gcode.gcodeClearance(
        z=z, f=self.feedrate,
        add_comment="Moving to the clearance height."
    )
    if extend:
        # Extend the commands list.
        self.extend_commands(move_commands)
    return move_commands

Fast move to safe height in Z axis, considering loaded tip's length

def parseAction(self, action, action_index=None)
Expand source code
def parseAction(self, action, action_index=None):
    """This function interprets a protocol action and produces the corresponding GCODE

    Note that GCODE generation can depend on the current state.

    Args:
        action (dict): Python dictionary defining the action.
        action_index (integer): Action index number.

    Raises:
        Exception: Any exception is re-raised. Context info and traceback are printed just before.

    Returns:
        list: List of GCODE commands associated to the action.
    """
    # Python switch case implementations https://data-flair.training/blogs/python-switch-case/
    # All action interpreting functions will take two arguments
    # action: the action object
    # i: the action index for the current action
    # task: the object of class "TaskEnvironment"
    logging.info(f"Parsing action with index {action_index} and command '{action['cmd']}'.")

    # Save checkpoint.
    self.checkpoint()

    # Default value for action index.
    if action_index is None:
        action_index = len(self.getActions())
        logging.warning(f"Replaced null index with index {action_index}.")

    # Save previous action.
    self.previous_action = self.current_action
    # Assign current action to self slot.
    self.current_action = action
    # Create a list for the actions GCODE commands.
    self.current_action["GCODE"] = []

    # Add defaults to required properties.
    self.current_action.setdefault("args", {})

    # Validate workspace name.
    self.validate_current_action_workspace()

    # Parse the current action into GCODE, checking for tool-changes first.
    try:
        # Get the action command ID.
        action_command_id = self.get_current_action_cmd()

        # Get the action's GCODE function.
        action_function = self.action_switch_case[action_command_id]

        # Note new action start as comment in the GCODE.
        comments = [self.gcode.comment(f"Building action {action_index}, with command: {action_command_id}")]
        self.extend_commands(comments)

        # Check and produce GCODE for a toolchange, if needed.
        self.check_toolchange(self.current_action, action_command_id)

        # Produce the GCODE for the action.
        if self.verbose:
            logging.debug(f"Processing command '{action_command_id}' with index '{action_index}'.")
        # Use the action function to generate its GCODE.
        # NOTE: The "action_output"s do not contain any gcode, only mildly informative messages.
        #       GCODE is saved to the current action and to the "commands" property.
        action_function(self.current_action, action_index)

        # Make a copy of the GCODE.
        final_action = deepcopy(self.current_action)

        # Run action callbacks.
        # This allows other parts of the program to "do stuff"
        # when a certain kind of command has been parsed.
        # TODO: Replace it with the Controller's events system.
        self.run_action_callbacks(action_command_id, final_action)

        # Log success.
        logging.info(f"Successfully parsed action with index {action_index} and command '{action_command_id}'.")

    except Exception as err:
        msg = f"Parser error at action number {action_index} with content '{self.current_action}' message: {err}"
        logging.error(msg)
        print(msg)
        # Rollback changes.
        self.rollback()
        raise ProtocolError(msg) from err

    else:
        # Note action parsed.
        msg = f"Done processing action with index={action_index} and command={action_command_id}"
        logging.debug(msg)
        # Commit changes.
        self.commit()

    return final_action # Returns GCODE and current action.

This function interprets a protocol action and produces the corresponding GCODE

Note that GCODE generation can depend on the current state.

Args

action : dict
Python dictionary defining the action.
action_index : integer
Action index number.

Raises

Exception
Any exception is re-raised. Context info and traceback are printed just before.

Returns

list
List of GCODE commands associated to the action.
def parseProtocol(self, actions: list = None)
Expand source code
def parseProtocol(self, actions:list=None):
    """This function takes parses a list of actions into gcode, either provided or from the class' property."""
    if actions is None:
        actions = self.protocol["actions"]
        logging.warning(f"No actions passed. Using {len(actions)} actions from the active protocol.")

    logging.info(f"Parsing {len(actions)} protocol actions.")

    # NOTE: "parseAction" returns a tuple with the commands list and the action, hence the "[0]" below.
    # NOTE: Cada action parser guarda el gcode implicitamente en "self.protocol" y en el GCODE del action.
    final_actions = []
    for i, action in enumerate(actions):
        final_actions.append(self.parseAction(action, i))

    logging.info(f"Parsed {len(actions)} actions.")

    return deepcopy(self.protocol)

This function takes parses a list of actions into gcode, either provided or from the class' property.

def register_action_callback(self, commands: list, callback_function: Callable)
Expand source code
def register_action_callback(self, commands: list, callback_function: Callable):
    """Callback functions to be run by 'parseAction' when the an an action is parsed.

    This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below.

    Args:
        commands (list): a list of strings, each an action command.
        callback (Callable): a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument.
    """
    self.action_callbacks.append({
        "commands": commands,
        "callback": callback_function
        })

Callback functions to be run by 'parseAction' when the an an action is parsed.

This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below.

Args

commands : list
a list of strings, each an action command.
callback : Callable
a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument.
def register_tool(self, tool: Tool)
Expand source code
def register_tool(self, tool: Tool):
    """Add a tool object to the tools dictionary.

    A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets).

    It also must have a few important methods called form this class (e.g. home).
    """
    tool_name = tool.name
    logging.info(f"Registering tool with name '{tool_name}'.")
    if tool_name in list(self.tools):
        msg = f"The tool '{tool_name}' has already been registered."
        logging.error(msg)
        raise DataError(msg)
    self.tools[tool_name] = tool

Add a tool object to the tools dictionary.

A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets).

It also must have a few important methods called form this class (e.g. home).

def register_tool_loader(self, tool_type: str, loader_function)
Expand source code
def register_tool_loader(self, tool_type: str, loader_function):
    """Register a 'loader' function for a particular tool 'type', which will be called to load all of its kind.
    There can be only one loader for each tool type, and each should be registered by a particular plugin.
    For example, see 'load_pipette' defined in the 'pipettes.py' plugin.
    """
    # Raise an error if the tool_type already has a registered loader function.
    if tool_type in self.tool_loaders:
        msg = f"Tool type '{tool_type}' already has a handler function: {self.tool_loaders[tool_type]}"
        logging.error(msg)
        raise DataError(msg)
    # Register the function.
    self.tool_loaders[tool_type] = loader_function

Register a 'loader' function for a particular tool 'type', which will be called to load all of its kind. There can be only one loader for each tool type, and each should be registered by a particular plugin. For example, see 'load_pipette' defined in the 'pipettes.py' plugin.

def rollback(self)
Expand source code
def rollback(self):
    """Rolls back to the previous state using the saved checkpoint.

    Restores workspaces, protocol, current_action, and previous_action from the checkpoint.
    Raises an error if no checkpoint is available.
    """
    if not hasattr(self, "_checkpoint") or not self._checkpoint:
        logging.error("No checkpoint available to roll back to.")
        raise RuntimeError("No checkpoint available for rollback.")
    try:
        for attr_name in self.state_attributes:
            setattr(self, attr_name, deepcopy(self._checkpoint[attr_name]))
        logging.warning("Rolled back to the previous checkpoint state.")
    except Exception as e:
        logging.error(f"Error during rollback: {e}")
        raise RuntimeError(f"Failed to rollback due to an error: {e}") from e

Rolls back to the previous state using the saved checkpoint.

Restores workspaces, protocol, current_action, and previous_action from the checkpoint. Raises an error if no checkpoint is available.

def run_action_callbacks(self, action_command_id: str, action: dict)
Expand source code
def run_action_callbacks(self, action_command_id: str, action: dict):
    """Run any callback functions registered to run after an action is processed.
    TODO: consider rewriting this in reverse, by "emitting" events like Klipper does.
    TODO: Replace it with the Controller's events system.
    """
    for action_callback in self.action_callbacks:
        # Get the action commands that the callback is meant to handle and the callback.
        callback_cmds = action_callback["commands"]
        callback_function = action_callback["callback"]
        # Execute the callback if there is a match.
        if action_command_id in callback_cmds:
            callback_function(self, action)
        # Check that the callback has a chance of doing something, just in case.
        for callback_cmd in callback_cmds:
            if not self.action_switch_case.get(callback_cmd, None):
                msg = f"The callback command '{callback_cmd}' was not found in the "
                msg += f"registered action handlers: {list(self.action_switch_case)}"
                logging.warning(msg)

Run any callback functions registered to run after an action is processed. TODO: consider rewriting this in reverse, by "emitting" events like Klipper does. TODO: Replace it with the Controller's events system.

def save_gcode(self, path='examples/test.gcode')
Expand source code
def save_gcode(self, path="examples/test.gcode"):
    """Save the GCODE to a file"""
    logging.debug(f"commanderTest.py message: Saving GCODE to file: {path}")
    with open(path, "w", encoding="utf-8") as gcode_file:
        gcode_file.writelines(self.getGcode())

Save the GCODE to a file

def toolChangeMacro(self, new_tool: str, extend=True, operation_time=30.0, action: dict = None)
Expand source code
def toolChangeMacro(self, new_tool:str, extend=True, operation_time=30.0, action:dict=None):

    logging.info(f"Changing tools from '{self.current_tool}' to '{new_tool}'")

    # Set the default action to the current action.
    if action is None:
        logging.info("No action provided, defaulting to the current action.")
        action = self.current_action

    # Park current tool, if any.
    park_commands = []
    if self.current_tool is None:
        # Nothing to park.
        msg = "Nothing to park, the current tool is 'None'."
        logging.info(msg)
        park_commands += [self.gcode.comment(msg)]
    elif self.current_tool == new_tool:
        # Nothing to do.
        msg = "Parking skipped. Incoming and outgoing tool are equal."
        logging.info(msg)
        park_commands += [self.gcode.comment(msg)]
    else:
        # Generate GCODE for the parking sequence.
        tool = self.tools[self.current_tool]
        park_commands += tool.park(self.current_tool, new_tool)
        # Extend the timeout.
        if action is not None:
            logging.debug("Adding parking operation time to action.")
            self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

    # # Update the current tool (intermediary state).
    # self.current_tool = None

    # Pick-up the next tool
    pickup_commands = []
    if new_tool is None:
        # There is nothing to do if the toolchange was just for parking.
        msg = "Nothing to pickup, the new tool is 'None'."
        logging.info(msg)
        park_commands += [self.gcode.comment(msg)]
    elif self.current_tool == new_tool:
        # Nothing to do.
        msg = "Pickup skipped. Incoming and outgoing tool are equal."
        logging.info(msg)
        park_commands += [self.gcode.comment(msg)]
    else:
        # Generate GCODE for the change.
        tool = self.tools[new_tool]
        pickup_commands += tool.pickup(self.current_tool, new_tool)
        # Extend the timeout.
        if action is not None:
            logging.debug("Adding pickup operation time to action.")
            self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

    # Update the current tool.
    self.current_tool = new_tool

    # Add pickup commands to the list.
    tc_commands = park_commands + pickup_commands
    if extend and tc_commands:
        logging.debug("Extending GCODE commands in action.")
        self.extend_commands(commands=tc_commands, action=action)

    # Return
    return tc_commands
def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None)
Expand source code
def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None):
    """Update the specified objects, not clearing the rest.
    If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead.
    Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects").
    """
    logging.info("Updating objects.")
    self.initialize_objects(
        protocol=protocol,
        workspace=workspace,
        platformsInWorkspace=platforms_in_workspace,
        # Do not clear the unspecified objects.
        clear_unspecified=False)

Update the specified objects, not clearing the rest. If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead. Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects").

def update_objects_by_workspace(self, workspace_name: str, force: bool = True)
Expand source code
def update_objects_by_workspace(self, workspace_name:str, force: bool = True):
    """Override the workspace and current platforms objects of the builder

    If the DB needs to be updated, consider using "apply_settings" before.

    New objects are fetched from the DB by the workspace's name.

    Args:
        workspace_name (str): Name of the workspace.
        force (bool): Wether to force an update if the name of the current and new workspaces are equal. Defaults to True.
    """
    logging.info(f"Updating builder objects to match a new workspace: '{workspace_name}'")
    if self.workspace.get("name", None) == workspace_name:
        if force:
            logging.warning("Forced update. The previous workspace has the same name, but may hold a different state.")
        else:
            logging.warning("The previous workspace has the same name, update skipped.")
            return

    # Get workspace data.
    ws = self.controller.database_tools.getWorkspaceByName(workspace_name=workspace_name)
    # Get platforms.
    pl = self.controller.database_tools.getPlatformsInWorkspace(workspace=ws)
    # Update builder objects and clearance (not clearing the protocol).
    self.update_objects(workspace=ws, platforms_in_workspace=pl)

Override the workspace and current platforms objects of the builder

If the DB needs to be updated, consider using "apply_settings" before.

New objects are fetched from the DB by the workspace's name.

Args

workspace_name : str
Name of the workspace.
force : bool
Wether to force an update if the name of the current and new workspaces are equal. Defaults to True.
def validate_current_action_workspace(self)
Expand source code
def validate_current_action_workspace(self):
    """
    Validates that the workspace specified in the current action matches the active workspace.

    This method checks the `workspace` name from the current action and compares it with the
    active workspace name. If the two workspaces do not match, a `ProtocolError` is raised.
    If the action does not specify a workspace, it will default to the current workspace and
    log a warning.

    Raises:
        ProtocolError: If the workspace specified in the current action does not match the active workspace.

    Notes:
        - If both the action and the current workspace specify a name, they are compared.
        - If the action does not have a workspace name, the current workspace name is assigned
        to the action and a warning is logged.
    """
    current_workspace_name = self.workspace.get("name", None)
    action_workspace_name = self.current_action.get("workspace", None)
    if action_workspace_name and current_workspace_name:
        # If set, validate the workspace name from the action with the current workspace.
        if action_workspace_name != current_workspace_name:
            msg = f"Action with index {self.current_action.get('index', None)} and workspace '{action_workspace_name}'"
            msg += f" does not match the currently active workspace '{current_workspace_name}'."
            logging.error(msg)
            raise ProtocolError(msg)
    elif current_workspace_name:
        # If the action did not specify a workspace name, then add the name of the current workspace to it.
        msg = f"Action with index {self.current_action.get('index', None)} has no workspace info."
        msg += f" Defaulting to the current workspace: '{current_workspace_name}'"
        logging.warning(msg)
        self.current_action["workspace"] = current_workspace_name

Validates that the workspace specified in the current action matches the active workspace.

This method checks the workspace name from the current action and compares it with the active workspace name. If the two workspaces do not match, a ProtocolError is raised. If the action does not specify a workspace, it will default to the current workspace and log a warning.

Raises

ProtocolError
If the workspace specified in the current action does not match the active workspace.

Notes

  • If both the action and the current workspace specify a name, they are compared.
  • If the action does not have a workspace name, the current workspace name is assigned to the action and a warning is logged.