Module pipettin-piper.piper.plugins.pipettes

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """Instantiate the plugin and register the pipettes as tools.
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = PipetteHandler(controller)
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Instantiate the plugin and register the pipettes as tools. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class PipetteHandler (controller: Controller = None, init_from_db=False)
Expand source code
class PipetteHandler(Plugin):
    """ WARNING EXPERIMENTAL

    THIS CLASS IS MEANT TO TAKE OVER ALL PIPETTING RELATED TASKS INCLUDING:

    - LOADING PIPETTE TOOLS
    - GENERATING GCODE
    - EXECUTING GCODE FOR THOSE ACTIONS (MAYBE)
    """

    # Current micro-pipette models and characteristics.
    pipettes: dict = None
    # Builder class.
    builder: GcodeBuilder = None
    controller: Controller = None
    # Current pipette as computed property.
    @property
    def pipette(self) -> PipetteTool:
        """Return a pipette, by the name of the current tool in the builder."""
        return self.pipettes[self.builder.current_tool]

    verbose = False

    def __init__(self, controller: Controller = None, init_from_db=False):

        self.controller: Controller = controller
        self.builder: GcodeBuilder = self.controller.builder
        self.gcode: GcodePrimitives = self.builder.gcode
        self.verbose: bool = self.builder.verbose

        # Dictionary of JSON data.
        self.pipettes_data = {}
        # Dictionary with python 'tool' objects in its values.
        self.pipettes = {}

        # Initialize pipettes.
        if init_from_db or self.controller.config.get("pipettes", {}).get("init_from_db", False):
            # Load and register tools directly from the DB.
            # TODO: Consider removing this option.
            #       It is here in case it helps configuring things after the controller's init.
            self.pipettes_data = self.get_pipette_tools_from_db()
            self.load_pipettes(self.pipettes_data)
            # Register it directly.
            logging.info("Registering pipettes directly.")
            for p in self.pipettes.values():
                self.builder.register_tool(p)
        else:
            # Use the new tool-loader callback system.
            logging.info("Registering pipette loader function.")
            self.builder.register_tool_loader(
                tool_type=TOOL_TYPE,
                loader_function=self.load_pipette)

        # List of available macros for the requested actions.
        # TODO: Implement repeat dispensing.
        # TODO: Implement reverse pipetting.
        # TODO: Implement available/maximum volume checks.
        # TODO: Add "extend" to all macros.
        # TODO: Make all macros return the generated GCODE.
        self.action_switch_case = {
            "PIPETTE": self.macroPipette,
            "PICK_TIP": self.macroPickNextTip,
            self.load_liquid_cmd: self.macroGoToAndPipette,
            self.pour_liquid_cmd: self.macroGoToAndPour,
            "DISCARD_TIP": self.macroDiscardTip,
            self.mix_cmd: self.macroGoToAndMix
        }

        # Register each handler in the GcodeBuilder.
        logging.info("Registering action handlers.")
        for name, function in self.action_switch_case.items():
            self.builder.add_action_handler(name, function)

        # Set status.
        self._status = True

    def get_pipette_tools_from_db(self):
        """Read tools from the DB and keep only the ones with a matching type."""
        tools: list = self.controller.database_tools.tools
        pipettes_data = {t["name"]: t for t in tools if t["type"] == TOOL_TYPE}
        logging.info(f"Found these pipettes in the DB: {list(pipettes_data)}")
        return pipettes_data

    def load_pipettes(self, pipettes_data):
        """Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'."""
        # NOTE: commenting out the defaults for now.
        # if not self.pipettes_data:
        #     self.pipettes_data = deepcopy(PIPETTES)
        for pipette_name, pipette_data in pipettes_data.items():
            # Instantiate all pipette tools.
            logging.info(f"Initializing pipette '{pipette_name}'.")
            self.load_pipette(pipette_data)

    def load_pipette(self, pipette_data):
        """Instantiate and save one PipetteTool from a parameter set.
        This is a tool loader function for GcodeBuilder.
        """
        pipette_name = pipette_data["name"]
        logging.info(f"Loading pipette '{pipette_name}'.")

        # Check if the tool is already registered locally.
        if pipette_name in self.pipettes:
            msg = f"Pipette '{pipette_name}' is already in the pipette handler's list."
            logging.error(msg)
            raise DataError(msg)

        # Load the pipette.
        try:
            # The PipetteTool instances  will register themselves a tool in the builder.
            tool = PipetteTool(pipette_data, self.builder)
            self.pipettes[pipette_name] = tool
        except Exception as e:
            msg = f"Failed to initialize tools: {e}"
            logging.error(msg)
            raise CommanderError(msg) from e

        return tool

    def guess_pipette(self, volume:float, tip_container:dict=None, repeats:int=1, prioritize_accuracy=True):
        """Guess which tool is better for a given volume and number of serial repeats.
        By default, accuracy will be prioritized (i.e. less dispenses per load), unless
        'prioritize_accuracy' is set to False.
        """
        # Get the maximum volume for the given tip.
        tool_limits = [(p.get_max_volume(tip_container), p.name) for p in self.pipettes.values()]
        # Calculate how many repeats can be done with each pipette.
        # Only keep the ones with enough volume.
        tool_repeats = [(limit // volume, tool) for limit, tool in tool_limits if limit // volume >= repeats]
        if prioritize_accuracy is True:
            # Calculate the smaller number of repeats between tools.
            min_repeats = min(*[reps for reps, tool in tool_repeats])
            # Choose the tool that allows the smaller number of repeats.
            # This will prioritize precision over speed.
            min_repeats_tool = next(tool for reps, tool in tool_repeats if reps == min_repeats)
            return min_repeats_tool
        else:
            # Calculate the greater number of repeats between tools.
            max_repeats = max(*[reps for reps, tool in tool_repeats])
            # Choose the tool that allows the larger number of repeats.
            # This will prioritize speed over precision.
            max_repeats_tool = next(tool for reps, tool in tool_repeats if reps == max_repeats)
            return max_repeats_tool

    def macroDiscardTip(self, action, i, operation_time=20.0):
        """Move over the trash box or eject-post and discard the tip (if placed, else Except).

        Action moves:
            - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools).
            - Move to the ejection site and eject the tip.
            - Home the tool.

        Args:
            action (dict): The action definition. Contents not used.
                            The coordinates are loaded from the tool/pipette definition.
            i (int): The index of the action. Not used.

        Raises:
            Exception: An error is raised when there are no tips to discard.

        Returns:
            str: Generic message.

        Sample action:
            {
                "cmd": "DISCARD_TIP",
                "args": {
                    "item": "descarte 1"
                }
            }
        """

        # TODO: Add alternate method of ejecting tips against an ejector pad (custom pipette).
        #       - Add "can eject" property and decide based on that.
        #       - Add "ejector pad" properties to tools.

        if self.pipette.tip_loaded:
            # Move to clearance level on Z.
            # Should extend task.commmand list automatically.
            self.builder.macroMoveSafeHeight()

            if self.pipette.can_eject:
                item_name = action["args"]["item"]
                # Get the item's XYZ.
                logging.info("Ejecting tip using integrated ejector.")
                x, y, z = self.builder.getItemXYZ(item_name)

                # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not.
                # Move over the ejection site initial XY coordinates.
                self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.")
                # Move over the ejection site initial Z coordinate.
                self.builder.macroGoToXYZ(z=z, add_comment="Initial Z approach to eject zone.")
            else:
                # Get the ejector post's initial coordinates.
                logging.info("Ejecting tip using post.")
                x, y, z = self.pipette.eject_post_init_coords

                # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not.
                # Move over the ejection site initial XY coordinates.
                self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.")
                # Move over the ejection site initial Z coordinate.
                # NOTE: "add_offset=False" is needed in this case.
                self.builder.macroGoToXYZ(z=z, add_offset=False, add_comment="Initial Z approach to eject zone.")

            # Generate GCODE for the actual ejection sequence.
            eject_commands = self.pipette.eject_tip()
            self.builder.extend_commands(eject_commands)

            # Raise to safety.
            self.builder.macroMoveSafeHeight()

            # Ensure zeroed pipette axis
            # self.macroPipetteZero()
            # NOTE: replace macroPipetteZero with extruder homing command.
            self.macroPipetteHome(which_tool=self.builder.current_tool)
            # TODO: reimplement non-homing zeroing once the underlying Klipper issue is resolved:
            #       Sending "M82" and moving the extruder to 0 does not set the position to 0.
            #       Check if this is due to absolute extruder positioning not taking effect.

        else:
            # TODO: discutir si es mejor tirar este error o no hacer nada
            raise ProtocolError(f"Cannot discard tip if one is not already placed! Action index: {i}")

        # Extend the operation time.
        self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

        return f"Processed tip discard with index {i}"

    def volume_to_displacement(self, volume: float, mode="incremental"):
        """Convert volume (uL) to shaft displacement (mm)."""

        # Get scaler properties.
        scaler, scaler_adjust = self.pipette.scaler, self.pipette.scaler_adjust

        # Convert volume to shaft displacement in millimeters in incremental mode.
        if mode == "incremental":
            # Units are in microliters, they must be converted.
            displacement = volume / scaler
            # Appliy linear adjustment.
            displacement = displacement * scaler_adjust
        else:
            # TODO: implement "absolute" pipetting volumes.
            raise ProtocolError(f"Invalid mode: '{mode}'")

        # Comment GCODE
        message = f"Converted {volume} uL to {displacement} mm"
        message += f" with scaler {scaler} and adjust factor {scaler_adjust}"
        logging.debug(message)
        self.builder.extend_commands(commands=[self.gcode.comment(message)])

        return displacement

    def macroPipette(
        self,
        action:dict, i:int,
        backlash_correction:float=0.0,
        tip_priming_correction:bool=False,
        back_pour_correction:bool=0.0,
        under_draw_correction:float=0.0,
        extra_draw_volume:float=0.0,
        capillary_correction:bool=False):
        """
        Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes).
        Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py).
        """
        # Get the target volume.
        volume = action["args"]["volume"]

        # Flag indicating if this is a draw or dispense move.
        is_drawing = self.builder.sign(volume) > 0

        # Comment GCODE
        message = f"Calculating pipetting move for {volume} uL volume for action {i}"
        message += f", using tool '{self.builder.current_tool}'."
        logging.info(message)

        # Check if the tip is new/empty.
        first_draw = self.pipette.state["tip_vol"] is None
        logging.debug(f"Computed move properties: is_drawing={is_drawing} first_draw={first_draw} (tip_vol={self.pipette.state['tip_vol']})")
        self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action)

        # Read options from action arguments.
        # Prioritize the arguments, then the config, and defaulting to arguments if not there.
        backlash_correction = backlash_correction or float(self.pipette.get("backlash_correction", backlash_correction))
        tip_priming_correction = tip_priming_correction or bool(self.pipette.get("tip_priming_correction", tip_priming_correction))
        back_pour_correction = back_pour_correction or float(self.pipette.get("back_pour_correction", back_pour_correction))
        under_draw_correction = under_draw_correction or float(self.pipette.get("under_draw_correction", under_draw_correction))
        extra_draw_volume = extra_draw_volume or float(self.pipette.get("extra_draw_volume", extra_draw_volume))
        capillary_correction = capillary_correction or bool(self.pipette.get("capillary_correction", capillary_correction))
        # Log the corrections.
        logging.debug(f"Corrections: backlash_correction={backlash_correction}, tip_priming_correction={tip_priming_correction}, back_pour_correction={back_pour_correction}, under_draw_correction={under_draw_correction}, capillary_correction={capillary_correction}")

        # Compensate mechanical backlash.
        if backlash_correction:
            # Backlash compensation.
            self.compensate_backlash(volume, backlash_correction, action, i)

        # Compensate for tip-wetting issues.
        if is_drawing and first_draw:
            if tip_priming_correction:
                # Compensation for under-drawing on first load, by pre-wetting.
                self.compensate_first_draw(volume, action, i)
            if extra_draw_volume:
                # Compensation for under-drawing on first load, by over-drawing a bit.
                self.compensate_extra_draw(extra_draw_volume, action, i)

        # Compensation for capillary effects for low final tip volume during dispensing.
        # Apply only for regular dispensing moves.
        if capillary_correction and self.builder.sign(volume) == -1 and not first_draw:
            self.compensate_low_volume(volume, action, i)

        # Comment GCODE
        message = f"Pipetting {volume} uL for action {i}."
        logging.info(message)
        self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action)

        # Compute axis displacement from the target pipetting volume.
        # Default pipetting mode should be incremental (i.e. relative) volume.
        self.displace_volume(volume, action)

        # Under-draw correction
        if is_drawing and under_draw_correction:
            self.compensate_underdraw(action, i)

        # Back-pour correction
        if is_drawing and back_pour_correction:
            self.compensate_backpour(action, i)

        return f"Processed pipetting action with index {i}."

    def displace_volume(self, volume: float, action: dict, check_tolerance:float=100.0):
        """Compute axis displacement from the target pipetting volume.
        Default pipetting mode should be incremental (i.e. relative) volume.

        Args:
            volume (float): Pipetting volume in microliters.
            action (dict): Action object, optionally containing a pipetting "mode" string in its "args".
        """
        # TODO: De-hardcode the tolerance set above. The issue here is that corrections move the
        #       shaft to volumes outside the limit of the tip, which may be a bad idea (e.g. because 
        #       there may be a filter) or of the tool's limit for the tip.

        # Volume limit check for the current tip.
        self.check_volume_limit(volume, tolerance=check_tolerance)

        # Convert volume to displacement.
        mode = action["args"].get("mode", "incremental")
        displacement = self.volume_to_displacement(volume, mode=mode)

        # Generate GCODE command for the move.
        # TODO: Add pipetting speed control.
        # TODO: Volume sign INVERTED here, be smarter about the direction.
        command = self.gcode.G1(
            e=-displacement,
            absolute=False, absolute_e=False,
            f=self.pipette.feedrate)

        # Add the GCODE command to the list.
        self.builder.extend_commands(commands=command, action=action)

        # Update state.
        self.update_pipette_state(volume, displacement)

        return displacement

    def check_volume_limit(self, volume, tolerance:float=0.0):
        """Volume limit check for the current tip and tool."""
        # self.check_tip_volume(volume)
        tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"])
        max_volume = self.pipette.get_max_volume(tip_container)
        current_volume = self.pipette.tip_loaded.get("volume", 0.0)
        final_volume = current_volume + volume
        if final_volume <= current_volume:
            # TODO: Reconsider passing this check if the volume is decreasing (i.e. dispensing).
            pass
        if final_volume > max_volume + tolerance:
            msg = f"The requested final volume ({final_volume}) exceeds the tool's capacity of {max_volume} uL for tip {tip_container['name']}."
            raise ProtocolError(msg)
        return max_volume, final_volume

    def update_pipette_state(self, volume, displacement):
        """Update the pipette state information."""
        # Setup valid tip volume before updating
        if self.pipette.state["tip_vol"] is None:
            self.pipette.state["tip_vol"] = 0

        # Update pipette state
        self.pipette.state["s"] += displacement
        self.pipette.state["vol"] += volume
        self.pipette.state["tip_vol"] += volume
        self.pipette.tip_loaded["volume"] = volume + self.pipette.tip_loaded.get("volume", 0.0)

        # Update direction with the current move
        self.pipette.state["lastdir"] = self.builder.sign(volume)
        # Print state
        msg = f"Current state of tool {self.builder.current_tool}: " + pformat(self.pipette.state)
        logging.debug(msg)

    def compensate_backlash(self, volume: float, backlash_correction: float, action:dict=None, i:int=None):
        """Backlash compensation"""
        # If previous and current directions are different
        last_dir_sign = self.builder.sign(self.pipette.state["lastdir"])
        new_dir_sign = self.builder.sign(volume)
        # TODO: be smarter about backlash (?).
        backlash_correction_vol = 0.0
        if (last_dir_sign * new_dir_sign) == -1:
            # Then over-pour (negative volume, -0.5 uL for p200) or over-draw (positive volume, 0.5 uL for p200)
            backlash_correction_vol = new_dir_sign * backlash_correction
            # Comment GCODE.
            message = f"Action {i}: pipetting extra {backlash_correction_vol} uL on backlash correction."
            logging.info(message)
            self.builder.extend_commands([self.gcode.comment(message)])
            # Move without corrections.
            self.displace_volume(volume = backlash_correction_vol,
                                 action = action)

        return backlash_correction_vol

    def compensate_first_draw(self, volume: float, action:dict=None, i:int=None):
        """First draw compensation.
        Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume).
        """

        # Limit pre-wetting to 120% of move volume or max volume.
        correction_volume = min(abs(self.pipette.max_volume), volume*1.2)

        # Comment GCODE.
        message = f"Action {i}: pre-wetting the tip by pipetting up-and-down {correction_volume} uL on first draw."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Apply the correction by calling this very method,
        # by pipetting up and down (i.e. pre-wetting the tip).
        for direction in [1.0, -1.0]:
            # Move without corrections.
            self.displace_volume(volume = direction * correction_volume,
                                 action = action)

        return correction_volume

    def compensate_extra_draw(self, extra_draw_volume, action:dict=None, i:int=None):
        """First draw compensation.
        If the tip is new/empty, the liquid may have a harder time "entering" the dry tip.
        Correct by over-drawing by a specified amount (e.g. 5 uL for the p200).
        """

        # Comment GCODE.
        message = f"Action {i}: over-drawing by {extra_draw_volume} uL on first draw."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Move without corrections.
        self.displace_volume(volume = extra_draw_volume, action = action)

        return extra_draw_volume

    def compensate_low_volume(self, volume: float, action:dict=None, i:int=None):
        """Compensation for capillary effects for low final tip volume during dispensing."""
        # Apply only for regular dispensing moves.
        # Apply correction if necessary: only at low final volumes.
        correction_start_vol = self.pipette.tension_correction["start_vol"]

        # Get the current tip volume (i.e. from the previous action).
        final_volume = self.pipette.state["vol"]

        # Add the new [corrected] volume to the local pipette volume tracker (axis position in uL).
        final_volume += volume

        # Subtract the "back-pour correction" volume if this is a first draw,
        # forcing negative volume (for dispensing). Though this "correction" is
        # applied later on, it must be considered here, because it will change
        # the final volume.
        # TODO: disabled this bit for now, since this correction is meant for
        #       simple dispense moves only.
        # if first_draw and tip_priming_correction and back_pour_correction:
        #     final_volume += -abs(self.pipette["back_pour_correction"])

        # Default correction should be null.
        capillary_extra_vol = 0.0

        # Apply the correction.
        if final_volume < correction_start_vol:
            # Compute the "extra" volume that must be dispensed.
            capillary_extra_vol = self.capillary_corr(final_volume)

            # Comment GCODE
            message = f"Action {i}: final volume {final_volume} uL requires low-volume compensation. Dispensing extra {capillary_extra_vol} uL."
            logging.info(message)
            self.builder.extend_commands([self.gcode.comment(message)])

            # Apply correction.
            # Move without corrections.
            self.displace_volume(volume = capillary_extra_vol,
                                 action = action)
        else:
            # Comment GCODE
            message = f"Action {i}: final volume {final_volume} uL does not require low-volume compensation."
            logging.debug(message)
            self.builder.extend_commands([self.gcode.comment(message)])

        return capillary_extra_vol

    def compensate_underdraw(self, action:dict=None, i:int=None):
        """Under-draw correction
        Correction for aspiration moves that draw less volume than expected,
        increasing the loaded volume by a fixed amount of microliters.
        This applies to all moves.
        """

        # Get parameters from the tool
        under_draw_correction = self.pipette["under_draw_correction"]

        # Over-draw
        vol_correction = abs(under_draw_correction) # positive, force loading

        # Logs.
        message = f"Action {i}: applying {vol_correction} uL under-draw volume correction."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Apply the correction.
        self.displace_volume(vol_correction, action)

    def compensate_backpour(self, action:dict=None, i:int=None):
        """Back-pour correction
        Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube.
        This corrects inaccuracies in the first dispense of a series.
        """

        # Get parameters from the tool
        back_pour_correction = self.pipette["back_pour_correction"]

        # Over-draw
        over_draw_volume = abs(back_pour_correction) # positive, force loading

        # Logs.
        message = f"Action {i}: applying {over_draw_volume} uL over-draw volume correction."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Apply the correction.
        self.displace_volume(over_draw_volume, action)

        # Back-pour
        back_pour_volume = -abs(back_pour_correction)   # negative, force dispensing

        # Logs.
        message = f"Action {i}: applying {back_pour_volume} uL back-pour volume correction."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Apply the correction.
        self.displace_volume(back_pour_volume, action)

        return over_draw_volume, back_pour_volume

    def macroPipetteZero(self, action=None, i=None):
        """
        Move the pipette to the zero position (without re-homing).

        Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0)
        and the retraction distance is considered the zero position for pietting.
        """
        # Reverse net displacement
        volume = -self.pipette.state["vol"]

        # Early return if the volume is zero.
        if not volume:
            message = f"Pipette is already zeroed at volume={volume} (action index {i})."
            logging.debug(message)
            self.builder.extend_commands([self.gcode.comment(message)])
            return

        # Logs.
        message = f"Zeroing pipette with volume={volume} (action index {i})."
        logging.debug(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Execute the displacement.
        self.displace_volume(volume, action)

        # Check for final volume near zero
        state_vol = abs(self.pipette.state["vol"])
        state_s = abs(self.pipette.state["s"])
        threshold = 1e-10
        if (state_vol > threshold) or (state_s > threshold):
            raise ProtocolError("Pipette was zeroed, but reference states did not match zero. " +
                                "Expected vol=0 and s=0 but got:" +
                                f" vol={self.pipette['state']['vol']}" +
                                f" s={self.pipette['state']['s']}")

    # TODO: This is now only used by "macroDiscardTip". Considering removing it.
    def macroPipetteHome(self, which_tool="all"):
        """Macro to home the requested tools

        Args:
            which_tool (str, optional): Either a valid name for a tool, or "all". Defaults to "all".
        """
        if which_tool == "all":
            which_tools = list(self.pipettes)
        else:
            which_tools = [which_tool]

        tool_home_cmds = []
        for tool_name in which_tools:
            tool = self.pipettes[tool_name]
            tool_home_cmds += tool.home()

        message = f"Homing pipettes {which_tools}"
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        self.builder.extend_commands(tool_home_cmds)

        return tool_home_cmds

    def calcTipFitZ(self, content_top_z: float, next_tip: dict):
        """Calculate absolute Z coordinates for initial approach and final fitting depth."""
        # Get the tip's properties, and the corresponding tip-stage info from the tool.
        tip_container = self.pipette.get_tip_container(next_tip)

        # Get the tip-stage offset that matches the new tip.
        tip_stage_offset = self.pipette.get_tip_stage_offset(tip_container)
        # Get the tip fit distance for the current tip-tool pair.
        tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container)
        # Extra distance to probe below and beyond the tip's active height.
        probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container)

        # Subtract the tip-stage offset.
        z_pre_fit = content_top_z - tip_stage_offset
        # NOTE: This "z_pre_fit" is now the initial coordinate before fitting, with or without probing.

        # Final calculation.
        z_final = z_pre_fit - tip_fit_distance - probe_extra_dist

        # Logs.
        logging.info(f"Tip fitting with: z={content_top_z}, tip_stage_offset={tip_stage_offset}, z_pre_fit={z_pre_fit}")
        logging.info(f"Tip fitting with: tip_fit_distance={tip_fit_distance}, probe_extra_dist={probe_extra_dist}")

        # Return both.
        return z_pre_fit, z_final

    def calcTipProbeDistZ(self, next_tip: dict):
        """Calculate max Z displacement for fitting a tip with probing.
        NOTE: This is a relative move.
        """
        # Get the tip's properties, and the corresponding tip-stage info from the tool.
        tip_container = self.pipette.get_tip_container(next_tip)

        # Get the tip fit distance for the current tip-tool pair.
        tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container)
        # Extra distance to probe below and beyond the tip's active height.
        probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container)

        # Total distance of the probing move.
        z_scan_dist = tip_fit_distance + probe_extra_dist

        return z_scan_dist

    def pickNextTip(self, next_tip, tip_x, tip_y, tip_z):
        """Generate GCODE to place a tip.

        Args:
            next_tip (dict): Data for the target tip.
            tip_x (float): X coordinate of the tip, without tool offsets.
            tip_y (float): Y coordinate of the tip, without tool offsets.
            tip_z (float): Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets.

        TODO: Test adding a "vibration" if possible (i have not found this necessary).
        """

        if self.pipette.tip_loaded:
            # TODO: discutir si es mejor descartar el tip
            #  si está automáticamente o tirar este error
            raise ProtocolError("Cannot pick tip if one is already placed!")

        # Probe command list.
        commands = [self.gcode.comment("Start tip pickup moves.")]

        # Add XYZ tool offsets to the tip's active center. This adds:
        #   - XYZ tool offsets.
        #   - And nothing else, because no tip is loaded.
        x, y, z = self.builder.addToolOffsets(x=tip_x, y=tip_y, z=tip_z)

        # Move over the tip on XY.
        commands += self.gcode.G1(x=x, y=y, absolute=True)

        # Calculate initial and final fit coordinates.
        z_pre_fit, z_final = self.calcTipFitZ(content_top_z=z, next_tip=next_tip)

        # Move over the tip on Z, partially inserting the tip holder if using a non-terminal tip stage.
        commands += self.gcode.G1(z=z_pre_fit, absolute=True)

        if not self.pipette.can_probe:
            # Fit the tip (blindly, without probing).
            logging.info(f"Picking tip with: z_final={z_final}")
            # TODO: add a feedrate config parameter for tip fitting.
            commands += self.gcode.G1(z=z_final, f=10, absolute=True)
        else:
            # Probe for the tip if supported.
            # Total distance of the probing move.
            z_scan_dist = self.calcTipProbeDistZ(next_tip)
            # NOTE: This is a relative move.
            logging.info(f"Probing for tip with: z_scan_dist={z_scan_dist}")
            commands += self.gcode.gcodeProbeDown(z_scan=-z_scan_dist)

        # TODO: Seal the tip by pressing a little bit very slowly two times.

        # Extend commands list.
        self.builder.extend_commands(commands)

        # Flag loaded tip.
        self.pipette.tip_loaded = next_tip

        # Zero the tip volume tracking (set to None instead for new tip check).
        self.pipette.state["tip_vol"] = None

        return commands

    def macroPickNextTip(self, action, i, operation_time=20.0):
        """Move to clearance height and fit a tip.

        Will throw an error if a tip is already loaded.

        Sample platform-based action:
            {
                "cmd": "PICK_TIP",
                "args": {
                    "item": "200ul_tip_rack_MULTITOOL 1",
                    "tool": "P200"
                }
            }

        Sample coordinate-based (platformless) action:
            {
                "cmd": "PICK_TIP",
                "args": {
                    "position": {"x": 10.5,"y": 20.5,"z": 0.5},
                    "tool": "P200",
                    "tip": {"length": 50.0, "maxVolume": 200, "volume": 0}
                }
            }
        """

        if not self.pipette.tip_loaded:
            # Move to clearance level on Z, extends task.command list automatically
            self.builder.macroMoveSafeHeight()

            # Get tip and coords (supports "position" key specification).
            next_tip, tip_x, tip_y, tip_z = self.getTipAndCoords(action, i)

            # Generate GCODE to pick up the tip.
            self.pickNextTip(next_tip, tip_x, tip_y, tip_z)

            # Raise to safety.
            self.builder.macroMoveSafeHeight()
        else:
            # TODO: discutir si es mejor descartar el tip
            #  si está automáticamente o tirar este error
            raise ProtocolError(f"Cannot pick tip if one is already placed! Action index: {i}")

        # Extend the operation time.
        self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

        return action

    pour_liquid_cmd = "DROP_LIQUID"
    def macroGoToAndPour(self, action, i, operation_time=20.0):
        """Move to a tube and dispense liquid.
        Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette".
        """
        action["args"]["volume"] = -abs(float(action["args"]["volume"]))  # Force negative volume

        ret_msg = self.macroGoToAndPipette(action, i, operation_time=operation_time)

        return ret_msg

    def fast_safe_height(self, current_action:dict, previous_action:dict=None):
        """Try optimising clearance for a pipetting move within the same platform item.
        The objective is to speed-up repetition pipetting within the same platform.
        """
        # Previous action.
        if previous_action is None:
            previous_action = self.builder.previous_action
        check_last, last_item_name = self.fast_safe_height_action_check(previous_action)
        # Current action.
        check_next, next_item_name = self.fast_safe_height_action_check(current_action)

        # Items to optimize by.
        item_names = []

        # Decide wether to optimize or not.
        if check_last and check_next and (last_item_name == next_item_name):
            logging.info(f"Optmizing clearance move to safe height within item: '{next_item_name}'")
            item_names.append(next_item_name)

        return self.builder.macroMoveSafeHeight(item_names=item_names)

    def fast_safe_height_action_check(self, action:dict):
        """Check that an action is compatible with a fast safe height check."""

        # List of eligible commands.
        fast_sh_eligible_cmds = [self.pour_liquid_cmd, self.load_liquid_cmd, self.mix_cmd]
        # Check that the commands are reasonable to optmize.
        cmd_check = action and action.get("cmd", None) in fast_sh_eligible_cmds

        # Check that the actions are platform-based, and point to an item.
        args_check = "item" in action["args"]

        # Try getting item_name from the action.
        item_name = action.get("args", {}).get("item", None)
        # Check that the origin and destination item is the same one.
        item_check = item_name is not None

        # Return the digested check, and the item name.
        return cmd_check and args_check and item_check, item_name

    load_liquid_cmd = "LOAD_LIQUID"
    def macroGoToAndPipette(self, action, i, operation_time=20.0):
        """Move to a tube and load liquid.
        LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette.
        Will throw an error if a tip is not already loaded.

        Sample action:
            {
                "cmd": "LOAD_LIQUID",
                "args": {
                    "item": "5x16_1.5_rack 1",
                    "selector": {"by": "name", "value": "tube1"},
                    "volume": 100,  # in microliters
                    "tool": "P200"
                }
            }

        Likely platformless action:
            {
                "cmd": "LOAD_LIQUID",
                "args": {
                    "content": {"volume": 200.0},
                    "volume": 100,  # in microliters
                    "tool": "P200"
                }
            }

        Primary move sequence:
        1. move to clearance level on Z
        2. move to next tube location on XY
        3. move to Z at loading height
        4. load liquid
        5. move to clearance level on Z
        """

        if self.pipette.tip_loaded:
            # Move to clearance level on Z.
            self.fast_safe_height(action)

            # Move over the target content and downward into the tube.
            self.builder.macroGoTo(action, i)

            # Actuate the pipette tool.
            self.macroPipette(action, i)

            # Move to clearance level on Z.
            self.fast_safe_height(action, action)

        else:
            # TODO: discutir si es mejor descartar el tip
            #  si está automáticamente o tirar este error
            raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}")

        # Extend the operation time.
        self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

        return f"Processed macroGoToAndPipette action with index {i}"

    mix_cmd = "MIX"
    def macroGoToAndMix(self, action, i, operation_time=20.0):
        """Parse a MIX action and generate GCODE.
        Homogenize a solution. Possibly useful for rinsing tips before a transfer.

        Sample MIX action: {
            'args': {
                'count': 3,
                'item': '5x16_1.5_rack 1',
                'of': 'content',  # Can also be 'of': "tip"
                'percentage': 50,
                'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector'
                'selector': {'by': 'name', 'value': 'tube1'}},
            'cmd': 'MIX'
        }

        Sample "tip":
            {
                "index": 1,
                "maxVolume": 160,
                "name": "tip1",
                "position": {
                "col": 1,  # Or: "x": 10?
                "row": 1   # Or: "y": 30?
                },
                "tags": [],
                "type": "tip",
                "volume": 0
            }

        Note that the tool must have been set by a previous tip pickup action.
        """

        logging.info("Parsing MIX action.")

        if not self.pipette.tip_loaded:
            # TODO: discutir si es mejor descartar el tip
            #  si está automáticamente o tirar este error
            raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}")

        # Move to clearance level on Z.
        self.fast_safe_height(action)

        # Zero the pipette axis.
        self.macroPipetteZero(action, i)

        # Move over the target content and downward into the tube.
        self.builder.macroGoTo(action, i)

        # Execute the mix.
        self.macroMix(action, operation_time=operation_time)

        # Back-up.
        self.fast_safe_height(action, action)

        logging.info("Done parsing MIX action.")

        return f"Processed MIX action with index {i}."

    def macroMix(self, action: dict, operation_time=0.0):
        """Execute pipetting moves for a mix action.

        Args:
            action (dict): _description_
            operation_time (float, optional): _description_. Defaults to 0.0.
        """

        # Comments for GCODE.
        comments = []

        # Calculate the mixing volume.
        mix_vol = self.calculate_mix_vol(action, comments)

        # Extend commands with comments.
        self.builder.extend_commands(comments)

        # Prepare minimal pipetting action dict
        mix_count = int(action["args"]["count"])

        # Calculate volume and increments.
        factor = 0.2 # fraction of volume that will be used for increasing mixing
        vol = mix_vol * (1 - factor)  # initial volume for mixing
        vol_increment = mix_vol * (factor / mix_count)

        # Do the mixing
        for i in range(mix_count):
            # Add time to the total operatin time.
            # TODO: Link this to the flow speed and total volume.
            operation_time += operation_time

            # Logs.
            logging.debug(f"Generating mix move {i} of {mix_count}.")

            # Load
            mix_action_up = {"args": {"volume": vol}}  # in microliters
            self.macroPipette(mix_action_up, i)
            self.builder.extend_commands(mix_action_up.get("GCODE", []))

            # NOTE: The dropped volume will be greater than the loaded, to
            # avoid having unmixed solution remainder in the tip. In the final
            # iteration, the dropped volume will be the one in the action.
            vol += vol_increment

            # Drop
            mix_action_dw = {"args": {"volume": -vol}}  # in microliters
            self.macroPipette(mix_action_dw, i)
            self.builder.extend_commands(mix_action_dw.get("GCODE", []))

        # Extend the operation time.
        self.controller.machine.update_exec_opts(
            action=action, timeout=operation_time, add_timeout=True
        )


    def calculate_mix_vol(self, action:dict, comments:list=None):
        """Calculate the mixing volume for a give mix action.

        Args:
            action (dict): Mix action.
            comments (list, optional): List of gcode comments to extend. Defaults to None.

        Raises:
            ProtocolError: _description_

        Returns:
            _type_: _description_
        """
        if comments is None:
            comments = []

        # Get the tube content and its coordinates.
        tube, _, _, _ = self.builder.getTubeAndCoords(action)

        # Get tube volume.
        # TODO: If the tube received volume during this protocol the updated volume is not here.
        #       See: https://gitlab.com/pipettin-bot/pipettin-grbl/-/issues/109
        tube_volume = tube["volume"]
        # Get pipette volume.
        pipette_volume = self.pipette['vol_max']
        # Get tip volume.
        tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"])
        max_tip_volume = tip_container["maxVolume"]
        # TODO: Check if the tip already has volume.
        # max_tip_volume -= self.pipette.tip_loaded.get("volume", 0)

        # Calculate maximum volume limit.
        logging.info(f"Current volume limits: tube_volume={'<TODO: limit disabled until tracked>'}, pipette_volume={pipette_volume}, max_tip_volume={max_tip_volume}")
        # TODO: Add "tube_volume" here once volume tracking is implemented.
        max_volume_limit = min([max_tip_volume, pipette_volume])

        # Calculate volume limited to max pipette tool volume
        of = action["args"]["of"]  # Either "tip" or tube "content".
        mix_percent = action["args"]["percentage"]
        if of == 'content':
            # Logs.
            logging.info(f"Mixing {mix_percent}% of the tube volume.")
            comments = [self.gcode.comment("MIX action by tube volume fraction.")]
            # Calculate mixing volume
            mix_vol = tube_volume * (mix_percent / 100)
        elif of == 'tip':
            # Logs.
            logging.info(f"Mixing {mix_percent}% of the tip volume.")
            comments += [self.gcode.comment("MIX action by tip volume fraction.")]

            # Calculate mixing volume
            mix_vol = max_tip_volume * (mix_percent / 100)
        else:
            raise ProtocolError(f"Can't parse mixing action with 'of'={of} at action: {action}")

        # Check for limits.
        if mix_vol > max_volume_limit:
            # Log.
            msg = f"Limiting mix volume to {max_volume_limit}. The initial volume {mix_vol} exceeded the limits."
            logging.warning(msg)
            comments += [self.gcode.comment(msg)]
            # Override.
            mix_vol = max_volume_limit

        # Logs.
        logging.info(f"Mixing volume {mix_vol} uL.")
        comments += [self.gcode.comment(f"MIX action with {mix_vol} uL.")]

        return mix_vol

    # Get tip stuff #
    def getNextTip(self,
                   platform_item:dict,
                   selector: dict,
                   pop_from_item=True,
                   workspace_name:str=None,
                   pop_from_db=False):
        """
        Get the next tip and delete it from the local "platform_item" object.
        The object in the database is not affected.
        """
        return self.controller.database_tools.getNextContent(
            workspace_name=workspace_name,
            platform_item=platform_item,
            selector=selector,
            pop_from_item=pop_from_item,
            pop_from_db=pop_from_db
        )

    def getTipAndCoords(self, action, action_index):
        """Get a tip object and its coordinates.

        Uses information from the platform, the item, the content, and the container to get the XYZ of a tip.

        The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip.

        This function does not apply tool offsets.

        Args:
            action (dict): Protocol action definition.
            action_index (int): Action index.

        Returns:
            tuple: Tuple with the tip object, and the xyz coordinates.

        Example "tip":  'index': 96,
                        'maxVolume': 160,
                        'name': 'tip96',
                        'position': {'col': 12, 'row': 8},
                        'tags': [],
                        'type': 'tip',
                        'volume': 0

        The "tip" above will be fetched from the database when the action's "args" are of "item" type:
            'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'},
            'cmd': 'PICK_TIP'

        The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly:
            'args': {'position': {"x": 20, "y": 200, "z": 30},
                     'tool': 'P200',
                     'tip': tip_definition},  # Note the tip data passed here (described below).
            'cmd': 'PICK_TIP'

        Example "tip_definition" for "position" type args:
            'maxVolume': 160,
            'length': 50.0,
            'volume': 0

        """
        # Use the provided coordinates if found.
        if "position" in list(action["args"]):
            # Get coords.
            coords = action["args"]["position"]
            x, y, z = coords["x"], coords["y"], coords["z"]
            # Get the tip's info. It must contain "length".
            next_tip = action["args"]["tip"]
            # Ensure required information about the loaded tip.
            try:
                assert next_tip.get("length", None) is not None
                # TODO: The "maxVolume" property is in the container definition.
                #       Redesing this code using jsondb or nodb to regain compatibility with this "platformless" mode.
                assert next_tip.get("maxVolume", None) is not None
                assert next_tip.get("volume", None) is not None
            except AssertionError as e:
                msg = f"The tip from an action is missing required properties: {pformat(action)}"
                raise ProtocolError(msg) from e
            # Save contextual information to the tip.
            next_tip["fromItem"] = None

        # Else lookup the coordinates by tip-rack platform.
        else:
            # Use the builder's method.
            next_tip, x, y, z = self.controller.builder.getContentCoords(
                # Get any next tip from the item in the action.
                action, allow_next=True,
                # Remove the item from the local tracker, but not from the DB.
                pop_from_item=True, pop_from_db=False
                )
            # TODO: Deprecate "getNextTip".
            # TODO: Check and tune Z positioning according to tip seal pressure.
            #       This will need calibration.

            # Get and set the length of the tip if missing.
            if next_tip.get("length", None) is None:
                # Get information about the loaded tip from the platform.
                container_name = next_tip["container"]
                container = self.controller.database_tools.getContainerByName(container_name)
                next_tip["length"] = container["length"]

            # Save contextual information to the tip.
            next_tip["fromItem"] = action["args"]["item"]

        # Save contextual information to the tip
        next_tip["fromAction"] = action_index
        next_tip["inTool"] = self.builder.current_tool

        # Return the tip and its spatial location.
        return next_tip, x, y, z

    #### Liquid handling correction functions ####

    def capillary_corr(self, final_vol):
        """
        Define reciprocal function.
        Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above).
        """
        params = self.pipette.tension_correction

        start_vol = params["start_vol"]
        max_correction = params["max_correction"]

        # Must be negative to dispense volume
        capillary_extra_vol = -abs(-final_vol * max_correction / start_vol + max_correction)

        # Cap maximum correction volume
        correction = max([-max_correction, capillary_extra_vol])

        return correction
        # return -abs((-1/(21-x) + 0.04761905) * (2/0.952381))  # reciprocal attempt, KISS!

