Module pipettin-piper.piper.gcode
Classes
class GcodeBuilder (protocol=None,
workspace=None,
platformsInWorkspace=None,
controller: Controller = None,
config: TrackedDict = None,
verbose=False)-
Expand source code
class GcodeBuilder: """ This class generates GCODE from Pipettin protocol "actions". It containse and consumes all information for the pipetting task: protocol, workspace, platforms and tools. It holds and tracks machine status or configuration during prococol 'slicing' to GCODE. Also stores all GCODE commands after parsing protocols. Some actions might depend on previous actions or action history. So these objects are for tracking the state of the machine during the translation of actions to GCODE, or during gcode streaming to the machine controllers. """ protocol: dict = None workspace: dict = None platformsInWorkspace: list = None tools_data: list current_action: dict def __init__(self, protocol=None, workspace=None, platformsInWorkspace=None, controller:Controller=None, config:TrackedDict=None, verbose=False): """ A class to contain all information for the pipetting task: protocol, workspace, platforms and tools. :param protocol: A protocol dictionary (JSON object) from the GUI/MongoDB, containing protocol 'actions'. :param workspace: A workspace dictionary (JSON object) from the GUI/MongoDB, containing the platform instances in the workspace. :param platformsInWorkspace: A platforms dictionary (JSON object) from the GUI/MongoDB, containing the definitions of platforms in the workspace. :param pipettes: A dictionary with pipette tool definitions. :param config: Configuration options loaded from the 'config.yml' YAML file. """ logging.info("Initializing pipettin's gcode generator class.") # Save configuration. if config is None: config = {} self.config = config self.verbose = verbose or config.get("verbose", False) # Instantiate gcode generator. self.gcode = GcodePrimitives(config=config) # GCODE commands list. self.commands = [] # Current Action property. self.previous_action = {} self.current_action = {} # Initialize snapshot variable. self._checkpoint: dict = None # Save controller and DB tools. self.controller: Controller = controller # Set workspace, platforms, and workspace properties. self.initialize_objects(protocol, workspace, platformsInWorkspace) # List of default parsers for the requested actions. self.action_switch_case = {} self.add_action_handler(name=self.comment_action_name, function=self.comment_action_parser) # Actions that can skip a tool-change check. self.tc_check_skip_actions = [self.comment_action_name] # TODO: base these values on a config file. self.extra_clearance = 1.0 # Get tool data. # TODO: make this attribute update itself on DB tools replacement. #self.tools_data: list = deepcopy(self.controller.database_tools.tools) self.tools_data = [] self.action_callbacks = [] # Setup tool tracking. self.tools = {} self.tool_loaders = {} self.current_tool: str = None # Register post-init methods. if controller: self.controller.post_init_callbacks.append(self.load_tools) # __init__ END. @property def feedrate(self): """Main feedrate for XYZ moves Default feedrate in mm/min (F parameter for G1) used for XYZ moves. This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate"). Equivalences: 60000 mm/min = 1000 * 60 mm/min = 1000 mm/s = 1 m/s """ return self.gcode.feedrate #### Action handlers to convert actions into GCODE #### comment_action_name = 'COMMENT' def comment_action_parser(self, action, i): comment_command = [self.gcode.comment(message=action["args"]["text"])] self.extend_commands(comment_command) logging.info(f"Processed {self.comment_action_name} action with index {i} and text: '{action['args']['text']}'") return comment_command #### Utility functions for action handling #### def add_action_handler(self, name: str, function: Callable): """Add a callback function responsible for generating precalculated GCODE for a certain protocol action. This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory. The "base_actions" plugin can be used as a reference. Args: name (str): Name for the action handler, this must match the value of the "CMD" key in the action. function (Callable): Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively. Raises: Exception: Raises an exception when a handler has already been registered for a certain CMD. """ if self.action_switch_case.get(name, None): msg = f"Attempted to overwrite action handler '{name}'." logging.error(msg) raise CommanderError(msg) else: logging.info(f"Registered new action handler for '{name}' actions.") self.action_switch_case[name] = function action_callbacks: list def register_action_callback(self, commands: list, callback_function: Callable): """Callback functions to be run by 'parseAction' when the an an action is parsed. This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below. Args: commands (list): a list of strings, each an action command. callback (Callable): a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument. """ self.action_callbacks.append({ "commands": commands, "callback": callback_function }) #### Utility functions for loading or updating pipettin's data objects #### tool_loaders: dict """Dictionary of tool-loading functions: with keys matching names of tool-type, and callbacks as values.""" def register_tool_loader(self, tool_type: str, loader_function): """Register a 'loader' function for a particular tool 'type', which will be called to load all of its kind. There can be only one loader for each tool type, and each should be registered by a particular plugin. For example, see 'load_pipette' defined in the 'pipettes.py' plugin. """ # Raise an error if the tool_type already has a registered loader function. if tool_type in self.tool_loaders: msg = f"Tool type '{tool_type}' already has a handler function: {self.tool_loaders[tool_type]}" logging.error(msg) raise DataError(msg) # Register the function. self.tool_loaders[tool_type] = loader_function def load_tools(self, tools_list: list = None): """Calls 'load_tool' on each tool in the list. If empty, it defaults to all tools in the database. """ # Default to all tools from the DB. if tools_list is None: tools_list: list = deepcopy(self.controller.database_tools.tools) # Load all tools in the list. for tool_data in tools_list: if not tool_data.get("enabled", True): # TODO: Remove the default above once all tools have the property. logging.warning(f"Tool '{tool_data['name']}' is disabled, and will be skipped.") else: self.load_tool(tool_data) def load_tool(self, tool_data): """Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data. The initialized tool object is finally registered in the tools dict by calling 'register_tool'. If no loader is found, then an error is logged. """ # Parse some basic info. tool_type = tool_data["type"] tool_name = tool_data["name"] # Check if enabled. if not tool_data.get("enabled", True): raise DataError(f"Won't load disabled tool '{tool_name}'. Set the 'enable' property in its data first.") # Get the loader function for this tool type. loader_function = self.tool_loaders.get(tool_type, None) # Load and register the tool. if loader_function is None: # Log a warning if no tool loader was found for this tool type. logging.error(f"There is no loader function for tool '{tool_name}' of type '{tool_type}'.") else: # Load the tool. logging.info(f"Loading tool '{tool_name}' of type '{tool_type}'.") tool = loader_function(tool_data) # Register the tool. logging.info(f"Registering tool '{tool_name}' of type '{tool_type}'.") self.register_tool(tool) # NOTE: TOOL STUFF UNDER CONSTRUCTION. tools: dict def register_tool(self, tool: Tool): """Add a tool object to the tools dictionary. A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets). It also must have a few important methods called form this class (e.g. home). """ tool_name = tool.name logging.info(f"Registering tool with name '{tool_name}'.") if tool_name in list(self.tools): msg = f"The tool '{tool_name}' has already been registered." logging.error(msg) raise DataError(msg) self.tools[tool_name] = tool # TODO: Add "tool" stuff and figure out how to handle re-initializations. def initialize_objects(self, protocol:dict=None, workspace:dict=None, platformsInWorkspace:dict=None, clear_unspecified=True): """Set or reset protocol, workspace, platforms, and clearance properties. This can be useful when switching protocols or streaming actions, and still preserve state tracking. This clears the unspecified objects when `clear_unspecified=True`. See 'update_objects' for a non-clearing update. """ # Define protocol property. if protocol is not None: self.protocol = protocol logging.info(f"New protocol set to: {protocol['name']}") logging.debug(f"New protocol definition:\n{pformat(protocol)}") elif clear_unspecified: self.protocol = dict() logging.warning("Warning, protocol property now empty.") # Set workspace. if workspace is not None: self.workspace = workspace logging.info(f"New workspace set to: '{workspace['name']}'") elif clear_unspecified: self.workspace = {} logging.warning("Warning, workspace property now empty.") # Get platform definitions of platform items in wokspace. if platformsInWorkspace is not None: self.platformsInWorkspace = platformsInWorkspace logging.info(f"New platformsInWorkspace set to {[i['name'] for i in platformsInWorkspace]}") elif clear_unspecified: self.platformsInWorkspace = list() logging.warning("Warning, platformsInWorkspace property now empty.") elif workspace is not None: # Get the platforms for the provided workspace, if no platforms were provided. logging.info(f"platformsInWorkspace not provided. Getting it from the provided workspace '{workspace['name']}'.") self.platformsInWorkspace = self.controller.database_tools.getPlatformsInWorkspace(workspace) logging.info(f"Available items: {[i['name'] for i in self.platformsInWorkspace]}") # Clearance and probing parameters. # NOTE: Using @propery decorator below to defer getClearance() calculation until a workspace is available. # This must be run because otherwise the getter method can fail with "AttributeError". if self.platformsInWorkspace: # NOTE: getClearance requires "self.platformsInWorkspace". self.clearance = self.getClearance() elif clear_unspecified: self.clearance = None # Consistency check. if self.protocol and self.workspace: if self.workspace["name"] != self.protocol["workspace"]: msg = f"The protocol's workspace ({self.workspace['name']}) " msg += f"does not match the specified workspace ({self.protocol['workspace']})." logging.warning(msg) def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None): """Update the specified objects, not clearing the rest. If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead. Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects"). """ logging.info("Updating objects.") self.initialize_objects( protocol=protocol, workspace=workspace, platformsInWorkspace=platforms_in_workspace, # Do not clear the unspecified objects. clear_unspecified=False) def update_objects_by_workspace(self, workspace_name:str, force: bool = True): """Override the workspace and current platforms objects of the builder If the DB needs to be updated, consider using "apply_settings" before. New objects are fetched from the DB by the workspace's name. Args: workspace_name (str): Name of the workspace. force (bool): Wether to force an update if the name of the current and new workspaces are equal. Defaults to True. """ logging.info(f"Updating builder objects to match a new workspace: '{workspace_name}'") if self.workspace.get("name", None) == workspace_name: if force: logging.warning("Forced update. The previous workspace has the same name, but may hold a different state.") else: logging.warning("The previous workspace has the same name, update skipped.") return # Get workspace data. ws = self.controller.database_tools.getWorkspaceByName(workspace_name=workspace_name) # Get platforms. pl = self.controller.database_tools.getPlatformsInWorkspace(workspace=ws) # Update builder objects and clearance (not clearing the protocol). self.update_objects(workspace=ws, platforms_in_workspace=pl) # STATE ROLLBACK SYSTEM #### state_attributes = [ "workspace", "protocol", "commands", "current_action", "previous_action", "current_tool" ] """List of attributes to consider as part of the builder's state.""" def checkpoint(self): """Creates a snapshot (checkpoint) of the current state of relevant attributes. Saves the state of workspaces, protocol, current_action, and previous_action. """ try: self._checkpoint = {n: deepcopy(getattr(self, n)) for n in self.state_attributes} logging.info("Checkpoint created for the current state.") except AttributeError as e: logging.error(f"Failed to create checkpoint: {e}") raise RuntimeError("Could not create checkpoint due to missing attributes.") from e def commit(self): """Commits the current state by clearing the checkpoint. This method should be called after successfully processing an action. """ self._checkpoint = None # Clear the checkpoint since the changes are confirmed logging.info("Changes committed, checkpoint cleared.") def rollback(self): """Rolls back to the previous state using the saved checkpoint. Restores workspaces, protocol, current_action, and previous_action from the checkpoint. Raises an error if no checkpoint is available. """ if not hasattr(self, "_checkpoint") or not self._checkpoint: logging.error("No checkpoint available to roll back to.") raise RuntimeError("No checkpoint available for rollback.") try: for attr_name in self.state_attributes: setattr(self, attr_name, deepcopy(self._checkpoint[attr_name])) logging.warning("Rolled back to the previous checkpoint state.") except Exception as e: logging.error(f"Error during rollback: {e}") raise RuntimeError(f"Failed to rollback due to an error: {e}") from e #### Properties #### # See: https://www.programiz.com/python-programming/property # https://stackoverflow.com/a/51342825/11524079 _clearance: float = None @property def clearance(self): """Current clearance height.""" if self._clearance: logging.debug("Getting clearance value...") val = self._clearance else: logging.debug("Calculating clearance value...") val = self.getClearance() return val # Setter method for "self.clearance". @clearance.setter def clearance(self, value): if self.verbose: logging.debug(f"Setting clearance value='{value}'...") self._clearance = value #### Convenienve math functions #### @staticmethod def sign(x): """Return the sign of a number (as 1 or -1).""" return copysign(1, x) #### Utility methods for GCODE generation #### @staticmethod def itemCenter(item, platform): """Calculate the XY center and the active-height Z of an item. TODO: Move this method to data-tools. """ x = item["position"]["x"] + platform["width"]/2 y = item["position"]["y"] + platform["length"]/2 z = platform["activeHeight"] return x, y, z def getItemXYZ(self, item_name: str): """Get the XYZ coordinates of an item. TODO: Move this method to data-tools. """ # Get platform and platform item. item = self.controller.database_tools.getWorkspaceItemByName( workspace=self.workspace, item_name=item_name) # Get platform definition from the name in the platform item. platform = self.controller.database_tools.getPlatformByName( platformsInWorkspace=self.platformsInWorkspace, platform_item=item) # Get coordinates x, y, z = self.itemCenter(item, platform) return x, y, z @staticmethod def getContentXY(content: dict, platform_item: dict, platform: dict): """ Calculate XY of content center. Defaults to using the col/row pair or, if unavailable, pre-existing x/y coordinates in the content's position. """ # Get the content's position. position: dict = content.get("position", None) # Get the platform's slots. slots: list = platform.get("slots", None) # If no position was received, try to recreate it by the content's index. if position is None: position = {} index = content["index"] # Guess by a platforms grid if present. ncols = platform.get("wellsColumns", None) nrows = platform.get("wellsRows", None) if (ncols is not None) and (nrows is not None): # Calculate the row and column. position["row"], position["col"] = datautils.get_colrow(index, ncols, nrows) # Check for consistency. if nrows < index // ncols: msg = f"Error: the platform '{platform.get('name', None)}'" msg += f" does not have enough rows to get a content with index {index}." msg += f" platform={platform} content={content}" raise DataError(msg) # If not, try to get it from the slot. elif slots is not None: slot = slots[index-1] position = slot["position"] else: msg = "Error: the content did not specify an position, and it could not be inferred " raise DataError(msg) # Calculate X of content center if "col" in position: _x = platform_item["position"]["x"] _x += platform['firstWellCenterX'] _x += (position["col"] - 1) * platform['wellSeparationX'] else: _x = position["x"] # Calculate Y of content center if "row" in position: _y = platform_item["position"]["y"] _y += platform['firstWellCenterY'] _y += (position["row"] - 1) * platform['wellSeparationY'] else: _x = position["y"] return _x, _y def getContentZ(self, content=None, platform_item=None, platform=None): """ Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip). Defaults to using the Z from the content or, if unavailable, information from the platform. Formula: z = `deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z` """ # Default to using the Z defined in the content. if content is not None: # Skip everything if the content provides a Z coordinate. if "position" in content: if "z" in content["position"]: return content["position"]["z"] # Use the platform item to get the platform if undefined. if (platform is None) and (platform_item is not None): platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item) # Get the default Z from the platform. if platform is not None: # Get container link info. container_name = content["container"] container_link = next(c for c in platform["containers"] if c["container"] == container_name) container = self.controller.database_tools.getContainerByName(container_name) # Workspace Z, the starting point. workspace_z = self.workspace.get("z", 0) # TODO: This is not in the schema, perhaps it should be. # Item Z, position relative to workspace. item_z = platform_item["position"].get("z", 0) # Z-coordinate of the bottom of the platform (i.e. relative to the reference tool, with offsets [0,0,0]). active_z_platform = platform["activeHeight"] # NOTE: Prevously named "defaultBottomPosition" or "defaultLoadBottom". # Subtract the container's offset to the platform. offset_z = -container_link["containerOffsetZ"] # Add the container's active height. active_z_container = container["activeHeight"] # NOTE: In the case of tips, this is equal to their total length. # Add it up. z = workspace_z + item_z + active_z_platform + offset_z + active_z_container logging.debug(f"Parsed content height {z} mm from: workspace_z={workspace_z} item_z={item_z} active_z_platform={active_z_platform} offset_z={offset_z} active_z_container={active_z_container}") # Done, phew! return z # Raise an error if the inut did not match any condition above. raise DataError("getContentZ: Failed to calculate Z. Information is missing.") def getContentCoords(self, action, allow_next=False, pop_from_item=False, pop_from_db=False): """ Get XYZ coordinates for the action's platform item (e.g. a "tube"). Also apply tool offsets, if any. """ # Get platform and platform item. item_name = action["args"]["item"] platform_item = self.controller.database_tools.getWorkspaceItemByName( workspace=self.workspace, item_name=item_name) # Get platform definition from the name in the platform item. platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item) # Get the content. content = action["args"].get("content", None) selector = action["args"].get("selector", None) if content: # Use the provided content. pass elif selector: # Get content from the platform item using the selector. # TODO: Consider getting and using "content_type" as a filter here. content = self.controller.database_tools.getNextContent( platform_item=platform_item, selector=selector, # Pass optional "pop" arguments. pop_from_item=pop_from_item, pop_from_db=pop_from_db ) elif allow_next: # Get any next content from the platform item. # TODO: Review this and consider deduplicating with the clause above. content = self.controller.database_tools.getNextContent( platform_item=platform_item, # Pass optional "pop" arguments. pop_from_item=pop_from_item, pop_from_db=pop_from_db ) else: msg = f"Can't obtain content from action. Insufficient selector information:\n{pprint.pformat(action)}" logging.error(msg) raise DataError(msg) # Calculate XY of tube center. x, y = self.getContentXY(content, platform_item, platform) # Calculate Z for pipetting. z = self.getContentZ(content, platform_item, platform) return content, x, y, z def getTubeAndCoords(self, action): """Get XYZ coordinates of a tube and it's parameters. TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else. It can either get the info from a reference to a platform item and a content within, (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR). Args: action (_type_): _description_ Returns: _type_: _description_ Example action 'args' for coords-based position: { "position": {'x': ,'y': , 'z':}, "content": {"volume": 10} } Example tube content: { 'index': 2, 'maxVolume': 1500, 'name': 'buffer', 'position': {'col': 2, 'row': 1}, 'tags': ['mixcomp', 'buffer'], 'type': 'tube', 'volume': 7.2 } """ if "position" in action["args"]: # Get coords from a coords-type tube object. coords = action["args"]["position"] x, y, z = coords.get("x", None), coords.get("y", None), coords.get("z", None) # Get information about the tube. content = action["args"]["content"] # A tube must contain "volume". Ensure that the required data is there. # NOTE: Removed because an exception would be raised in 'macroGoToAndMix' anyways, # which is the only function actually using volume information. # content["volume"] = float(action["args"]["content"]["volume"]) else: # Get XYZ coordinates for the action's "platform"-type tube, # applying current tool offsets if any. content, x, y, z = self.getContentCoords(action) return content, x, y, z #### Tool-related GCODE methods #### def addToolOffsets(self, x=None, y=None, z=None, tool_name:str=None): """Add offsets of the current tool. Args: x (float, optional): Value of the x coordinate to offset. Defaults to None. y (float, optional): Value of the y coordinate to offset. Defaults to None. z (float, optional): Value of the z coordinate to offset. Defaults to None. tool_name (str, optional): Name of a tool. Defaults to None. Returns: tuple: An updated coordiante set, with one element per each non-None coordinate argument. """ if tool_name is None: # Use the current tool if none was provided. if self.current_tool: logging.debug(f"Adding offsets with self.current_tool={self.current_tool}") tool_name = self.current_tool # If there is not current tool, then return the input. else: logging.debug("No offsets added, self.current_tool was 'None'.") return x, y, z # Defaults. coords = [x, y, z] if x is not None: coords[0] += self.tools[tool_name].x_offset if y is not None: coords[1] += self.tools[tool_name].y_offset if z is not None: coords[2] += self.tools[tool_name].z_offset return tuple(coords) #### Convenience functions for clearance coordinates #### def calc_clearance(self, platforms: list): """Calculate a conservative clearance for a set of platforms, considering all their possible containers.""" if not platforms: raise ValueError("No platforms provided for calculation.") max_height = 0.0 for platform in platforms: # Get platform height and active height. height = platform["height"] active_h = platform["activeHeight"] # Take containers into account. max_container_height = 0 for link in platform.get("containers", []): offset = link["containerOffsetZ"] container = self.controller.database_tools.getContainerByName(link["container"]) length = container["length"] max_container_height = max(max_container_height, active_h + length - offset ) # Update the max height. max_height = max(max_height, height, max_container_height) return max_height def getClearance(self, item_names:list=None): """Get the minimum clearance level for the Z axis from the platformsInWorkspace definition""" if not self.platformsInWorkspace: logging.error("There are no platforms in the workspace, or no workspace has been set up. Returning None.") clearance = None else: if item_names: logging.warning(f"Calculating clearance only between items: {item_names}") platforms_to_clear = [i for i in self.platformsInWorkspace if i["name"] in item_names] clearance = self.calc_clearance(platforms_to_clear) else: clearance = self.calc_clearance(self.platformsInWorkspace) logging.info(f"Calculated platform clearance value: {clearance}") return clearance def getSafeHeight(self, item_names:list=None): """ Calculate the Z coordinate above all contents, considering if a tip is placed on the current tool. Returns "None" if the clearance has not been set. """ # Calculate safe hieght. z = 0 # Add minimum clearance to Z coordinate. z += self.extra_clearance # Get clearance height from platforms. platform_clearance = self.getClearance(item_names=item_names) if platform_clearance: z += platform_clearance # Add clearance from tools. if self.current_tool: logging.info(f"Adding tool offset from tool '{self.current_tool}' to clearance.") # Unpacking is needed. z = self.addToolOffsets(z=z)[2] # Done! logging.info(f"Calculated safe height: {z}") return z def getSafeY(self): """ There is a limit on the Y axis movement. The tool-holder can crash into the parked tools. Here we use the parking post coordinates to establish a safe limit for the Y direction. NOTE: The return value can be "None" when there are no tools at all. """ # Default. y_clearance = None # Get Y position of the empty tool post of the currently attached tool, if any. # In theory this one will be the farthest from the tool-head, but it needs to be # accounted for even so (e.g. maybe it has really "long" pins on the tool-post). if self.current_tool: y_clearance = self.tools[self.current_tool].safe_y_post # Get safe Y position of the parked tools. safe_y_parked = [tool.safe_y_parked for tool in self.tools.values() if tool.name != self.current_tool] # Use the closest safe distance. if safe_y_parked and y_clearance: # NOTE: Corrected for axis inversion. y_clearance = max(y_clearance, *safe_y_parked) elif safe_y_parked: # NOTE: Corrected for axis inversion. y_clearance = max(safe_y_parked) # Adjust tool distance protruding from the toolhead, if any. if self.current_tool: # NOTE: Corrected for axis inversion. y_clearance += self.tools[self.current_tool].safe_y return y_clearance #### Useful GCODE generating macros for action handlers #### def macroMoveSafeHeight(self, item_names=None, extend=True): """Fast move to safe height in Z axis, considering loaded tip's length""" # Get the clearance height considering all platforms. z = self.getSafeHeight(item_names=item_names) # Build the move command. move_commands = self.gcode.gcodeClearance( z=z, f=self.feedrate, add_comment="Moving to the clearance height." ) if extend: # Extend the commands list. self.extend_commands(move_commands) return move_commands def macroGoTo(self, action, i, extend=True): # Move over the target content's XY. commands = self.macroGoToTubeXY(action, i, extend=extend) # Move to the bottom position of the conntent. commands += self.macroGoToTubeZ(action, i, extend=extend) return commands def macroGoToTubeXY(self, action, i, extend=True) -> list: """Get XY coordinates for the action's tube, applying current tool offsets if any.""" logging.info(f"Getting XY coordinates for a tube in action {i}.") _, x, y, _ = self.getTubeAndCoords(action) # Move over the content. go_over_tip_commands = self.macroGoToXYZ(x=x, y=y, extend=extend) return go_over_tip_commands def macroGoToTubeZ(self, action, i, extend=True) -> list: """Get Z coordinate for the action's tube, applying current tool offsets if any. TODO: be smarter about pipetting height, using current tube volume to increase this. """ logging.info(f"Getting Z coordinates for a tube in action {i}.") _, _, _, z = self.getTubeAndCoords(action) # Go to the bottom of the tube. go_into_tip_commands = self.macroGoToXYZ(z=z, extend=extend) return go_into_tip_commands xyz_move_event: str = "xyz_move" """Name of the event raised by 'macroGoToXYZ' when called.""" def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment="") -> list: """Move to an absolute XYZ position, adding tool-offsets by default. TODO: Send an 'xyz_move' event to any relevant event listeners. This is meant to implement obstacle avoidance or advanced movepath planning. """ # Default feedrate. if f is None: f = self.feedrate # Add the current tool's offsets. if add_offset: # Unpacking is needed. x, y, z = self.addToolOffsets(x=x, y=y, z=z) # Go to the requested position. commands = self.gcode.G1(x=x, y=y, z=z, f=f, absolute=True, add_comment=add_comment) # Let everyone know about the move. # NOTE: Other parts of the program may alter the commands list. # This is meant to implement obstacle avoidance or advanced # movepath planning with plugins. # TODO: Rethink the event system for non-async stuff. # self.controller.trigger_event_callback( # event_name=self.xyz_move_event, # parameters={"x": x, "y": y, "z": z, "f": f, "add_offset": add_offset}, # commands=commands # ) # Add the commands to the current action and internal list. if extend: self.extend_commands(commands) return commands def toolChangeMacro(self, new_tool:str, extend=True, operation_time=30.0, action:dict=None): logging.info(f"Changing tools from '{self.current_tool}' to '{new_tool}'") # Set the default action to the current action. if action is None: logging.info("No action provided, defaulting to the current action.") action = self.current_action # Park current tool, if any. park_commands = [] if self.current_tool is None: # Nothing to park. msg = "Nothing to park, the current tool is 'None'." logging.info(msg) park_commands += [self.gcode.comment(msg)] elif self.current_tool == new_tool: # Nothing to do. msg = "Parking skipped. Incoming and outgoing tool are equal." logging.info(msg) park_commands += [self.gcode.comment(msg)] else: # Generate GCODE for the parking sequence. tool = self.tools[self.current_tool] park_commands += tool.park(self.current_tool, new_tool) # Extend the timeout. if action is not None: logging.debug("Adding parking operation time to action.") self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) # # Update the current tool (intermediary state). # self.current_tool = None # Pick-up the next tool pickup_commands = [] if new_tool is None: # There is nothing to do if the toolchange was just for parking. msg = "Nothing to pickup, the new tool is 'None'." logging.info(msg) park_commands += [self.gcode.comment(msg)] elif self.current_tool == new_tool: # Nothing to do. msg = "Pickup skipped. Incoming and outgoing tool are equal." logging.info(msg) park_commands += [self.gcode.comment(msg)] else: # Generate GCODE for the change. tool = self.tools[new_tool] pickup_commands += tool.pickup(self.current_tool, new_tool) # Extend the timeout. if action is not None: logging.debug("Adding pickup operation time to action.") self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) # Update the current tool. self.current_tool = new_tool # Add pickup commands to the list. tc_commands = park_commands + pickup_commands if extend and tc_commands: logging.debug("Extending GCODE commands in action.") self.extend_commands(commands=tc_commands, action=action) # Return return tc_commands def check_toolchange(self, action: dict, action_command_id: str): """Check if the action requires a toolchange and apply it.""" # Check for a toolchange. toolchange = False # Check for arguments in action. if "args" in list(action): # Either "str" (tool name), None (do nothing) or False/"" (drop tool). new_tool = action["args"].get("tool", None) logging.info(f"Checking for toolchange on new_tool={new_tool} and action: {action}") # Handle cases. if action["cmd"] in self.tc_check_skip_actions: # Do nothing if this action does not demand a tool-change check. pass if new_tool is False or new_tool == "": # NOTE: Enter this block withother false-like values. # Drop current tool. new_tool = None if self.current_tool: # Park the current tool if any. toolchange = True elif new_tool: # If the tool ID is different from the current one, # a tool-change is needed before the action takes place. if new_tool != self.current_tool: # Flag the tool-change, so its GCODE is added to the action. toolchange = True else: # Do nothing if None. assert new_tool is None # Append the tool-change command to the action commands if a toolchange is needed if toolchange: self.toolChangeMacro(new_tool=new_tool) # If no toolchange was triggered, check if the current action is a HOME action. elif action_command_id == "HOME": # Park the loaded tool if any, before homing. # TODO: Replace hardcoded action ID with the one in base_actions.py. self.toolChangeMacro(new_tool=None) return toolchange #### HELPERS for mid-level action parsers #### def validate_current_action_workspace(self): """ Validates that the workspace specified in the current action matches the active workspace. This method checks the `workspace` name from the current action and compares it with the active workspace name. If the two workspaces do not match, a `ProtocolError` is raised. If the action does not specify a workspace, it will default to the current workspace and log a warning. Raises: ProtocolError: If the workspace specified in the current action does not match the active workspace. Notes: - If both the action and the current workspace specify a name, they are compared. - If the action does not have a workspace name, the current workspace name is assigned to the action and a warning is logged. """ current_workspace_name = self.workspace.get("name", None) action_workspace_name = self.current_action.get("workspace", None) if action_workspace_name and current_workspace_name: # If set, validate the workspace name from the action with the current workspace. if action_workspace_name != current_workspace_name: msg = f"Action with index {self.current_action.get('index', None)} and workspace '{action_workspace_name}'" msg += f" does not match the currently active workspace '{current_workspace_name}'." logging.error(msg) raise ProtocolError(msg) elif current_workspace_name: # If the action did not specify a workspace name, then add the name of the current workspace to it. msg = f"Action with index {self.current_action.get('index', None)} has no workspace info." msg += f" Defaulting to the current workspace: '{current_workspace_name}'" logging.warning(msg) self.current_action["workspace"] = current_workspace_name def get_current_action_cmd(self): """ Retrieves the command ID from the current action. This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the `cmd` field of the current action. If the action is a JSON-based action (i.e., the `cmd` field is "JSON"), the method updates the command ID to the one found within the action's arguments and logs the update. Returns: str: The command ID of the current action. Notes: - If the command is a JSON action (where `cmd` is "JSON"), the method looks in the `args` dictionary of the current action to retrieve the actual command ID. - Logs an informational message if the command ID is updated from a JSON action. """ # Get the action command ID. action_command_id = self.current_action['cmd'] # Check if it is a JSON action, and update the command id with the one in its arguments. if action_command_id == "JSON": action_command_id = self.current_action['args']['cmd'] logging.info(f"Updated action command from JSON action to: {action_command_id}") return action_command_id #### Mid-level action parsers #### def parseAction(self, action, action_index=None): """This function interprets a protocol action and produces the corresponding GCODE Note that GCODE generation can depend on the current state. Args: action (dict): Python dictionary defining the action. action_index (integer): Action index number. Raises: Exception: Any exception is re-raised. Context info and traceback are printed just before. Returns: list: List of GCODE commands associated to the action. """ # Python switch case implementations https://data-flair.training/blogs/python-switch-case/ # All action interpreting functions will take two arguments # action: the action object # i: the action index for the current action # task: the object of class "TaskEnvironment" logging.info(f"Parsing action with index {action_index} and command '{action['cmd']}'.") # Save checkpoint. self.checkpoint() # Default value for action index. if action_index is None: action_index = len(self.getActions()) logging.warning(f"Replaced null index with index {action_index}.") # Save previous action. self.previous_action = self.current_action # Assign current action to self slot. self.current_action = action # Create a list for the actions GCODE commands. self.current_action["GCODE"] = [] # Add defaults to required properties. self.current_action.setdefault("args", {}) # Validate workspace name. self.validate_current_action_workspace() # Parse the current action into GCODE, checking for tool-changes first. try: # Get the action command ID. action_command_id = self.get_current_action_cmd() # Get the action's GCODE function. action_function = self.action_switch_case[action_command_id] # Note new action start as comment in the GCODE. comments = [self.gcode.comment(f"Building action {action_index}, with command: {action_command_id}")] self.extend_commands(comments) # Check and produce GCODE for a toolchange, if needed. self.check_toolchange(self.current_action, action_command_id) # Produce the GCODE for the action. if self.verbose: logging.debug(f"Processing command '{action_command_id}' with index '{action_index}'.") # Use the action function to generate its GCODE. # NOTE: The "action_output"s do not contain any gcode, only mildly informative messages. # GCODE is saved to the current action and to the "commands" property. action_function(self.current_action, action_index) # Make a copy of the GCODE. final_action = deepcopy(self.current_action) # Run action callbacks. # This allows other parts of the program to "do stuff" # when a certain kind of command has been parsed. # TODO: Replace it with the Controller's events system. self.run_action_callbacks(action_command_id, final_action) # Log success. logging.info(f"Successfully parsed action with index {action_index} and command '{action_command_id}'.") except Exception as err: msg = f"Parser error at action number {action_index} with content '{self.current_action}' message: {err}" logging.error(msg) print(msg) # Rollback changes. self.rollback() raise ProtocolError(msg) from err else: # Note action parsed. msg = f"Done processing action with index={action_index} and command={action_command_id}" logging.debug(msg) # Commit changes. self.commit() return final_action # Returns GCODE and current action. def run_action_callbacks(self, action_command_id: str, action: dict): """Run any callback functions registered to run after an action is processed. TODO: consider rewriting this in reverse, by "emitting" events like Klipper does. TODO: Replace it with the Controller's events system. """ for action_callback in self.action_callbacks: # Get the action commands that the callback is meant to handle and the callback. callback_cmds = action_callback["commands"] callback_function = action_callback["callback"] # Execute the callback if there is a match. if action_command_id in callback_cmds: callback_function(self, action) # Check that the callback has a chance of doing something, just in case. for callback_cmd in callback_cmds: if not self.action_switch_case.get(callback_cmd, None): msg = f"The callback command '{callback_cmd}' was not found in the " msg += f"registered action handlers: {list(self.action_switch_case)}" logging.warning(msg) def addAction(self, action, parse=True, append=True): """ Adds a new action to the current protocol, optionally parsing and appending it. This method inserts an action into the protocol's action list and, if specified, parses it to generate GCODE. Args: action (dict): The action to add to the protocol. It should contain necessary details such as the command (`cmd`) and any associated arguments (`args`). parse (bool, optional): Whether to parse the action after adding it. Defaults to True. append (bool, optional): Whether to append the action to the protocol's action list. If False, the action is not appended. Defaults to True. Returns: tuple: A tuple containing: - action_commands (None or parsed action commands): If `parse` is True, returns the parsed action commands. - action (dict): The original or parsed action, depending on whether parsing was requested. Notes: - The action is appended to the protocol's "actions" list if `append` is True. - The `parseAction` method is invoked if `parse` is set to True to process the action. - If the protocol doesn't contain an "actions" key, this method initializes it. """ logging.debug("Adding action.") # Default action index. action_index = None # Add the new action to the protocol. if append: logging.debug("Appending action to protocol list.") # Check that the protocol object has an actions key or create it. actions = self.protocol.setdefault("actions", []) # Append the action (note: this propagates to the action in the self.protocol dictionary). actions.append(action) action_index=len(actions)-1 action_commands, action = None, None if parse: logging.debug("Parsing action.") action = self.parseAction(action=action, action_index=action_index) # TODO: Remove "action_commands", it does nothing. return action_commands, action def parseProtocol(self, actions:list=None): """This function takes parses a list of actions into gcode, either provided or from the class' property.""" if actions is None: actions = self.protocol["actions"] logging.warning(f"No actions passed. Using {len(actions)} actions from the active protocol.") logging.info(f"Parsing {len(actions)} protocol actions.") # NOTE: "parseAction" returns a tuple with the commands list and the action, hence the "[0]" below. # NOTE: Cada action parser guarda el gcode implicitamente en "self.protocol" y en el GCODE del action. final_actions = [] for i, action in enumerate(actions): final_actions.append(self.parseAction(action, i)) logging.info(f"Parsed {len(actions)} actions.") return deepcopy(self.protocol) #### GCODE command helpers #### def extend_commands(self, commands: list, action=None): """Append GCODE to the commands list, and to the active protocol action (if any).""" # Extend GCODE commands list. self.commands.extend(commands) # Extend the GCODE list in the action. if action is None: # Override the action argument with the current action. action = self.current_action # Extend the GCODE in the action. action.setdefault("GCODE", []) action["GCODE"].extend(commands) #### Output #### def save_gcode(self, path="examples/test.gcode"): """Save the GCODE to a file""" logging.debug(f"commanderTest.py message: Saving GCODE to file: {path}") with open(path, "w", encoding="utf-8") as gcode_file: gcode_file.writelines(self.getGcode()) def getGcode(self): return "\n".join(self.commands) def getActions(self): return self.protocol.get("actions", [])This class generates GCODE from Pipettin protocol "actions".
It containse and consumes all information for the pipetting task: protocol, workspace, platforms and tools.
It holds and tracks machine status or configuration during prococol 'slicing' to GCODE.
Also stores all GCODE commands after parsing protocols.
Some actions might depend on previous actions or action history. So these objects are for tracking the state of the machine during the translation of actions to GCODE, or during gcode streaming to the machine controllers.
A class to contain all information for the pipetting task: protocol, workspace, platforms and tools. :param protocol: A protocol dictionary (JSON object) from the GUI/MongoDB, containing protocol 'actions'. :param workspace: A workspace dictionary (JSON object) from the GUI/MongoDB, containing the platform instances in the workspace. :param platformsInWorkspace: A platforms dictionary (JSON object) from the GUI/MongoDB, containing the definitions of platforms in the workspace. :param pipettes: A dictionary with pipette tool definitions. :param config: Configuration options loaded from the 'config.yml' YAML file.
Class variables
var action_callbacks : listvar comment_action_namevar current_action : dictvar platformsInWorkspace : listvar protocol : dictvar state_attributes-
List of attributes to consider as part of the builder's state.
var tool_loaders : dict-
Dictionary of tool-loading functions: with keys matching names of tool-type, and callbacks as values.
var tools : dictvar tools_data : listvar workspace : dictvar xyz_move_event : str-
Name of the event raised by 'macroGoToXYZ' when called.
Static methods
def getContentXY(content: dict, platform_item: dict, platform: dict)-
Expand source code
@staticmethod def getContentXY(content: dict, platform_item: dict, platform: dict): """ Calculate XY of content center. Defaults to using the col/row pair or, if unavailable, pre-existing x/y coordinates in the content's position. """ # Get the content's position. position: dict = content.get("position", None) # Get the platform's slots. slots: list = platform.get("slots", None) # If no position was received, try to recreate it by the content's index. if position is None: position = {} index = content["index"] # Guess by a platforms grid if present. ncols = platform.get("wellsColumns", None) nrows = platform.get("wellsRows", None) if (ncols is not None) and (nrows is not None): # Calculate the row and column. position["row"], position["col"] = datautils.get_colrow(index, ncols, nrows) # Check for consistency. if nrows < index // ncols: msg = f"Error: the platform '{platform.get('name', None)}'" msg += f" does not have enough rows to get a content with index {index}." msg += f" platform={platform} content={content}" raise DataError(msg) # If not, try to get it from the slot. elif slots is not None: slot = slots[index-1] position = slot["position"] else: msg = "Error: the content did not specify an position, and it could not be inferred " raise DataError(msg) # Calculate X of content center if "col" in position: _x = platform_item["position"]["x"] _x += platform['firstWellCenterX'] _x += (position["col"] - 1) * platform['wellSeparationX'] else: _x = position["x"] # Calculate Y of content center if "row" in position: _y = platform_item["position"]["y"] _y += platform['firstWellCenterY'] _y += (position["row"] - 1) * platform['wellSeparationY'] else: _x = position["y"] return _x, _yCalculate XY of content center. Defaults to using the col/row pair or, if unavailable, pre-existing x/y coordinates in the content's position.
def itemCenter(item, platform)-
Expand source code
@staticmethod def itemCenter(item, platform): """Calculate the XY center and the active-height Z of an item. TODO: Move this method to data-tools. """ x = item["position"]["x"] + platform["width"]/2 y = item["position"]["y"] + platform["length"]/2 z = platform["activeHeight"] return x, y, zCalculate the XY center and the active-height Z of an item. TODO: Move this method to data-tools.
def sign(x)-
Expand source code
@staticmethod def sign(x): """Return the sign of a number (as 1 or -1).""" return copysign(1, x)Return the sign of a number (as 1 or -1).
Instance variables
prop clearance-
Expand source code
@property def clearance(self): """Current clearance height.""" if self._clearance: logging.debug("Getting clearance value...") val = self._clearance else: logging.debug("Calculating clearance value...") val = self.getClearance() return valCurrent clearance height.
prop feedrate-
Expand source code
@property def feedrate(self): """Main feedrate for XYZ moves Default feedrate in mm/min (F parameter for G1) used for XYZ moves. This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate"). Equivalences: 60000 mm/min = 1000 * 60 mm/min = 1000 mm/s = 1 m/s """ return self.gcode.feedrateMain feedrate for XYZ moves Default feedrate in mm/min (F parameter for G1) used for XYZ moves. This is a property that retuns the value set in the GCODE primitives class (i.e. "self.gcode.feedrate").
Equivalences
60000 mm/min = 1000 * 60 mm/min = 1000 mm/s = 1 m/s
Methods
def addAction(self, action, parse=True, append=True)-
Expand source code
def addAction(self, action, parse=True, append=True): """ Adds a new action to the current protocol, optionally parsing and appending it. This method inserts an action into the protocol's action list and, if specified, parses it to generate GCODE. Args: action (dict): The action to add to the protocol. It should contain necessary details such as the command (`cmd`) and any associated arguments (`args`). parse (bool, optional): Whether to parse the action after adding it. Defaults to True. append (bool, optional): Whether to append the action to the protocol's action list. If False, the action is not appended. Defaults to True. Returns: tuple: A tuple containing: - action_commands (None or parsed action commands): If `parse` is True, returns the parsed action commands. - action (dict): The original or parsed action, depending on whether parsing was requested. Notes: - The action is appended to the protocol's "actions" list if `append` is True. - The `parseAction` method is invoked if `parse` is set to True to process the action. - If the protocol doesn't contain an "actions" key, this method initializes it. """ logging.debug("Adding action.") # Default action index. action_index = None # Add the new action to the protocol. if append: logging.debug("Appending action to protocol list.") # Check that the protocol object has an actions key or create it. actions = self.protocol.setdefault("actions", []) # Append the action (note: this propagates to the action in the self.protocol dictionary). actions.append(action) action_index=len(actions)-1 action_commands, action = None, None if parse: logging.debug("Parsing action.") action = self.parseAction(action=action, action_index=action_index) # TODO: Remove "action_commands", it does nothing. return action_commands, actionAdds a new action to the current protocol, optionally parsing and appending it.
This method inserts an action into the protocol's action list and, if specified, parses it to generate GCODE.
Args
action:dict- The action to add to the protocol. It should contain necessary details
such as the command (
cmd) and any associated arguments (args). parse:bool, optional- Whether to parse the action after adding it. Defaults to True.
append:bool, optional- Whether to append the action to the protocol's action list. If False, the action is not appended. Defaults to True.
Returns
tuple- A tuple containing:
- action_commands (None or parsed action commands): If
parseis True, returns the parsed action commands. - action (dict): The original or parsed action, depending on whether parsing was requested.
Notes
- The action is appended to the protocol's "actions" list if
appendis True. - The
parseActionmethod is invoked ifparseis set to True to process the action. - If the protocol doesn't contain an "actions" key, this method initializes it.
def addToolOffsets(self, x=None, y=None, z=None, tool_name: str = None)-
Expand source code
def addToolOffsets(self, x=None, y=None, z=None, tool_name:str=None): """Add offsets of the current tool. Args: x (float, optional): Value of the x coordinate to offset. Defaults to None. y (float, optional): Value of the y coordinate to offset. Defaults to None. z (float, optional): Value of the z coordinate to offset. Defaults to None. tool_name (str, optional): Name of a tool. Defaults to None. Returns: tuple: An updated coordiante set, with one element per each non-None coordinate argument. """ if tool_name is None: # Use the current tool if none was provided. if self.current_tool: logging.debug(f"Adding offsets with self.current_tool={self.current_tool}") tool_name = self.current_tool # If there is not current tool, then return the input. else: logging.debug("No offsets added, self.current_tool was 'None'.") return x, y, z # Defaults. coords = [x, y, z] if x is not None: coords[0] += self.tools[tool_name].x_offset if y is not None: coords[1] += self.tools[tool_name].y_offset if z is not None: coords[2] += self.tools[tool_name].z_offset return tuple(coords)Add offsets of the current tool.
Args
x:float, optional- Value of the x coordinate to offset. Defaults to None.
y:float, optional- Value of the y coordinate to offset. Defaults to None.
z:float, optional- Value of the z coordinate to offset. Defaults to None.
tool_name:str, optional- Name of a tool. Defaults to None.
Returns
tuple- An updated coordiante set, with one element per each non-None coordinate argument.
def add_action_handler(self, name: str, function: Callable)-
Expand source code
def add_action_handler(self, name: str, function: Callable): """Add a callback function responsible for generating precalculated GCODE for a certain protocol action. This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory. The "base_actions" plugin can be used as a reference. Args: name (str): Name for the action handler, this must match the value of the "CMD" key in the action. function (Callable): Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively. Raises: Exception: Raises an exception when a handler has already been registered for a certain CMD. """ if self.action_switch_case.get(name, None): msg = f"Attempted to overwrite action handler '{name}'." logging.error(msg) raise CommanderError(msg) else: logging.info(f"Registered new action handler for '{name}' actions.") self.action_switch_case[name] = functionAdd a callback function responsible for generating precalculated GCODE for a certain protocol action.
This is part of an extension system useful to write plugins. See example plugins in the "plugins" directory. The "base_actions" plugin can be used as a reference.
Args
name:str- Name for the action handler, this must match the value of the "CMD" key in the action.
function:Callable- Function called to handle the action. It must accept an 'action' and its 'index' as first and second arguments, respectively.
Raises
Exception- Raises an exception when a handler has already been registered for a certain CMD.
def calc_clearance(self, platforms: list)-
Expand source code
def calc_clearance(self, platforms: list): """Calculate a conservative clearance for a set of platforms, considering all their possible containers.""" if not platforms: raise ValueError("No platforms provided for calculation.") max_height = 0.0 for platform in platforms: # Get platform height and active height. height = platform["height"] active_h = platform["activeHeight"] # Take containers into account. max_container_height = 0 for link in platform.get("containers", []): offset = link["containerOffsetZ"] container = self.controller.database_tools.getContainerByName(link["container"]) length = container["length"] max_container_height = max(max_container_height, active_h + length - offset ) # Update the max height. max_height = max(max_height, height, max_container_height) return max_heightCalculate a conservative clearance for a set of platforms, considering all their possible containers.
def check_toolchange(self, action: dict, action_command_id: str)-
Expand source code
def check_toolchange(self, action: dict, action_command_id: str): """Check if the action requires a toolchange and apply it.""" # Check for a toolchange. toolchange = False # Check for arguments in action. if "args" in list(action): # Either "str" (tool name), None (do nothing) or False/"" (drop tool). new_tool = action["args"].get("tool", None) logging.info(f"Checking for toolchange on new_tool={new_tool} and action: {action}") # Handle cases. if action["cmd"] in self.tc_check_skip_actions: # Do nothing if this action does not demand a tool-change check. pass if new_tool is False or new_tool == "": # NOTE: Enter this block withother false-like values. # Drop current tool. new_tool = None if self.current_tool: # Park the current tool if any. toolchange = True elif new_tool: # If the tool ID is different from the current one, # a tool-change is needed before the action takes place. if new_tool != self.current_tool: # Flag the tool-change, so its GCODE is added to the action. toolchange = True else: # Do nothing if None. assert new_tool is None # Append the tool-change command to the action commands if a toolchange is needed if toolchange: self.toolChangeMacro(new_tool=new_tool) # If no toolchange was triggered, check if the current action is a HOME action. elif action_command_id == "HOME": # Park the loaded tool if any, before homing. # TODO: Replace hardcoded action ID with the one in base_actions.py. self.toolChangeMacro(new_tool=None) return toolchangeCheck if the action requires a toolchange and apply it.
def checkpoint(self)-
Expand source code
def checkpoint(self): """Creates a snapshot (checkpoint) of the current state of relevant attributes. Saves the state of workspaces, protocol, current_action, and previous_action. """ try: self._checkpoint = {n: deepcopy(getattr(self, n)) for n in self.state_attributes} logging.info("Checkpoint created for the current state.") except AttributeError as e: logging.error(f"Failed to create checkpoint: {e}") raise RuntimeError("Could not create checkpoint due to missing attributes.") from eCreates a snapshot (checkpoint) of the current state of relevant attributes.
Saves the state of workspaces, protocol, current_action, and previous_action.
def comment_action_parser(self, action, i)-
Expand source code
def comment_action_parser(self, action, i): comment_command = [self.gcode.comment(message=action["args"]["text"])] self.extend_commands(comment_command) logging.info(f"Processed {self.comment_action_name} action with index {i} and text: '{action['args']['text']}'") return comment_command def commit(self)-
Expand source code
def commit(self): """Commits the current state by clearing the checkpoint. This method should be called after successfully processing an action. """ self._checkpoint = None # Clear the checkpoint since the changes are confirmed logging.info("Changes committed, checkpoint cleared.")Commits the current state by clearing the checkpoint.
This method should be called after successfully processing an action.
def extend_commands(self, commands: list, action=None)-
Expand source code
def extend_commands(self, commands: list, action=None): """Append GCODE to the commands list, and to the active protocol action (if any).""" # Extend GCODE commands list. self.commands.extend(commands) # Extend the GCODE list in the action. if action is None: # Override the action argument with the current action. action = self.current_action # Extend the GCODE in the action. action.setdefault("GCODE", []) action["GCODE"].extend(commands)Append GCODE to the commands list, and to the active protocol action (if any).
def getActions(self)-
Expand source code
def getActions(self): return self.protocol.get("actions", []) def getClearance(self, item_names: list = None)-
Expand source code
def getClearance(self, item_names:list=None): """Get the minimum clearance level for the Z axis from the platformsInWorkspace definition""" if not self.platformsInWorkspace: logging.error("There are no platforms in the workspace, or no workspace has been set up. Returning None.") clearance = None else: if item_names: logging.warning(f"Calculating clearance only between items: {item_names}") platforms_to_clear = [i for i in self.platformsInWorkspace if i["name"] in item_names] clearance = self.calc_clearance(platforms_to_clear) else: clearance = self.calc_clearance(self.platformsInWorkspace) logging.info(f"Calculated platform clearance value: {clearance}") return clearanceGet the minimum clearance level for the Z axis from the platformsInWorkspace definition
def getContentCoords(self, action, allow_next=False, pop_from_item=False, pop_from_db=False)-
Expand source code
def getContentCoords(self, action, allow_next=False, pop_from_item=False, pop_from_db=False): """ Get XYZ coordinates for the action's platform item (e.g. a "tube"). Also apply tool offsets, if any. """ # Get platform and platform item. item_name = action["args"]["item"] platform_item = self.controller.database_tools.getWorkspaceItemByName( workspace=self.workspace, item_name=item_name) # Get platform definition from the name in the platform item. platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item) # Get the content. content = action["args"].get("content", None) selector = action["args"].get("selector", None) if content: # Use the provided content. pass elif selector: # Get content from the platform item using the selector. # TODO: Consider getting and using "content_type" as a filter here. content = self.controller.database_tools.getNextContent( platform_item=platform_item, selector=selector, # Pass optional "pop" arguments. pop_from_item=pop_from_item, pop_from_db=pop_from_db ) elif allow_next: # Get any next content from the platform item. # TODO: Review this and consider deduplicating with the clause above. content = self.controller.database_tools.getNextContent( platform_item=platform_item, # Pass optional "pop" arguments. pop_from_item=pop_from_item, pop_from_db=pop_from_db ) else: msg = f"Can't obtain content from action. Insufficient selector information:\n{pprint.pformat(action)}" logging.error(msg) raise DataError(msg) # Calculate XY of tube center. x, y = self.getContentXY(content, platform_item, platform) # Calculate Z for pipetting. z = self.getContentZ(content, platform_item, platform) return content, x, y, zGet XYZ coordinates for the action's platform item (e.g. a "tube").
Also apply tool offsets, if any.
def getContentZ(self, content=None, platform_item=None, platform=None)-
Expand source code
def getContentZ(self, content=None, platform_item=None, platform=None): """ Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip). Defaults to using the Z from the content or, if unavailable, information from the platform. Formula: z = `deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z` """ # Default to using the Z defined in the content. if content is not None: # Skip everything if the content provides a Z coordinate. if "position" in content: if "z" in content["position"]: return content["position"]["z"] # Use the platform item to get the platform if undefined. if (platform is None) and (platform_item is not None): platform = self.controller.database_tools.getPlatformByName(self.platformsInWorkspace, platform_item) # Get the default Z from the platform. if platform is not None: # Get container link info. container_name = content["container"] container_link = next(c for c in platform["containers"] if c["container"] == container_name) container = self.controller.database_tools.getContainerByName(container_name) # Workspace Z, the starting point. workspace_z = self.workspace.get("z", 0) # TODO: This is not in the schema, perhaps it should be. # Item Z, position relative to workspace. item_z = platform_item["position"].get("z", 0) # Z-coordinate of the bottom of the platform (i.e. relative to the reference tool, with offsets [0,0,0]). active_z_platform = platform["activeHeight"] # NOTE: Prevously named "defaultBottomPosition" or "defaultLoadBottom". # Subtract the container's offset to the platform. offset_z = -container_link["containerOffsetZ"] # Add the container's active height. active_z_container = container["activeHeight"] # NOTE: In the case of tips, this is equal to their total length. # Add it up. z = workspace_z + item_z + active_z_platform + offset_z + active_z_container logging.debug(f"Parsed content height {z} mm from: workspace_z={workspace_z} item_z={item_z} active_z_platform={active_z_platform} offset_z={offset_z} active_z_container={active_z_container}") # Done, phew! return z # Raise an error if the inut did not match any condition above. raise DataError("getContentZ: Failed to calculate Z. Information is missing.")Calculate the "active" Z height of a content (e.g. the bottom of a "tube", or fitting height of a tip).
Defaults to using the Z from the content or, if unavailable, information from the platform.
Formula: z =
deck_z + plat_z + plat_activeHeight - container_z_offset + tip_active_height = tip_spot_z def getGcode(self)-
Expand source code
def getGcode(self): return "\n".join(self.commands) def getItemXYZ(self, item_name: str)-
Expand source code
def getItemXYZ(self, item_name: str): """Get the XYZ coordinates of an item. TODO: Move this method to data-tools. """ # Get platform and platform item. item = self.controller.database_tools.getWorkspaceItemByName( workspace=self.workspace, item_name=item_name) # Get platform definition from the name in the platform item. platform = self.controller.database_tools.getPlatformByName( platformsInWorkspace=self.platformsInWorkspace, platform_item=item) # Get coordinates x, y, z = self.itemCenter(item, platform) return x, y, zGet the XYZ coordinates of an item. TODO: Move this method to data-tools.
def getSafeHeight(self, item_names: list = None)-
Expand source code
def getSafeHeight(self, item_names:list=None): """ Calculate the Z coordinate above all contents, considering if a tip is placed on the current tool. Returns "None" if the clearance has not been set. """ # Calculate safe hieght. z = 0 # Add minimum clearance to Z coordinate. z += self.extra_clearance # Get clearance height from platforms. platform_clearance = self.getClearance(item_names=item_names) if platform_clearance: z += platform_clearance # Add clearance from tools. if self.current_tool: logging.info(f"Adding tool offset from tool '{self.current_tool}' to clearance.") # Unpacking is needed. z = self.addToolOffsets(z=z)[2] # Done! logging.info(f"Calculated safe height: {z}") return zCalculate the Z coordinate above all contents, considering if a tip is placed on the current tool. Returns "None" if the clearance has not been set.
def getSafeY(self)-
Expand source code
def getSafeY(self): """ There is a limit on the Y axis movement. The tool-holder can crash into the parked tools. Here we use the parking post coordinates to establish a safe limit for the Y direction. NOTE: The return value can be "None" when there are no tools at all. """ # Default. y_clearance = None # Get Y position of the empty tool post of the currently attached tool, if any. # In theory this one will be the farthest from the tool-head, but it needs to be # accounted for even so (e.g. maybe it has really "long" pins on the tool-post). if self.current_tool: y_clearance = self.tools[self.current_tool].safe_y_post # Get safe Y position of the parked tools. safe_y_parked = [tool.safe_y_parked for tool in self.tools.values() if tool.name != self.current_tool] # Use the closest safe distance. if safe_y_parked and y_clearance: # NOTE: Corrected for axis inversion. y_clearance = max(y_clearance, *safe_y_parked) elif safe_y_parked: # NOTE: Corrected for axis inversion. y_clearance = max(safe_y_parked) # Adjust tool distance protruding from the toolhead, if any. if self.current_tool: # NOTE: Corrected for axis inversion. y_clearance += self.tools[self.current_tool].safe_y return y_clearanceThere is a limit on the Y axis movement. The tool-holder can crash into the parked tools. Here we use the parking post coordinates to establish a safe limit for the Y direction. NOTE: The return value can be "None" when there are no tools at all.
def getTubeAndCoords(self, action)-
Expand source code
def getTubeAndCoords(self, action): """Get XYZ coordinates of a tube and it's parameters. TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else. It can either get the info from a reference to a platform item and a content within, (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR). Args: action (_type_): _description_ Returns: _type_: _description_ Example action 'args' for coords-based position: { "position": {'x': ,'y': , 'z':}, "content": {"volume": 10} } Example tube content: { 'index': 2, 'maxVolume': 1500, 'name': 'buffer', 'position': {'col': 2, 'row': 1}, 'tags': ['mixcomp', 'buffer'], 'type': 'tube', 'volume': 7.2 } """ if "position" in action["args"]: # Get coords from a coords-type tube object. coords = action["args"]["position"] x, y, z = coords.get("x", None), coords.get("y", None), coords.get("z", None) # Get information about the tube. content = action["args"]["content"] # A tube must contain "volume". Ensure that the required data is there. # NOTE: Removed because an exception would be raised in 'macroGoToAndMix' anyways, # which is the only function actually using volume information. # content["volume"] = float(action["args"]["content"]["volume"]) else: # Get XYZ coordinates for the action's "platform"-type tube, # applying current tool offsets if any. content, x, y, z = self.getContentCoords(action) return content, x, y, zGet XYZ coordinates of a tube and it's parameters.
TODO: Rename this to "getContentAndCoords", and "getContentCoords" to something else.
It can either get the info from a reference to a platform item and a content within, (i.e. platform-type) or directly from an XYZ coordinate (i.e. coord-type, meant for PLR).
Args
action:_type_- description
Returns
_type_- description
Example action 'args' for coords-based position: { "position": {'x': ,'y': , 'z':}, "content": {"volume": 10} }
Example tube content: { 'index': 2, 'maxVolume': 1500, 'name': 'buffer', 'position': {'col': 2, 'row': 1}, 'tags': ['mixcomp', 'buffer'], 'type': 'tube', 'volume': 7.2 }
def get_current_action_cmd(self)-
Expand source code
def get_current_action_cmd(self): """ Retrieves the command ID from the current action. This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the `cmd` field of the current action. If the action is a JSON-based action (i.e., the `cmd` field is "JSON"), the method updates the command ID to the one found within the action's arguments and logs the update. Returns: str: The command ID of the current action. Notes: - If the command is a JSON action (where `cmd` is "JSON"), the method looks in the `args` dictionary of the current action to retrieve the actual command ID. - Logs an informational message if the command ID is updated from a JSON action. """ # Get the action command ID. action_command_id = self.current_action['cmd'] # Check if it is a JSON action, and update the command id with the one in its arguments. if action_command_id == "JSON": action_command_id = self.current_action['args']['cmd'] logging.info(f"Updated action command from JSON action to: {action_command_id}") return action_command_idRetrieves the command ID from the current action.
This method extracts the command identifier (e.g., "HOME", "PICK_TIP") from the
cmdfield of the current action. If the action is a JSON-based action (i.e., thecmdfield is "JSON"), the method updates the command ID to the one found within the action's arguments and logs the update.Returns
str- The command ID of the current action.
Notes
- If the command is a JSON action (where
cmdis "JSON"), the method looks in theargsdictionary of the current action to retrieve the actual command ID. - Logs an informational message if the command ID is updated from a JSON action.
def initialize_objects(self,
protocol: dict = None,
workspace: dict = None,
platformsInWorkspace: dict = None,
clear_unspecified=True)-
Expand source code
def initialize_objects(self, protocol:dict=None, workspace:dict=None, platformsInWorkspace:dict=None, clear_unspecified=True): """Set or reset protocol, workspace, platforms, and clearance properties. This can be useful when switching protocols or streaming actions, and still preserve state tracking. This clears the unspecified objects when `clear_unspecified=True`. See 'update_objects' for a non-clearing update. """ # Define protocol property. if protocol is not None: self.protocol = protocol logging.info(f"New protocol set to: {protocol['name']}") logging.debug(f"New protocol definition:\n{pformat(protocol)}") elif clear_unspecified: self.protocol = dict() logging.warning("Warning, protocol property now empty.") # Set workspace. if workspace is not None: self.workspace = workspace logging.info(f"New workspace set to: '{workspace['name']}'") elif clear_unspecified: self.workspace = {} logging.warning("Warning, workspace property now empty.") # Get platform definitions of platform items in wokspace. if platformsInWorkspace is not None: self.platformsInWorkspace = platformsInWorkspace logging.info(f"New platformsInWorkspace set to {[i['name'] for i in platformsInWorkspace]}") elif clear_unspecified: self.platformsInWorkspace = list() logging.warning("Warning, platformsInWorkspace property now empty.") elif workspace is not None: # Get the platforms for the provided workspace, if no platforms were provided. logging.info(f"platformsInWorkspace not provided. Getting it from the provided workspace '{workspace['name']}'.") self.platformsInWorkspace = self.controller.database_tools.getPlatformsInWorkspace(workspace) logging.info(f"Available items: {[i['name'] for i in self.platformsInWorkspace]}") # Clearance and probing parameters. # NOTE: Using @propery decorator below to defer getClearance() calculation until a workspace is available. # This must be run because otherwise the getter method can fail with "AttributeError". if self.platformsInWorkspace: # NOTE: getClearance requires "self.platformsInWorkspace". self.clearance = self.getClearance() elif clear_unspecified: self.clearance = None # Consistency check. if self.protocol and self.workspace: if self.workspace["name"] != self.protocol["workspace"]: msg = f"The protocol's workspace ({self.workspace['name']}) " msg += f"does not match the specified workspace ({self.protocol['workspace']})." logging.warning(msg)Set or reset protocol, workspace, platforms, and clearance properties. This can be useful when switching protocols or streaming actions, and still preserve state tracking.
This clears the unspecified objects when
clear_unspecified=True. See 'update_objects' for a non-clearing update. def load_tool(self, tool_data)-
Expand source code
def load_tool(self, tool_data): """Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data. The initialized tool object is finally registered in the tools dict by calling 'register_tool'. If no loader is found, then an error is logged. """ # Parse some basic info. tool_type = tool_data["type"] tool_name = tool_data["name"] # Check if enabled. if not tool_data.get("enabled", True): raise DataError(f"Won't load disabled tool '{tool_name}'. Set the 'enable' property in its data first.") # Get the loader function for this tool type. loader_function = self.tool_loaders.get(tool_type, None) # Load and register the tool. if loader_function is None: # Log a warning if no tool loader was found for this tool type. logging.error(f"There is no loader function for tool '{tool_name}' of type '{tool_type}'.") else: # Load the tool. logging.info(f"Loading tool '{tool_name}' of type '{tool_type}'.") tool = loader_function(tool_data) # Register the tool. logging.info(f"Registering tool '{tool_name}' of type '{tool_type}'.") self.register_tool(tool)Looks up a tool 'loader' function in 'tool_loaders', and calls it to lad the tool from its data. The initialized tool object is finally registered in the tools dict by calling 'register_tool'. If no loader is found, then an error is logged.
def load_tools(self, tools_list: list = None)-
Expand source code
def load_tools(self, tools_list: list = None): """Calls 'load_tool' on each tool in the list. If empty, it defaults to all tools in the database. """ # Default to all tools from the DB. if tools_list is None: tools_list: list = deepcopy(self.controller.database_tools.tools) # Load all tools in the list. for tool_data in tools_list: if not tool_data.get("enabled", True): # TODO: Remove the default above once all tools have the property. logging.warning(f"Tool '{tool_data['name']}' is disabled, and will be skipped.") else: self.load_tool(tool_data)Calls 'load_tool' on each tool in the list. If empty, it defaults to all tools in the database.
def macroGoTo(self, action, i, extend=True)-
Expand source code
def macroGoTo(self, action, i, extend=True): # Move over the target content's XY. commands = self.macroGoToTubeXY(action, i, extend=extend) # Move to the bottom position of the conntent. commands += self.macroGoToTubeZ(action, i, extend=extend) return commands def macroGoToTubeXY(self, action, i, extend=True) ‑> list-
Expand source code
def macroGoToTubeXY(self, action, i, extend=True) -> list: """Get XY coordinates for the action's tube, applying current tool offsets if any.""" logging.info(f"Getting XY coordinates for a tube in action {i}.") _, x, y, _ = self.getTubeAndCoords(action) # Move over the content. go_over_tip_commands = self.macroGoToXYZ(x=x, y=y, extend=extend) return go_over_tip_commandsGet XY coordinates for the action's tube, applying current tool offsets if any.
def macroGoToTubeZ(self, action, i, extend=True) ‑> list-
Expand source code
def macroGoToTubeZ(self, action, i, extend=True) -> list: """Get Z coordinate for the action's tube, applying current tool offsets if any. TODO: be smarter about pipetting height, using current tube volume to increase this. """ logging.info(f"Getting Z coordinates for a tube in action {i}.") _, _, _, z = self.getTubeAndCoords(action) # Go to the bottom of the tube. go_into_tip_commands = self.macroGoToXYZ(z=z, extend=extend) return go_into_tip_commandsGet Z coordinate for the action's tube, applying current tool offsets if any. TODO: be smarter about pipetting height, using current tube volume to increase this.
def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment='') ‑> list-
Expand source code
def macroGoToXYZ(self, x=None, y=None, z=None, f=None, add_offset=True, extend=True, add_comment="") -> list: """Move to an absolute XYZ position, adding tool-offsets by default. TODO: Send an 'xyz_move' event to any relevant event listeners. This is meant to implement obstacle avoidance or advanced movepath planning. """ # Default feedrate. if f is None: f = self.feedrate # Add the current tool's offsets. if add_offset: # Unpacking is needed. x, y, z = self.addToolOffsets(x=x, y=y, z=z) # Go to the requested position. commands = self.gcode.G1(x=x, y=y, z=z, f=f, absolute=True, add_comment=add_comment) # Let everyone know about the move. # NOTE: Other parts of the program may alter the commands list. # This is meant to implement obstacle avoidance or advanced # movepath planning with plugins. # TODO: Rethink the event system for non-async stuff. # self.controller.trigger_event_callback( # event_name=self.xyz_move_event, # parameters={"x": x, "y": y, "z": z, "f": f, "add_offset": add_offset}, # commands=commands # ) # Add the commands to the current action and internal list. if extend: self.extend_commands(commands) return commandsMove to an absolute XYZ position, adding tool-offsets by default. TODO: Send an 'xyz_move' event to any relevant event listeners. This is meant to implement obstacle avoidance or advanced movepath planning.
def macroMoveSafeHeight(self, item_names=None, extend=True)-
Expand source code
def macroMoveSafeHeight(self, item_names=None, extend=True): """Fast move to safe height in Z axis, considering loaded tip's length""" # Get the clearance height considering all platforms. z = self.getSafeHeight(item_names=item_names) # Build the move command. move_commands = self.gcode.gcodeClearance( z=z, f=self.feedrate, add_comment="Moving to the clearance height." ) if extend: # Extend the commands list. self.extend_commands(move_commands) return move_commandsFast move to safe height in Z axis, considering loaded tip's length
def parseAction(self, action, action_index=None)-
Expand source code
def parseAction(self, action, action_index=None): """This function interprets a protocol action and produces the corresponding GCODE Note that GCODE generation can depend on the current state. Args: action (dict): Python dictionary defining the action. action_index (integer): Action index number. Raises: Exception: Any exception is re-raised. Context info and traceback are printed just before. Returns: list: List of GCODE commands associated to the action. """ # Python switch case implementations https://data-flair.training/blogs/python-switch-case/ # All action interpreting functions will take two arguments # action: the action object # i: the action index for the current action # task: the object of class "TaskEnvironment" logging.info(f"Parsing action with index {action_index} and command '{action['cmd']}'.") # Save checkpoint. self.checkpoint() # Default value for action index. if action_index is None: action_index = len(self.getActions()) logging.warning(f"Replaced null index with index {action_index}.") # Save previous action. self.previous_action = self.current_action # Assign current action to self slot. self.current_action = action # Create a list for the actions GCODE commands. self.current_action["GCODE"] = [] # Add defaults to required properties. self.current_action.setdefault("args", {}) # Validate workspace name. self.validate_current_action_workspace() # Parse the current action into GCODE, checking for tool-changes first. try: # Get the action command ID. action_command_id = self.get_current_action_cmd() # Get the action's GCODE function. action_function = self.action_switch_case[action_command_id] # Note new action start as comment in the GCODE. comments = [self.gcode.comment(f"Building action {action_index}, with command: {action_command_id}")] self.extend_commands(comments) # Check and produce GCODE for a toolchange, if needed. self.check_toolchange(self.current_action, action_command_id) # Produce the GCODE for the action. if self.verbose: logging.debug(f"Processing command '{action_command_id}' with index '{action_index}'.") # Use the action function to generate its GCODE. # NOTE: The "action_output"s do not contain any gcode, only mildly informative messages. # GCODE is saved to the current action and to the "commands" property. action_function(self.current_action, action_index) # Make a copy of the GCODE. final_action = deepcopy(self.current_action) # Run action callbacks. # This allows other parts of the program to "do stuff" # when a certain kind of command has been parsed. # TODO: Replace it with the Controller's events system. self.run_action_callbacks(action_command_id, final_action) # Log success. logging.info(f"Successfully parsed action with index {action_index} and command '{action_command_id}'.") except Exception as err: msg = f"Parser error at action number {action_index} with content '{self.current_action}' message: {err}" logging.error(msg) print(msg) # Rollback changes. self.rollback() raise ProtocolError(msg) from err else: # Note action parsed. msg = f"Done processing action with index={action_index} and command={action_command_id}" logging.debug(msg) # Commit changes. self.commit() return final_action # Returns GCODE and current action.This function interprets a protocol action and produces the corresponding GCODE
Note that GCODE generation can depend on the current state.
Args
action:dict- Python dictionary defining the action.
action_index:integer- Action index number.
Raises
Exception- Any exception is re-raised. Context info and traceback are printed just before.
Returns
list- List of GCODE commands associated to the action.
def parseProtocol(self, actions: list = None)-
Expand source code
def parseProtocol(self, actions:list=None): """This function takes parses a list of actions into gcode, either provided or from the class' property.""" if actions is None: actions = self.protocol["actions"] logging.warning(f"No actions passed. Using {len(actions)} actions from the active protocol.") logging.info(f"Parsing {len(actions)} protocol actions.") # NOTE: "parseAction" returns a tuple with the commands list and the action, hence the "[0]" below. # NOTE: Cada action parser guarda el gcode implicitamente en "self.protocol" y en el GCODE del action. final_actions = [] for i, action in enumerate(actions): final_actions.append(self.parseAction(action, i)) logging.info(f"Parsed {len(actions)} actions.") return deepcopy(self.protocol)This function takes parses a list of actions into gcode, either provided or from the class' property.
def register_action_callback(self, commands: list, callback_function: Callable)-
Expand source code
def register_action_callback(self, commands: list, callback_function: Callable): """Callback functions to be run by 'parseAction' when the an an action is parsed. This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below. Args: commands (list): a list of strings, each an action command. callback (Callable): a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument. """ self.action_callbacks.append({ "commands": commands, "callback": callback_function })Callback functions to be run by 'parseAction' when the an an action is parsed.
This is meant to be a "notification system" for other parts of the code. See 'run_action_callbacks' below.
Args
commands:list- a list of strings, each an action command.
callback:Callable- a function accepting the GcodeBuilder class instance as a first argument, and the "final action" dictionary as a second argument.
def register_tool(self, tool: Tool)-
Expand source code
def register_tool(self, tool: Tool): """Add a tool object to the tools dictionary. A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets). It also must have a few important methods called form this class (e.g. home). """ tool_name = tool.name logging.info(f"Registering tool with name '{tool_name}'.") if tool_name in list(self.tools): msg = f"The tool '{tool_name}' has already been registered." logging.error(msg) raise DataError(msg) self.tools[tool_name] = toolAdd a tool object to the tools dictionary.
A tool object is expected to be dictionary-like and/or a class with attibutes useful for generating GCODE (e.g. XYZ offsets).
It also must have a few important methods called form this class (e.g. home).
def register_tool_loader(self, tool_type: str, loader_function)-
Expand source code
def register_tool_loader(self, tool_type: str, loader_function): """Register a 'loader' function for a particular tool 'type', which will be called to load all of its kind. There can be only one loader for each tool type, and each should be registered by a particular plugin. For example, see 'load_pipette' defined in the 'pipettes.py' plugin. """ # Raise an error if the tool_type already has a registered loader function. if tool_type in self.tool_loaders: msg = f"Tool type '{tool_type}' already has a handler function: {self.tool_loaders[tool_type]}" logging.error(msg) raise DataError(msg) # Register the function. self.tool_loaders[tool_type] = loader_functionRegister a 'loader' function for a particular tool 'type', which will be called to load all of its kind. There can be only one loader for each tool type, and each should be registered by a particular plugin. For example, see 'load_pipette' defined in the 'pipettes.py' plugin.
def rollback(self)-
Expand source code
def rollback(self): """Rolls back to the previous state using the saved checkpoint. Restores workspaces, protocol, current_action, and previous_action from the checkpoint. Raises an error if no checkpoint is available. """ if not hasattr(self, "_checkpoint") or not self._checkpoint: logging.error("No checkpoint available to roll back to.") raise RuntimeError("No checkpoint available for rollback.") try: for attr_name in self.state_attributes: setattr(self, attr_name, deepcopy(self._checkpoint[attr_name])) logging.warning("Rolled back to the previous checkpoint state.") except Exception as e: logging.error(f"Error during rollback: {e}") raise RuntimeError(f"Failed to rollback due to an error: {e}") from eRolls back to the previous state using the saved checkpoint.
Restores workspaces, protocol, current_action, and previous_action from the checkpoint. Raises an error if no checkpoint is available.
def run_action_callbacks(self, action_command_id: str, action: dict)-
Expand source code
def run_action_callbacks(self, action_command_id: str, action: dict): """Run any callback functions registered to run after an action is processed. TODO: consider rewriting this in reverse, by "emitting" events like Klipper does. TODO: Replace it with the Controller's events system. """ for action_callback in self.action_callbacks: # Get the action commands that the callback is meant to handle and the callback. callback_cmds = action_callback["commands"] callback_function = action_callback["callback"] # Execute the callback if there is a match. if action_command_id in callback_cmds: callback_function(self, action) # Check that the callback has a chance of doing something, just in case. for callback_cmd in callback_cmds: if not self.action_switch_case.get(callback_cmd, None): msg = f"The callback command '{callback_cmd}' was not found in the " msg += f"registered action handlers: {list(self.action_switch_case)}" logging.warning(msg)Run any callback functions registered to run after an action is processed. TODO: consider rewriting this in reverse, by "emitting" events like Klipper does. TODO: Replace it with the Controller's events system.
def save_gcode(self, path='examples/test.gcode')-
Expand source code
def save_gcode(self, path="examples/test.gcode"): """Save the GCODE to a file""" logging.debug(f"commanderTest.py message: Saving GCODE to file: {path}") with open(path, "w", encoding="utf-8") as gcode_file: gcode_file.writelines(self.getGcode())Save the GCODE to a file
def toolChangeMacro(self, new_tool: str, extend=True, operation_time=30.0, action: dict = None)-
Expand source code
def toolChangeMacro(self, new_tool:str, extend=True, operation_time=30.0, action:dict=None): logging.info(f"Changing tools from '{self.current_tool}' to '{new_tool}'") # Set the default action to the current action. if action is None: logging.info("No action provided, defaulting to the current action.") action = self.current_action # Park current tool, if any. park_commands = [] if self.current_tool is None: # Nothing to park. msg = "Nothing to park, the current tool is 'None'." logging.info(msg) park_commands += [self.gcode.comment(msg)] elif self.current_tool == new_tool: # Nothing to do. msg = "Parking skipped. Incoming and outgoing tool are equal." logging.info(msg) park_commands += [self.gcode.comment(msg)] else: # Generate GCODE for the parking sequence. tool = self.tools[self.current_tool] park_commands += tool.park(self.current_tool, new_tool) # Extend the timeout. if action is not None: logging.debug("Adding parking operation time to action.") self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) # # Update the current tool (intermediary state). # self.current_tool = None # Pick-up the next tool pickup_commands = [] if new_tool is None: # There is nothing to do if the toolchange was just for parking. msg = "Nothing to pickup, the new tool is 'None'." logging.info(msg) park_commands += [self.gcode.comment(msg)] elif self.current_tool == new_tool: # Nothing to do. msg = "Pickup skipped. Incoming and outgoing tool are equal." logging.info(msg) park_commands += [self.gcode.comment(msg)] else: # Generate GCODE for the change. tool = self.tools[new_tool] pickup_commands += tool.pickup(self.current_tool, new_tool) # Extend the timeout. if action is not None: logging.debug("Adding pickup operation time to action.") self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) # Update the current tool. self.current_tool = new_tool # Add pickup commands to the list. tc_commands = park_commands + pickup_commands if extend and tc_commands: logging.debug("Extending GCODE commands in action.") self.extend_commands(commands=tc_commands, action=action) # Return return tc_commands def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None)-
Expand source code
def update_objects(self, workspace=None, platforms_in_workspace=None, protocol=None): """Update the specified objects, not clearing the rest. If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead. Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects"). """ logging.info("Updating objects.") self.initialize_objects( protocol=protocol, workspace=workspace, platformsInWorkspace=platforms_in_workspace, # Do not clear the unspecified objects. clear_unspecified=False)Update the specified objects, not clearing the rest. If you want to clear (set to an empty dict) the unspecified objects, use "initialize_objects" instead. Specifying "platforms_in_workspace" will update the "clearance" (see "initialize_objects").
def update_objects_by_workspace(self, workspace_name: str, force: bool = True)-
Expand source code
def update_objects_by_workspace(self, workspace_name:str, force: bool = True): """Override the workspace and current platforms objects of the builder If the DB needs to be updated, consider using "apply_settings" before. New objects are fetched from the DB by the workspace's name. Args: workspace_name (str): Name of the workspace. force (bool): Wether to force an update if the name of the current and new workspaces are equal. Defaults to True. """ logging.info(f"Updating builder objects to match a new workspace: '{workspace_name}'") if self.workspace.get("name", None) == workspace_name: if force: logging.warning("Forced update. The previous workspace has the same name, but may hold a different state.") else: logging.warning("The previous workspace has the same name, update skipped.") return # Get workspace data. ws = self.controller.database_tools.getWorkspaceByName(workspace_name=workspace_name) # Get platforms. pl = self.controller.database_tools.getPlatformsInWorkspace(workspace=ws) # Update builder objects and clearance (not clearing the protocol). self.update_objects(workspace=ws, platforms_in_workspace=pl)Override the workspace and current platforms objects of the builder
If the DB needs to be updated, consider using "apply_settings" before.
New objects are fetched from the DB by the workspace's name.
Args
workspace_name:str- Name of the workspace.
force:bool- Wether to force an update if the name of the current and new workspaces are equal. Defaults to True.
def validate_current_action_workspace(self)-
Expand source code
def validate_current_action_workspace(self): """ Validates that the workspace specified in the current action matches the active workspace. This method checks the `workspace` name from the current action and compares it with the active workspace name. If the two workspaces do not match, a `ProtocolError` is raised. If the action does not specify a workspace, it will default to the current workspace and log a warning. Raises: ProtocolError: If the workspace specified in the current action does not match the active workspace. Notes: - If both the action and the current workspace specify a name, they are compared. - If the action does not have a workspace name, the current workspace name is assigned to the action and a warning is logged. """ current_workspace_name = self.workspace.get("name", None) action_workspace_name = self.current_action.get("workspace", None) if action_workspace_name and current_workspace_name: # If set, validate the workspace name from the action with the current workspace. if action_workspace_name != current_workspace_name: msg = f"Action with index {self.current_action.get('index', None)} and workspace '{action_workspace_name}'" msg += f" does not match the currently active workspace '{current_workspace_name}'." logging.error(msg) raise ProtocolError(msg) elif current_workspace_name: # If the action did not specify a workspace name, then add the name of the current workspace to it. msg = f"Action with index {self.current_action.get('index', None)} has no workspace info." msg += f" Defaulting to the current workspace: '{current_workspace_name}'" logging.warning(msg) self.current_action["workspace"] = current_workspace_nameValidates that the workspace specified in the current action matches the active workspace.
This method checks the
workspacename from the current action and compares it with the active workspace name. If the two workspaces do not match, aProtocolErroris raised. If the action does not specify a workspace, it will default to the current workspace and log a warning.Raises
ProtocolError- If the workspace specified in the current action does not match the active workspace.
Notes
- If both the action and the current workspace specify a name, they are compared.
- If the action does not have a workspace name, the current workspace name is assigned to the action and a warning is logged.