Module pipettin-piper.piper.plugins.pipettes
Functions
def load_plugin(controller: "'Controller'", **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """Instantiate the plugin and register the pipettes as tools. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ logging.debug(f"load_plugin: loading {plugin_name} plugin.") try: class_instance = PipetteHandler(controller) except Exception as e: msg = f"Failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instanceInstantiate the plugin and register the pipettes as tools. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class PipetteHandler (controller: Controller = None, init_from_db=False)-
Expand source code
class PipetteHandler(Plugin): """ WARNING EXPERIMENTAL THIS CLASS IS MEANT TO TAKE OVER ALL PIPETTING RELATED TASKS INCLUDING: - LOADING PIPETTE TOOLS - GENERATING GCODE - EXECUTING GCODE FOR THOSE ACTIONS (MAYBE) """ # Current micro-pipette models and characteristics. pipettes: dict = None # Builder class. builder: GcodeBuilder = None controller: Controller = None # Current pipette as computed property. @property def pipette(self) -> PipetteTool: """Return a pipette, by the name of the current tool in the builder.""" return self.pipettes[self.builder.current_tool] verbose = False def __init__(self, controller: Controller = None, init_from_db=False): self.controller: Controller = controller self.builder: GcodeBuilder = self.controller.builder self.gcode: GcodePrimitives = self.builder.gcode self.verbose: bool = self.builder.verbose # Dictionary of JSON data. self.pipettes_data = {} # Dictionary with python 'tool' objects in its values. self.pipettes = {} # Initialize pipettes. if init_from_db or self.controller.config.get("pipettes", {}).get("init_from_db", False): # Load and register tools directly from the DB. # TODO: Consider removing this option. # It is here in case it helps configuring things after the controller's init. self.pipettes_data = self.get_pipette_tools_from_db() self.load_pipettes(self.pipettes_data) # Register it directly. logging.info("Registering pipettes directly.") for p in self.pipettes.values(): self.builder.register_tool(p) else: # Use the new tool-loader callback system. logging.info("Registering pipette loader function.") self.builder.register_tool_loader( tool_type=TOOL_TYPE, loader_function=self.load_pipette) # List of available macros for the requested actions. # TODO: Implement repeat dispensing. # TODO: Implement reverse pipetting. # TODO: Implement available/maximum volume checks. # TODO: Add "extend" to all macros. # TODO: Make all macros return the generated GCODE. self.action_switch_case = { "PIPETTE": self.macroPipette, "PICK_TIP": self.macroPickNextTip, self.load_liquid_cmd: self.macroGoToAndPipette, self.pour_liquid_cmd: self.macroGoToAndPour, "DISCARD_TIP": self.macroDiscardTip, self.mix_cmd: self.macroGoToAndMix } # Register each handler in the GcodeBuilder. logging.info("Registering action handlers.") for name, function in self.action_switch_case.items(): self.builder.add_action_handler(name, function) # Set status. self._status = True def get_pipette_tools_from_db(self): """Read tools from the DB and keep only the ones with a matching type.""" tools: list = self.controller.database_tools.tools pipettes_data = {t["name"]: t for t in tools if t["type"] == TOOL_TYPE} logging.info(f"Found these pipettes in the DB: {list(pipettes_data)}") return pipettes_data def load_pipettes(self, pipettes_data): """Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'.""" # NOTE: commenting out the defaults for now. # if not self.pipettes_data: # self.pipettes_data = deepcopy(PIPETTES) for pipette_name, pipette_data in pipettes_data.items(): # Instantiate all pipette tools. logging.info(f"Initializing pipette '{pipette_name}'.") self.load_pipette(pipette_data) def load_pipette(self, pipette_data): """Instantiate and save one PipetteTool from a parameter set. This is a tool loader function for GcodeBuilder. """ pipette_name = pipette_data["name"] logging.info(f"Loading pipette '{pipette_name}'.") # Check if the tool is already registered locally. if pipette_name in self.pipettes: msg = f"Pipette '{pipette_name}' is already in the pipette handler's list." logging.error(msg) raise DataError(msg) # Load the pipette. try: # The PipetteTool instances will register themselves a tool in the builder. tool = PipetteTool(pipette_data, self.builder) self.pipettes[pipette_name] = tool except Exception as e: msg = f"Failed to initialize tools: {e}" logging.error(msg) raise CommanderError(msg) from e return tool def guess_pipette(self, volume:float, tip_container:dict=None, repeats:int=1, prioritize_accuracy=True): """Guess which tool is better for a given volume and number of serial repeats. By default, accuracy will be prioritized (i.e. less dispenses per load), unless 'prioritize_accuracy' is set to False. """ # Get the maximum volume for the given tip. tool_limits = [(p.get_max_volume(tip_container), p.name) for p in self.pipettes.values()] # Calculate how many repeats can be done with each pipette. # Only keep the ones with enough volume. tool_repeats = [(limit // volume, tool) for limit, tool in tool_limits if limit // volume >= repeats] if prioritize_accuracy is True: # Calculate the smaller number of repeats between tools. min_repeats = min(*[reps for reps, tool in tool_repeats]) # Choose the tool that allows the smaller number of repeats. # This will prioritize precision over speed. min_repeats_tool = next(tool for reps, tool in tool_repeats if reps == min_repeats) return min_repeats_tool else: # Calculate the greater number of repeats between tools. max_repeats = max(*[reps for reps, tool in tool_repeats]) # Choose the tool that allows the larger number of repeats. # This will prioritize speed over precision. max_repeats_tool = next(tool for reps, tool in tool_repeats if reps == max_repeats) return max_repeats_tool def macroDiscardTip(self, action, i, operation_time=20.0): """Move over the trash box or eject-post and discard the tip (if placed, else Except). Action moves: - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools). - Move to the ejection site and eject the tip. - Home the tool. Args: action (dict): The action definition. Contents not used. The coordinates are loaded from the tool/pipette definition. i (int): The index of the action. Not used. Raises: Exception: An error is raised when there are no tips to discard. Returns: str: Generic message. Sample action: { "cmd": "DISCARD_TIP", "args": { "item": "descarte 1" } } """ # TODO: Add alternate method of ejecting tips against an ejector pad (custom pipette). # - Add "can eject" property and decide based on that. # - Add "ejector pad" properties to tools. if self.pipette.tip_loaded: # Move to clearance level on Z. # Should extend task.commmand list automatically. self.builder.macroMoveSafeHeight() if self.pipette.can_eject: item_name = action["args"]["item"] # Get the item's XYZ. logging.info("Ejecting tip using integrated ejector.") x, y, z = self.builder.getItemXYZ(item_name) # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not. # Move over the ejection site initial XY coordinates. self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.") # Move over the ejection site initial Z coordinate. self.builder.macroGoToXYZ(z=z, add_comment="Initial Z approach to eject zone.") else: # Get the ejector post's initial coordinates. logging.info("Ejecting tip using post.") x, y, z = self.pipette.eject_post_init_coords # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not. # Move over the ejection site initial XY coordinates. self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.") # Move over the ejection site initial Z coordinate. # NOTE: "add_offset=False" is needed in this case. self.builder.macroGoToXYZ(z=z, add_offset=False, add_comment="Initial Z approach to eject zone.") # Generate GCODE for the actual ejection sequence. eject_commands = self.pipette.eject_tip() self.builder.extend_commands(eject_commands) # Raise to safety. self.builder.macroMoveSafeHeight() # Ensure zeroed pipette axis # self.macroPipetteZero() # NOTE: replace macroPipetteZero with extruder homing command. self.macroPipetteHome(which_tool=self.builder.current_tool) # TODO: reimplement non-homing zeroing once the underlying Klipper issue is resolved: # Sending "M82" and moving the extruder to 0 does not set the position to 0. # Check if this is due to absolute extruder positioning not taking effect. else: # TODO: discutir si es mejor tirar este error o no hacer nada raise ProtocolError(f"Cannot discard tip if one is not already placed! Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return f"Processed tip discard with index {i}" def volume_to_displacement(self, volume: float, mode="incremental"): """Convert volume (uL) to shaft displacement (mm).""" # Get scaler properties. scaler, scaler_adjust = self.pipette.scaler, self.pipette.scaler_adjust # Convert volume to shaft displacement in millimeters in incremental mode. if mode == "incremental": # Units are in microliters, they must be converted. displacement = volume / scaler # Appliy linear adjustment. displacement = displacement * scaler_adjust else: # TODO: implement "absolute" pipetting volumes. raise ProtocolError(f"Invalid mode: '{mode}'") # Comment GCODE message = f"Converted {volume} uL to {displacement} mm" message += f" with scaler {scaler} and adjust factor {scaler_adjust}" logging.debug(message) self.builder.extend_commands(commands=[self.gcode.comment(message)]) return displacement def macroPipette( self, action:dict, i:int, backlash_correction:float=0.0, tip_priming_correction:bool=False, back_pour_correction:bool=0.0, under_draw_correction:float=0.0, extra_draw_volume:float=0.0, capillary_correction:bool=False): """ Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes). Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py). """ # Get the target volume. volume = action["args"]["volume"] # Flag indicating if this is a draw or dispense move. is_drawing = self.builder.sign(volume) > 0 # Comment GCODE message = f"Calculating pipetting move for {volume} uL volume for action {i}" message += f", using tool '{self.builder.current_tool}'." logging.info(message) # Check if the tip is new/empty. first_draw = self.pipette.state["tip_vol"] is None logging.debug(f"Computed move properties: is_drawing={is_drawing} first_draw={first_draw} (tip_vol={self.pipette.state['tip_vol']})") self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action) # Read options from action arguments. # Prioritize the arguments, then the config, and defaulting to arguments if not there. backlash_correction = backlash_correction or float(self.pipette.get("backlash_correction", backlash_correction)) tip_priming_correction = tip_priming_correction or bool(self.pipette.get("tip_priming_correction", tip_priming_correction)) back_pour_correction = back_pour_correction or float(self.pipette.get("back_pour_correction", back_pour_correction)) under_draw_correction = under_draw_correction or float(self.pipette.get("under_draw_correction", under_draw_correction)) extra_draw_volume = extra_draw_volume or float(self.pipette.get("extra_draw_volume", extra_draw_volume)) capillary_correction = capillary_correction or bool(self.pipette.get("capillary_correction", capillary_correction)) # Log the corrections. logging.debug(f"Corrections: backlash_correction={backlash_correction}, tip_priming_correction={tip_priming_correction}, back_pour_correction={back_pour_correction}, under_draw_correction={under_draw_correction}, capillary_correction={capillary_correction}") # Compensate mechanical backlash. if backlash_correction: # Backlash compensation. self.compensate_backlash(volume, backlash_correction, action, i) # Compensate for tip-wetting issues. if is_drawing and first_draw: if tip_priming_correction: # Compensation for under-drawing on first load, by pre-wetting. self.compensate_first_draw(volume, action, i) if extra_draw_volume: # Compensation for under-drawing on first load, by over-drawing a bit. self.compensate_extra_draw(extra_draw_volume, action, i) # Compensation for capillary effects for low final tip volume during dispensing. # Apply only for regular dispensing moves. if capillary_correction and self.builder.sign(volume) == -1 and not first_draw: self.compensate_low_volume(volume, action, i) # Comment GCODE message = f"Pipetting {volume} uL for action {i}." logging.info(message) self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action) # Compute axis displacement from the target pipetting volume. # Default pipetting mode should be incremental (i.e. relative) volume. self.displace_volume(volume, action) # Under-draw correction if is_drawing and under_draw_correction: self.compensate_underdraw(action, i) # Back-pour correction if is_drawing and back_pour_correction: self.compensate_backpour(action, i) return f"Processed pipetting action with index {i}." def displace_volume(self, volume: float, action: dict, check_tolerance:float=100.0): """Compute axis displacement from the target pipetting volume. Default pipetting mode should be incremental (i.e. relative) volume. Args: volume (float): Pipetting volume in microliters. action (dict): Action object, optionally containing a pipetting "mode" string in its "args". """ # TODO: De-hardcode the tolerance set above. The issue here is that corrections move the # shaft to volumes outside the limit of the tip, which may be a bad idea (e.g. because # there may be a filter) or of the tool's limit for the tip. # Volume limit check for the current tip. self.check_volume_limit(volume, tolerance=check_tolerance) # Convert volume to displacement. mode = action["args"].get("mode", "incremental") displacement = self.volume_to_displacement(volume, mode=mode) # Generate GCODE command for the move. # TODO: Add pipetting speed control. # TODO: Volume sign INVERTED here, be smarter about the direction. command = self.gcode.G1( e=-displacement, absolute=False, absolute_e=False, f=self.pipette.feedrate) # Add the GCODE command to the list. self.builder.extend_commands(commands=command, action=action) # Update state. self.update_pipette_state(volume, displacement) return displacement def check_volume_limit(self, volume, tolerance:float=0.0): """Volume limit check for the current tip and tool.""" # self.check_tip_volume(volume) tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"]) max_volume = self.pipette.get_max_volume(tip_container) current_volume = self.pipette.tip_loaded.get("volume", 0.0) final_volume = current_volume + volume if final_volume <= current_volume: # TODO: Reconsider passing this check if the volume is decreasing (i.e. dispensing). pass if final_volume > max_volume + tolerance: msg = f"The requested final volume ({final_volume}) exceeds the tool's capacity of {max_volume} uL for tip {tip_container['name']}." raise ProtocolError(msg) return max_volume, final_volume def update_pipette_state(self, volume, displacement): """Update the pipette state information.""" # Setup valid tip volume before updating if self.pipette.state["tip_vol"] is None: self.pipette.state["tip_vol"] = 0 # Update pipette state self.pipette.state["s"] += displacement self.pipette.state["vol"] += volume self.pipette.state["tip_vol"] += volume self.pipette.tip_loaded["volume"] = volume + self.pipette.tip_loaded.get("volume", 0.0) # Update direction with the current move self.pipette.state["lastdir"] = self.builder.sign(volume) # Print state msg = f"Current state of tool {self.builder.current_tool}: " + pformat(self.pipette.state) logging.debug(msg) def compensate_backlash(self, volume: float, backlash_correction: float, action:dict=None, i:int=None): """Backlash compensation""" # If previous and current directions are different last_dir_sign = self.builder.sign(self.pipette.state["lastdir"]) new_dir_sign = self.builder.sign(volume) # TODO: be smarter about backlash (?). backlash_correction_vol = 0.0 if (last_dir_sign * new_dir_sign) == -1: # Then over-pour (negative volume, -0.5 uL for p200) or over-draw (positive volume, 0.5 uL for p200) backlash_correction_vol = new_dir_sign * backlash_correction # Comment GCODE. message = f"Action {i}: pipetting extra {backlash_correction_vol} uL on backlash correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Move without corrections. self.displace_volume(volume = backlash_correction_vol, action = action) return backlash_correction_vol def compensate_first_draw(self, volume: float, action:dict=None, i:int=None): """First draw compensation. Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume). """ # Limit pre-wetting to 120% of move volume or max volume. correction_volume = min(abs(self.pipette.max_volume), volume*1.2) # Comment GCODE. message = f"Action {i}: pre-wetting the tip by pipetting up-and-down {correction_volume} uL on first draw." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction by calling this very method, # by pipetting up and down (i.e. pre-wetting the tip). for direction in [1.0, -1.0]: # Move without corrections. self.displace_volume(volume = direction * correction_volume, action = action) return correction_volume def compensate_extra_draw(self, extra_draw_volume, action:dict=None, i:int=None): """First draw compensation. If the tip is new/empty, the liquid may have a harder time "entering" the dry tip. Correct by over-drawing by a specified amount (e.g. 5 uL for the p200). """ # Comment GCODE. message = f"Action {i}: over-drawing by {extra_draw_volume} uL on first draw." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Move without corrections. self.displace_volume(volume = extra_draw_volume, action = action) return extra_draw_volume def compensate_low_volume(self, volume: float, action:dict=None, i:int=None): """Compensation for capillary effects for low final tip volume during dispensing.""" # Apply only for regular dispensing moves. # Apply correction if necessary: only at low final volumes. correction_start_vol = self.pipette.tension_correction["start_vol"] # Get the current tip volume (i.e. from the previous action). final_volume = self.pipette.state["vol"] # Add the new [corrected] volume to the local pipette volume tracker (axis position in uL). final_volume += volume # Subtract the "back-pour correction" volume if this is a first draw, # forcing negative volume (for dispensing). Though this "correction" is # applied later on, it must be considered here, because it will change # the final volume. # TODO: disabled this bit for now, since this correction is meant for # simple dispense moves only. # if first_draw and tip_priming_correction and back_pour_correction: # final_volume += -abs(self.pipette["back_pour_correction"]) # Default correction should be null. capillary_extra_vol = 0.0 # Apply the correction. if final_volume < correction_start_vol: # Compute the "extra" volume that must be dispensed. capillary_extra_vol = self.capillary_corr(final_volume) # Comment GCODE message = f"Action {i}: final volume {final_volume} uL requires low-volume compensation. Dispensing extra {capillary_extra_vol} uL." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply correction. # Move without corrections. self.displace_volume(volume = capillary_extra_vol, action = action) else: # Comment GCODE message = f"Action {i}: final volume {final_volume} uL does not require low-volume compensation." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) return capillary_extra_vol def compensate_underdraw(self, action:dict=None, i:int=None): """Under-draw correction Correction for aspiration moves that draw less volume than expected, increasing the loaded volume by a fixed amount of microliters. This applies to all moves. """ # Get parameters from the tool under_draw_correction = self.pipette["under_draw_correction"] # Over-draw vol_correction = abs(under_draw_correction) # positive, force loading # Logs. message = f"Action {i}: applying {vol_correction} uL under-draw volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(vol_correction, action) def compensate_backpour(self, action:dict=None, i:int=None): """Back-pour correction Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube. This corrects inaccuracies in the first dispense of a series. """ # Get parameters from the tool back_pour_correction = self.pipette["back_pour_correction"] # Over-draw over_draw_volume = abs(back_pour_correction) # positive, force loading # Logs. message = f"Action {i}: applying {over_draw_volume} uL over-draw volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(over_draw_volume, action) # Back-pour back_pour_volume = -abs(back_pour_correction) # negative, force dispensing # Logs. message = f"Action {i}: applying {back_pour_volume} uL back-pour volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(back_pour_volume, action) return over_draw_volume, back_pour_volume def macroPipetteZero(self, action=None, i=None): """ Move the pipette to the zero position (without re-homing). Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0) and the retraction distance is considered the zero position for pietting. """ # Reverse net displacement volume = -self.pipette.state["vol"] # Early return if the volume is zero. if not volume: message = f"Pipette is already zeroed at volume={volume} (action index {i})." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) return # Logs. message = f"Zeroing pipette with volume={volume} (action index {i})." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) # Execute the displacement. self.displace_volume(volume, action) # Check for final volume near zero state_vol = abs(self.pipette.state["vol"]) state_s = abs(self.pipette.state["s"]) threshold = 1e-10 if (state_vol > threshold) or (state_s > threshold): raise ProtocolError("Pipette was zeroed, but reference states did not match zero. " + "Expected vol=0 and s=0 but got:" + f" vol={self.pipette['state']['vol']}" + f" s={self.pipette['state']['s']}") # TODO: This is now only used by "macroDiscardTip". Considering removing it. def macroPipetteHome(self, which_tool="all"): """Macro to home the requested tools Args: which_tool (str, optional): Either a valid name for a tool, or "all". Defaults to "all". """ if which_tool == "all": which_tools = list(self.pipettes) else: which_tools = [which_tool] tool_home_cmds = [] for tool_name in which_tools: tool = self.pipettes[tool_name] tool_home_cmds += tool.home() message = f"Homing pipettes {which_tools}" logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) self.builder.extend_commands(tool_home_cmds) return tool_home_cmds def calcTipFitZ(self, content_top_z: float, next_tip: dict): """Calculate absolute Z coordinates for initial approach and final fitting depth.""" # Get the tip's properties, and the corresponding tip-stage info from the tool. tip_container = self.pipette.get_tip_container(next_tip) # Get the tip-stage offset that matches the new tip. tip_stage_offset = self.pipette.get_tip_stage_offset(tip_container) # Get the tip fit distance for the current tip-tool pair. tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container) # Extra distance to probe below and beyond the tip's active height. probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container) # Subtract the tip-stage offset. z_pre_fit = content_top_z - tip_stage_offset # NOTE: This "z_pre_fit" is now the initial coordinate before fitting, with or without probing. # Final calculation. z_final = z_pre_fit - tip_fit_distance - probe_extra_dist # Logs. logging.info(f"Tip fitting with: z={content_top_z}, tip_stage_offset={tip_stage_offset}, z_pre_fit={z_pre_fit}") logging.info(f"Tip fitting with: tip_fit_distance={tip_fit_distance}, probe_extra_dist={probe_extra_dist}") # Return both. return z_pre_fit, z_final def calcTipProbeDistZ(self, next_tip: dict): """Calculate max Z displacement for fitting a tip with probing. NOTE: This is a relative move. """ # Get the tip's properties, and the corresponding tip-stage info from the tool. tip_container = self.pipette.get_tip_container(next_tip) # Get the tip fit distance for the current tip-tool pair. tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container) # Extra distance to probe below and beyond the tip's active height. probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container) # Total distance of the probing move. z_scan_dist = tip_fit_distance + probe_extra_dist return z_scan_dist def pickNextTip(self, next_tip, tip_x, tip_y, tip_z): """Generate GCODE to place a tip. Args: next_tip (dict): Data for the target tip. tip_x (float): X coordinate of the tip, without tool offsets. tip_y (float): Y coordinate of the tip, without tool offsets. tip_z (float): Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets. TODO: Test adding a "vibration" if possible (i have not found this necessary). """ if self.pipette.tip_loaded: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError("Cannot pick tip if one is already placed!") # Probe command list. commands = [self.gcode.comment("Start tip pickup moves.")] # Add XYZ tool offsets to the tip's active center. This adds: # - XYZ tool offsets. # - And nothing else, because no tip is loaded. x, y, z = self.builder.addToolOffsets(x=tip_x, y=tip_y, z=tip_z) # Move over the tip on XY. commands += self.gcode.G1(x=x, y=y, absolute=True) # Calculate initial and final fit coordinates. z_pre_fit, z_final = self.calcTipFitZ(content_top_z=z, next_tip=next_tip) # Move over the tip on Z, partially inserting the tip holder if using a non-terminal tip stage. commands += self.gcode.G1(z=z_pre_fit, absolute=True) if not self.pipette.can_probe: # Fit the tip (blindly, without probing). logging.info(f"Picking tip with: z_final={z_final}") # TODO: add a feedrate config parameter for tip fitting. commands += self.gcode.G1(z=z_final, f=10, absolute=True) else: # Probe for the tip if supported. # Total distance of the probing move. z_scan_dist = self.calcTipProbeDistZ(next_tip) # NOTE: This is a relative move. logging.info(f"Probing for tip with: z_scan_dist={z_scan_dist}") commands += self.gcode.gcodeProbeDown(z_scan=-z_scan_dist) # TODO: Seal the tip by pressing a little bit very slowly two times. # Extend commands list. self.builder.extend_commands(commands) # Flag loaded tip. self.pipette.tip_loaded = next_tip # Zero the tip volume tracking (set to None instead for new tip check). self.pipette.state["tip_vol"] = None return commands def macroPickNextTip(self, action, i, operation_time=20.0): """Move to clearance height and fit a tip. Will throw an error if a tip is already loaded. Sample platform-based action: { "cmd": "PICK_TIP", "args": { "item": "200ul_tip_rack_MULTITOOL 1", "tool": "P200" } } Sample coordinate-based (platformless) action: { "cmd": "PICK_TIP", "args": { "position": {"x": 10.5,"y": 20.5,"z": 0.5}, "tool": "P200", "tip": {"length": 50.0, "maxVolume": 200, "volume": 0} } } """ if not self.pipette.tip_loaded: # Move to clearance level on Z, extends task.command list automatically self.builder.macroMoveSafeHeight() # Get tip and coords (supports "position" key specification). next_tip, tip_x, tip_y, tip_z = self.getTipAndCoords(action, i) # Generate GCODE to pick up the tip. self.pickNextTip(next_tip, tip_x, tip_y, tip_z) # Raise to safety. self.builder.macroMoveSafeHeight() else: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot pick tip if one is already placed! Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return action pour_liquid_cmd = "DROP_LIQUID" def macroGoToAndPour(self, action, i, operation_time=20.0): """Move to a tube and dispense liquid. Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette". """ action["args"]["volume"] = -abs(float(action["args"]["volume"])) # Force negative volume ret_msg = self.macroGoToAndPipette(action, i, operation_time=operation_time) return ret_msg def fast_safe_height(self, current_action:dict, previous_action:dict=None): """Try optimising clearance for a pipetting move within the same platform item. The objective is to speed-up repetition pipetting within the same platform. """ # Previous action. if previous_action is None: previous_action = self.builder.previous_action check_last, last_item_name = self.fast_safe_height_action_check(previous_action) # Current action. check_next, next_item_name = self.fast_safe_height_action_check(current_action) # Items to optimize by. item_names = [] # Decide wether to optimize or not. if check_last and check_next and (last_item_name == next_item_name): logging.info(f"Optmizing clearance move to safe height within item: '{next_item_name}'") item_names.append(next_item_name) return self.builder.macroMoveSafeHeight(item_names=item_names) def fast_safe_height_action_check(self, action:dict): """Check that an action is compatible with a fast safe height check.""" # List of eligible commands. fast_sh_eligible_cmds = [self.pour_liquid_cmd, self.load_liquid_cmd, self.mix_cmd] # Check that the commands are reasonable to optmize. cmd_check = action and action.get("cmd", None) in fast_sh_eligible_cmds # Check that the actions are platform-based, and point to an item. args_check = "item" in action["args"] # Try getting item_name from the action. item_name = action.get("args", {}).get("item", None) # Check that the origin and destination item is the same one. item_check = item_name is not None # Return the digested check, and the item name. return cmd_check and args_check and item_check, item_name load_liquid_cmd = "LOAD_LIQUID" def macroGoToAndPipette(self, action, i, operation_time=20.0): """Move to a tube and load liquid. LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette. Will throw an error if a tip is not already loaded. Sample action: { "cmd": "LOAD_LIQUID", "args": { "item": "5x16_1.5_rack 1", "selector": {"by": "name", "value": "tube1"}, "volume": 100, # in microliters "tool": "P200" } } Likely platformless action: { "cmd": "LOAD_LIQUID", "args": { "content": {"volume": 200.0}, "volume": 100, # in microliters "tool": "P200" } } Primary move sequence: 1. move to clearance level on Z 2. move to next tube location on XY 3. move to Z at loading height 4. load liquid 5. move to clearance level on Z """ if self.pipette.tip_loaded: # Move to clearance level on Z. self.fast_safe_height(action) # Move over the target content and downward into the tube. self.builder.macroGoTo(action, i) # Actuate the pipette tool. self.macroPipette(action, i) # Move to clearance level on Z. self.fast_safe_height(action, action) else: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return f"Processed macroGoToAndPipette action with index {i}" mix_cmd = "MIX" def macroGoToAndMix(self, action, i, operation_time=20.0): """Parse a MIX action and generate GCODE. Homogenize a solution. Possibly useful for rinsing tips before a transfer. Sample MIX action: { 'args': { 'count': 3, 'item': '5x16_1.5_rack 1', 'of': 'content', # Can also be 'of': "tip" 'percentage': 50, 'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector' 'selector': {'by': 'name', 'value': 'tube1'}}, 'cmd': 'MIX' } Sample "tip": { "index": 1, "maxVolume": 160, "name": "tip1", "position": { "col": 1, # Or: "x": 10? "row": 1 # Or: "y": 30? }, "tags": [], "type": "tip", "volume": 0 } Note that the tool must have been set by a previous tip pickup action. """ logging.info("Parsing MIX action.") if not self.pipette.tip_loaded: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}") # Move to clearance level on Z. self.fast_safe_height(action) # Zero the pipette axis. self.macroPipetteZero(action, i) # Move over the target content and downward into the tube. self.builder.macroGoTo(action, i) # Execute the mix. self.macroMix(action, operation_time=operation_time) # Back-up. self.fast_safe_height(action, action) logging.info("Done parsing MIX action.") return f"Processed MIX action with index {i}." def macroMix(self, action: dict, operation_time=0.0): """Execute pipetting moves for a mix action. Args: action (dict): _description_ operation_time (float, optional): _description_. Defaults to 0.0. """ # Comments for GCODE. comments = [] # Calculate the mixing volume. mix_vol = self.calculate_mix_vol(action, comments) # Extend commands with comments. self.builder.extend_commands(comments) # Prepare minimal pipetting action dict mix_count = int(action["args"]["count"]) # Calculate volume and increments. factor = 0.2 # fraction of volume that will be used for increasing mixing vol = mix_vol * (1 - factor) # initial volume for mixing vol_increment = mix_vol * (factor / mix_count) # Do the mixing for i in range(mix_count): # Add time to the total operatin time. # TODO: Link this to the flow speed and total volume. operation_time += operation_time # Logs. logging.debug(f"Generating mix move {i} of {mix_count}.") # Load mix_action_up = {"args": {"volume": vol}} # in microliters self.macroPipette(mix_action_up, i) self.builder.extend_commands(mix_action_up.get("GCODE", [])) # NOTE: The dropped volume will be greater than the loaded, to # avoid having unmixed solution remainder in the tip. In the final # iteration, the dropped volume will be the one in the action. vol += vol_increment # Drop mix_action_dw = {"args": {"volume": -vol}} # in microliters self.macroPipette(mix_action_dw, i) self.builder.extend_commands(mix_action_dw.get("GCODE", [])) # Extend the operation time. self.controller.machine.update_exec_opts( action=action, timeout=operation_time, add_timeout=True ) def calculate_mix_vol(self, action:dict, comments:list=None): """Calculate the mixing volume for a give mix action. Args: action (dict): Mix action. comments (list, optional): List of gcode comments to extend. Defaults to None. Raises: ProtocolError: _description_ Returns: _type_: _description_ """ if comments is None: comments = [] # Get the tube content and its coordinates. tube, _, _, _ = self.builder.getTubeAndCoords(action) # Get tube volume. # TODO: If the tube received volume during this protocol the updated volume is not here. # See: https://gitlab.com/pipettin-bot/pipettin-grbl/-/issues/109 tube_volume = tube["volume"] # Get pipette volume. pipette_volume = self.pipette['vol_max'] # Get tip volume. tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"]) max_tip_volume = tip_container["maxVolume"] # TODO: Check if the tip already has volume. # max_tip_volume -= self.pipette.tip_loaded.get("volume", 0) # Calculate maximum volume limit. logging.info(f"Current volume limits: tube_volume={'<TODO: limit disabled until tracked>'}, pipette_volume={pipette_volume}, max_tip_volume={max_tip_volume}") # TODO: Add "tube_volume" here once volume tracking is implemented. max_volume_limit = min([max_tip_volume, pipette_volume]) # Calculate volume limited to max pipette tool volume of = action["args"]["of"] # Either "tip" or tube "content". mix_percent = action["args"]["percentage"] if of == 'content': # Logs. logging.info(f"Mixing {mix_percent}% of the tube volume.") comments = [self.gcode.comment("MIX action by tube volume fraction.")] # Calculate mixing volume mix_vol = tube_volume * (mix_percent / 100) elif of == 'tip': # Logs. logging.info(f"Mixing {mix_percent}% of the tip volume.") comments += [self.gcode.comment("MIX action by tip volume fraction.")] # Calculate mixing volume mix_vol = max_tip_volume * (mix_percent / 100) else: raise ProtocolError(f"Can't parse mixing action with 'of'={of} at action: {action}") # Check for limits. if mix_vol > max_volume_limit: # Log. msg = f"Limiting mix volume to {max_volume_limit}. The initial volume {mix_vol} exceeded the limits." logging.warning(msg) comments += [self.gcode.comment(msg)] # Override. mix_vol = max_volume_limit # Logs. logging.info(f"Mixing volume {mix_vol} uL.") comments += [self.gcode.comment(f"MIX action with {mix_vol} uL.")] return mix_vol # Get tip stuff # def getNextTip(self, platform_item:dict, selector: dict, pop_from_item=True, workspace_name:str=None, pop_from_db=False): """ Get the next tip and delete it from the local "platform_item" object. The object in the database is not affected. """ return self.controller.database_tools.getNextContent( workspace_name=workspace_name, platform_item=platform_item, selector=selector, pop_from_item=pop_from_item, pop_from_db=pop_from_db ) def getTipAndCoords(self, action, action_index): """Get a tip object and its coordinates. Uses information from the platform, the item, the content, and the container to get the XYZ of a tip. The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip. This function does not apply tool offsets. Args: action (dict): Protocol action definition. action_index (int): Action index. Returns: tuple: Tuple with the tip object, and the xyz coordinates. Example "tip": 'index': 96, 'maxVolume': 160, 'name': 'tip96', 'position': {'col': 12, 'row': 8}, 'tags': [], 'type': 'tip', 'volume': 0 The "tip" above will be fetched from the database when the action's "args" are of "item" type: 'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'}, 'cmd': 'PICK_TIP' The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly: 'args': {'position': {"x": 20, "y": 200, "z": 30}, 'tool': 'P200', 'tip': tip_definition}, # Note the tip data passed here (described below). 'cmd': 'PICK_TIP' Example "tip_definition" for "position" type args: 'maxVolume': 160, 'length': 50.0, 'volume': 0 """ # Use the provided coordinates if found. if "position" in list(action["args"]): # Get coords. coords = action["args"]["position"] x, y, z = coords["x"], coords["y"], coords["z"] # Get the tip's info. It must contain "length". next_tip = action["args"]["tip"] # Ensure required information about the loaded tip. try: assert next_tip.get("length", None) is not None # TODO: The "maxVolume" property is in the container definition. # Redesing this code using jsondb or nodb to regain compatibility with this "platformless" mode. assert next_tip.get("maxVolume", None) is not None assert next_tip.get("volume", None) is not None except AssertionError as e: msg = f"The tip from an action is missing required properties: {pformat(action)}" raise ProtocolError(msg) from e # Save contextual information to the tip. next_tip["fromItem"] = None # Else lookup the coordinates by tip-rack platform. else: # Use the builder's method. next_tip, x, y, z = self.controller.builder.getContentCoords( # Get any next tip from the item in the action. action, allow_next=True, # Remove the item from the local tracker, but not from the DB. pop_from_item=True, pop_from_db=False ) # TODO: Deprecate "getNextTip". # TODO: Check and tune Z positioning according to tip seal pressure. # This will need calibration. # Get and set the length of the tip if missing. if next_tip.get("length", None) is None: # Get information about the loaded tip from the platform. container_name = next_tip["container"] container = self.controller.database_tools.getContainerByName(container_name) next_tip["length"] = container["length"] # Save contextual information to the tip. next_tip["fromItem"] = action["args"]["item"] # Save contextual information to the tip next_tip["fromAction"] = action_index next_tip["inTool"] = self.builder.current_tool # Return the tip and its spatial location. return next_tip, x, y, z #### Liquid handling correction functions #### def capillary_corr(self, final_vol): """ Define reciprocal function. Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above). """ params = self.pipette.tension_correction start_vol = params["start_vol"] max_correction = params["max_correction"] # Must be negative to dispense volume capillary_extra_vol = -abs(-final_vol * max_correction / start_vol + max_correction) # Cap maximum correction volume correction = max([-max_correction, capillary_extra_vol]) return correction # return -abs((-1/(21-x) + 0.04761905) * (2/0.952381)) # reciprocal attempt, KISS!WARNING EXPERIMENTAL
THIS CLASS IS MEANT TO TAKE OVER ALL PIPETTING RELATED TASKS INCLUDING:
- LOADING PIPETTE TOOLS
- GENERATING GCODE
- EXECUTING GCODE FOR THOSE ACTIONS (MAYBE)
Ancestors
Class variables
var builder : GcodeBuildervar controller : Controllervar load_liquid_cmdvar mix_cmdvar pipettes : dictvar pour_liquid_cmdvar verbose
Instance variables
prop pipette : PipetteTool-
Expand source code
@property def pipette(self) -> PipetteTool: """Return a pipette, by the name of the current tool in the builder.""" return self.pipettes[self.builder.current_tool]Return a pipette, by the name of the current tool in the builder.
Methods
def calcTipFitZ(self, content_top_z: float, next_tip: dict)-
Expand source code
def calcTipFitZ(self, content_top_z: float, next_tip: dict): """Calculate absolute Z coordinates for initial approach and final fitting depth.""" # Get the tip's properties, and the corresponding tip-stage info from the tool. tip_container = self.pipette.get_tip_container(next_tip) # Get the tip-stage offset that matches the new tip. tip_stage_offset = self.pipette.get_tip_stage_offset(tip_container) # Get the tip fit distance for the current tip-tool pair. tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container) # Extra distance to probe below and beyond the tip's active height. probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container) # Subtract the tip-stage offset. z_pre_fit = content_top_z - tip_stage_offset # NOTE: This "z_pre_fit" is now the initial coordinate before fitting, with or without probing. # Final calculation. z_final = z_pre_fit - tip_fit_distance - probe_extra_dist # Logs. logging.info(f"Tip fitting with: z={content_top_z}, tip_stage_offset={tip_stage_offset}, z_pre_fit={z_pre_fit}") logging.info(f"Tip fitting with: tip_fit_distance={tip_fit_distance}, probe_extra_dist={probe_extra_dist}") # Return both. return z_pre_fit, z_finalCalculate absolute Z coordinates for initial approach and final fitting depth.
def calcTipProbeDistZ(self, next_tip: dict)-
Expand source code
def calcTipProbeDistZ(self, next_tip: dict): """Calculate max Z displacement for fitting a tip with probing. NOTE: This is a relative move. """ # Get the tip's properties, and the corresponding tip-stage info from the tool. tip_container = self.pipette.get_tip_container(next_tip) # Get the tip fit distance for the current tip-tool pair. tip_fit_distance = self.pipette.get_tip_fit_distance(tip_container) # Extra distance to probe below and beyond the tip's active height. probe_extra_dist = self.pipette.get_probe_extra_dist(tip_container) # Total distance of the probing move. z_scan_dist = tip_fit_distance + probe_extra_dist return z_scan_distCalculate max Z displacement for fitting a tip with probing. NOTE: This is a relative move.
def calculate_mix_vol(self, action: dict, comments: list = None)-
Expand source code
def calculate_mix_vol(self, action:dict, comments:list=None): """Calculate the mixing volume for a give mix action. Args: action (dict): Mix action. comments (list, optional): List of gcode comments to extend. Defaults to None. Raises: ProtocolError: _description_ Returns: _type_: _description_ """ if comments is None: comments = [] # Get the tube content and its coordinates. tube, _, _, _ = self.builder.getTubeAndCoords(action) # Get tube volume. # TODO: If the tube received volume during this protocol the updated volume is not here. # See: https://gitlab.com/pipettin-bot/pipettin-grbl/-/issues/109 tube_volume = tube["volume"] # Get pipette volume. pipette_volume = self.pipette['vol_max'] # Get tip volume. tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"]) max_tip_volume = tip_container["maxVolume"] # TODO: Check if the tip already has volume. # max_tip_volume -= self.pipette.tip_loaded.get("volume", 0) # Calculate maximum volume limit. logging.info(f"Current volume limits: tube_volume={'<TODO: limit disabled until tracked>'}, pipette_volume={pipette_volume}, max_tip_volume={max_tip_volume}") # TODO: Add "tube_volume" here once volume tracking is implemented. max_volume_limit = min([max_tip_volume, pipette_volume]) # Calculate volume limited to max pipette tool volume of = action["args"]["of"] # Either "tip" or tube "content". mix_percent = action["args"]["percentage"] if of == 'content': # Logs. logging.info(f"Mixing {mix_percent}% of the tube volume.") comments = [self.gcode.comment("MIX action by tube volume fraction.")] # Calculate mixing volume mix_vol = tube_volume * (mix_percent / 100) elif of == 'tip': # Logs. logging.info(f"Mixing {mix_percent}% of the tip volume.") comments += [self.gcode.comment("MIX action by tip volume fraction.")] # Calculate mixing volume mix_vol = max_tip_volume * (mix_percent / 100) else: raise ProtocolError(f"Can't parse mixing action with 'of'={of} at action: {action}") # Check for limits. if mix_vol > max_volume_limit: # Log. msg = f"Limiting mix volume to {max_volume_limit}. The initial volume {mix_vol} exceeded the limits." logging.warning(msg) comments += [self.gcode.comment(msg)] # Override. mix_vol = max_volume_limit # Logs. logging.info(f"Mixing volume {mix_vol} uL.") comments += [self.gcode.comment(f"MIX action with {mix_vol} uL.")] return mix_volCalculate the mixing volume for a give mix action.
Args
action:dict- Mix action.
comments:list, optional- List of gcode comments to extend. Defaults to None.
Raises
ProtocolError- description
Returns
_type_- description
def capillary_corr(self, final_vol)-
Expand source code
def capillary_corr(self, final_vol): """ Define reciprocal function. Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above). """ params = self.pipette.tension_correction start_vol = params["start_vol"] max_correction = params["max_correction"] # Must be negative to dispense volume capillary_extra_vol = -abs(-final_vol * max_correction / start_vol + max_correction) # Cap maximum correction volume correction = max([-max_correction, capillary_extra_vol]) return correction # return -abs((-1/(21-x) + 0.04761905) * (2/0.952381)) # reciprocal attempt, KISS!Define reciprocal function. Linear function. For the p200 pipette, returns -2 at final volume 0, and 0 at final volume 20 (and above).
def check_volume_limit(self, volume, tolerance: float = 0.0)-
Expand source code
def check_volume_limit(self, volume, tolerance:float=0.0): """Volume limit check for the current tip and tool.""" # self.check_tip_volume(volume) tip_container = self.controller.database_tools.getContainerByName(self.pipette.tip_loaded["container"]) max_volume = self.pipette.get_max_volume(tip_container) current_volume = self.pipette.tip_loaded.get("volume", 0.0) final_volume = current_volume + volume if final_volume <= current_volume: # TODO: Reconsider passing this check if the volume is decreasing (i.e. dispensing). pass if final_volume > max_volume + tolerance: msg = f"The requested final volume ({final_volume}) exceeds the tool's capacity of {max_volume} uL for tip {tip_container['name']}." raise ProtocolError(msg) return max_volume, final_volumeVolume limit check for the current tip and tool.
def compensate_backlash(self, volume: float, backlash_correction: float, action: dict = None, i: int = None)-
Expand source code
def compensate_backlash(self, volume: float, backlash_correction: float, action:dict=None, i:int=None): """Backlash compensation""" # If previous and current directions are different last_dir_sign = self.builder.sign(self.pipette.state["lastdir"]) new_dir_sign = self.builder.sign(volume) # TODO: be smarter about backlash (?). backlash_correction_vol = 0.0 if (last_dir_sign * new_dir_sign) == -1: # Then over-pour (negative volume, -0.5 uL for p200) or over-draw (positive volume, 0.5 uL for p200) backlash_correction_vol = new_dir_sign * backlash_correction # Comment GCODE. message = f"Action {i}: pipetting extra {backlash_correction_vol} uL on backlash correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Move without corrections. self.displace_volume(volume = backlash_correction_vol, action = action) return backlash_correction_volBacklash compensation
def compensate_backpour(self, action: dict = None, i: int = None)-
Expand source code
def compensate_backpour(self, action:dict=None, i:int=None): """Back-pour correction Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube. This corrects inaccuracies in the first dispense of a series. """ # Get parameters from the tool back_pour_correction = self.pipette["back_pour_correction"] # Over-draw over_draw_volume = abs(back_pour_correction) # positive, force loading # Logs. message = f"Action {i}: applying {over_draw_volume} uL over-draw volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(over_draw_volume, action) # Back-pour back_pour_volume = -abs(back_pour_correction) # negative, force dispensing # Logs. message = f"Action {i}: applying {back_pour_volume} uL back-pour volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(back_pour_volume, action) return over_draw_volume, back_pour_volumeBack-pour correction Try correcting "liquid backlash" by over-drawing and pouring a bit back into the source tube. This corrects inaccuracies in the first dispense of a series.
def compensate_extra_draw(self, extra_draw_volume, action: dict = None, i: int = None)-
Expand source code
def compensate_extra_draw(self, extra_draw_volume, action:dict=None, i:int=None): """First draw compensation. If the tip is new/empty, the liquid may have a harder time "entering" the dry tip. Correct by over-drawing by a specified amount (e.g. 5 uL for the p200). """ # Comment GCODE. message = f"Action {i}: over-drawing by {extra_draw_volume} uL on first draw." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Move without corrections. self.displace_volume(volume = extra_draw_volume, action = action) return extra_draw_volumeFirst draw compensation. If the tip is new/empty, the liquid may have a harder time "entering" the dry tip. Correct by over-drawing by a specified amount (e.g. 5 uL for the p200).
def compensate_first_draw(self, volume: float, action: dict = None, i: int = None)-
Expand source code
def compensate_first_draw(self, volume: float, action:dict=None, i:int=None): """First draw compensation. Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume). """ # Limit pre-wetting to 120% of move volume or max volume. correction_volume = min(abs(self.pipette.max_volume), volume*1.2) # Comment GCODE. message = f"Action {i}: pre-wetting the tip by pipetting up-and-down {correction_volume} uL on first draw." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction by calling this very method, # by pipetting up and down (i.e. pre-wetting the tip). for direction in [1.0, -1.0]: # Move without corrections. self.displace_volume(volume = direction * correction_volume, action = action) return correction_volumeFirst draw compensation. Pre-wet the tip by loading and dispensing the target volume (capped by the pipette's max volume).
def compensate_low_volume(self, volume: float, action: dict = None, i: int = None)-
Expand source code
def compensate_low_volume(self, volume: float, action:dict=None, i:int=None): """Compensation for capillary effects for low final tip volume during dispensing.""" # Apply only for regular dispensing moves. # Apply correction if necessary: only at low final volumes. correction_start_vol = self.pipette.tension_correction["start_vol"] # Get the current tip volume (i.e. from the previous action). final_volume = self.pipette.state["vol"] # Add the new [corrected] volume to the local pipette volume tracker (axis position in uL). final_volume += volume # Subtract the "back-pour correction" volume if this is a first draw, # forcing negative volume (for dispensing). Though this "correction" is # applied later on, it must be considered here, because it will change # the final volume. # TODO: disabled this bit for now, since this correction is meant for # simple dispense moves only. # if first_draw and tip_priming_correction and back_pour_correction: # final_volume += -abs(self.pipette["back_pour_correction"]) # Default correction should be null. capillary_extra_vol = 0.0 # Apply the correction. if final_volume < correction_start_vol: # Compute the "extra" volume that must be dispensed. capillary_extra_vol = self.capillary_corr(final_volume) # Comment GCODE message = f"Action {i}: final volume {final_volume} uL requires low-volume compensation. Dispensing extra {capillary_extra_vol} uL." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply correction. # Move without corrections. self.displace_volume(volume = capillary_extra_vol, action = action) else: # Comment GCODE message = f"Action {i}: final volume {final_volume} uL does not require low-volume compensation." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) return capillary_extra_volCompensation for capillary effects for low final tip volume during dispensing.
def compensate_underdraw(self, action: dict = None, i: int = None)-
Expand source code
def compensate_underdraw(self, action:dict=None, i:int=None): """Under-draw correction Correction for aspiration moves that draw less volume than expected, increasing the loaded volume by a fixed amount of microliters. This applies to all moves. """ # Get parameters from the tool under_draw_correction = self.pipette["under_draw_correction"] # Over-draw vol_correction = abs(under_draw_correction) # positive, force loading # Logs. message = f"Action {i}: applying {vol_correction} uL under-draw volume correction." logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) # Apply the correction. self.displace_volume(vol_correction, action)Under-draw correction Correction for aspiration moves that draw less volume than expected, increasing the loaded volume by a fixed amount of microliters. This applies to all moves.
def displace_volume(self, volume: float, action: dict, check_tolerance: float = 100.0)-
Expand source code
def displace_volume(self, volume: float, action: dict, check_tolerance:float=100.0): """Compute axis displacement from the target pipetting volume. Default pipetting mode should be incremental (i.e. relative) volume. Args: volume (float): Pipetting volume in microliters. action (dict): Action object, optionally containing a pipetting "mode" string in its "args". """ # TODO: De-hardcode the tolerance set above. The issue here is that corrections move the # shaft to volumes outside the limit of the tip, which may be a bad idea (e.g. because # there may be a filter) or of the tool's limit for the tip. # Volume limit check for the current tip. self.check_volume_limit(volume, tolerance=check_tolerance) # Convert volume to displacement. mode = action["args"].get("mode", "incremental") displacement = self.volume_to_displacement(volume, mode=mode) # Generate GCODE command for the move. # TODO: Add pipetting speed control. # TODO: Volume sign INVERTED here, be smarter about the direction. command = self.gcode.G1( e=-displacement, absolute=False, absolute_e=False, f=self.pipette.feedrate) # Add the GCODE command to the list. self.builder.extend_commands(commands=command, action=action) # Update state. self.update_pipette_state(volume, displacement) return displacementCompute axis displacement from the target pipetting volume. Default pipetting mode should be incremental (i.e. relative) volume.
Args
volume:float- Pipetting volume in microliters.
action:dict- Action object, optionally containing a pipetting "mode" string in its "args".
def fast_safe_height(self, current_action: dict, previous_action: dict = None)-
Expand source code
def fast_safe_height(self, current_action:dict, previous_action:dict=None): """Try optimising clearance for a pipetting move within the same platform item. The objective is to speed-up repetition pipetting within the same platform. """ # Previous action. if previous_action is None: previous_action = self.builder.previous_action check_last, last_item_name = self.fast_safe_height_action_check(previous_action) # Current action. check_next, next_item_name = self.fast_safe_height_action_check(current_action) # Items to optimize by. item_names = [] # Decide wether to optimize or not. if check_last and check_next and (last_item_name == next_item_name): logging.info(f"Optmizing clearance move to safe height within item: '{next_item_name}'") item_names.append(next_item_name) return self.builder.macroMoveSafeHeight(item_names=item_names)Try optimising clearance for a pipetting move within the same platform item. The objective is to speed-up repetition pipetting within the same platform.
def fast_safe_height_action_check(self, action: dict)-
Expand source code
def fast_safe_height_action_check(self, action:dict): """Check that an action is compatible with a fast safe height check.""" # List of eligible commands. fast_sh_eligible_cmds = [self.pour_liquid_cmd, self.load_liquid_cmd, self.mix_cmd] # Check that the commands are reasonable to optmize. cmd_check = action and action.get("cmd", None) in fast_sh_eligible_cmds # Check that the actions are platform-based, and point to an item. args_check = "item" in action["args"] # Try getting item_name from the action. item_name = action.get("args", {}).get("item", None) # Check that the origin and destination item is the same one. item_check = item_name is not None # Return the digested check, and the item name. return cmd_check and args_check and item_check, item_nameCheck that an action is compatible with a fast safe height check.
def getNextTip(self,
platform_item: dict,
selector: dict,
pop_from_item=True,
workspace_name: str = None,
pop_from_db=False)-
Expand source code
def getNextTip(self, platform_item:dict, selector: dict, pop_from_item=True, workspace_name:str=None, pop_from_db=False): """ Get the next tip and delete it from the local "platform_item" object. The object in the database is not affected. """ return self.controller.database_tools.getNextContent( workspace_name=workspace_name, platform_item=platform_item, selector=selector, pop_from_item=pop_from_item, pop_from_db=pop_from_db )Get the next tip and delete it from the local "platform_item" object. The object in the database is not affected.
def getTipAndCoords(self, action, action_index)-
Expand source code
def getTipAndCoords(self, action, action_index): """Get a tip object and its coordinates. Uses information from the platform, the item, the content, and the container to get the XYZ of a tip. The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip. This function does not apply tool offsets. Args: action (dict): Protocol action definition. action_index (int): Action index. Returns: tuple: Tuple with the tip object, and the xyz coordinates. Example "tip": 'index': 96, 'maxVolume': 160, 'name': 'tip96', 'position': {'col': 12, 'row': 8}, 'tags': [], 'type': 'tip', 'volume': 0 The "tip" above will be fetched from the database when the action's "args" are of "item" type: 'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'}, 'cmd': 'PICK_TIP' The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly: 'args': {'position': {"x": 20, "y": 200, "z": 30}, 'tool': 'P200', 'tip': tip_definition}, # Note the tip data passed here (described below). 'cmd': 'PICK_TIP' Example "tip_definition" for "position" type args: 'maxVolume': 160, 'length': 50.0, 'volume': 0 """ # Use the provided coordinates if found. if "position" in list(action["args"]): # Get coords. coords = action["args"]["position"] x, y, z = coords["x"], coords["y"], coords["z"] # Get the tip's info. It must contain "length". next_tip = action["args"]["tip"] # Ensure required information about the loaded tip. try: assert next_tip.get("length", None) is not None # TODO: The "maxVolume" property is in the container definition. # Redesing this code using jsondb or nodb to regain compatibility with this "platformless" mode. assert next_tip.get("maxVolume", None) is not None assert next_tip.get("volume", None) is not None except AssertionError as e: msg = f"The tip from an action is missing required properties: {pformat(action)}" raise ProtocolError(msg) from e # Save contextual information to the tip. next_tip["fromItem"] = None # Else lookup the coordinates by tip-rack platform. else: # Use the builder's method. next_tip, x, y, z = self.controller.builder.getContentCoords( # Get any next tip from the item in the action. action, allow_next=True, # Remove the item from the local tracker, but not from the DB. pop_from_item=True, pop_from_db=False ) # TODO: Deprecate "getNextTip". # TODO: Check and tune Z positioning according to tip seal pressure. # This will need calibration. # Get and set the length of the tip if missing. if next_tip.get("length", None) is None: # Get information about the loaded tip from the platform. container_name = next_tip["container"] container = self.controller.database_tools.getContainerByName(container_name) next_tip["length"] = container["length"] # Save contextual information to the tip. next_tip["fromItem"] = action["args"]["item"] # Save contextual information to the tip next_tip["fromAction"] = action_index next_tip["inTool"] = self.builder.current_tool # Return the tip and its spatial location. return next_tip, x, y, zGet a tip object and its coordinates.
Uses information from the platform, the item, the content, and the container to get the XYZ of a tip.
The Z coordinate is calculated by "getContentZ", and corresponds to the fitting depth of the tip.
This function does not apply tool offsets.
Args
action:dict- Protocol action definition.
action_index:int- Action index.
Returns
tuple- Tuple with the tip object, and the xyz coordinates.
Example "tip": 'index': 96, 'maxVolume': 160, 'name': 'tip96', 'position': {'col': 12, 'row': 8}, 'tags': [], 'type': 'tip', 'volume': 0
The "tip" above will be fetched from the database when the action's "args" are of "item" type: 'args': {'item': '200ul_tip_rack_MULTITOOL 1', 'tool': 'P200'}, 'cmd': 'PICK_TIP'
The action can also provide the tip's coordinates directly, but must provide "tip" data explicitly: 'args': {'position': {"x": 20, "y": 200, "z": 30}, 'tool': 'P200', 'tip': tip_definition}, # Note the tip data passed here (described below). 'cmd': 'PICK_TIP'
Example "tip_definition" for "position" type args: 'maxVolume': 160, 'length': 50.0, 'volume': 0
def get_pipette_tools_from_db(self)-
Expand source code
def get_pipette_tools_from_db(self): """Read tools from the DB and keep only the ones with a matching type.""" tools: list = self.controller.database_tools.tools pipettes_data = {t["name"]: t for t in tools if t["type"] == TOOL_TYPE} logging.info(f"Found these pipettes in the DB: {list(pipettes_data)}") return pipettes_dataRead tools from the DB and keep only the ones with a matching type.
def guess_pipette(self,
volume: float,
tip_container: dict = None,
repeats: int = 1,
prioritize_accuracy=True)-
Expand source code
def guess_pipette(self, volume:float, tip_container:dict=None, repeats:int=1, prioritize_accuracy=True): """Guess which tool is better for a given volume and number of serial repeats. By default, accuracy will be prioritized (i.e. less dispenses per load), unless 'prioritize_accuracy' is set to False. """ # Get the maximum volume for the given tip. tool_limits = [(p.get_max_volume(tip_container), p.name) for p in self.pipettes.values()] # Calculate how many repeats can be done with each pipette. # Only keep the ones with enough volume. tool_repeats = [(limit // volume, tool) for limit, tool in tool_limits if limit // volume >= repeats] if prioritize_accuracy is True: # Calculate the smaller number of repeats between tools. min_repeats = min(*[reps for reps, tool in tool_repeats]) # Choose the tool that allows the smaller number of repeats. # This will prioritize precision over speed. min_repeats_tool = next(tool for reps, tool in tool_repeats if reps == min_repeats) return min_repeats_tool else: # Calculate the greater number of repeats between tools. max_repeats = max(*[reps for reps, tool in tool_repeats]) # Choose the tool that allows the larger number of repeats. # This will prioritize speed over precision. max_repeats_tool = next(tool for reps, tool in tool_repeats if reps == max_repeats) return max_repeats_toolGuess which tool is better for a given volume and number of serial repeats. By default, accuracy will be prioritized (i.e. less dispenses per load), unless 'prioritize_accuracy' is set to False.
def load_pipette(self, pipette_data)-
Expand source code
def load_pipette(self, pipette_data): """Instantiate and save one PipetteTool from a parameter set. This is a tool loader function for GcodeBuilder. """ pipette_name = pipette_data["name"] logging.info(f"Loading pipette '{pipette_name}'.") # Check if the tool is already registered locally. if pipette_name in self.pipettes: msg = f"Pipette '{pipette_name}' is already in the pipette handler's list." logging.error(msg) raise DataError(msg) # Load the pipette. try: # The PipetteTool instances will register themselves a tool in the builder. tool = PipetteTool(pipette_data, self.builder) self.pipettes[pipette_name] = tool except Exception as e: msg = f"Failed to initialize tools: {e}" logging.error(msg) raise CommanderError(msg) from e return toolInstantiate and save one PipetteTool from a parameter set. This is a tool loader function for GcodeBuilder.
def load_pipettes(self, pipettes_data)-
Expand source code
def load_pipettes(self, pipettes_data): """Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'.""" # NOTE: commenting out the defaults for now. # if not self.pipettes_data: # self.pipettes_data = deepcopy(PIPETTES) for pipette_name, pipette_data in pipettes_data.items(): # Instantiate all pipette tools. logging.info(f"Initializing pipette '{pipette_name}'.") self.load_pipette(pipette_data)Instantiate and save one PipetteTool for each parameter set in 'pipettes_data'.
def macroDiscardTip(self, action, i, operation_time=20.0)-
Expand source code
def macroDiscardTip(self, action, i, operation_time=20.0): """Move over the trash box or eject-post and discard the tip (if placed, else Except). Action moves: - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools). - Move to the ejection site and eject the tip. - Home the tool. Args: action (dict): The action definition. Contents not used. The coordinates are loaded from the tool/pipette definition. i (int): The index of the action. Not used. Raises: Exception: An error is raised when there are no tips to discard. Returns: str: Generic message. Sample action: { "cmd": "DISCARD_TIP", "args": { "item": "descarte 1" } } """ # TODO: Add alternate method of ejecting tips against an ejector pad (custom pipette). # - Add "can eject" property and decide based on that. # - Add "ejector pad" properties to tools. if self.pipette.tip_loaded: # Move to clearance level on Z. # Should extend task.commmand list automatically. self.builder.macroMoveSafeHeight() if self.pipette.can_eject: item_name = action["args"]["item"] # Get the item's XYZ. logging.info("Ejecting tip using integrated ejector.") x, y, z = self.builder.getItemXYZ(item_name) # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not. # Move over the ejection site initial XY coordinates. self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.") # Move over the ejection site initial Z coordinate. self.builder.macroGoToXYZ(z=z, add_comment="Initial Z approach to eject zone.") else: # Get the ejector post's initial coordinates. logging.info("Ejecting tip using post.") x, y, z = self.pipette.eject_post_init_coords # TODO: Deduplicate this. The eject-post passes an absolute Z but the ejector does not. # Move over the ejection site initial XY coordinates. self.builder.macroGoToXYZ(x=x, y=y, add_comment="Initial XY approach to eject zone.") # Move over the ejection site initial Z coordinate. # NOTE: "add_offset=False" is needed in this case. self.builder.macroGoToXYZ(z=z, add_offset=False, add_comment="Initial Z approach to eject zone.") # Generate GCODE for the actual ejection sequence. eject_commands = self.pipette.eject_tip() self.builder.extend_commands(eject_commands) # Raise to safety. self.builder.macroMoveSafeHeight() # Ensure zeroed pipette axis # self.macroPipetteZero() # NOTE: replace macroPipetteZero with extruder homing command. self.macroPipetteHome(which_tool=self.builder.current_tool) # TODO: reimplement non-homing zeroing once the underlying Klipper issue is resolved: # Sending "M82" and moving the extruder to 0 does not set the position to 0. # Check if this is due to absolute extruder positioning not taking effect. else: # TODO: discutir si es mejor tirar este error o no hacer nada raise ProtocolError(f"Cannot discard tip if one is not already placed! Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return f"Processed tip discard with index {i}"Move over the trash box or eject-post and discard the tip (if placed, else Except).
Action moves: - Move to clearance level on Z (avoiding platforms) and Y (avoiding parked tools). - Move to the ejection site and eject the tip. - Home the tool.
Args
action:dict- The action definition. Contents not used. The coordinates are loaded from the tool/pipette definition.
i:int- The index of the action. Not used.
Raises
Exception- An error is raised when there are no tips to discard.
Returns
str- Generic message.
Sample action: { "cmd": "DISCARD_TIP", "args": { "item": "descarte 1" } }
def macroGoToAndMix(self, action, i, operation_time=20.0)-
Expand source code
def macroGoToAndMix(self, action, i, operation_time=20.0): """Parse a MIX action and generate GCODE. Homogenize a solution. Possibly useful for rinsing tips before a transfer. Sample MIX action: { 'args': { 'count': 3, 'item': '5x16_1.5_rack 1', 'of': 'content', # Can also be 'of': "tip" 'percentage': 50, 'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector' 'selector': {'by': 'name', 'value': 'tube1'}}, 'cmd': 'MIX' } Sample "tip": { "index": 1, "maxVolume": 160, "name": "tip1", "position": { "col": 1, # Or: "x": 10? "row": 1 # Or: "y": 30? }, "tags": [], "type": "tip", "volume": 0 } Note that the tool must have been set by a previous tip pickup action. """ logging.info("Parsing MIX action.") if not self.pipette.tip_loaded: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}") # Move to clearance level on Z. self.fast_safe_height(action) # Zero the pipette axis. self.macroPipetteZero(action, i) # Move over the target content and downward into the tube. self.builder.macroGoTo(action, i) # Execute the mix. self.macroMix(action, operation_time=operation_time) # Back-up. self.fast_safe_height(action, action) logging.info("Done parsing MIX action.") return f"Processed MIX action with index {i}."Parse a MIX action and generate GCODE. Homogenize a solution. Possibly useful for rinsing tips before a transfer.
Sample MIX action: { 'args': { 'count': 3, 'item': '5x16_1.5_rack 1', 'of': 'content', # Can also be 'of': "tip" 'percentage': 50, 'coords': {"x": 123, "y": 123, "z": 123}, # Will override 'selector' 'selector': {'by': 'name', 'value': 'tube1'}}, 'cmd': 'MIX' }
Sample "tip": { "index": 1, "maxVolume": 160, "name": "tip1", "position": { "col": 1, # Or: "x": 10? "row": 1 # Or: "y": 30? }, "tags": [], "type": "tip", "volume": 0 }
Note that the tool must have been set by a previous tip pickup action.
def macroGoToAndPipette(self, action, i, operation_time=20.0)-
Expand source code
def macroGoToAndPipette(self, action, i, operation_time=20.0): """Move to a tube and load liquid. LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette. Will throw an error if a tip is not already loaded. Sample action: { "cmd": "LOAD_LIQUID", "args": { "item": "5x16_1.5_rack 1", "selector": {"by": "name", "value": "tube1"}, "volume": 100, # in microliters "tool": "P200" } } Likely platformless action: { "cmd": "LOAD_LIQUID", "args": { "content": {"volume": 200.0}, "volume": 100, # in microliters "tool": "P200" } } Primary move sequence: 1. move to clearance level on Z 2. move to next tube location on XY 3. move to Z at loading height 4. load liquid 5. move to clearance level on Z """ if self.pipette.tip_loaded: # Move to clearance level on Z. self.fast_safe_height(action) # Move over the target content and downward into the tube. self.builder.macroGoTo(action, i) # Actuate the pipette tool. self.macroPipette(action, i) # Move to clearance level on Z. self.fast_safe_height(action, action) else: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot load or dispense without a tip. Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return f"Processed macroGoToAndPipette action with index {i}"Move to a tube and load liquid. LOAD_LIQUID action interpreter: moves to XYZ position and moves S axis to pipette. Will throw an error if a tip is not already loaded.
Sample action: { "cmd": "LOAD_LIQUID", "args": { "item": "5x16_1.5_rack 1", "selector": {"by": "name", "value": "tube1"}, "volume": 100, # in microliters "tool": "P200" } }
Likely platformless action: { "cmd": "LOAD_LIQUID", "args": { "content": {"volume": 200.0}, "volume": 100, # in microliters "tool": "P200" } }
Primary move sequence: 1. move to clearance level on Z 2. move to next tube location on XY 3. move to Z at loading height 4. load liquid 5. move to clearance level on Z
def macroGoToAndPour(self, action, i, operation_time=20.0)-
Expand source code
def macroGoToAndPour(self, action, i, operation_time=20.0): """Move to a tube and dispense liquid. Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette". """ action["args"]["volume"] = -abs(float(action["args"]["volume"])) # Force negative volume ret_msg = self.macroGoToAndPipette(action, i, operation_time=operation_time) return ret_msgMove to a tube and dispense liquid. Hadnles "DROP_LIQUID" actions. Makes volume "negative" and runs "macroGoToAndPipette".
def macroMix(self, action: dict, operation_time=0.0)-
Expand source code
def macroMix(self, action: dict, operation_time=0.0): """Execute pipetting moves for a mix action. Args: action (dict): _description_ operation_time (float, optional): _description_. Defaults to 0.0. """ # Comments for GCODE. comments = [] # Calculate the mixing volume. mix_vol = self.calculate_mix_vol(action, comments) # Extend commands with comments. self.builder.extend_commands(comments) # Prepare minimal pipetting action dict mix_count = int(action["args"]["count"]) # Calculate volume and increments. factor = 0.2 # fraction of volume that will be used for increasing mixing vol = mix_vol * (1 - factor) # initial volume for mixing vol_increment = mix_vol * (factor / mix_count) # Do the mixing for i in range(mix_count): # Add time to the total operatin time. # TODO: Link this to the flow speed and total volume. operation_time += operation_time # Logs. logging.debug(f"Generating mix move {i} of {mix_count}.") # Load mix_action_up = {"args": {"volume": vol}} # in microliters self.macroPipette(mix_action_up, i) self.builder.extend_commands(mix_action_up.get("GCODE", [])) # NOTE: The dropped volume will be greater than the loaded, to # avoid having unmixed solution remainder in the tip. In the final # iteration, the dropped volume will be the one in the action. vol += vol_increment # Drop mix_action_dw = {"args": {"volume": -vol}} # in microliters self.macroPipette(mix_action_dw, i) self.builder.extend_commands(mix_action_dw.get("GCODE", [])) # Extend the operation time. self.controller.machine.update_exec_opts( action=action, timeout=operation_time, add_timeout=True )Execute pipetting moves for a mix action.
Args
action:dict- description
operation_time:float, optional- description. Defaults to 0.0.
def macroPickNextTip(self, action, i, operation_time=20.0)-
Expand source code
def macroPickNextTip(self, action, i, operation_time=20.0): """Move to clearance height and fit a tip. Will throw an error if a tip is already loaded. Sample platform-based action: { "cmd": "PICK_TIP", "args": { "item": "200ul_tip_rack_MULTITOOL 1", "tool": "P200" } } Sample coordinate-based (platformless) action: { "cmd": "PICK_TIP", "args": { "position": {"x": 10.5,"y": 20.5,"z": 0.5}, "tool": "P200", "tip": {"length": 50.0, "maxVolume": 200, "volume": 0} } } """ if not self.pipette.tip_loaded: # Move to clearance level on Z, extends task.command list automatically self.builder.macroMoveSafeHeight() # Get tip and coords (supports "position" key specification). next_tip, tip_x, tip_y, tip_z = self.getTipAndCoords(action, i) # Generate GCODE to pick up the tip. self.pickNextTip(next_tip, tip_x, tip_y, tip_z) # Raise to safety. self.builder.macroMoveSafeHeight() else: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError(f"Cannot pick tip if one is already placed! Action index: {i}") # Extend the operation time. self.controller.machine.update_exec_opts(action, timeout=operation_time, add_timeout=True) return actionMove to clearance height and fit a tip.
Will throw an error if a tip is already loaded.
Sample platform-based action: { "cmd": "PICK_TIP", "args": { "item": "200ul_tip_rack_MULTITOOL 1", "tool": "P200" } }
Sample coordinate-based (platformless) action: { "cmd": "PICK_TIP", "args": { "position": {"x": 10.5,"y": 20.5,"z": 0.5}, "tool": "P200", "tip": {"length": 50.0, "maxVolume": 200, "volume": 0} } }
def macroPipette(self,
action: dict,
i: int,
backlash_correction: float = 0.0,
tip_priming_correction: bool = False,
back_pour_correction: bool = 0.0,
under_draw_correction: float = 0.0,
extra_draw_volume: float = 0.0,
capillary_correction: bool = False)-
Expand source code
def macroPipette( self, action:dict, i:int, backlash_correction:float=0.0, tip_priming_correction:bool=False, back_pour_correction:bool=0.0, under_draw_correction:float=0.0, extra_draw_volume:float=0.0, capillary_correction:bool=False): """ Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes). Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py). """ # Get the target volume. volume = action["args"]["volume"] # Flag indicating if this is a draw or dispense move. is_drawing = self.builder.sign(volume) > 0 # Comment GCODE message = f"Calculating pipetting move for {volume} uL volume for action {i}" message += f", using tool '{self.builder.current_tool}'." logging.info(message) # Check if the tip is new/empty. first_draw = self.pipette.state["tip_vol"] is None logging.debug(f"Computed move properties: is_drawing={is_drawing} first_draw={first_draw} (tip_vol={self.pipette.state['tip_vol']})") self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action) # Read options from action arguments. # Prioritize the arguments, then the config, and defaulting to arguments if not there. backlash_correction = backlash_correction or float(self.pipette.get("backlash_correction", backlash_correction)) tip_priming_correction = tip_priming_correction or bool(self.pipette.get("tip_priming_correction", tip_priming_correction)) back_pour_correction = back_pour_correction or float(self.pipette.get("back_pour_correction", back_pour_correction)) under_draw_correction = under_draw_correction or float(self.pipette.get("under_draw_correction", under_draw_correction)) extra_draw_volume = extra_draw_volume or float(self.pipette.get("extra_draw_volume", extra_draw_volume)) capillary_correction = capillary_correction or bool(self.pipette.get("capillary_correction", capillary_correction)) # Log the corrections. logging.debug(f"Corrections: backlash_correction={backlash_correction}, tip_priming_correction={tip_priming_correction}, back_pour_correction={back_pour_correction}, under_draw_correction={under_draw_correction}, capillary_correction={capillary_correction}") # Compensate mechanical backlash. if backlash_correction: # Backlash compensation. self.compensate_backlash(volume, backlash_correction, action, i) # Compensate for tip-wetting issues. if is_drawing and first_draw: if tip_priming_correction: # Compensation for under-drawing on first load, by pre-wetting. self.compensate_first_draw(volume, action, i) if extra_draw_volume: # Compensation for under-drawing on first load, by over-drawing a bit. self.compensate_extra_draw(extra_draw_volume, action, i) # Compensation for capillary effects for low final tip volume during dispensing. # Apply only for regular dispensing moves. if capillary_correction and self.builder.sign(volume) == -1 and not first_draw: self.compensate_low_volume(volume, action, i) # Comment GCODE message = f"Pipetting {volume} uL for action {i}." logging.info(message) self.builder.extend_commands(commands=[self.gcode.comment(message)], action=action) # Compute axis displacement from the target pipetting volume. # Default pipetting mode should be incremental (i.e. relative) volume. self.displace_volume(volume, action) # Under-draw correction if is_drawing and under_draw_correction: self.compensate_underdraw(action, i) # Back-pour correction if is_drawing and back_pour_correction: self.compensate_backpour(action, i) return f"Processed pipetting action with index {i}."Pipetting works incrementally for now (i.e. relative volumes, not absolute volumes). Negative values mean 'dispense' or 'lower' the axis (see functions in driverSaxis.py).
def macroPipetteHome(self, which_tool='all')-
Expand source code
def macroPipetteHome(self, which_tool="all"): """Macro to home the requested tools Args: which_tool (str, optional): Either a valid name for a tool, or "all". Defaults to "all". """ if which_tool == "all": which_tools = list(self.pipettes) else: which_tools = [which_tool] tool_home_cmds = [] for tool_name in which_tools: tool = self.pipettes[tool_name] tool_home_cmds += tool.home() message = f"Homing pipettes {which_tools}" logging.info(message) self.builder.extend_commands([self.gcode.comment(message)]) self.builder.extend_commands(tool_home_cmds) return tool_home_cmdsMacro to home the requested tools
Args
which_tool:str, optional- Either a valid name for a tool, or "all". Defaults to "all".
def macroPipetteZero(self, action=None, i=None)-
Expand source code
def macroPipetteZero(self, action=None, i=None): """ Move the pipette to the zero position (without re-homing). Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0) and the retraction distance is considered the zero position for pietting. """ # Reverse net displacement volume = -self.pipette.state["vol"] # Early return if the volume is zero. if not volume: message = f"Pipette is already zeroed at volume={volume} (action index {i})." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) return # Logs. message = f"Zeroing pipette with volume={volume} (action index {i})." logging.debug(message) self.builder.extend_commands([self.gcode.comment(message)]) # Execute the displacement. self.displace_volume(volume, action) # Check for final volume near zero state_vol = abs(self.pipette.state["vol"]) state_s = abs(self.pipette.state["s"]) threshold = 1e-10 if (state_vol > threshold) or (state_s > threshold): raise ProtocolError("Pipette was zeroed, but reference states did not match zero. " + "Expected vol=0 and s=0 but got:" + f" vol={self.pipette['state']['vol']}" + f" s={self.pipette['state']['s']}")Move the pipette to the zero position (without re-homing).
Note: this now requires that the pipette homes upwards to the axis origin (i.e. at 0) and the retraction distance is considered the zero position for pietting.
def pickNextTip(self, next_tip, tip_x, tip_y, tip_z)-
Expand source code
def pickNextTip(self, next_tip, tip_x, tip_y, tip_z): """Generate GCODE to place a tip. Args: next_tip (dict): Data for the target tip. tip_x (float): X coordinate of the tip, without tool offsets. tip_y (float): Y coordinate of the tip, without tool offsets. tip_z (float): Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets. TODO: Test adding a "vibration" if possible (i have not found this necessary). """ if self.pipette.tip_loaded: # TODO: discutir si es mejor descartar el tip # si está automáticamente o tirar este error raise ProtocolError("Cannot pick tip if one is already placed!") # Probe command list. commands = [self.gcode.comment("Start tip pickup moves.")] # Add XYZ tool offsets to the tip's active center. This adds: # - XYZ tool offsets. # - And nothing else, because no tip is loaded. x, y, z = self.builder.addToolOffsets(x=tip_x, y=tip_y, z=tip_z) # Move over the tip on XY. commands += self.gcode.G1(x=x, y=y, absolute=True) # Calculate initial and final fit coordinates. z_pre_fit, z_final = self.calcTipFitZ(content_top_z=z, next_tip=next_tip) # Move over the tip on Z, partially inserting the tip holder if using a non-terminal tip stage. commands += self.gcode.G1(z=z_pre_fit, absolute=True) if not self.pipette.can_probe: # Fit the tip (blindly, without probing). logging.info(f"Picking tip with: z_final={z_final}") # TODO: add a feedrate config parameter for tip fitting. commands += self.gcode.G1(z=z_final, f=10, absolute=True) else: # Probe for the tip if supported. # Total distance of the probing move. z_scan_dist = self.calcTipProbeDistZ(next_tip) # NOTE: This is a relative move. logging.info(f"Probing for tip with: z_scan_dist={z_scan_dist}") commands += self.gcode.gcodeProbeDown(z_scan=-z_scan_dist) # TODO: Seal the tip by pressing a little bit very slowly two times. # Extend commands list. self.builder.extend_commands(commands) # Flag loaded tip. self.pipette.tip_loaded = next_tip # Zero the tip volume tracking (set to None instead for new tip check). self.pipette.state["tip_vol"] = None return commandsGenerate GCODE to place a tip.
Args
next_tip:dict- Data for the target tip.
tip_x:float- X coordinate of the tip, without tool offsets.
tip_y:float- Y coordinate of the tip, without tool offsets.
tip_z:float- Z coordinate of the tip (active height, i.e. absolute fitting height), without tool offsets.
TODO: Test adding a "vibration" if possible (i have not found this necessary).
def update_pipette_state(self, volume, displacement)-
Expand source code
def update_pipette_state(self, volume, displacement): """Update the pipette state information.""" # Setup valid tip volume before updating if self.pipette.state["tip_vol"] is None: self.pipette.state["tip_vol"] = 0 # Update pipette state self.pipette.state["s"] += displacement self.pipette.state["vol"] += volume self.pipette.state["tip_vol"] += volume self.pipette.tip_loaded["volume"] = volume + self.pipette.tip_loaded.get("volume", 0.0) # Update direction with the current move self.pipette.state["lastdir"] = self.builder.sign(volume) # Print state msg = f"Current state of tool {self.builder.current_tool}: " + pformat(self.pipette.state) logging.debug(msg)Update the pipette state information.
def volume_to_displacement(self, volume: float, mode='incremental')-
Expand source code
def volume_to_displacement(self, volume: float, mode="incremental"): """Convert volume (uL) to shaft displacement (mm).""" # Get scaler properties. scaler, scaler_adjust = self.pipette.scaler, self.pipette.scaler_adjust # Convert volume to shaft displacement in millimeters in incremental mode. if mode == "incremental": # Units are in microliters, they must be converted. displacement = volume / scaler # Appliy linear adjustment. displacement = displacement * scaler_adjust else: # TODO: implement "absolute" pipetting volumes. raise ProtocolError(f"Invalid mode: '{mode}'") # Comment GCODE message = f"Converted {volume} uL to {displacement} mm" message += f" with scaler {scaler} and adjust factor {scaler_adjust}" logging.debug(message) self.builder.extend_commands(commands=[self.gcode.comment(message)]) return displacementConvert volume (uL) to shaft displacement (mm).
Inherited members