WARNING EXPERIMENTAL

THIS CLASS IS MEANT TO TAKE OVER ALL PIPETTING RELATED TASKS INCLUDING:

  • LOADING PIPETTE TOOLS
  • GENERATING GCODE
  • EXECUTING GCODE FOR THOSE ACTIONS (MAYBE)

Ancestors

Class variables

var builder : GcodeBuilder
var controller : Controller
var load_liquid_cmd
var mix_cmd
var pipettes : dict
var pour_liquid_cmd
var verbose

Instance variables

prop pipette : PipetteTool
Expand source code
@property
def pipette(self) -> PipetteTool:
    """Return a pipette, by the name of the current tool in the builder."""
    return self.pipettes[self.builder.current_tool]

Return a pipette, by the name of the current tool in the builder.

Methods

def calcTipFitZ(self, content_top_z: float, next_tip: dict)
Expand source code
def calcTipFitZ(self, content_top_z: float, next_tip: dict):
    """Calculate absolute Z coordinates for initial approach and final fitting depth."""
    # Get the tip's properties, and the corresponding tip-stage info from the tool.
    tip_container = self.pipette.get_tip_container(next_tip)

    # Get the tip-stage offset that matches the new tip.
    tip_stage_offset = self.pipette.get_tip_stage_offset(tip_container)
    # Get the tip fit distance for the current tip-tool pair.
    tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container)
    # Extra distance to probe below and beyond the tip's active height.
    probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container)

    # Subtract the tip-stage offset.
    z_pre_fit = content_top_z - tip_stage_offset
    # NOTE: This "z_pre_fit" is now the initial coordinate before fitting, with or without probing.

    # Final calculation.
    z_final = z_pre_fit - tip_fit_distance - probe_extra_dist

    # Logs.
    logging.info(f"Tip fitting with: z={content_top_z}, tip_stage_offset={tip_stage_offset}, z_pre_fit={z_pre_fit}")
    logging.info(f"Tip fitting with: tip_fit_distance={tip_fit_distance}, probe_extra_dist={probe_extra_dist}")

    # Return both.
    return z_pre_fit, z_final

