Module pipettin-piper.piper.plugins.pcr_template

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = TemplateRouter(controller=controller)
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class TemplateRouter (controller=None, config: dict = None, verbose=False)
Expand source code
class TemplateRouter(Plugin):
    """Generate a protocol and workspace objects for PCR reactions.

    Receives data from a 'PCR template form', sent the GUI through SocketIO,
    and automatically generates steps to prepare the reactions using the "mix" module.

    The preparation steps are then translated to protocol steps and workspace objects,
    which are returned through SocketIO, and are used by the UI.
    """

    config: dict
    verbose: bool = False
    sio: socketio.AsyncClient = None

    # Field names for platforms in the template data.
    tmplt_tip_plat = 'tipsPlatform'
    tmplt_trash_plat = 'trashPlatform'
    tmplt_pcr_tube = 'PCRtubePlatform'
    tmplt_rgt_tube = 'tube15Platform'

    def __init__(self, controller=None, config:dict=None, verbose=False):

        self.config: dict = {}
        if config is not None:
            self.config = config

        self.verbose = verbose

        # Save controller.
        self.controller: Controller = controller

        # Handlers for different kinds of protocol template envents.
        self.template_handlers = {
            'PCR Mix': self.process_pcr_template,
            'Well Plate Prep': self.process_well_plate_prep
        }

        if self.controller is not None:
            # Save configuration.
            self.config = self.controller.config
            self.verbose = self.controller.verbose

            # Save database tools.
            # NOTE: Removed attribute. It makes it hard to update DB tools from elsewhere.
            #self.database_tools=controller.database_tools

            # Register default socketio event handlers.
            self.register_sio_callbacks()
        else:
            print("Warning: no controller configured.")

        # Set status.
        self._status = True

    # SOCKET.IO CALLBACKS AND EVENTS SECTION ####
    protocol_template_event = 'protocol_template'
    def register_sio_callbacks(self):
        """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
        if self.controller.comms.sio:
            # Register template handler.
            @self.controller.comms.sio.on(self.protocol_template_event)
            async def protocol_template(data):
                return self.process_template(data)
        else:
            logging.error("Failed to register event handlers. The SIO comms object is undefined.")

    def process_template(self, data):
        """Receives a protocol template and current workspace from the GUI/backend and sends new ones.
        The incoming SocketIO event expects a response (with a timeout), which is the return value of this function.
        """
        # Logs.
        logging.info(f"Received template command for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'.")
        logging.debug(f"Received template command with data:\n{pformat(data)}")

        try:
            # Look for a handler function.
            template_type = data['protocol']['template']
            template_handler = self.template_handlers.get(template_type, None)
            # Generate the output.
            if template_handler:
                result = template_handler(template_data=deepcopy(data))
                # Send our congrats!
                msg = f"Successfully processed '{template_type}' template for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'."
                self.controller.send_alert(msg, alert_type="message")
                logging.info(msg)
                logging.debug(f"This is the result:\n{pformat(result)}")
            else:
                logging.warning(f"Unknown template type: '{template_type}'. Returning input data.")
                logging.debug(f"Input protocol data:\n{pformat(data['protocol'])}")
                result = deepcopy(data)

        except Exception as e:
            protocol_name = data.get('protocol', {}).get('name', None)
            msg = f"Error during template processing for protocol '{protocol_name}': {pformat(e)}\n" + traceback.format_exc()
            self.controller.send_alert(msg, alert_type="message")
            logging.error(msg + "\n" + pformat(data))
            # return the received data as is.
            return data

        logging.debug(f"Processed template command with result:\n{pformat(result)}")
        return result

    @staticmethod
    def scrub_mongo_stuff(data: dict):
        """Remove MongoDB stuff from dictionary data **in place**."""
        scrub(data, "__v")
        scrub(data, "createdAt")
        scrub(data, "updatedAt")
        # NOTE: Retain the "_id"s. Scrubbing the _id from the template data causes a UI error.
        # scrub(data, "_id")
        return data

    def process_well_plate_prep(self, template_data: dict):
        """Processes a 'Well Plate Prep' protocol template event.
        TO-DO:
            1. Generate contents for a 384 well-plate platform.
                - Tag all as "targets"
                - Use "strain" and "stimuli" data to add more tags.
            2. Generate contents for a tube rack, for "strain" and "stimuli" data.
            3. Generate high-level steps to transfer "strains" and "stimuli" by tag.
        """

        dump_json("/tmp/template_data.json", template_data)

        # Extract the data.
        file_data = self.parse_file_data(template_data)

        # Get data.
        well_data = self.parse_well_plate(file_data["layout"], sort_col_idx=2)
        well_data_dict = self.well_list_to_dict(well_data)
        # Strain and stimuli data.
        strain_data = self.parse_well_plate(file_data["strains"])
        strain_data_dict = self.well_list_to_dict(strain_data)
        stimuli_data = self.parse_well_plate(file_data["stimuli"])
        stimuli_data_dict = self.well_list_to_dict(stimuli_data)

        # Get item names
        template_definition = template_data['protocol']['templateDefinition']
        well_plate_name = template_definition["well_plate"]
        reagent_rack_name = template_definition["reagent_rack"]
        tip_rack_name = template_definition["tip_rack"]
        trash_name = template_definition["trash"]

        stimuli_volume = template_definition["stimuli_volume"]
        culture_volume = template_definition["cell_volume"]

        # Get items
        workspace = template_data["workspace"]
        well_plate_item = next(i for i in workspace["items"] if i["name"] == well_plate_name)
        reagent_rack_item = next(i for i in workspace["items"] if i["name"] == reagent_rack_name)
        tip_rack_item = next(i for i in workspace["items"] if i["name"] == tip_rack_name)
        # trash_item = next(i for i in workspace["items"] if i["name"] == trash_name)

        # Get first container as default
        well_plate_container = next(c for c in well_plate_item["platformData"]["containers"])
        reagent_rack_container = next(c for c in reagent_rack_item["platformData"]["containers"])
        tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"])

        # Generate contents.
        contents = []
        strains, stimuli = {}, {}
        # Unpack the row and column identifiers
        for col_id, row_id, _ in well_data:
            # Calculate the main index for the content
            main_index = datautils.get_index(
                col_id = col_id-1,  # Convert to zero-based index
                row_id = row_id,
                ncols=well_plate_item["platformData"]["wellsColumns"],
                nrows=well_plate_item["platformData"]["wellsRows"]
            )
            # Get strain tag:
            strain_tag = "strain-" + str(strain_data_dict.get(row_id, {}).get(col_id, "default"))
            stimulus_tag = "stimulus-" + str(stimuli_data_dict.get(row_id, {}).get(col_id, "default"))
            # Generate the content using the base_content function
            row_index = datautils.row_to_index(row_id)  # Convert the row letter(s) to an index (0-based)
            content = newt.contents.base_content(
                container=well_plate_container["container"],
                # Convert index to 1-based.
                index=main_index+1,
                position={"col": col_id, "row": row_index+1},
                name=f"Content_{row_id}{col_id}",  # Example name
                tags=[strain_tag, stimulus_tag],  # Add any tags if needed
                volume=0
            )
            # Append the content.
            contents.append(content)
            # Count strain tags, to insert reagent tubes later.
            strains.setdefault(strain_tag, 0)
            strains[strain_tag] += 1
            # Count stimuli tags, to insert reagent tubes later.
            stimuli.setdefault(stimulus_tag, 0)
            stimuli[stimulus_tag] += 1

        # Override the platform item's contents.
        well_plate_item["content"] = contents

        # Make a list of tags, counts, and the target volume for each one.
        tag_counts = [[t, c, culture_volume, "strain"] for t, c in strains.items()]
        tag_counts += [[t, c, stimuli_volume, "stimulus"] for t, c in stimuli.items()]

        # Add contents to tube rack.
        tubes = []
        tips = []
        for tag_name, tag_count, volume, label in tag_counts:
            # Create and append a tube for each source.
            tube_index = len(tubes) + 1
            col, row = datautils.get_colrow(tube_index,
                                            reagent_rack_item["platformData"]["wellsColumns"],
                                            reagent_rack_item["platformData"]["wellsRows"])
            tube = newt.contents.base_content(
                container=reagent_rack_container["container"],
                # Convert index to 1-based.
                index=tube_index,
                position={"col": col, "row": row},
                name=f"Content_{row}{col}",  # Example name
                tags=[tag_name, label],  # Add any tags if needed
                volume=tag_count*volume # As much as needed
            )
            tubes.append(tube)

            # Create and append a tip for each tube.
            # NOTE: This should be enough as the tips are meant to be reused for each source.
            tip_index = len(tips) + 1
            col, row = datautils.get_colrow(tip_index,
                                            tip_rack_item["platformData"]["wellsColumns"],
                                            tip_rack_item["platformData"]["wellsRows"])
            tip = newt.contents.base_content(
                container=tip_rack_container["container"],
                # Convert index to 1-based.
                index=tip_index,
                position={"col": col, "row": row},
                name=f"Content_{row}{col}",
                volume=0
            )
            tips.append(tip)

        # Override the platform item's contents.
        reagent_rack_item["content"] = tubes
        tip_rack_item["content"] = tips

        # Make a tip element, common to all steps.
        tip_element = make_tip(
            item=tip_rack_name,
            mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"),
            discardItem=trash_name)

        # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests).
        tip_container = None
        # Get the tip's container definition.
        if self.controller:
            tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"])

        # Create and append steps.
        steps = []
        for tag_name, tag_count, volume, _ in tag_counts:
            # Guess the tool.
            tool_name = self.config.get("pcr_template", {}).get("default_tool", None)
            if self.controller:
                tool_name = self.controller.plugins["pipettes"].guess_pipette(
                    volume=volume, tip_container=tip_container,
                    repeats=tag_count, prioritize_accuracy=True)
            # Create step.
            step = step_transfer(target_value=tag_name, source_value=tag_name, volume=volume,
                                 target_platform=well_plate_name, source_platform=reagent_rack_name,
                                 tool=tool_name,
                                 tip=tip_element,
                                 order=len(steps)+1,
                                 name=f"Add {reagent_rack_name} to {well_plate_name}",
                                 target_by="tag", source_by="tag")
            steps.append(step)

        # Save steps, overwriting any previous value.
        template_data['protocol']['steps'] = steps

        # Generate the protocol, using the originals as defaults.
        protocol = newt.hl_protocols.protocol_hl(**template_data['protocol'])

        # Generate the output.
        result = {"protocol": protocol, "workspace": workspace}

        # Cleanup
        result = self.scrub_mongo_stuff(result)

        # Push data into the result (for tests).
        result["well_data"] = well_data
        result["well_data_dict"] = well_data_dict

        dump_json("/tmp/well_data_dict.json", result)

        return result

    @staticmethod
    def parse_well_plate(data, sort_col_idx: int = None) -> list:
        """Convert the raw well-plate matrix-format data into long-format.
        Example output:
            [
                [1, "B", 1],
                [1, "C", 2],
                ...
            ]
        """
        # Iterate over the columns and rows
        result = []
        for col_index, row_values in data.items():
            if col_index == '384':  # Skip the first key as it contains row headers
                continue
            for row_index, value in enumerate(row_values):
                if value != "0":  # Only include non-zero values
                    # '384' contains the row labels; so row letter = data['384'][row_index]
                    result.append([int(col_index), data['384'][row_index], value])

        if sort_col_idx is not None:
            # Sort the result list by the processing order value (third element in each sublist)
            result = sorted(result, key=lambda x: x[sort_col_idx])

        # Return the result
        return result

    @staticmethod
    def well_list_to_dict(data) -> dict:
        """Convert a list of wells to dict format for easy retrieving of data by col/row name
        Example:
            well_data["B"][2]
        """
        # Initialize the nested dictionary
        nested_dict = OrderedDict()
        # Populate the nested dictionary
        for col, row, order in data:
            if row not in nested_dict:
                nested_dict[row] = {}
            nested_dict[row][col] = order

        # Return the result
        return nested_dict

    @staticmethod
    def parse_file_data(template_data: dict):
        """Extract CSV data from the template data."""
        fields_data = template_data["protocol"]["templateDefinition"]
        fields = template_data["protocol"]["templateFields"]["fields"]

        file_data = {}
        for field in fields:
            # Look for file upload fields.
            if field["field_type"] == "fileUpload":
                field_id = field["field_id"]
                # Look for file upload data.
                try:
                    field_data = next(d for k, d in fields_data.items() if k == field_id)
                except StopIteration:
                    logging.error(f"Failed to obtain file data for field id '{field_id}', missing in the template data.")
                # Parse and save CSV data if any.
                if field_data["type"] == "text/csv":
                    file_data[field_id] = decode_csv(field_data["data"])

        logging.info(f"Parsed CSV data from fields: {list(file_data)}")
        logging.debug(f"CSV file data:\n{pformat(file_data)}")

        return file_data

    def process_pcr_template(self, template_data):
        """Processes a 'PCR Mix' protocol template event."""

        # Generate new workspace and protocol steps.
        workspace, protocol = self.prepare_objects(template_data)

        # Generate the output.
        result = {"protocol": protocol, "workspace": workspace}

        # Cleanup.
        self.scrub_mongo_stuff(result)

        return result

    def prepare_objects(self, template_data: dict):
        """Generate data and intermediate objects for a 'PCR Mix' template."""

        # Prepare the Recipe object.
        master_recipe: Recipe = self.prepare_recipe(template_data)

        # Generate draft contents and preparation steps.
        pre_contents, preps = self.prepare_contents(master_recipe)

        # Prepare the workspace.
        workspace = self.prepare_workspace(template_data, pre_contents)

        # Prepare the protocol.
        protocol = self.make_protocol(preps, pre_contents, template_data)

        return workspace, protocol

    debug_steps_file = '/tmp/steps.json'
    def make_protocol(self, preps, contents, template_data: dict):
        """Generate a high-level pipetting protocol for a 'PCR Mix' template."""

        template_definition = template_data['protocol']['templateDefinition']
        pcr_platform = template_definition[self.tmplt_pcr_tube]
        reagents_platform = template_definition[self.tmplt_rgt_tube]
        tips_platform = template_definition[self.tmplt_tip_plat]

        # Get the pit rack's platform data and first container.
        workspace = template_data["workspace"]
        tip_rack_item = next(i for i in workspace["items"] if i["name"] == tips_platform)
        tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"])

        # Make a tip element, common to all steps.
        tip_element = make_tip(
            mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"),
            item=tips_platform,
            discardItem=template_definition[self.tmplt_trash_plat])

        # Get the tip's container definition.
        # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests).
        tip_container = None
        if self.controller:
            tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"])

        # Prepare the protocol steps.
        steps = self.preps_to_steps(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform)

        # Write output for debugging.
        if self.verbose:
            dump_json(self.debug_steps_file, steps)

        if template_data['protocol'].get('steps', None):
            logging.info('make_protocol: replacing previous protocol steps. Storing old steps in a "steps-old" key.')
            template_data['protocol']['steps-old'] = template_data['protocol']['steps']

        if not template_data['protocol'].get("workspace", None):
            logging.info('make_protocol: the input protocol does not have a workspace. Getting its name from the template data.')
            template_data['protocol']["workspace"] = template_data["workspace"].get("name", None)

        # Save steps, overwriting any previous value.
        template_data['protocol']['steps'] = steps

        # Generate the protocol, using the originals as defaults.
        protocol = newt.hl_protocols.protocol_hl(**template_data['protocol'])

        return protocol

    def preps_to_steps(self, preps: dict, contents: list, tip_element, tip_container, pcr_platform, reagents_platform):
        """Funciones para generar pasos "TRANSFER" para un protocolo HL."""
        steps = []
        steps.extend(self.prep_to_step(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform))

        for prep in preps["derivatives"]:
            steps.extend(self.prep_to_step(prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=len(steps)))

        return steps

    def prep_to_step(self, prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=0):
        """Funciones para generar pasos "TRANSFER" para un protocolo HL."""

        steps = []

        target_content_name = prep["name"]

        # Sort sources from high to low volume.
        sources = [ {"component": k, "volume": v} for k,v in prep["recipe"].items() ]
        def getval(d):
            return d["volume"]
        sources.sort(key=getval, reverse=True)

        for source in sources:
            source_content_name = source["component"]
            volume = source["volume"]
            # Get the "depth" level of the PCR tubes platform.
            max_depth = max([d["level"] for d in contents])

            # Set source platform.
            source_platform = "any"
            for content in contents:
                if source_content_name == content["name"]:
                    if content["level"] == max_depth:
                        source_platform = pcr_platform
                    else:
                        source_platform = reagents_platform
                    break

            # Set target platform.
            target_platform = "any"
            for content in contents:
                if target_content_name == content["name"]:
                    if content["level"] == max_depth:
                        target_platform = pcr_platform
                    else:
                        target_platform = reagents_platform
                    break

            # Guess the tool.
            tool_name = self.config.get("pcr_template", {}).get("default_tool", None)
            if self.controller:
                tool_name = self.controller.plugins["pipettes"].guess_pipette(
                    volume=volume, tip_container=tip_container,
                    repeats=1, prioritize_accuracy=True)

            # Create and append steps.
            steps.append(step_transfer(target_value=target_content_name, source_value=source_content_name, volume=volume,
                                       target_platform=target_platform, source_platform=source_platform,
                                       tip=tip_element,
                                       tool=tool_name,
                                       order=len(steps)+1+initial_order,
                                       name=f"Add {source_content_name} to {target_content_name}",
                                       target_by="name", source_by="name"))

        return steps

    def prepare_workspace(self, template_data, contents):
        """Generate workspace platformns and contents for a 'PCR Mix' template."""

        # Make default platforms (in case the workspace does not have them).
        template_definition = template_data['protocol']['templateDefinition']
        workspace = template_data['workspace']

        # Get target platforms by name if found, and clear their contents.
        # NOTE: This overrides the defaults set above, in order to reuse
        #       any properties sent by the GUI.
        pcr_tubes_item = None
        reagents_item = None
        for i, item in enumerate(workspace['items']):
            # Look for the PCR tubes paltform.
            if item.get('name', None) == template_definition[self.tmplt_pcr_tube]:
                pcr_tubes_item = workspace['items'][i]
                pcr_tubes_item["content"].clear()
            # Look for the reagent tubes paltform.
            elif item.get('name', None) == template_definition[self.tmplt_rgt_tube]:
                reagents_item = workspace['items'][i]
                reagents_item["content"].clear()

        # If no platforms were found, then create and insert them.
        if not pcr_tubes_item:
            pcr_tubes_item = newt.platform_items.base_platform_item(
                name=template_definition[self.tmplt_pcr_tube], # TODO: PCRtubePlatformItem ??
                platform=template_definition[self.tmplt_pcr_tube],
                position={"x": 0, "y": 0, "z": 0})
            workspace['items'].append(pcr_tubes_item)
        if not reagents_item:
            reagents_item = newt.platform_items.base_platform_item(
                name=template_definition[self.tmplt_rgt_tube], # TODO: tube15PlatformItem ??
                platform=template_definition[self.tmplt_rgt_tube],
                position={"x": 0, "y": 0, "z": 0})
            workspace['items'].append(reagents_item)

        # The "level" key indicates the depth of the mix.
        # The "deepest" mix corresponds to the terminal solution,
        # with the target composition and volume.
        # In a PCR mix scenario, contents of this depth should end up
        # in the "pcr tubes" platform, and the rest can go in the "reagents" platform.
        # NOTE: This was replaced by a precalculated property: "terminal".
        #       Using "max_depth" was buggy, because not all terminal leaves have the same
        #       number of levels, as derived from the number of cuts of their cluster.
        #max_depth = max([d["level"] for d in contents])

        # Populate the platform items with contents.
        for i, content in enumerate(contents):
            # Get the appropriate platform depending on which kind of tube this is.
            # That is, figure out if this is a "PCR tube" or a "reagent tube", by looking
            # at its "level" property. PCR tubes will be terminal tubes.
            if content.get("terminal", None):
                content_list = pcr_tubes_item["content"]
                platform_data = pcr_tubes_item["platformData"]
                # Get default container
                container_name = platform_data["containers"][0]["container"]
                container_name = self.config.get("pcr_template", {}).get("pcr_container", container_name)
            else:
                content_list = reagents_item["content"]
                platform_data = reagents_item["platformData"]
                # Get default container
                container_name = platform_data["containers"][0]["container"]
                container_name = self.config.get("pcr_template", {}).get("reagent_container", container_name)

            # Make content
            index = len(content_list) + 1
            col, row = datautils.get_colrow(index,
                                            platform_data["wellsColumns"],
                                            platform_data["wellsRows"])
            tube = newt.contents.base_content(
                container=container_name,
                index=index,
                position={"col": col, "row": row},
                name=content["name"],
                tags=content.get("tags", []),
                volume=content["volume"])
            # Append the content to the platform iteal: bool = item["name"]m's content list.
            content_list.append(tube)

        # Return the updated workspace
        return workspace

    # Define recursive function.
    def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None):
        """Recursive function to get the amounts of each reagent.

        This function will modify the "prep" dict in place, remember to pass a deep copy:
            amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names)
        """
        if amounts is None:
            amounts = {}
        # First, replace names of "variable components" when they became "fixed".
        fixed_comps = prep.get("fixed_component_reference")
        # NOTE: Example data for "fixed_comps":
        #       "fixed_component_reference": {
        #           "rvPrimers": "3"
        #       },
        recipe = prep.get("recipe")
        reduced_fixed_comps = []
        if fixed_comps:
            for comp_name, comp_variant in fixed_comps.items():
                comp_amount = recipe.pop(comp_name)
                # Make a new name for the component, appending its variant name.
                new_comp_name = f"{comp_name}_{comp_variant}"
                recipe[new_comp_name] = comp_amount
                reduced_fixed_comps.append(new_comp_name)
        prep.setdefault("reduced_fixed_comps", [])
        prep["reduced_fixed_comps"] += reduced_fixed_comps
        # Recurse into derivative preps.
        if prep["derivatives"]:
            for derivative in prep["derivatives"]:
                derivative["reduced_fixed_comps"] = deepcopy(prep["reduced_fixed_comps"])
                amounts, _ = self.get_amounts(prep=derivative, mix_names=mix_names, amounts=amounts)
        # Sum up the amounts.
        for comp_name, comp_amount in recipe.items():
            # Remove mix-type components (i.e. non-reagent) from the recipe.
            if comp_name in mix_names:
                continue
            amounts[comp_name] = comp_amount + amounts.get(comp_name, 0.0)

        return amounts, prep

    def flatten_derivatives(self, data: dict, result_list: list=None, level:int=0):
        if result_list is None:
            parent = deepcopy(data)
            parent["derivatives"] = [d["name"] for d in parent["derivatives"]]
            parent["level"] = level
            result_list = [parent]
        children = data.get("derivatives", [])
        if children:
            for child in children:
                new_child = deepcopy(child)
                new_child["derivatives"] = [d["name"] for d in new_child["derivatives"]]
                new_child["level"] = level+1
                result_list.append(new_child)
                self.flatten_derivatives(data=child, result_list=result_list, level=level+1)
        return result_list

    debug_preps_file = '/tmp/preps.json'
    debug_reduced_preps_file = '/tmp/preps_red.json'
    debug_newpreps_file = '/tmp/new_preps.json'
    flat_new_preparations_file = '/tmp/flat_new_preparations.json'
    debug_precontents_file = '/tmp/pre_contents.json'
    def prepare_contents(self, master_recipe):
        """Generate preparation steps and pre-content data for a 'PCR Mix' template."""

        # Make a mix plan object.
        mix_plan = MixPlan(master_recipe)

        # Make the recipes and preps.
        # Requires running "make_preparation" to generate the "mix_plan.preparations" object.
        preparations = mix_plan.make_preparation()


        # Get the reduced and flattened preparations.abs
        reduced_preps: list = mix_plan.preparations

        # Write output for debugging.
        if self.verbose:
            dump_json(self.debug_preps_file, preparations)
            dump_json(self.debug_reduced_preps_file, reduced_preps)

        # Calculate total amounts of each (non-mix) component:
        # Get the names of mixes.
        mix_names = [d["name"] for d in reduced_preps]
        # It's magic!
        amounts, new_preparations = self.get_amounts(deepcopy(preparations), mix_names)

        flat_new_preparations = self.flatten_derivatives(new_preparations)

        # Write output for debugging.
        if self.verbose:
            dump_json(self.debug_newpreps_file, new_preparations)
            dump_json(self.flat_new_preparations_file, flat_new_preparations)

        # List for pre-content data.
        pre_contents=[]

        # Add reagents as contents.
        for k, v in amounts.items():
            reagent = {"level": 0, "name": k, "volume": v, "terminal": False}
            # Use the reagent name as tag name.
            reagent["tags"] = [k] + ["reagent"]
            # Get the base names of components, and add them as content tags.
            reagent_names = master_recipe.recipe_component_names
            reagent["tags"] += [t for t in reagent_names if str(k).startswith(str(t))]
            # Make unique.
            reagent["tags"] = list(set(reagent["tags"]))
            # Use "level 0" to let the pre_contents be placed in the reagents tube rack later on.
            pre_contents.append(reagent)

        # Prepare a list with the names of terminal mixes.
        all_mix_ingredients = set()  # Example: ["fwPrimers", "templates", ...]
        for reduced_prep in reduced_preps:
            # Update the set keeping it unique: https://stackoverflow.com/a/32482852
            all_mix_ingredients.update(reduced_prep["recipe"].keys())

        # A terminal item is not an ingredient in any recipe.
        terminal_set = [n for n in mix_names if n not in all_mix_ingredients]
        # TODO: Patch "mix" to get this information directly from there,
        #       instead of inferring it here.

        # Generate mixes and reaction tubes, and add them to the pre_contents list.
        # NOTE: These will not be "proper" content yet. They will be used to make a proper ones later on.
        for item in flat_new_preparations:
            tags = []
            is_terminal = item["name"] in terminal_set
            if not is_terminal:
                tags += ["mix", f"level{item['level']}"]
            else:
                tags += item["reduced_fixed_comps"]
            pre_content = {
                "name": item["name"],
                "volume": sum(item["recipe"].values()),
                "level": item["level"],
                "tags": tags,
                "terminal": is_terminal
            }
            pre_contents.append(pre_content)

        # Write output for debugging.
        if self.verbose:
            dump_json(self.debug_precontents_file, pre_contents)

        # NOTE: Here the "new_preparations" is returned instead of the "preps", because
        #       the (variable) component names in the pre_contents have been updated
        #       and the same applies to the components in preps.
        return pre_contents, new_preparations

    def prepare_recipe(self, template_data):
        """ Generate a 'Recipe' object from the data sent by the GUI's template.

        Example components:
            [{'fwPrimers': ['1', '2'],
                'name': ['G1'],
                'rvPrimers': ['5'],
                'templates': ['A1']},
            {'fwPrimers': ['3'],
                'name': ['G2'],
                'rvPrimers': ['6'],
                'templates': ['A1',
                            'A2']}]

        PCR-specific parameters for the recipe:
            'bufferFinal': '1',
            'bufferStock': '10',
            'dntpsFinal': '1',
            'dntpsStock': '20',
            'finalVol': '20',
            'polVol': '0.2',
            'primerFinal': '1',
            'primerStock': '20',
            'templateVol': '1',
            'volLossCompensation': '0.2'
        """

        # Get the template data.
        template_def = template_data['protocol']['templateDefinition']
        # NOTE: a copy is needed because content names are "popped" below,
        #       would otherwise propagate to the templateDefinition.
        components = deepcopy(template_def["components"])

        # NOTE: "component groups" would normally be generated like this:
        # component_groups = groups(
        #     reaccion1 = group( template="A550", pFw = c(667, 999), pRv = 668 ),
        #     reaccion2 = group( template="A550", pFw = 667, pRv = c(668, 770), dmso=10 )
        # )
        # NOTE: an equivalent behaviour:
        groups_dict = {}
        for d in components:
            d_name = d.pop('name')  # Remove the "name" attribute and save it.
            d_name = str(d_name)    # Ensure that it is a string (the UI might send it as a list if it has commas).
            groups_dict[d_name] = group(**d)  # Expand the components and add a group.
        component_groups = groups(**groups_dict)  # Expand the groups.

        if self.verbose:
            print(f"Parsed component_groups:\n{pformat(component_groups)}")

        # NOTE: here properties from the GUI are passed to their counterparts in "Recipe".
        master_recipe = Recipe(
            component_groups = component_groups,
            volume = float(template_def['finalVol']),
            extra_fraction = float(template_def['volLossCompensation']) + 1.0,
            stock_solutions_x = comps(
                buffer = float(template_def['bufferStock']),
                dNTPs = float(template_def['dntpsStock']),
                fwPrimers = float(template_def['primerStock']),
                rvPrimers = float(template_def['primerStock'])
            ),
            stock_solutions_vol = comps(
                templates = float(template_def['templateVol']),
                pol = float(template_def['polVol'])
            ),
            target_solution = comps(
                buffer = float(template_def['bufferFinal']),
                dNTPs = float(template_def['dntpsFinal']),
                fwPrimers = float(template_def['primerFinal']),
                rvPrimers = float(template_def['primerFinal'])
            )
            # variable_components=comps(
            #     pol = c("Taq", "Pfu")
            # )
        )

        return master_recipe

Generate a protocol and workspace objects for PCR reactions.

Receives data from a 'PCR template form', sent the GUI through SocketIO, and automatically generates steps to prepare the reactions using the "mix" module.

The preparation steps are then translated to protocol steps and workspace objects, which are returned through SocketIO, and are used by the UI.

Ancestors

Class variables

var config : dict
var debug_newpreps_file
var debug_precontents_file
var debug_preps_file
var debug_reduced_preps_file
var debug_steps_file
var flat_new_preparations_file
var protocol_template_event
var sio : socketio.async_client.AsyncClient
var tmplt_pcr_tube
var tmplt_rgt_tube
var tmplt_tip_plat
var tmplt_trash_plat
var verbose : bool

Static methods

def parse_file_data(template_data: dict)
Expand source code
@staticmethod
def parse_file_data(template_data: dict):
    """Extract CSV data from the template data."""
    fields_data = template_data["protocol"]["templateDefinition"]
    fields = template_data["protocol"]["templateFields"]["fields"]

    file_data = {}
    for field in fields:
        # Look for file upload fields.
        if field["field_type"] == "fileUpload":
            field_id = field["field_id"]
            # Look for file upload data.
            try:
                field_data = next(d for k, d in fields_data.items() if k == field_id)
            except StopIteration:
                logging.error(f"Failed to obtain file data for field id '{field_id}', missing in the template data.")
            # Parse and save CSV data if any.
            if field_data["type"] == "text/csv":
                file_data[field_id] = decode_csv(field_data["data"])

    logging.info(f"Parsed CSV data from fields: {list(file_data)}")
    logging.debug(f"CSV file data:\n{pformat(file_data)}")

    return file_data

Extract CSV data from the template data.

def parse_well_plate(data, sort_col_idx: int = None) ‑> list
Expand source code
@staticmethod
def parse_well_plate(data, sort_col_idx: int = None) -> list:
    """Convert the raw well-plate matrix-format data into long-format.
    Example output:
        [
            [1, "B", 1],
            [1, "C", 2],
            ...
        ]
    """
    # Iterate over the columns and rows
    result = []
    for col_index, row_values in data.items():
        if col_index == '384':  # Skip the first key as it contains row headers
            continue
        for row_index, value in enumerate(row_values):
            if value != "0":  # Only include non-zero values
                # '384' contains the row labels; so row letter = data['384'][row_index]
                result.append([int(col_index), data['384'][row_index], value])

    if sort_col_idx is not None:
        # Sort the result list by the processing order value (third element in each sublist)
        result = sorted(result, key=lambda x: x[sort_col_idx])

    # Return the result
    return result

Convert the raw well-plate matrix-format data into long-format. Example output: [ [1, "B", 1], [1, "C", 2], … ]

def scrub_mongo_stuff(data: dict)
Expand source code
@staticmethod
def scrub_mongo_stuff(data: dict):
    """Remove MongoDB stuff from dictionary data **in place**."""
    scrub(data, "__v")
    scrub(data, "createdAt")
    scrub(data, "updatedAt")
    # NOTE: Retain the "_id"s. Scrubbing the _id from the template data causes a UI error.
    # scrub(data, "_id")
    return data

Remove MongoDB stuff from dictionary data in place.

def well_list_to_dict(data) ‑> dict
Expand source code
@staticmethod
def well_list_to_dict(data) -> dict:
    """Convert a list of wells to dict format for easy retrieving of data by col/row name
    Example:
        well_data["B"][2]
    """
    # Initialize the nested dictionary
    nested_dict = OrderedDict()
    # Populate the nested dictionary
    for col, row, order in data:
        if row not in nested_dict:
            nested_dict[row] = {}
        nested_dict[row][col] = order

    # Return the result
    return nested_dict

Convert a list of wells to dict format for easy retrieving of data by col/row name

Example

well_data["B"][2]

Methods

def flatten_derivatives(self, data: dict, result_list: list = None, level: int = 0)
Expand source code
def flatten_derivatives(self, data: dict, result_list: list=None, level:int=0):
    if result_list is None:
        parent = deepcopy(data)
        parent["derivatives"] = [d["name"] for d in parent["derivatives"]]
        parent["level"] = level
        result_list = [parent]
    children = data.get("derivatives", [])
    if children:
        for child in children:
            new_child = deepcopy(child)
            new_child["derivatives"] = [d["name"] for d in new_child["derivatives"]]
            new_child["level"] = level+1
            result_list.append(new_child)
            self.flatten_derivatives(data=child, result_list=result_list, level=level+1)
    return result_list
def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None)
Expand source code
def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None):
    """Recursive function to get the amounts of each reagent.

    This function will modify the "prep" dict in place, remember to pass a deep copy:
        amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names)
    """
    if amounts is None:
        amounts = {}
    # First, replace names of "variable components" when they became "fixed".
    fixed_comps = prep.get("fixed_component_reference")
    # NOTE: Example data for "fixed_comps":
    #       "fixed_component_reference": {
    #           "rvPrimers": "3"
    #       },
    recipe = prep.get("recipe")
    reduced_fixed_comps = []
    if fixed_comps:
        for comp_name, comp_variant in fixed_comps.items():
            comp_amount = recipe.pop(comp_name)
            # Make a new name for the component, appending its variant name.
            new_comp_name = f"{comp_name}_{comp_variant}"
            recipe[new_comp_name] = comp_amount
            reduced_fixed_comps.append(new_comp_name)
    prep.setdefault("reduced_fixed_comps", [])
    prep["reduced_fixed_comps"] += reduced_fixed_comps
    # Recurse into derivative preps.
    if prep["derivatives"]:
        for derivative in prep["derivatives"]:
            derivative["reduced_fixed_comps"] = deepcopy(prep["reduced_fixed_comps"])
            amounts, _ = self.get_amounts(prep=derivative, mix_names=mix_names, amounts=amounts)
    # Sum up the amounts.
    for comp_name, comp_amount in recipe.items():
        # Remove mix-type components (i.e. non-reagent) from the recipe.
        if comp_name in mix_names:
            continue
        amounts[comp_name] = comp_amount + amounts.get(comp_name, 0.0)

    return amounts, prep

Recursive function to get the amounts of each reagent.

This function will modify the "prep" dict in place, remember to pass a deep copy: amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names)

def make_protocol(self, preps, contents, template_data: dict)
Expand source code
def make_protocol(self, preps, contents, template_data: dict):
    """Generate a high-level pipetting protocol for a 'PCR Mix' template."""

    template_definition = template_data['protocol']['templateDefinition']
    pcr_platform = template_definition[self.tmplt_pcr_tube]
    reagents_platform = template_definition[self.tmplt_rgt_tube]
    tips_platform = template_definition[self.tmplt_tip_plat]

    # Get the pit rack's platform data and first container.
    workspace = template_data["workspace"]
    tip_rack_item = next(i for i in workspace["items"] if i["name"] == tips_platform)
    tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"])

    # Make a tip element, common to all steps.
    tip_element = make_tip(
        mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"),
        item=tips_platform,
        discardItem=template_definition[self.tmplt_trash_plat])

    # Get the tip's container definition.
    # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests).
    tip_container = None
    if self.controller:
        tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"])

    # Prepare the protocol steps.
    steps = self.preps_to_steps(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform)

    # Write output for debugging.
    if self.verbose:
        dump_json(self.debug_steps_file, steps)

    if template_data['protocol'].get('steps', None):
        logging.info('make_protocol: replacing previous protocol steps. Storing old steps in a "steps-old" key.')
        template_data['protocol']['steps-old'] = template_data['protocol']['steps']

    if not template_data['protocol'].get("workspace", None):
        logging.info('make_protocol: the input protocol does not have a workspace. Getting its name from the template data.')
        template_data['protocol']["workspace"] = template_data["workspace"].get("name", None)

    # Save steps, overwriting any previous value.
    template_data['protocol']['steps'] = steps

    # Generate the protocol, using the originals as defaults.
    protocol = newt.hl_protocols.protocol_hl(**template_data['protocol'])

    return protocol

Generate a high-level pipetting protocol for a 'PCR Mix' template.

def prep_to_step(self,
prep,
contents,
tip_element,
tip_container,
pcr_platform,
reagents_platform,
initial_order=0)
Expand source code
def prep_to_step(self, prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=0):
    """Funciones para generar pasos "TRANSFER" para un protocolo HL."""

    steps = []

    target_content_name = prep["name"]

    # Sort sources from high to low volume.
    sources = [ {"component": k, "volume": v} for k,v in prep["recipe"].items() ]
    def getval(d):
        return d["volume"]
    sources.sort(key=getval, reverse=True)

    for source in sources:
        source_content_name = source["component"]
        volume = source["volume"]
        # Get the "depth" level of the PCR tubes platform.
        max_depth = max([d["level"] for d in contents])

        # Set source platform.
        source_platform = "any"
        for content in contents:
            if source_content_name == content["name"]:
                if content["level"] == max_depth:
                    source_platform = pcr_platform
                else:
                    source_platform = reagents_platform
                break

        # Set target platform.
        target_platform = "any"
        for content in contents:
            if target_content_name == content["name"]:
                if content["level"] == max_depth:
                    target_platform = pcr_platform
                else:
                    target_platform = reagents_platform
                break

        # Guess the tool.
        tool_name = self.config.get("pcr_template", {}).get("default_tool", None)
        if self.controller:
            tool_name = self.controller.plugins["pipettes"].guess_pipette(
                volume=volume, tip_container=tip_container,
                repeats=1, prioritize_accuracy=True)

        # Create and append steps.
        steps.append(step_transfer(target_value=target_content_name, source_value=source_content_name, volume=volume,
                                   target_platform=target_platform, source_platform=source_platform,
                                   tip=tip_element,
                                   tool=tool_name,
                                   order=len(steps)+1+initial_order,
                                   name=f"Add {source_content_name} to {target_content_name}",
                                   target_by="name", source_by="name"))

    return steps

Funciones para generar pasos "TRANSFER" para un protocolo HL.

def prepare_contents(self, master_recipe)
Expand source code
def prepare_contents(self, master_recipe):
    """Generate preparation steps and pre-content data for a 'PCR Mix' template."""

    # Make a mix plan object.
    mix_plan = MixPlan(master_recipe)

    # Make the recipes and preps.
    # Requires running "make_preparation" to generate the "mix_plan.preparations" object.
    preparations = mix_plan.make_preparation()


    # Get the reduced and flattened preparations.abs
    reduced_preps: list = mix_plan.preparations

    # Write output for debugging.
    if self.verbose:
        dump_json(self.debug_preps_file, preparations)
        dump_json(self.debug_reduced_preps_file, reduced_preps)

    # Calculate total amounts of each (non-mix) component:
    # Get the names of mixes.
    mix_names = [d["name"] for d in reduced_preps]
    # It's magic!
    amounts, new_preparations = self.get_amounts(deepcopy(preparations), mix_names)

    flat_new_preparations = self.flatten_derivatives(new_preparations)

    # Write output for debugging.
    if self.verbose:
        dump_json(self.debug_newpreps_file, new_preparations)
        dump_json(self.flat_new_preparations_file, flat_new_preparations)

    # List for pre-content data.
    pre_contents=[]

    # Add reagents as contents.
    for k, v in amounts.items():
        reagent = {"level": 0, "name": k, "volume": v, "terminal": False}
        # Use the reagent name as tag name.
        reagent["tags"] = [k] + ["reagent"]
        # Get the base names of components, and add them as content tags.
        reagent_names = master_recipe.recipe_component_names
        reagent["tags"] += [t for t in reagent_names if str(k).startswith(str(t))]
        # Make unique.
        reagent["tags"] = list(set(reagent["tags"]))
        # Use "level 0" to let the pre_contents be placed in the reagents tube rack later on.
        pre_contents.append(reagent)

    # Prepare a list with the names of terminal mixes.
    all_mix_ingredients = set()  # Example: ["fwPrimers", "templates", ...]
    for reduced_prep in reduced_preps:
        # Update the set keeping it unique: https://stackoverflow.com/a/32482852
        all_mix_ingredients.update(reduced_prep["recipe"].keys())

    # A terminal item is not an ingredient in any recipe.
    terminal_set = [n for n in mix_names if n not in all_mix_ingredients]
    # TODO: Patch "mix" to get this information directly from there,
    #       instead of inferring it here.

    # Generate mixes and reaction tubes, and add them to the pre_contents list.
    # NOTE: These will not be "proper" content yet. They will be used to make a proper ones later on.
    for item in flat_new_preparations:
        tags = []
        is_terminal = item["name"] in terminal_set
        if not is_terminal:
            tags += ["mix", f"level{item['level']}"]
        else:
            tags += item["reduced_fixed_comps"]
        pre_content = {
            "name": item["name"],
            "volume": sum(item["recipe"].values()),
            "level": item["level"],
            "tags": tags,
            "terminal": is_terminal
        }
        pre_contents.append(pre_content)

    # Write output for debugging.
    if self.verbose:
        dump_json(self.debug_precontents_file, pre_contents)

    # NOTE: Here the "new_preparations" is returned instead of the "preps", because
    #       the (variable) component names in the pre_contents have been updated
    #       and the same applies to the components in preps.
    return pre_contents, new_preparations

Generate preparation steps and pre-content data for a 'PCR Mix' template.

def prepare_objects(self, template_data: dict)
Expand source code
def prepare_objects(self, template_data: dict):
    """Generate data and intermediate objects for a 'PCR Mix' template."""

    # Prepare the Recipe object.
    master_recipe: Recipe = self.prepare_recipe(template_data)

    # Generate draft contents and preparation steps.
    pre_contents, preps = self.prepare_contents(master_recipe)

    # Prepare the workspace.
    workspace = self.prepare_workspace(template_data, pre_contents)

    # Prepare the protocol.
    protocol = self.make_protocol(preps, pre_contents, template_data)

    return workspace, protocol

Generate data and intermediate objects for a 'PCR Mix' template.

def prepare_recipe(self, template_data)
Expand source code
def prepare_recipe(self, template_data):
    """ Generate a 'Recipe' object from the data sent by the GUI's template.

    Example components:
        [{'fwPrimers': ['1', '2'],
            'name': ['G1'],
            'rvPrimers': ['5'],
            'templates': ['A1']},
        {'fwPrimers': ['3'],
            'name': ['G2'],
            'rvPrimers': ['6'],
            'templates': ['A1',
                        'A2']}]

    PCR-specific parameters for the recipe:
        'bufferFinal': '1',
        'bufferStock': '10',
        'dntpsFinal': '1',
        'dntpsStock': '20',
        'finalVol': '20',
        'polVol': '0.2',
        'primerFinal': '1',
        'primerStock': '20',
        'templateVol': '1',
        'volLossCompensation': '0.2'
    """

    # Get the template data.
    template_def = template_data['protocol']['templateDefinition']
    # NOTE: a copy is needed because content names are "popped" below,
    #       would otherwise propagate to the templateDefinition.
    components = deepcopy(template_def["components"])

    # NOTE: "component groups" would normally be generated like this:
    # component_groups = groups(
    #     reaccion1 = group( template="A550", pFw = c(667, 999), pRv = 668 ),
    #     reaccion2 = group( template="A550", pFw = 667, pRv = c(668, 770), dmso=10 )
    # )
    # NOTE: an equivalent behaviour:
    groups_dict = {}
    for d in components:
        d_name = d.pop('name')  # Remove the "name" attribute and save it.
        d_name = str(d_name)    # Ensure that it is a string (the UI might send it as a list if it has commas).
        groups_dict[d_name] = group(**d)  # Expand the components and add a group.
    component_groups = groups(**groups_dict)  # Expand the groups.

    if self.verbose:
        print(f"Parsed component_groups:\n{pformat(component_groups)}")

    # NOTE: here properties from the GUI are passed to their counterparts in "Recipe".
    master_recipe = Recipe(
        component_groups = component_groups,
        volume = float(template_def['finalVol']),
        extra_fraction = float(template_def['volLossCompensation']) + 1.0,
        stock_solutions_x = comps(
            buffer = float(template_def['bufferStock']),
            dNTPs = float(template_def['dntpsStock']),
            fwPrimers = float(template_def['primerStock']),
            rvPrimers = float(template_def['primerStock'])
        ),
        stock_solutions_vol = comps(
            templates = float(template_def['templateVol']),
            pol = float(template_def['polVol'])
        ),
        target_solution = comps(
            buffer = float(template_def['bufferFinal']),
            dNTPs = float(template_def['dntpsFinal']),
            fwPrimers = float(template_def['primerFinal']),
            rvPrimers = float(template_def['primerFinal'])
        )
        # variable_components=comps(
        #     pol = c("Taq", "Pfu")
        # )
    )

    return master_recipe

Generate a 'Recipe' object from the data sent by the GUI's template.

Example components: [{'fwPrimers': ['1', '2'], 'name': ['G1'], 'rvPrimers': ['5'], 'templates': ['A1']}, {'fwPrimers': ['3'], 'name': ['G2'], 'rvPrimers': ['6'], 'templates': ['A1', 'A2']}]

PCR-specific parameters for the recipe: 'bufferFinal': '1', 'bufferStock': '10', 'dntpsFinal': '1', 'dntpsStock': '20', 'finalVol': '20', 'polVol': '0.2', 'primerFinal': '1', 'primerStock': '20', 'templateVol': '1', 'volLossCompensation': '0.2'

def prepare_workspace(self, template_data, contents)
Expand source code
def prepare_workspace(self, template_data, contents):
    """Generate workspace platformns and contents for a 'PCR Mix' template."""

    # Make default platforms (in case the workspace does not have them).
    template_definition = template_data['protocol']['templateDefinition']
    workspace = template_data['workspace']

    # Get target platforms by name if found, and clear their contents.
    # NOTE: This overrides the defaults set above, in order to reuse
    #       any properties sent by the GUI.
    pcr_tubes_item = None
    reagents_item = None
    for i, item in enumerate(workspace['items']):
        # Look for the PCR tubes paltform.
        if item.get('name', None) == template_definition[self.tmplt_pcr_tube]:
            pcr_tubes_item = workspace['items'][i]
            pcr_tubes_item["content"].clear()
        # Look for the reagent tubes paltform.
        elif item.get('name', None) == template_definition[self.tmplt_rgt_tube]:
            reagents_item = workspace['items'][i]
            reagents_item["content"].clear()

    # If no platforms were found, then create and insert them.
    if not pcr_tubes_item:
        pcr_tubes_item = newt.platform_items.base_platform_item(
            name=template_definition[self.tmplt_pcr_tube], # TODO: PCRtubePlatformItem ??
            platform=template_definition[self.tmplt_pcr_tube],
            position={"x": 0, "y": 0, "z": 0})
        workspace['items'].append(pcr_tubes_item)
    if not reagents_item:
        reagents_item = newt.platform_items.base_platform_item(
            name=template_definition[self.tmplt_rgt_tube], # TODO: tube15PlatformItem ??
            platform=template_definition[self.tmplt_rgt_tube],
            position={"x": 0, "y": 0, "z": 0})
        workspace['items'].append(reagents_item)

    # The "level" key indicates the depth of the mix.
    # The "deepest" mix corresponds to the terminal solution,
    # with the target composition and volume.
    # In a PCR mix scenario, contents of this depth should end up
    # in the "pcr tubes" platform, and the rest can go in the "reagents" platform.
    # NOTE: This was replaced by a precalculated property: "terminal".
    #       Using "max_depth" was buggy, because not all terminal leaves have the same
    #       number of levels, as derived from the number of cuts of their cluster.
    #max_depth = max([d["level"] for d in contents])

    # Populate the platform items with contents.
    for i, content in enumerate(contents):
        # Get the appropriate platform depending on which kind of tube this is.
        # That is, figure out if this is a "PCR tube" or a "reagent tube", by looking
        # at its "level" property. PCR tubes will be terminal tubes.
        if content.get("terminal", None):
            content_list = pcr_tubes_item["content"]
            platform_data = pcr_tubes_item["platformData"]
            # Get default container
            container_name = platform_data["containers"][0]["container"]
            container_name = self.config.get("pcr_template", {}).get("pcr_container", container_name)
        else:
            content_list = reagents_item["content"]
            platform_data = reagents_item["platformData"]
            # Get default container
            container_name = platform_data["containers"][0]["container"]
            container_name = self.config.get("pcr_template", {}).get("reagent_container", container_name)

        # Make content
        index = len(content_list) + 1
        col, row = datautils.get_colrow(index,
                                        platform_data["wellsColumns"],
                                        platform_data["wellsRows"])
        tube = newt.contents.base_content(
            container=container_name,
            index=index,
            position={"col": col, "row": row},
            name=content["name"],
            tags=content.get("tags", []),
            volume=content["volume"])
        # Append the content to the platform iteal: bool = item["name"]m's content list.
        content_list.append(tube)

    # Return the updated workspace
    return workspace

Generate workspace platformns and contents for a 'PCR Mix' template.

def preps_to_steps(self,
preps: dict,
contents: list,
tip_element,
tip_container,
pcr_platform,
reagents_platform)
Expand source code
def preps_to_steps(self, preps: dict, contents: list, tip_element, tip_container, pcr_platform, reagents_platform):
    """Funciones para generar pasos "TRANSFER" para un protocolo HL."""
    steps = []
    steps.extend(self.prep_to_step(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform))

    for prep in preps["derivatives"]:
        steps.extend(self.prep_to_step(prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=len(steps)))

    return steps

Funciones para generar pasos "TRANSFER" para un protocolo HL.

def process_pcr_template(self, template_data)
Expand source code
def process_pcr_template(self, template_data):
    """Processes a 'PCR Mix' protocol template event."""

    # Generate new workspace and protocol steps.
    workspace, protocol = self.prepare_objects(template_data)

    # Generate the output.
    result = {"protocol": protocol, "workspace": workspace}

    # Cleanup.
    self.scrub_mongo_stuff(result)

    return result

Processes a 'PCR Mix' protocol template event.

def process_template(self, data)
Expand source code
def process_template(self, data):
    """Receives a protocol template and current workspace from the GUI/backend and sends new ones.
    The incoming SocketIO event expects a response (with a timeout), which is the return value of this function.
    """
    # Logs.
    logging.info(f"Received template command for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'.")
    logging.debug(f"Received template command with data:\n{pformat(data)}")

    try:
        # Look for a handler function.
        template_type = data['protocol']['template']
        template_handler = self.template_handlers.get(template_type, None)
        # Generate the output.
        if template_handler:
            result = template_handler(template_data=deepcopy(data))
            # Send our congrats!
            msg = f"Successfully processed '{template_type}' template for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'."
            self.controller.send_alert(msg, alert_type="message")
            logging.info(msg)
            logging.debug(f"This is the result:\n{pformat(result)}")
        else:
            logging.warning(f"Unknown template type: '{template_type}'. Returning input data.")
            logging.debug(f"Input protocol data:\n{pformat(data['protocol'])}")
            result = deepcopy(data)

    except Exception as e:
        protocol_name = data.get('protocol', {}).get('name', None)
        msg = f"Error during template processing for protocol '{protocol_name}': {pformat(e)}\n" + traceback.format_exc()
        self.controller.send_alert(msg, alert_type="message")
        logging.error(msg + "\n" + pformat(data))
        # return the received data as is.
        return data

    logging.debug(f"Processed template command with result:\n{pformat(result)}")
    return result

Receives a protocol template and current workspace from the GUI/backend and sends new ones. The incoming SocketIO event expects a response (with a timeout), which is the return value of this function.

def process_well_plate_prep(self, template_data: dict)
Expand source code
def process_well_plate_prep(self, template_data: dict):
    """Processes a 'Well Plate Prep' protocol template event.
    TO-DO:
        1. Generate contents for a 384 well-plate platform.
            - Tag all as "targets"
            - Use "strain" and "stimuli" data to add more tags.
        2. Generate contents for a tube rack, for "strain" and "stimuli" data.
        3. Generate high-level steps to transfer "strains" and "stimuli" by tag.
    """

    dump_json("/tmp/template_data.json", template_data)

    # Extract the data.
    file_data = self.parse_file_data(template_data)

    # Get data.
    well_data = self.parse_well_plate(file_data["layout"], sort_col_idx=2)
    well_data_dict = self.well_list_to_dict(well_data)
    # Strain and stimuli data.
    strain_data = self.parse_well_plate(file_data["strains"])
    strain_data_dict = self.well_list_to_dict(strain_data)
    stimuli_data = self.parse_well_plate(file_data["stimuli"])
    stimuli_data_dict = self.well_list_to_dict(stimuli_data)

    # Get item names
    template_definition = template_data['protocol']['templateDefinition']
    well_plate_name = template_definition["well_plate"]
    reagent_rack_name = template_definition["reagent_rack"]
    tip_rack_name = template_definition["tip_rack"]
    trash_name = template_definition["trash"]

    stimuli_volume = template_definition["stimuli_volume"]
    culture_volume = template_definition["cell_volume"]

    # Get items
    workspace = template_data["workspace"]
    well_plate_item = next(i for i in workspace["items"] if i["name"] == well_plate_name)
    reagent_rack_item = next(i for i in workspace["items"] if i["name"] == reagent_rack_name)
    tip_rack_item = next(i for i in workspace["items"] if i["name"] == tip_rack_name)
    # trash_item = next(i for i in workspace["items"] if i["name"] == trash_name)

    # Get first container as default
    well_plate_container = next(c for c in well_plate_item["platformData"]["containers"])
    reagent_rack_container = next(c for c in reagent_rack_item["platformData"]["containers"])
    tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"])

    # Generate contents.
    contents = []
    strains, stimuli = {}, {}
    # Unpack the row and column identifiers
    for col_id, row_id, _ in well_data:
        # Calculate the main index for the content
        main_index = datautils.get_index(
            col_id = col_id-1,  # Convert to zero-based index
            row_id = row_id,
            ncols=well_plate_item["platformData"]["wellsColumns"],
            nrows=well_plate_item["platformData"]["wellsRows"]
        )
        # Get strain tag:
        strain_tag = "strain-" + str(strain_data_dict.get(row_id, {}).get(col_id, "default"))
        stimulus_tag = "stimulus-" + str(stimuli_data_dict.get(row_id, {}).get(col_id, "default"))
        # Generate the content using the base_content function
        row_index = datautils.row_to_index(row_id)  # Convert the row letter(s) to an index (0-based)
        content = newt.contents.base_content(
            container=well_plate_container["container"],
            # Convert index to 1-based.
            index=main_index+1,
            position={"col": col_id, "row": row_index+1},
            name=f"Content_{row_id}{col_id}",  # Example name
            tags=[strain_tag, stimulus_tag],  # Add any tags if needed
            volume=0
        )
        # Append the content.
        contents.append(content)
        # Count strain tags, to insert reagent tubes later.
        strains.setdefault(strain_tag, 0)
        strains[strain_tag] += 1
        # Count stimuli tags, to insert reagent tubes later.
        stimuli.setdefault(stimulus_tag, 0)
        stimuli[stimulus_tag] += 1

    # Override the platform item's contents.
    well_plate_item["content"] = contents

    # Make a list of tags, counts, and the target volume for each one.
    tag_counts = [[t, c, culture_volume, "strain"] for t, c in strains.items()]
    tag_counts += [[t, c, stimuli_volume, "stimulus"] for t, c in stimuli.items()]

    # Add contents to tube rack.
    tubes = []
    tips = []
    for tag_name, tag_count, volume, label in tag_counts:
        # Create and append a tube for each source.
        tube_index = len(tubes) + 1
        col, row = datautils.get_colrow(tube_index,
                                        reagent_rack_item["platformData"]["wellsColumns"],
                                        reagent_rack_item["platformData"]["wellsRows"])
        tube = newt.contents.base_content(
            container=reagent_rack_container["container"],
            # Convert index to 1-based.
            index=tube_index,
            position={"col": col, "row": row},
            name=f"Content_{row}{col}",  # Example name
            tags=[tag_name, label],  # Add any tags if needed
            volume=tag_count*volume # As much as needed
        )
        tubes.append(tube)

        # Create and append a tip for each tube.
        # NOTE: This should be enough as the tips are meant to be reused for each source.
        tip_index = len(tips) + 1
        col, row = datautils.get_colrow(tip_index,
                                        tip_rack_item["platformData"]["wellsColumns"],
                                        tip_rack_item["platformData"]["wellsRows"])
        tip = newt.contents.base_content(
            container=tip_rack_container["container"],
            # Convert index to 1-based.
            index=tip_index,
            position={"col": col, "row": row},
            name=f"Content_{row}{col}",
            volume=0
        )
        tips.append(tip)

    # Override the platform item's contents.
    reagent_rack_item["content"] = tubes
    tip_rack_item["content"] = tips

    # Make a tip element, common to all steps.
    tip_element = make_tip(
        item=tip_rack_name,
        mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"),
        discardItem=trash_name)

    # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests).
    tip_container = None
    # Get the tip's container definition.
    if self.controller:
        tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"])

    # Create and append steps.
    steps = []
    for tag_name, tag_count, volume, _ in tag_counts:
        # Guess the tool.
        tool_name = self.config.get("pcr_template", {}).get("default_tool", None)
        if self.controller:
            tool_name = self.controller.plugins["pipettes"].guess_pipette(
                volume=volume, tip_container=tip_container,
                repeats=tag_count, prioritize_accuracy=True)
        # Create step.
        step = step_transfer(target_value=tag_name, source_value=tag_name, volume=volume,
                             target_platform=well_plate_name, source_platform=reagent_rack_name,
                             tool=tool_name,
                             tip=tip_element,
                             order=len(steps)+1,
                             name=f"Add {reagent_rack_name} to {well_plate_name}",
                             target_by="tag", source_by="tag")
        steps.append(step)

    # Save steps, overwriting any previous value.
    template_data['protocol']['steps'] = steps

    # Generate the protocol, using the originals as defaults.
    protocol = newt.hl_protocols.protocol_hl(**template_data['protocol'])

    # Generate the output.
    result = {"protocol": protocol, "workspace": workspace}

    # Cleanup
    result = self.scrub_mongo_stuff(result)

    # Push data into the result (for tests).
    result["well_data"] = well_data
    result["well_data_dict"] = well_data_dict

    dump_json("/tmp/well_data_dict.json", result)

    return result

Processes a 'Well Plate Prep' protocol template event. TO-DO: 1. Generate contents for a 384 well-plate platform. - Tag all as "targets" - Use "strain" and "stimuli" data to add more tags. 2. Generate contents for a tube rack, for "strain" and "stimuli" data. 3. Generate high-level steps to transfer "strains" and "stimuli" by tag.

def register_sio_callbacks(self)
Expand source code
def register_sio_callbacks(self):
    """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
    if self.controller.comms.sio:
        # Register template handler.
        @self.controller.comms.sio.on(self.protocol_template_event)
        async def protocol_template(data):
            return self.process_template(data)
    else:
        logging.error("Failed to register event handlers. The SIO comms object is undefined.")

Function to register socketio event callbacks, typically sent by the Pipettin GUI.

Inherited members