Calculate absolute Z coordinates for initial approach and final fitting depth.

def calcTipProbeDistZ(self, next_tip: dict)
Expand source code
def calcTipProbeDistZ(self, next_tip: dict):
    """Calculate max Z displacement for fitting a tip with probing.
    NOTE: This is a relative move.
    """
    # Get the tip's properties, and the corresponding tip-stage info from the tool.
    tip_container = self.pipette.get_tip_container(next_tip)

    # Get the tip fit distance for the current tip-tool pair.
    tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container)
    # Extra distance to probe below and beyond the tip's active height.
    probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container)

    # Total distance of the probing move.
    z_scan_dist = tip_fit_distance + probe_extra_dist

    return z_scan_dist

Calculate max Z displacement for fitting a tip with probing. NOTE: This is a relative move.

def calculate_mix_vol(self, action: dict, comments: list = None)
Expand source code
def calculate_mix_vol(self, action:dict, comments:list=None):
    """Calculate the mixing volume for a give mix action.

    Args:
        action (dict): Mix action.
        comments (list, optional): List of gcode comments to extend. Defaults to None.

    Raises:
        ProtocolError: _description_

    Returns:
        _type_: _description_
    """
    if comments is None:
        comments = []

    # Get the tube content and its coordinates.
    tube, _, _, _ = self.builder.getTubeAndCoords(action)

    # Get tube volume.
    # TODO: If the tube received volume during this protocol the updated volume is not here.
    #       See: https://gitlab.com/pipettin-bot/pipettin-grbl/-/issues/109
    tube_volume = tube["volume"]
    # Get pipette volume.
    pipette_volume = self.pipette['vol_max']
    # Get tip volume.
    tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"])
    max_tip_volume = tip_container["maxVolume"]
    # TODO: Check if the tip already has volume.
    # max_tip_volume -= self.pipette.tip_loaded.get("volume", 0)

    # Calculate maximum volume limit.
    logging.info(f"Current volume limits: tube_volume={'<TODO: limit disabled until tracked>'}, pipette_volume={pipette_volume}, max_tip_volume={max_tip_volume}")
    # TODO: Add "tube_volume" here once volume tracking is implemented.
    max_volume_limit = min([max_tip_volume, pipette_volume])

    # Calculate volume limited to max pipette tool volume
    of = action["args"]["of"]  # Either "tip" or tube "content".
    mix_percent = action["args"]["percentage"]
    if of == 'content':
        # Logs.
        logging.info(f"Mixing {mix_percent}% of the tube volume.")
        comments = [self.gcode.comment("MIX action by tube volume fraction.")]
        # Calculate mixing volume
        mix_vol = tube_volume * (mix_percent / 100)
    elif of == 'tip':
        # Logs.
        logging.info(f"Mixing {mix_percent}% of the tip volume.")
        comments += [self.gcode.comment("MIX action by tip volume fraction.")]

        # Calculate mixing volume
        mix_vol = max_tip_volume * (mix_percent / 100)
    else:
        raise ProtocolError(f"Can't parse mixing action with 'of'={of} at action: {action}")

    # Check for limits.
    if mix_vol > max_volume_limit:
        # Log.
        msg = f"Limiting mix volume to {max_volume_limit}. The initial volume {mix_vol} exceeded the limits."
        logging.warning(msg)
        comments += [self.gcode.comment(msg)]
        # Override.
        mix_vol = max_volume_limit

    # Logs.
    logging.info(f"Mixing volume {mix_vol} uL.")
    comments += [self.gcode.comment(f"MIX action with {mix_vol} uL.")]

    return mix_vol

Calculate the mixing volume for a give mix action.

Args

action : dict
Mix action.
comments : list, optional
List of gcode comments to extend. Defaults to None.

Raises

ProtocolError
description

Returns

_type_
description
def capillary_corr(self, final_vol)
Expand source code
def capillary_corr(self, final_vol):
    """
    Define reciprocal function.
    Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above).
    """
    params = self.pipette.tension_correction

    start_vol = params["start_vol"]
    max_correction = params["max_correction"]

    # Must be negative to dispense volume
    capillary_extra_vol = -abs(-final_vol * max_correction / start_vol + max_correction)

    # Cap maximum correction volume
    correction = max([-max_correction, capillary_extra_vol])

    return correction
    # return -abs((-1/(21-x) + 0.04761905) * (2/0.952381))  # reciprocal attempt, KISS!

Define reciprocal function. Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above).

def check_volume_limit(self, volume, tolerance: float = 0.0)
Expand source code
def check_volume_limit(self, volume, tolerance:float=0.0):
    """Volume limit check for the current tip and tool."""
    # self.check_tip_volume(volume)
    tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"])
    max_volume = self.pipette.get_max_volume(tip_container)
    current_volume = self.pipette.tip_loaded.get("volume", 0.0)
    final_volume = current_volume + volume
    if final_volume <= current_volume:
        # TODO: Reconsider passing this check if the volume is decreasing (i.e. dispensing).
        pass
    if final_volume > max_volume + tolerance:
        msg = f"The requested final volume ({final_volume}) exceeds the tool's capacity of {max_volume} uL for tip {tip_container['name']}."
        raise ProtocolError(msg)
    return max_volume, final_volume

Volume limit check for the current tip and tool.

def compensate_backlash(self, volume: float, backlash_correction: float, action: dict = None, i: int = None)
Expand source code
def compensate_backlash(self, volume: float, backlash_correction: float, action:dict=None, i:int=None):
    """Backlash compensation"""
    # If previous and current directions are different
    last_dir_sign = self.builder.sign(self.pipette.state["lastdir"])
    new_dir_sign = self.builder.sign(volume)
    # TODO: be smarter about backlash (?).
    backlash_correction_vol = 0.0
    if (last_dir_sign * new_dir_sign) == -1:
        # Then over-pour (negative volume, -0.5 uL for p200) or over-draw (positive volume, 0.5 uL for p200)
        backlash_correction_vol = new_dir_sign * backlash_correction
        # Comment GCODE.
        message = f"Action {i}: pipetting extra {backlash_correction_vol} uL on backlash correction."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])
        # Move without corrections.
        self.displace_volume(volume = backlash_correction_vol,
                             action = action)

    return backlash_correction_vol

Backlash compensation

def compensate_backpour(self, action: dict = None, i: int = None)
Expand source code
def compensate_backpour(self, action:dict=None, i:int=None):
    """Back-pour correction
    Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube.
    This corrects inaccuracies in the first dispense of a series.
    """

    # Get parameters from the tool
    back_pour_correction = self.pipette["back_pour_correction"]

    # Over-draw
    over_draw_volume = abs(back_pour_correction) # positive, force loading

    # Logs.
    message = f"Action {i}: applying {over_draw_volume} uL over-draw volume correction."
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Apply the correction.
    self.displace_volume(over_draw_volume, action)

    # Back-pour
    back_pour_volume = -abs(back_pour_correction)   # negative, force dispensing

    # Logs.
    message = f"Action {i}: applying {back_pour_volume} uL back-pour volume correction."
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Apply the correction.
    self.displace_volume(back_pour_volume, action)

    return over_draw_volume, back_pour_volume

Back-pour correction Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube. This corrects inaccuracies in the first dispense of a series.

def compensate_extra_draw(self, extra_draw_volume, action: dict = None, i: int = None)
Expand source code
def compensate_extra_draw(self, extra_draw_volume, action:dict=None, i:int=None):
    """First draw compensation.
    If the tip is new/empty, the liquid may have a harder time "entering" the dry tip.
    Correct by over-drawing by a specified amount (e.g. 5 uL for the p200).
    """

    # Comment GCODE.
    message = f"Action {i}: over-drawing by {extra_draw_volume} uL on first draw."
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Move without corrections.
    self.displace_volume(volume = extra_draw_volume, action = action)

    return extra_draw_volume

First draw compensation. If the tip is new/empty, the liquid may have a harder time "entering" the dry tip. Correct by over-drawing by a specified amount (e.g. 5 uL for the p200).

def compensate_first_draw(self, volume: float, action: dict = None, i: int = None)
Expand source code
def compensate_first_draw(self, volume: float, action:dict=None, i:int=None):
    """First draw compensation.
    Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume).
    """

    # Limit pre-wetting to 120% of move volume or max volume.
    correction_volume = min(abs(self.pipette.max_volume), volume*1.2)

    # Comment GCODE.
    message = f"Action {i}: pre-wetting the tip by pipetting up-and-down {correction_volume} uL on first draw."
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Apply the correction by calling this very method,
    # by pipetting up and down (i.e. pre-wetting the tip).
    for direction in [1.0, -1.0]:
        # Move without corrections.
        self.displace_volume(volume = direction * correction_volume,
                             action = action)

    return correction_volume

First draw compensation. Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume).

def compensate_low_volume(self, volume: float, action: dict = None, i: int = None)
Expand source code
def compensate_low_volume(self, volume: float, action:dict=None, i:int=None):
    """Compensation for capillary effects for low final tip volume during dispensing."""
    # Apply only for regular dispensing moves.
    # Apply correction if necessary: only at low final volumes.
    correction_start_vol = self.pipette.tension_correction["start_vol"]

    # Get the current tip volume (i.e. from the previous action).
    final_volume = self.pipette.state["vol"]

    # Add the new [corrected] volume to the local pipette volume tracker (axis position in uL).
    final_volume += volume

    # Subtract the "back-pour correction" volume if this is a first draw,
    # forcing negative volume (for dispensing). Though this "correction" is
    # applied later on, it must be considered here, because it will change
    # the final volume.
    # TODO: disabled this bit for now, since this correction is meant for
    #       simple dispense moves only.
    # if first_draw and tip_priming_correction and back_pour_correction:
    #     final_volume += -abs(self.pipette["back_pour_correction"])

    # Default correction should be null.
    capillary_extra_vol = 0.0

    # Apply the correction.
    if final_volume < correction_start_vol:
        # Compute the "extra" volume that must be dispensed.
        capillary_extra_vol = self.capillary_corr(final_volume)

        # Comment GCODE
        message = f"Action {i}: final volume {final_volume} uL requires low-volume compensation. Dispensing extra {capillary_extra_vol} uL."
        logging.info(message)
        self.builder.extend_commands([self.gcode.comment(message)])

        # Apply correction.
        # Move without corrections.
        self.displace_volume(volume = capillary_extra_vol,
                             action = action)
    else:
        # Comment GCODE
        message = f"Action {i}: final volume {final_volume} uL does not require low-volume compensation."
        logging.debug(message)
        self.builder.extend_commands([self.gcode.comment(message)])

    return capillary_extra_vol

Compensation for capillary effects for low final tip volume during dispensing.

def compensate_underdraw(self, action: dict = None, i: int = None)
Expand source code
def compensate_underdraw(self, action:dict=None, i:int=None):
    """Under-draw correction
    Correction for aspiration moves that draw less volume than expected,
    increasing the loaded volume by a fixed amount of microliters.
    This applies to all moves.
    """

    # Get parameters from the tool
    under_draw_correction = self.pipette["under_draw_correction"]

    # Over-draw
    vol_correction = abs(under_draw_correction) # positive, force loading

    # Logs.
    message = f"Action {i}: applying {vol_correction} uL under-draw volume correction."
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Apply the correction.
    self.displace_volume(vol_correction, action)

Under-draw correction Correction for aspiration moves that draw less volume than expected, increasing the loaded volume by a fixed amount of microliters. This applies to all moves.

def displace_volume(self, volume: float, action: dict, check_tolerance: float = 100.0)
Expand source code
def displace_volume(self, volume: float, action: dict, check_tolerance:float=100.0):
    """Compute axis displacement from the target pipetting volume.
    Default pipetting mode should be incremental (i.e. relative) volume.

    Args:
        volume (float): Pipetting volume in microliters.
        action (dict): Action object, optionally containing a pipetting "mode" string in its "args".
    """
    # TODO: De-hardcode the tolerance set above. The issue here is that corrections move the
    #       shaft to volumes outside the limit of the tip, which may be a bad idea (e.g. because 
    #       there may be a filter) or of the tool's limit for the tip.

    # Volume limit check for the current tip.
    self.check_volume_limit(volume, tolerance=check_tolerance)

    # Convert volume to displacement.
    mode = action["args"].get("mode", "incremental")
    displacement = self.volume_to_displacement(volume, mode=mode)

    # Generate GCODE command for the move.
    # TODO: Add pipetting speed control.
    # TODO: Volume sign INVERTED here, be smarter about the direction.
    command = self.gcode.G1(
        e=-displacement,
        absolute=False, absolute_e=False,
        f=self.pipette.feedrate)

    # Add the GCODE command to the list.
    self.builder.extend_commands(commands=command, action=action)

    # Update state.
    self.update_pipette_state(volume, displacement)

    return displacement

Compute axis displacement from the target pipetting volume. Default pipetting mode should be incremental (i.e. relative) volume.

Args

volume : float
Pipetting volume in microliters.
action : dict
Action object, optionally containing a pipetting "mode" string in its "args".
def fast_safe_height(self, current_action: dict, previous_action: dict = None)
Expand source code
def fast_safe_height(self, current_action:dict, previous_action:dict=None):
    """Try optimising clearance for a pipetting move within the same platform item.
    The objective is to speed-up repetition pipetting within the same platform.
    """
    # Previous action.
    if previous_action is None:
        previous_action = self.builder.previous_action
    check_last, last_item_name = self.fast_safe_height_action_check(previous_action)
    # Current action.
    check_next, next_item_name = self.fast_safe_height_action_check(current_action)

    # Items to optimize by.
    item_names = []

    # Decide wether to optimize or not.
    if check_last and check_next and (last_item_name == next_item_name):
        logging.info(f"Optmizing clearance move to safe height within item: '{next_item_name}'")
        item_names.append(next_item_name)

    return self.builder.macroMoveSafeHeight(item_names=item_names)

Try optimising clearance for a pipetting move within the same platform item. The objective is to speed-up repetition pipetting within the same platform.

def fast_safe_height_action_check(self, action: dict)
Expand source code
def fast_safe_height_action_check(self, action:dict):
    """Check that an action is compatible with a fast safe height check."""

    # List of eligible commands.
    fast_sh_eligible_cmds = [self.pour_liquid_cmd, self.load_liquid_cmd, self.mix_cmd]
    # Check that the commands are reasonable to optmize.
    cmd_check = action and action.get("cmd", None) in fast_sh_eligible_cmds

    # Check that the actions are platform-based, and point to an item.
    args_check = "item" in action["args"]

    # Try getting item_name from the action.
    item_name = action.get("args", {}).get("item", None)
    # Check that the origin and destination item is the same one.
    item_check = item_name is not None

    # Return the digested check, and the item name.
    return cmd_check and args_check and item_check, item_name

Check that an action is compatible with a fast safe height check.

def getNextTip(self,
platform_item: dict,
selector: dict,
pop_from_item=True,
workspace_name: str = None,
pop_from_db=False)
Expand source code
def getNextTip(self,
               platform_item:dict,
               selector: dict,
               pop_from_item=True,
               workspace_name:str=None,
               pop_from_db=False):
    """
    Get the next tip and delete it from the local "platform_item" object.
    The object in the database is not affected.
    """
    return self.controller.database_tools.getNextContent(
        workspace_name=workspace_name,
        platform_item=platform_item,
        selector=selector,
        pop_from_item=pop_from_item,
        pop_from_db=pop_from_db
    )

Get the next tip and delete it from the local "platform_item" object. The object in the database is not affected.

def getTipAndCoords(self, action, action_index)
Expand source code
def getTipAndCoords(self, action, action_index):
    """Get a tip object and its coordinates.

    Uses information from the platform, the item, the content, and the container to get the XYZ of a tip.

    The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip.

    This function does not apply tool offsets.

    Args:
        action (dict): Protocol action definition.
        action_index (int): Action index.

    Returns:
        tuple: Tuple with the tip object, and the xyz coordinates.

    Example "tip":  'index': 96,
                    'maxVolume': 160,
                    'name': 'tip96',
                    'position': {'col': 12, 'row': 8},
                    'tags': [],
                    'type': 'tip',
                    'volume': 0

    The "tip" above will be fetched from the database when the action's "args" are of "item" type:
        'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'},
        'cmd': 'PICK_TIP'

    The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly:
        'args': {'position': {"x": 20, "y": 200, "z": 30},
                 'tool': 'P200',
                 'tip': tip_definition},  # Note the tip data passed here (described below).
        'cmd': 'PICK_TIP'

    Example "tip_definition" for "position" type args:
        'maxVolume': 160,
        'length': 50.0,
        'volume': 0

    """
    # Use the provided coordinates if found.
    if "position" in list(action["args"]):
        # Get coords.
        coords = action["args"]["position"]
        x, y, z = coords["x"], coords["y"], coords["z"]
        # Get the tip's info. It must contain "length".
        next_tip = action["args"]["tip"]
        # Ensure required information about the loaded tip.
        try:
            assert next_tip.get("length", None) is not None
            # TODO: The "maxVolume" property is in the container definition.
            #       Redesing this code using jsondb or nodb to regain compatibility with this "platformless" mode.
            assert next_tip.get("maxVolume", None) is not None
            assert next_tip.get("volume", None) is not None
        except AssertionError as e:
            msg = f"The tip from an action is missing required properties: {pformat(action)}"
            raise ProtocolError(msg) from e
        # Save contextual information to the tip.
        next_tip["fromItem"] = None

    # Else lookup the coordinates by tip-rack platform.
    else:
        # Use the builder's method.
        next_tip, x, y, z = self.controller.builder.getContentCoords(
            # Get any next tip from the item in the action.
            action, allow_next=True,
            # Remove the item from the local tracker, but not from the DB.
            pop_from_item=True, pop_from_db=False
            )
        # TODO: Deprecate "getNextTip".
        # TODO: Check and tune Z positioning according to tip seal pressure.
        #       This will need calibration.

        # Get and set the length of the tip if missing.
        if next_tip.get("length", None) is None:
            # Get information about the loaded tip from the platform.
            container_name = next_tip["container"]
            container = self.controller.database_tools.getContainerByName(container_name)
            next_tip["length"] = container["length"]

        # Save contextual information to the tip.
        next_tip["fromItem"] = action["args"]["item"]

    # Save contextual information to the tip
    next_tip["fromAction"] = action_index
    next_tip["inTool"] = self.builder.current_tool

    # Return the tip and its spatial location.
    return next_tip, x, y, z

Get a tip object and its coordinates.

Uses information from the platform, the item, the content, and the container to get the XYZ of a tip.

The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip.

This function does not apply tool offsets.

Args

action : dict
Protocol action definition.
action_index : int
Action index.

Returns

tuple
Tuple with the tip object, and the xyz coordinates.

Example "tip": 'index': 96, 'maxVolume': 160, 'name': 'tip96', 'position': {'col': 12, 'row': 8}, 'tags': [], 'type': 'tip', 'volume': 0

The "tip" above will be fetched from the database when the action's "args" are of "item" type: 'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'}, 'cmd': 'PICK_TIP'

The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly: 'args': {'position': {"x": 20, "y": 200, "z": 30}, 'tool': 'P200', 'tip': tip_definition}, # Note the tip data passed here (described below). 'cmd': 'PICK_TIP'

Example "tip_definition" for "position" type args: 'maxVolume': 160, 'length': 50.0, 'volume': 0

def get_pipette_tools_from_db(self)
Expand source code
def get_pipette_tools_from_db(self):
    """Read tools from the DB and keep only the ones with a matching type."""
    tools: list = self.controller.database_tools.tools
    pipettes_data = {t["name"]: t for t in tools if t["type"] == TOOL_TYPE}
    logging.info(f"Found these pipettes in the DB: {list(pipettes_data)}")
    return pipettes_data

Read tools from the DB and keep only the ones with a matching type.

def guess_pipette(self,
volume: float,
tip_container: dict = None,
repeats: int = 1,
prioritize_accuracy=True)
Expand source code
def guess_pipette(self, volume:float, tip_container:dict=None, repeats:int=1, prioritize_accuracy=True):
    """Guess which tool is better for a given volume and number of serial repeats.
    By default, accuracy will be prioritized (i.e. less dispenses per load), unless
    'prioritize_accuracy' is set to False.
    """
    # Get the maximum volume for the given tip.
    tool_limits = [(p.get_max_volume(tip_container), p.name) for p in self.pipettes.values()]
    # Calculate how many repeats can be done with each pipette.
    # Only keep the ones with enough volume.
    tool_repeats = [(limit // volume, tool) for limit, tool in tool_limits if limit // volume >= repeats]
    if prioritize_accuracy is True:
        # Calculate the smaller number of repeats between tools.
        min_repeats = min(*[reps for reps, tool in tool_repeats])
        # Choose the tool that allows the smaller number of repeats.
        # This will prioritize precision over speed.
        min_repeats_tool = next(tool for reps, tool in tool_repeats if reps == min_repeats)
        return min_repeats_tool
    else:
        # Calculate the greater number of repeats between tools.
        max_repeats = max(*[reps for reps, tool in tool_repeats])
        # Choose the tool that allows the larger number of repeats.
        # This will prioritize speed over precision.
        max_repeats_tool = next(tool for reps, tool in tool_repeats if reps == max_repeats)
        return max_repeats_tool

Guess which tool is better for a given volume and number of serial repeats. By default, accuracy will be prioritized (i.e. less dispenses per load), unless 'prioritize_accuracy' is set to False.

def load_pipette(self, pipette_data)
Expand source code
def load_pipette(self, pipette_data):
    """Instantiate and save one PipetteTool from a parameter set.
    This is a tool loader function for GcodeBuilder.
    """
    pipette_name = pipette_data["name"]
    logging.info(f"Loading pipette '{pipette_name}'.")

    # Check if the tool is already registered locally.
    if pipette_name in self.pipettes:
        msg = f"Pipette '{pipette_name}' is already in the pipette handler's list."
        logging.error(msg)
        raise DataError(msg)

    # Load the pipette.
    try:
        # The PipetteTool instances  will register themselves a tool in the builder.
        tool = PipetteTool(pipette_data, self.builder)
        self.pipettes[pipette_name] = tool
    except Exception as e:
        msg = f"Failed to initialize tools: {e}"
        logging.error(msg)
        raise CommanderError(msg) from e

    return tool

Instantiate and save one PipetteTool from a parameter set. This is a tool loader function for GcodeBuilder.

def load_pipettes(self, pipettes_data)
Expand source code
def load_pipettes(self, pipettes_data):
    """Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'."""
    # NOTE: commenting out the defaults for now.
    # if not self.pipettes_data:
    #     self.pipettes_data = deepcopy(PIPETTES)
    for pipette_name, pipette_data in pipettes_data.items():
        # Instantiate all pipette tools.
        logging.info(f"Initializing pipette '{pipette_name}'.")
        self.load_pipette(pipette_data)

Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'.

def macroDiscardTip(self, action, i, operation_time=20.0)
Expand source code
def macroDiscardTip(self, action, i, operation_time=20.0):
    """Move over the trash box or eject-post and discard the tip (if placed, else Except).

    Action moves:
        - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools).
        - Move to the ejection site and eject the tip.
        - Home the tool.

    Args:
        action (dict): The action definition. Contents not used.
                        The coordinates are loaded from the tool/pipette definition.
        i (int): The index of the action. Not used.

    Raises:
        Exception: An error is raised when there are no tips to discard.

    Returns:
        str: Generic message.

    Sample action:
        {
            "cmd": "DISCARD_TIP",
            "args": {
                "item": "descarte 1"
            }
        }
    """

    # TODO: Add alternate method of ejecting tips against an ejector pad (custom pipette).
    #       - Add "can eject" property and decide based on that.
    #       - Add "ejector pad" properties to tools.

    if self.pipette.tip_loaded:
        # Move to clearance level on Z.
        # Should extend task.commmand list automatically.
        self.builder.macroMoveSafeHeight()

        if self.pipette.can_eject:
            item_name = action["args"]["item"]
            # Get the item's XYZ.
            logging.info("Ejecting tip using integrated ejector.")
            x, y, z = self.builder.getItemXYZ(item_name)

            # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not.
            # Move over the ejection site initial XY coordinates.
            self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.")
            # Move over the ejection site initial Z coordinate.
            self.builder.macroGoToXYZ(z=z, add_comment="Initial Z approach to eject zone.")
        else:
            # Get the ejector post's initial coordinates.
            logging.info("Ejecting tip using post.")
            x, y, z = self.pipette.eject_post_init_coords

            # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not.
            # Move over the ejection site initial XY coordinates.
            self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.")
            # Move over the ejection site initial Z coordinate.
            # NOTE: "add_offset=False" is needed in this case.
            self.builder.macroGoToXYZ(z=z, add_offset=False, add_comment="Initial Z approach to eject zone.")

        # Generate GCODE for the actual ejection sequence.
        eject_commands = self.pipette.eject_tip()
        self.builder.extend_commands(eject_commands)

        # Raise to safety.
        self.builder.macroMoveSafeHeight()

        # Ensure zeroed pipette axis
        # self.macroPipetteZero()
        # NOTE: replace macroPipetteZero with extruder homing command.
        self.macroPipetteHome(which_tool=self.builder.current_tool)
        # TODO: reimplement non-homing zeroing once the underlying Klipper issue is resolved:
        #       Sending "M82" and moving the extruder to 0 does not set the position to 0.
        #       Check if this is due to absolute extruder positioning not taking effect.

    else:
        # TODO: discutir si es mejor tirar este error o no hacer nada
        raise ProtocolError(f"Cannot discard tip if one is not already placed! Action index: {i}")

    # Extend the operation time.
    self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

    return f"Processed tip discard with index {i}"

Move over the trash box or eject-post and discard the tip (if placed, else Except).

Action moves: - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools). - Move to the ejection site and eject the tip. - Home the tool.

Args

action : dict
The action definition. Contents not used. The coordinates are loaded from the tool/pipette definition.
i : int
The index of the action. Not used.

Raises

Exception
An error is raised when there are no tips to discard.

Returns

str
Generic message.

Sample action: { "cmd": "DISCARD_TIP", "args": { "item": "descarte 1" } }

def macroGoToAndMix(self, action, i, operation_time=20.0)
Expand source code
def macroGoToAndMix(self, action, i, operation_time=20.0):
    """Parse a MIX action and generate GCODE.
    Homogenize a solution. Possibly useful for rinsing tips before a transfer.

    Sample MIX action: {
        'args': {
            'count': 3,
            'item': '5x16_1.5_rack 1',
            'of': 'content',  # Can also be 'of': "tip"
            'percentage': 50,
            'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector'
            'selector': {'by': 'name', 'value': 'tube1'}},
        'cmd': 'MIX'
    }

    Sample "tip":
        {
            "index": 1,
            "maxVolume": 160,
            "name": "tip1",
            "position": {
            "col": 1,  # Or: "x": 10?
            "row": 1   # Or: "y": 30?
            },
            "tags": [],
            "type": "tip",
            "volume": 0
        }

    Note that the tool must have been set by a previous tip pickup action.
    """

    logging.info("Parsing MIX action.")

    if not self.pipette.tip_loaded:
        # TODO: discutir si es mejor descartar el tip
        #  si está automáticamente o tirar este error
        raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}")

    # Move to clearance level on Z.
    self.fast_safe_height(action)

    # Zero the pipette axis.
    self.macroPipetteZero(action, i)

    # Move over the target content and downward into the tube.
    self.builder.macroGoTo(action, i)

    # Execute the mix.
    self.macroMix(action, operation_time=operation_time)

    # Back-up.
    self.fast_safe_height(action, action)

    logging.info("Done parsing MIX action.")

    return f"Processed MIX action with index {i}."

Parse a MIX action and generate GCODE. Homogenize a solution. Possibly useful for rinsing tips before a transfer.

Sample MIX action: { 'args': { 'count': 3, 'item': '5x16_1.5_rack 1', 'of': 'content', # Can also be 'of': "tip" 'percentage': 50, 'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector' 'selector': {'by': 'name', 'value': 'tube1'}}, 'cmd': 'MIX' }

Sample "tip": { "index": 1, "maxVolume": 160, "name": "tip1", "position": { "col": 1, # Or: "x": 10? "row": 1 # Or: "y": 30? }, "tags": [], "type": "tip", "volume": 0 }

Note that the tool must have been set by a previous tip pickup action.

def macroGoToAndPipette(self, action, i, operation_time=20.0)
Expand source code
def macroGoToAndPipette(self, action, i, operation_time=20.0):
    """Move to a tube and load liquid.
    LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette.
    Will throw an error if a tip is not already loaded.

    Sample action:
        {
            "cmd": "LOAD_LIQUID",
            "args": {
                "item": "5x16_1.5_rack 1",
                "selector": {"by": "name", "value": "tube1"},
                "volume": 100,  # in microliters
                "tool": "P200"
            }
        }

    Likely platformless action:
        {
            "cmd": "LOAD_LIQUID",
            "args": {
                "content": {"volume": 200.0},
                "volume": 100,  # in microliters
                "tool": "P200"
            }
        }

    Primary move sequence:
    1. move to clearance level on Z
    2. move to next tube location on XY
    3. move to Z at loading height
    4. load liquid
    5. move to clearance level on Z
    """

    if self.pipette.tip_loaded:
        # Move to clearance level on Z.
        self.fast_safe_height(action)

        # Move over the target content and downward into the tube.
        self.builder.macroGoTo(action, i)

        # Actuate the pipette tool.
        self.macroPipette(action, i)

        # Move to clearance level on Z.
        self.fast_safe_height(action, action)

    else:
        # TODO: discutir si es mejor descartar el tip
        #  si está automáticamente o tirar este error
        raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}")

    # Extend the operation time.
    self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

    return f"Processed macroGoToAndPipette action with index {i}"

Move to a tube and load liquid. LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette. Will throw an error if a tip is not already loaded.

Sample action: { "cmd": "LOAD_LIQUID", "args": { "item": "5x16_1.5_rack 1", "selector": {"by": "name", "value": "tube1"}, "volume": 100, # in microliters "tool": "P200" } }

Likely platformless action: { "cmd": "LOAD_LIQUID", "args": { "content": {"volume": 200.0}, "volume": 100, # in microliters "tool": "P200" } }

Primary move sequence: 1. move to clearance level on Z 2. move to next tube location on XY 3. move to Z at loading height 4. load liquid 5. move to clearance level on Z

def macroGoToAndPour(self, action, i, operation_time=20.0)
Expand source code
def macroGoToAndPour(self, action, i, operation_time=20.0):
    """Move to a tube and dispense liquid.
    Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette".
    """
    action["args"]["volume"] = -abs(float(action["args"]["volume"]))  # Force negative volume

    ret_msg = self.macroGoToAndPipette(action, i, operation_time=operation_time)

    return ret_msg

Move to a tube and dispense liquid. Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette".

def macroMix(self, action: dict, operation_time=0.0)
Expand source code
def macroMix(self, action: dict, operation_time=0.0):
    """Execute pipetting moves for a mix action.

    Args:
        action (dict): _description_
        operation_time (float, optional): _description_. Defaults to 0.0.
    """

    # Comments for GCODE.
    comments = []

    # Calculate the mixing volume.
    mix_vol = self.calculate_mix_vol(action, comments)

    # Extend commands with comments.
    self.builder.extend_commands(comments)

    # Prepare minimal pipetting action dict
    mix_count = int(action["args"]["count"])

    # Calculate volume and increments.
    factor = 0.2 # fraction of volume that will be used for increasing mixing
    vol = mix_vol * (1 - factor)  # initial volume for mixing
    vol_increment = mix_vol * (factor / mix_count)

    # Do the mixing
    for i in range(mix_count):
        # Add time to the total operatin time.
        # TODO: Link this to the flow speed and total volume.
        operation_time += operation_time

        # Logs.
        logging.debug(f"Generating mix move {i} of {mix_count}.")

        # Load
        mix_action_up = {"args": {"volume": vol}}  # in microliters
        self.macroPipette(mix_action_up, i)
        self.builder.extend_commands(mix_action_up.get("GCODE", []))

        # NOTE: The dropped volume will be greater than the loaded, to
        # avoid having unmixed solution remainder in the tip. In the final
        # iteration, the dropped volume will be the one in the action.
        vol += vol_increment

        # Drop
        mix_action_dw = {"args": {"volume": -vol}}  # in microliters
        self.macroPipette(mix_action_dw, i)
        self.builder.extend_commands(mix_action_dw.get("GCODE", []))

    # Extend the operation time.
    self.controller.machine.update_exec_opts(
        action=action, timeout=operation_time, add_timeout=True
    )

Execute pipetting moves for a mix action.

Args

action : dict
description
operation_time : float, optional
description. Defaults to 0.0.
def macroPickNextTip(self, action, i, operation_time=20.0)
Expand source code
def macroPickNextTip(self, action, i, operation_time=20.0):
    """Move to clearance height and fit a tip.

    Will throw an error if a tip is already loaded.

    Sample platform-based action:
        {
            "cmd": "PICK_TIP",
            "args": {
                "item": "200ul_tip_rack_MULTITOOL 1",
                "tool": "P200"
            }
        }

    Sample coordinate-based (platformless) action:
        {
            "cmd": "PICK_TIP",
            "args": {
                "position": {"x": 10.5,"y": 20.5,"z": 0.5},
                "tool": "P200",
                "tip": {"length": 50.0, "maxVolume": 200, "volume": 0}
            }
        }
    """

    if not self.pipette.tip_loaded:
        # Move to clearance level on Z, extends task.command list automatically
        self.builder.macroMoveSafeHeight()

        # Get tip and coords (supports "position" key specification).
        next_tip, tip_x, tip_y, tip_z = self.getTipAndCoords(action, i)

        # Generate GCODE to pick up the tip.
        self.pickNextTip(next_tip, tip_x, tip_y, tip_z)

        # Raise to safety.
        self.builder.macroMoveSafeHeight()
    else:
        # TODO: discutir si es mejor descartar el tip
        #  si está automáticamente o tirar este error
        raise ProtocolError(f"Cannot pick tip if one is already placed! Action index: {i}")

    # Extend the operation time.
    self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True)

    return action

Move to clearance height and fit a tip.

Will throw an error if a tip is already loaded.

Sample platform-based action: { "cmd": "PICK_TIP", "args": { "item": "200ul_tip_rack_MULTITOOL 1", "tool": "P200" } }

Sample coordinate-based (platformless) action: { "cmd": "PICK_TIP", "args": { "position": {"x": 10.5,"y": 20.5,"z": 0.5}, "tool": "P200", "tip": {"length": 50.0, "maxVolume": 200, "volume": 0} } }

def macroPipette(self,
action: dict,
i: int,
backlash_correction: float = 0.0,
tip_priming_correction: bool = False,
back_pour_correction: bool = 0.0,
under_draw_correction: float = 0.0,
extra_draw_volume: float = 0.0,
capillary_correction: bool = False)
Expand source code
def macroPipette(
    self,
    action:dict, i:int,
    backlash_correction:float=0.0,
    tip_priming_correction:bool=False,
    back_pour_correction:bool=0.0,
    under_draw_correction:float=0.0,
    extra_draw_volume:float=0.0,
    capillary_correction:bool=False):
    """
    Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes).
    Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py).
    """
    # Get the target volume.
    volume = action["args"]["volume"]

    # Flag indicating if this is a draw or dispense move.
    is_drawing = self.builder.sign(volume) > 0

    # Comment GCODE
    message = f"Calculating pipetting move for {volume} uL volume for action {i}"
    message += f", using tool '{self.builder.current_tool}'."
    logging.info(message)

    # Check if the tip is new/empty.
    first_draw = self.pipette.state["tip_vol"] is None
    logging.debug(f"Computed move properties: is_drawing={is_drawing} first_draw={first_draw} (tip_vol={self.pipette.state['tip_vol']})")
    self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action)

    # Read options from action arguments.
    # Prioritize the arguments, then the config, and defaulting to arguments if not there.
    backlash_correction = backlash_correction or float(self.pipette.get("backlash_correction", backlash_correction))
    tip_priming_correction = tip_priming_correction or bool(self.pipette.get("tip_priming_correction", tip_priming_correction))
    back_pour_correction = back_pour_correction or float(self.pipette.get("back_pour_correction", back_pour_correction))
    under_draw_correction = under_draw_correction or float(self.pipette.get("under_draw_correction", under_draw_correction))
    extra_draw_volume = extra_draw_volume or float(self.pipette.get("extra_draw_volume", extra_draw_volume))
    capillary_correction = capillary_correction or bool(self.pipette.get("capillary_correction", capillary_correction))
    # Log the corrections.
    logging.debug(f"Corrections: backlash_correction={backlash_correction}, tip_priming_correction={tip_priming_correction}, back_pour_correction={back_pour_correction}, under_draw_correction={under_draw_correction}, capillary_correction={capillary_correction}")

    # Compensate mechanical backlash.
    if backlash_correction:
        # Backlash compensation.
        self.compensate_backlash(volume, backlash_correction, action, i)

    # Compensate for tip-wetting issues.
    if is_drawing and first_draw:
        if tip_priming_correction:
            # Compensation for under-drawing on first load, by pre-wetting.
            self.compensate_first_draw(volume, action, i)
        if extra_draw_volume:
            # Compensation for under-drawing on first load, by over-drawing a bit.
            self.compensate_extra_draw(extra_draw_volume, action, i)

    # Compensation for capillary effects for low final tip volume during dispensing.
    # Apply only for regular dispensing moves.
    if capillary_correction and self.builder.sign(volume) == -1 and not first_draw:
        self.compensate_low_volume(volume, action, i)

    # Comment GCODE
    message = f"Pipetting {volume} uL for action {i}."
    logging.info(message)
    self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action)

    # Compute axis displacement from the target pipetting volume.
    # Default pipetting mode should be incremental (i.e. relative) volume.
    self.displace_volume(volume, action)

    # Under-draw correction
    if is_drawing and under_draw_correction:
        self.compensate_underdraw(action, i)

    # Back-pour correction
    if is_drawing and back_pour_correction:
        self.compensate_backpour(action, i)

    return f"Processed pipetting action with index {i}."

Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes). Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py).

def macroPipetteHome(self, which_tool='all')
Expand source code
def macroPipetteHome(self, which_tool="all"):
    """Macro to home the requested tools

    Args:
        which_tool (str, optional): Either a valid name for a tool, or "all". Defaults to "all".
    """
    if which_tool == "all":
        which_tools = list(self.pipettes)
    else:
        which_tools = [which_tool]

    tool_home_cmds = []
    for tool_name in which_tools:
        tool = self.pipettes[tool_name]
        tool_home_cmds += tool.home()

    message = f"Homing pipettes {which_tools}"
    logging.info(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    self.builder.extend_commands(tool_home_cmds)

    return tool_home_cmds

Macro to home the requested tools

Args

which_tool : str, optional
Either a valid name for a tool, or "all". Defaults to "all".
def macroPipetteZero(self, action=None, i=None)
Expand source code
def macroPipetteZero(self, action=None, i=None):
    """
    Move the pipette to the zero position (without re-homing).

    Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0)
    and the retraction distance is considered the zero position for pietting.
    """
    # Reverse net displacement
    volume = -self.pipette.state["vol"]

    # Early return if the volume is zero.
    if not volume:
        message = f"Pipette is already zeroed at volume={volume} (action index {i})."
        logging.debug(message)
        self.builder.extend_commands([self.gcode.comment(message)])
        return

    # Logs.
    message = f"Zeroing pipette with volume={volume} (action index {i})."
    logging.debug(message)
    self.builder.extend_commands([self.gcode.comment(message)])

    # Execute the displacement.
    self.displace_volume(volume, action)

    # Check for final volume near zero
    state_vol = abs(self.pipette.state["vol"])
    state_s = abs(self.pipette.state["s"])
    threshold = 1e-10
    if (state_vol > threshold) or (state_s > threshold):
        raise ProtocolError("Pipette was zeroed, but reference states did not match zero. " +
                            "Expected vol=0 and s=0 but got:" +
                            f" vol={self.pipette['state']['vol']}" +
                            f" s={self.pipette['state']['s']}")

Move the pipette to the zero position (without re-homing).

Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0) and the retraction distance is considered the zero position for pietting.

def pickNextTip(self, next_tip, tip_x, tip_y, tip_z)
Expand source code
def pickNextTip(self, next_tip, tip_x, tip_y, tip_z):
    """Generate GCODE to place a tip.

    Args:
        next_tip (dict): Data for the target tip.
        tip_x (float): X coordinate of the tip, without tool offsets.
        tip_y (float): Y coordinate of the tip, without tool offsets.
        tip_z (float): Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets.

    TODO: Test adding a "vibration" if possible (i have not found this necessary).
    """

    if self.pipette.tip_loaded:
        # TODO: discutir si es mejor descartar el tip
        #  si está automáticamente o tirar este error
        raise ProtocolError("Cannot pick tip if one is already placed!")

    # Probe command list.
    commands = [self.gcode.comment("Start tip pickup moves.")]

    # Add XYZ tool offsets to the tip's active center. This adds:
    #   - XYZ tool offsets.
    #   - And nothing else, because no tip is loaded.
    x, y, z = self.builder.addToolOffsets(x=tip_x, y=tip_y, z=tip_z)

    # Move over the tip on XY.
    commands += self.gcode.G1(x=x, y=y, absolute=True)

    # Calculate initial and final fit coordinates.
    z_pre_fit, z_final = self.calcTipFitZ(content_top_z=z, next_tip=next_tip)

    # Move over the tip on Z, partially inserting the tip holder if using a non-terminal tip stage.
    commands += self.gcode.G1(z=z_pre_fit, absolute=True)

    if not self.pipette.can_probe:
        # Fit the tip (blindly, without probing).
        logging.info(f"Picking tip with: z_final={z_final}")
        # TODO: add a feedrate config parameter for tip fitting.
        commands += self.gcode.G1(z=z_final, f=10, absolute=True)
    else:
        # Probe for the tip if supported.
        # Total distance of the probing move.
        z_scan_dist = self.calcTipProbeDistZ(next_tip)
        # NOTE: This is a relative move.
        logging.info(f"Probing for tip with: z_scan_dist={z_scan_dist}")
        commands += self.gcode.gcodeProbeDown(z_scan=-z_scan_dist)

    # TODO: Seal the tip by pressing a little bit very slowly two times.

    # Extend commands list.
    self.builder.extend_commands(commands)

    # Flag loaded tip.
    self.pipette.tip_loaded = next_tip

    # Zero the tip volume tracking (set to None instead for new tip check).
    self.pipette.state["tip_vol"] = None

    return commands

Generate GCODE to place a tip.

Args

next_tip : dict
Data for the target tip.
tip_x : float
X coordinate of the tip, without tool offsets.
tip_y : float
Y coordinate of the tip, without tool offsets.
tip_z : float
Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets.

TODO: Test adding a "vibration" if possible (i have not found this necessary).

def update_pipette_state(self, volume, displacement)
Expand source code
def update_pipette_state(self, volume, displacement):
    """Update the pipette state information."""
    # Setup valid tip volume before updating
    if self.pipette.state["tip_vol"] is None:
        self.pipette.state["tip_vol"] = 0

    # Update pipette state
    self.pipette.state["s"] += displacement
    self.pipette.state["vol"] += volume
    self.pipette.state["tip_vol"] += volume
    self.pipette.tip_loaded["volume"] = volume + self.pipette.tip_loaded.get("volume", 0.0)

    # Update direction with the current move
    self.pipette.state["lastdir"] = self.builder.sign(volume)
    # Print state
    msg = f"Current state of tool {self.builder.current_tool}: " + pformat(self.pipette.state)
    logging.debug(msg)

Update the pipette state information.

def volume_to_displacement(self, volume: float, mode='incremental')
Expand source code
def volume_to_displacement(self, volume: float, mode="incremental"):
    """Convert volume (uL) to shaft displacement (mm)."""

    # Get scaler properties.
    scaler, scaler_adjust = self.pipette.scaler, self.pipette.scaler_adjust

    # Convert volume to shaft displacement in millimeters in incremental mode.
    if mode == "incremental":
        # Units are in microliters, they must be converted.
        displacement = volume / scaler
        # Appliy linear adjustment.
        displacement = displacement * scaler_adjust
    else:
        # TODO: implement "absolute" pipetting volumes.
        raise ProtocolError(f"Invalid mode: '{mode}'")

    # Comment GCODE
    message = f"Converted {volume} uL to {displacement} mm"
    message += f" with scaler {scaler} and adjust factor {scaler_adjust}"
    logging.debug(message)
    self.builder.extend_commands(commands=[self.gcode.comment(message)])

    return displacement

Convert volume (uL) to shaft displacement (mm).

Inherited members