Module pipettin-piper.piper.plugins.pcr_template
Functions
def load_plugin(controller: "'Controller'", **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """ Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ logging.debug(f"load_plugin: loading {plugin_name} plugin.") try: class_instance = TemplateRouter(controller=controller) except Exception as e: msg = f"Failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instancePlugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class TemplateRouter (controller=None, config: dict = None, verbose=False)-
Expand source code
class TemplateRouter(Plugin): """Generate a protocol and workspace objects for PCR reactions. Receives data from a 'PCR template form', sent the GUI through SocketIO, and automatically generates steps to prepare the reactions using the "mix" module. The preparation steps are then translated to protocol steps and workspace objects, which are returned through SocketIO, and are used by the UI. """ config: dict verbose: bool = False sio: socketio.AsyncClient = None # Field names for platforms in the template data. tmplt_tip_plat = 'tipsPlatform' tmplt_trash_plat = 'trashPlatform' tmplt_pcr_tube = 'PCRtubePlatform' tmplt_rgt_tube = 'tube15Platform' def __init__(self, controller=None, config:dict=None, verbose=False): self.config: dict = {} if config is not None: self.config = config self.verbose = verbose # Save controller. self.controller: Controller = controller # Handlers for different kinds of protocol template envents. self.template_handlers = { 'PCR Mix': self.process_pcr_template, 'Well Plate Prep': self.process_well_plate_prep } if self.controller is not None: # Save configuration. self.config = self.controller.config self.verbose = self.controller.verbose # Save database tools. # NOTE: Removed attribute. It makes it hard to update DB tools from elsewhere. #self.database_tools=controller.database_tools # Register default socketio event handlers. self.register_sio_callbacks() else: print("Warning: no controller configured.") # Set status. self._status = True # SOCKET.IO CALLBACKS AND EVENTS SECTION #### protocol_template_event = 'protocol_template' def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI.""" if self.controller.comms.sio: # Register template handler. @self.controller.comms.sio.on(self.protocol_template_event) async def protocol_template(data): return self.process_template(data) else: logging.error("Failed to register event handlers. The SIO comms object is undefined.") def process_template(self, data): """Receives a protocol template and current workspace from the GUI/backend and sends new ones. The incoming SocketIO event expects a response (with a timeout), which is the return value of this function. """ # Logs. logging.info(f"Received template command for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'.") logging.debug(f"Received template command with data:\n{pformat(data)}") try: # Look for a handler function. template_type = data['protocol']['template'] template_handler = self.template_handlers.get(template_type, None) # Generate the output. if template_handler: result = template_handler(template_data=deepcopy(data)) # Send our congrats! msg = f"Successfully processed '{template_type}' template for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'." self.controller.send_alert(msg, alert_type="message") logging.info(msg) logging.debug(f"This is the result:\n{pformat(result)}") else: logging.warning(f"Unknown template type: '{template_type}'. Returning input data.") logging.debug(f"Input protocol data:\n{pformat(data['protocol'])}") result = deepcopy(data) except Exception as e: protocol_name = data.get('protocol', {}).get('name', None) msg = f"Error during template processing for protocol '{protocol_name}': {pformat(e)}\n" + traceback.format_exc() self.controller.send_alert(msg, alert_type="message") logging.error(msg + "\n" + pformat(data)) # return the received data as is. return data logging.debug(f"Processed template command with result:\n{pformat(result)}") return result @staticmethod def scrub_mongo_stuff(data: dict): """Remove MongoDB stuff from dictionary data **in place**.""" scrub(data, "__v") scrub(data, "createdAt") scrub(data, "updatedAt") # NOTE: Retain the "_id"s. Scrubbing the _id from the template data causes a UI error. # scrub(data, "_id") return data def process_well_plate_prep(self, template_data: dict): """Processes a 'Well Plate Prep' protocol template event. TO-DO: 1. Generate contents for a 384 well-plate platform. - Tag all as "targets" - Use "strain" and "stimuli" data to add more tags. 2. Generate contents for a tube rack, for "strain" and "stimuli" data. 3. Generate high-level steps to transfer "strains" and "stimuli" by tag. """ dump_json("/tmp/template_data.json", template_data) # Extract the data. file_data = self.parse_file_data(template_data) # Get data. well_data = self.parse_well_plate(file_data["layout"], sort_col_idx=2) well_data_dict = self.well_list_to_dict(well_data) # Strain and stimuli data. strain_data = self.parse_well_plate(file_data["strains"]) strain_data_dict = self.well_list_to_dict(strain_data) stimuli_data = self.parse_well_plate(file_data["stimuli"]) stimuli_data_dict = self.well_list_to_dict(stimuli_data) # Get item names template_definition = template_data['protocol']['templateDefinition'] well_plate_name = template_definition["well_plate"] reagent_rack_name = template_definition["reagent_rack"] tip_rack_name = template_definition["tip_rack"] trash_name = template_definition["trash"] stimuli_volume = template_definition["stimuli_volume"] culture_volume = template_definition["cell_volume"] # Get items workspace = template_data["workspace"] well_plate_item = next(i for i in workspace["items"] if i["name"] == well_plate_name) reagent_rack_item = next(i for i in workspace["items"] if i["name"] == reagent_rack_name) tip_rack_item = next(i for i in workspace["items"] if i["name"] == tip_rack_name) # trash_item = next(i for i in workspace["items"] if i["name"] == trash_name) # Get first container as default well_plate_container = next(c for c in well_plate_item["platformData"]["containers"]) reagent_rack_container = next(c for c in reagent_rack_item["platformData"]["containers"]) tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"]) # Generate contents. contents = [] strains, stimuli = {}, {} # Unpack the row and column identifiers for col_id, row_id, _ in well_data: # Calculate the main index for the content main_index = datautils.get_index( col_id = col_id-1, # Convert to zero-based index row_id = row_id, ncols=well_plate_item["platformData"]["wellsColumns"], nrows=well_plate_item["platformData"]["wellsRows"] ) # Get strain tag: strain_tag = "strain-" + str(strain_data_dict.get(row_id, {}).get(col_id, "default")) stimulus_tag = "stimulus-" + str(stimuli_data_dict.get(row_id, {}).get(col_id, "default")) # Generate the content using the base_content function row_index = datautils.row_to_index(row_id) # Convert the row letter(s) to an index (0-based) content = newt.contents.base_content( container=well_plate_container["container"], # Convert index to 1-based. index=main_index+1, position={"col": col_id, "row": row_index+1}, name=f"Content_{row_id}{col_id}", # Example name tags=[strain_tag, stimulus_tag], # Add any tags if needed volume=0 ) # Append the content. contents.append(content) # Count strain tags, to insert reagent tubes later. strains.setdefault(strain_tag, 0) strains[strain_tag] += 1 # Count stimuli tags, to insert reagent tubes later. stimuli.setdefault(stimulus_tag, 0) stimuli[stimulus_tag] += 1 # Override the platform item's contents. well_plate_item["content"] = contents # Make a list of tags, counts, and the target volume for each one. tag_counts = [[t, c, culture_volume, "strain"] for t, c in strains.items()] tag_counts += [[t, c, stimuli_volume, "stimulus"] for t, c in stimuli.items()] # Add contents to tube rack. tubes = [] tips = [] for tag_name, tag_count, volume, label in tag_counts: # Create and append a tube for each source. tube_index = len(tubes) + 1 col, row = datautils.get_colrow(tube_index, reagent_rack_item["platformData"]["wellsColumns"], reagent_rack_item["platformData"]["wellsRows"]) tube = newt.contents.base_content( container=reagent_rack_container["container"], # Convert index to 1-based. index=tube_index, position={"col": col, "row": row}, name=f"Content_{row}{col}", # Example name tags=[tag_name, label], # Add any tags if needed volume=tag_count*volume # As much as needed ) tubes.append(tube) # Create and append a tip for each tube. # NOTE: This should be enough as the tips are meant to be reused for each source. tip_index = len(tips) + 1 col, row = datautils.get_colrow(tip_index, tip_rack_item["platformData"]["wellsColumns"], tip_rack_item["platformData"]["wellsRows"]) tip = newt.contents.base_content( container=tip_rack_container["container"], # Convert index to 1-based. index=tip_index, position={"col": col, "row": row}, name=f"Content_{row}{col}", volume=0 ) tips.append(tip) # Override the platform item's contents. reagent_rack_item["content"] = tubes tip_rack_item["content"] = tips # Make a tip element, common to all steps. tip_element = make_tip( item=tip_rack_name, mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"), discardItem=trash_name) # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests). tip_container = None # Get the tip's container definition. if self.controller: tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"]) # Create and append steps. steps = [] for tag_name, tag_count, volume, _ in tag_counts: # Guess the tool. tool_name = self.config.get("pcr_template", {}).get("default_tool", None) if self.controller: tool_name = self.controller.plugins["pipettes"].guess_pipette( volume=volume, tip_container=tip_container, repeats=tag_count, prioritize_accuracy=True) # Create step. step = step_transfer(target_value=tag_name, source_value=tag_name, volume=volume, target_platform=well_plate_name, source_platform=reagent_rack_name, tool=tool_name, tip=tip_element, order=len(steps)+1, name=f"Add {reagent_rack_name} to {well_plate_name}", target_by="tag", source_by="tag") steps.append(step) # Save steps, overwriting any previous value. template_data['protocol']['steps'] = steps # Generate the protocol, using the originals as defaults. protocol = newt.hl_protocols.protocol_hl(**template_data['protocol']) # Generate the output. result = {"protocol": protocol, "workspace": workspace} # Cleanup result = self.scrub_mongo_stuff(result) # Push data into the result (for tests). result["well_data"] = well_data result["well_data_dict"] = well_data_dict dump_json("/tmp/well_data_dict.json", result) return result @staticmethod def parse_well_plate(data, sort_col_idx: int = None) -> list: """Convert the raw well-plate matrix-format data into long-format. Example output: [ [1, "B", 1], [1, "C", 2], ... ] """ # Iterate over the columns and rows result = [] for col_index, row_values in data.items(): if col_index == '384': # Skip the first key as it contains row headers continue for row_index, value in enumerate(row_values): if value != "0": # Only include non-zero values # '384' contains the row labels; so row letter = data['384'][row_index] result.append([int(col_index), data['384'][row_index], value]) if sort_col_idx is not None: # Sort the result list by the processing order value (third element in each sublist) result = sorted(result, key=lambda x: x[sort_col_idx]) # Return the result return result @staticmethod def well_list_to_dict(data) -> dict: """Convert a list of wells to dict format for easy retrieving of data by col/row name Example: well_data["B"][2] """ # Initialize the nested dictionary nested_dict = OrderedDict() # Populate the nested dictionary for col, row, order in data: if row not in nested_dict: nested_dict[row] = {} nested_dict[row][col] = order # Return the result return nested_dict @staticmethod def parse_file_data(template_data: dict): """Extract CSV data from the template data.""" fields_data = template_data["protocol"]["templateDefinition"] fields = template_data["protocol"]["templateFields"]["fields"] file_data = {} for field in fields: # Look for file upload fields. if field["field_type"] == "fileUpload": field_id = field["field_id"] # Look for file upload data. try: field_data = next(d for k, d in fields_data.items() if k == field_id) except StopIteration: logging.error(f"Failed to obtain file data for field id '{field_id}', missing in the template data.") # Parse and save CSV data if any. if field_data["type"] == "text/csv": file_data[field_id] = decode_csv(field_data["data"]) logging.info(f"Parsed CSV data from fields: {list(file_data)}") logging.debug(f"CSV file data:\n{pformat(file_data)}") return file_data def process_pcr_template(self, template_data): """Processes a 'PCR Mix' protocol template event.""" # Generate new workspace and protocol steps. workspace, protocol = self.prepare_objects(template_data) # Generate the output. result = {"protocol": protocol, "workspace": workspace} # Cleanup. self.scrub_mongo_stuff(result) return result def prepare_objects(self, template_data: dict): """Generate data and intermediate objects for a 'PCR Mix' template.""" # Prepare the Recipe object. master_recipe: Recipe = self.prepare_recipe(template_data) # Generate draft contents and preparation steps. pre_contents, preps = self.prepare_contents(master_recipe) # Prepare the workspace. workspace = self.prepare_workspace(template_data, pre_contents) # Prepare the protocol. protocol = self.make_protocol(preps, pre_contents, template_data) return workspace, protocol debug_steps_file = '/tmp/steps.json' def make_protocol(self, preps, contents, template_data: dict): """Generate a high-level pipetting protocol for a 'PCR Mix' template.""" template_definition = template_data['protocol']['templateDefinition'] pcr_platform = template_definition[self.tmplt_pcr_tube] reagents_platform = template_definition[self.tmplt_rgt_tube] tips_platform = template_definition[self.tmplt_tip_plat] # Get the pit rack's platform data and first container. workspace = template_data["workspace"] tip_rack_item = next(i for i in workspace["items"] if i["name"] == tips_platform) tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"]) # Make a tip element, common to all steps. tip_element = make_tip( mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"), item=tips_platform, discardItem=template_definition[self.tmplt_trash_plat]) # Get the tip's container definition. # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests). tip_container = None if self.controller: tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"]) # Prepare the protocol steps. steps = self.preps_to_steps(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform) # Write output for debugging. if self.verbose: dump_json(self.debug_steps_file, steps) if template_data['protocol'].get('steps', None): logging.info('make_protocol: replacing previous protocol steps. Storing old steps in a "steps-old" key.') template_data['protocol']['steps-old'] = template_data['protocol']['steps'] if not template_data['protocol'].get("workspace", None): logging.info('make_protocol: the input protocol does not have a workspace. Getting its name from the template data.') template_data['protocol']["workspace"] = template_data["workspace"].get("name", None) # Save steps, overwriting any previous value. template_data['protocol']['steps'] = steps # Generate the protocol, using the originals as defaults. protocol = newt.hl_protocols.protocol_hl(**template_data['protocol']) return protocol def preps_to_steps(self, preps: dict, contents: list, tip_element, tip_container, pcr_platform, reagents_platform): """Funciones para generar pasos "TRANSFER" para un protocolo HL.""" steps = [] steps.extend(self.prep_to_step(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform)) for prep in preps["derivatives"]: steps.extend(self.prep_to_step(prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=len(steps))) return steps def prep_to_step(self, prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=0): """Funciones para generar pasos "TRANSFER" para un protocolo HL.""" steps = [] target_content_name = prep["name"] # Sort sources from high to low volume. sources = [ {"component": k, "volume": v} for k,v in prep["recipe"].items() ] def getval(d): return d["volume"] sources.sort(key=getval, reverse=True) for source in sources: source_content_name = source["component"] volume = source["volume"] # Get the "depth" level of the PCR tubes platform. max_depth = max([d["level"] for d in contents]) # Set source platform. source_platform = "any" for content in contents: if source_content_name == content["name"]: if content["level"] == max_depth: source_platform = pcr_platform else: source_platform = reagents_platform break # Set target platform. target_platform = "any" for content in contents: if target_content_name == content["name"]: if content["level"] == max_depth: target_platform = pcr_platform else: target_platform = reagents_platform break # Guess the tool. tool_name = self.config.get("pcr_template", {}).get("default_tool", None) if self.controller: tool_name = self.controller.plugins["pipettes"].guess_pipette( volume=volume, tip_container=tip_container, repeats=1, prioritize_accuracy=True) # Create and append steps. steps.append(step_transfer(target_value=target_content_name, source_value=source_content_name, volume=volume, target_platform=target_platform, source_platform=source_platform, tip=tip_element, tool=tool_name, order=len(steps)+1+initial_order, name=f"Add {source_content_name} to {target_content_name}", target_by="name", source_by="name")) return steps def prepare_workspace(self, template_data, contents): """Generate workspace platformns and contents for a 'PCR Mix' template.""" # Make default platforms (in case the workspace does not have them). template_definition = template_data['protocol']['templateDefinition'] workspace = template_data['workspace'] # Get target platforms by name if found, and clear their contents. # NOTE: This overrides the defaults set above, in order to reuse # any properties sent by the GUI. pcr_tubes_item = None reagents_item = None for i, item in enumerate(workspace['items']): # Look for the PCR tubes paltform. if item.get('name', None) == template_definition[self.tmplt_pcr_tube]: pcr_tubes_item = workspace['items'][i] pcr_tubes_item["content"].clear() # Look for the reagent tubes paltform. elif item.get('name', None) == template_definition[self.tmplt_rgt_tube]: reagents_item = workspace['items'][i] reagents_item["content"].clear() # If no platforms were found, then create and insert them. if not pcr_tubes_item: pcr_tubes_item = newt.platform_items.base_platform_item( name=template_definition[self.tmplt_pcr_tube], # TODO: PCRtubePlatformItem ?? platform=template_definition[self.tmplt_pcr_tube], position={"x": 0, "y": 0, "z": 0}) workspace['items'].append(pcr_tubes_item) if not reagents_item: reagents_item = newt.platform_items.base_platform_item( name=template_definition[self.tmplt_rgt_tube], # TODO: tube15PlatformItem ?? platform=template_definition[self.tmplt_rgt_tube], position={"x": 0, "y": 0, "z": 0}) workspace['items'].append(reagents_item) # The "level" key indicates the depth of the mix. # The "deepest" mix corresponds to the terminal solution, # with the target composition and volume. # In a PCR mix scenario, contents of this depth should end up # in the "pcr tubes" platform, and the rest can go in the "reagents" platform. # NOTE: This was replaced by a precalculated property: "terminal". # Using "max_depth" was buggy, because not all terminal leaves have the same # number of levels, as derived from the number of cuts of their cluster. #max_depth = max([d["level"] for d in contents]) # Populate the platform items with contents. for i, content in enumerate(contents): # Get the appropriate platform depending on which kind of tube this is. # That is, figure out if this is a "PCR tube" or a "reagent tube", by looking # at its "level" property. PCR tubes will be terminal tubes. if content.get("terminal", None): content_list = pcr_tubes_item["content"] platform_data = pcr_tubes_item["platformData"] # Get default container container_name = platform_data["containers"][0]["container"] container_name = self.config.get("pcr_template", {}).get("pcr_container", container_name) else: content_list = reagents_item["content"] platform_data = reagents_item["platformData"] # Get default container container_name = platform_data["containers"][0]["container"] container_name = self.config.get("pcr_template", {}).get("reagent_container", container_name) # Make content index = len(content_list) + 1 col, row = datautils.get_colrow(index, platform_data["wellsColumns"], platform_data["wellsRows"]) tube = newt.contents.base_content( container=container_name, index=index, position={"col": col, "row": row}, name=content["name"], tags=content.get("tags", []), volume=content["volume"]) # Append the content to the platform iteal: bool = item["name"]m's content list. content_list.append(tube) # Return the updated workspace return workspace # Define recursive function. def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None): """Recursive function to get the amounts of each reagent. This function will modify the "prep" dict in place, remember to pass a deep copy: amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names) """ if amounts is None: amounts = {} # First, replace names of "variable components" when they became "fixed". fixed_comps = prep.get("fixed_component_reference") # NOTE: Example data for "fixed_comps": # "fixed_component_reference": { # "rvPrimers": "3" # }, recipe = prep.get("recipe") reduced_fixed_comps = [] if fixed_comps: for comp_name, comp_variant in fixed_comps.items(): comp_amount = recipe.pop(comp_name) # Make a new name for the component, appending its variant name. new_comp_name = f"{comp_name}_{comp_variant}" recipe[new_comp_name] = comp_amount reduced_fixed_comps.append(new_comp_name) prep.setdefault("reduced_fixed_comps", []) prep["reduced_fixed_comps"] += reduced_fixed_comps # Recurse into derivative preps. if prep["derivatives"]: for derivative in prep["derivatives"]: derivative["reduced_fixed_comps"] = deepcopy(prep["reduced_fixed_comps"]) amounts, _ = self.get_amounts(prep=derivative, mix_names=mix_names, amounts=amounts) # Sum up the amounts. for comp_name, comp_amount in recipe.items(): # Remove mix-type components (i.e. non-reagent) from the recipe. if comp_name in mix_names: continue amounts[comp_name] = comp_amount + amounts.get(comp_name, 0.0) return amounts, prep def flatten_derivatives(self, data: dict, result_list: list=None, level:int=0): if result_list is None: parent = deepcopy(data) parent["derivatives"] = [d["name"] for d in parent["derivatives"]] parent["level"] = level result_list = [parent] children = data.get("derivatives", []) if children: for child in children: new_child = deepcopy(child) new_child["derivatives"] = [d["name"] for d in new_child["derivatives"]] new_child["level"] = level+1 result_list.append(new_child) self.flatten_derivatives(data=child, result_list=result_list, level=level+1) return result_list debug_preps_file = '/tmp/preps.json' debug_reduced_preps_file = '/tmp/preps_red.json' debug_newpreps_file = '/tmp/new_preps.json' flat_new_preparations_file = '/tmp/flat_new_preparations.json' debug_precontents_file = '/tmp/pre_contents.json' def prepare_contents(self, master_recipe): """Generate preparation steps and pre-content data for a 'PCR Mix' template.""" # Make a mix plan object. mix_plan = MixPlan(master_recipe) # Make the recipes and preps. # Requires running "make_preparation" to generate the "mix_plan.preparations" object. preparations = mix_plan.make_preparation() # Get the reduced and flattened preparations.abs reduced_preps: list = mix_plan.preparations # Write output for debugging. if self.verbose: dump_json(self.debug_preps_file, preparations) dump_json(self.debug_reduced_preps_file, reduced_preps) # Calculate total amounts of each (non-mix) component: # Get the names of mixes. mix_names = [d["name"] for d in reduced_preps] # It's magic! amounts, new_preparations = self.get_amounts(deepcopy(preparations), mix_names) flat_new_preparations = self.flatten_derivatives(new_preparations) # Write output for debugging. if self.verbose: dump_json(self.debug_newpreps_file, new_preparations) dump_json(self.flat_new_preparations_file, flat_new_preparations) # List for pre-content data. pre_contents=[] # Add reagents as contents. for k, v in amounts.items(): reagent = {"level": 0, "name": k, "volume": v, "terminal": False} # Use the reagent name as tag name. reagent["tags"] = [k] + ["reagent"] # Get the base names of components, and add them as content tags. reagent_names = master_recipe.recipe_component_names reagent["tags"] += [t for t in reagent_names if str(k).startswith(str(t))] # Make unique. reagent["tags"] = list(set(reagent["tags"])) # Use "level 0" to let the pre_contents be placed in the reagents tube rack later on. pre_contents.append(reagent) # Prepare a list with the names of terminal mixes. all_mix_ingredients = set() # Example: ["fwPrimers", "templates", ...] for reduced_prep in reduced_preps: # Update the set keeping it unique: https://stackoverflow.com/a/32482852 all_mix_ingredients.update(reduced_prep["recipe"].keys()) # A terminal item is not an ingredient in any recipe. terminal_set = [n for n in mix_names if n not in all_mix_ingredients] # TODO: Patch "mix" to get this information directly from there, # instead of inferring it here. # Generate mixes and reaction tubes, and add them to the pre_contents list. # NOTE: These will not be "proper" content yet. They will be used to make a proper ones later on. for item in flat_new_preparations: tags = [] is_terminal = item["name"] in terminal_set if not is_terminal: tags += ["mix", f"level{item['level']}"] else: tags += item["reduced_fixed_comps"] pre_content = { "name": item["name"], "volume": sum(item["recipe"].values()), "level": item["level"], "tags": tags, "terminal": is_terminal } pre_contents.append(pre_content) # Write output for debugging. if self.verbose: dump_json(self.debug_precontents_file, pre_contents) # NOTE: Here the "new_preparations" is returned instead of the "preps", because # the (variable) component names in the pre_contents have been updated # and the same applies to the components in preps. return pre_contents, new_preparations def prepare_recipe(self, template_data): """ Generate a 'Recipe' object from the data sent by the GUI's template. Example components: [{'fwPrimers': ['1', '2'], 'name': ['G1'], 'rvPrimers': ['5'], 'templates': ['A1']}, {'fwPrimers': ['3'], 'name': ['G2'], 'rvPrimers': ['6'], 'templates': ['A1', 'A2']}] PCR-specific parameters for the recipe: 'bufferFinal': '1', 'bufferStock': '10', 'dntpsFinal': '1', 'dntpsStock': '20', 'finalVol': '20', 'polVol': '0.2', 'primerFinal': '1', 'primerStock': '20', 'templateVol': '1', 'volLossCompensation': '0.2' """ # Get the template data. template_def = template_data['protocol']['templateDefinition'] # NOTE: a copy is needed because content names are "popped" below, # would otherwise propagate to the templateDefinition. components = deepcopy(template_def["components"]) # NOTE: "component groups" would normally be generated like this: # component_groups = groups( # reaccion1 = group( template="A550", pFw = c(667, 999), pRv = 668 ), # reaccion2 = group( template="A550", pFw = 667, pRv = c(668, 770), dmso=10 ) # ) # NOTE: an equivalent behaviour: groups_dict = {} for d in components: d_name = d.pop('name') # Remove the "name" attribute and save it. d_name = str(d_name) # Ensure that it is a string (the UI might send it as a list if it has commas). groups_dict[d_name] = group(**d) # Expand the components and add a group. component_groups = groups(**groups_dict) # Expand the groups. if self.verbose: print(f"Parsed component_groups:\n{pformat(component_groups)}") # NOTE: here properties from the GUI are passed to their counterparts in "Recipe". master_recipe = Recipe( component_groups = component_groups, volume = float(template_def['finalVol']), extra_fraction = float(template_def['volLossCompensation']) + 1.0, stock_solutions_x = comps( buffer = float(template_def['bufferStock']), dNTPs = float(template_def['dntpsStock']), fwPrimers = float(template_def['primerStock']), rvPrimers = float(template_def['primerStock']) ), stock_solutions_vol = comps( templates = float(template_def['templateVol']), pol = float(template_def['polVol']) ), target_solution = comps( buffer = float(template_def['bufferFinal']), dNTPs = float(template_def['dntpsFinal']), fwPrimers = float(template_def['primerFinal']), rvPrimers = float(template_def['primerFinal']) ) # variable_components=comps( # pol = c("Taq", "Pfu") # ) ) return master_recipeGenerate a protocol and workspace objects for PCR reactions.
Receives data from a 'PCR template form', sent the GUI through SocketIO, and automatically generates steps to prepare the reactions using the "mix" module.
The preparation steps are then translated to protocol steps and workspace objects, which are returned through SocketIO, and are used by the UI.
Ancestors
Class variables
var config : dictvar debug_newpreps_filevar debug_precontents_filevar debug_preps_filevar debug_reduced_preps_filevar debug_steps_filevar flat_new_preparations_filevar protocol_template_eventvar sio : socketio.async_client.AsyncClientvar tmplt_pcr_tubevar tmplt_rgt_tubevar tmplt_tip_platvar tmplt_trash_platvar verbose : bool
Static methods
def parse_file_data(template_data: dict)-
Expand source code
@staticmethod def parse_file_data(template_data: dict): """Extract CSV data from the template data.""" fields_data = template_data["protocol"]["templateDefinition"] fields = template_data["protocol"]["templateFields"]["fields"] file_data = {} for field in fields: # Look for file upload fields. if field["field_type"] == "fileUpload": field_id = field["field_id"] # Look for file upload data. try: field_data = next(d for k, d in fields_data.items() if k == field_id) except StopIteration: logging.error(f"Failed to obtain file data for field id '{field_id}', missing in the template data.") # Parse and save CSV data if any. if field_data["type"] == "text/csv": file_data[field_id] = decode_csv(field_data["data"]) logging.info(f"Parsed CSV data from fields: {list(file_data)}") logging.debug(f"CSV file data:\n{pformat(file_data)}") return file_dataExtract CSV data from the template data.
def parse_well_plate(data, sort_col_idx: int = None) ‑> list-
Expand source code
@staticmethod def parse_well_plate(data, sort_col_idx: int = None) -> list: """Convert the raw well-plate matrix-format data into long-format. Example output: [ [1, "B", 1], [1, "C", 2], ... ] """ # Iterate over the columns and rows result = [] for col_index, row_values in data.items(): if col_index == '384': # Skip the first key as it contains row headers continue for row_index, value in enumerate(row_values): if value != "0": # Only include non-zero values # '384' contains the row labels; so row letter = data['384'][row_index] result.append([int(col_index), data['384'][row_index], value]) if sort_col_idx is not None: # Sort the result list by the processing order value (third element in each sublist) result = sorted(result, key=lambda x: x[sort_col_idx]) # Return the result return resultConvert the raw well-plate matrix-format data into long-format. Example output: [ [1, "B", 1], [1, "C", 2], … ]
def scrub_mongo_stuff(data: dict)-
Expand source code
@staticmethod def scrub_mongo_stuff(data: dict): """Remove MongoDB stuff from dictionary data **in place**.""" scrub(data, "__v") scrub(data, "createdAt") scrub(data, "updatedAt") # NOTE: Retain the "_id"s. Scrubbing the _id from the template data causes a UI error. # scrub(data, "_id") return dataRemove MongoDB stuff from dictionary data in place.
def well_list_to_dict(data) ‑> dict-
Expand source code
@staticmethod def well_list_to_dict(data) -> dict: """Convert a list of wells to dict format for easy retrieving of data by col/row name Example: well_data["B"][2] """ # Initialize the nested dictionary nested_dict = OrderedDict() # Populate the nested dictionary for col, row, order in data: if row not in nested_dict: nested_dict[row] = {} nested_dict[row][col] = order # Return the result return nested_dictConvert a list of wells to dict format for easy retrieving of data by col/row name
Example
well_data["B"][2]
Methods
def flatten_derivatives(self, data: dict, result_list: list = None, level: int = 0)-
Expand source code
def flatten_derivatives(self, data: dict, result_list: list=None, level:int=0): if result_list is None: parent = deepcopy(data) parent["derivatives"] = [d["name"] for d in parent["derivatives"]] parent["level"] = level result_list = [parent] children = data.get("derivatives", []) if children: for child in children: new_child = deepcopy(child) new_child["derivatives"] = [d["name"] for d in new_child["derivatives"]] new_child["level"] = level+1 result_list.append(new_child) self.flatten_derivatives(data=child, result_list=result_list, level=level+1) return result_list def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None)-
Expand source code
def get_amounts(self, prep: dict, mix_names: list, amounts: dict = None): """Recursive function to get the amounts of each reagent. This function will modify the "prep" dict in place, remember to pass a deep copy: amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names) """ if amounts is None: amounts = {} # First, replace names of "variable components" when they became "fixed". fixed_comps = prep.get("fixed_component_reference") # NOTE: Example data for "fixed_comps": # "fixed_component_reference": { # "rvPrimers": "3" # }, recipe = prep.get("recipe") reduced_fixed_comps = [] if fixed_comps: for comp_name, comp_variant in fixed_comps.items(): comp_amount = recipe.pop(comp_name) # Make a new name for the component, appending its variant name. new_comp_name = f"{comp_name}_{comp_variant}" recipe[new_comp_name] = comp_amount reduced_fixed_comps.append(new_comp_name) prep.setdefault("reduced_fixed_comps", []) prep["reduced_fixed_comps"] += reduced_fixed_comps # Recurse into derivative preps. if prep["derivatives"]: for derivative in prep["derivatives"]: derivative["reduced_fixed_comps"] = deepcopy(prep["reduced_fixed_comps"]) amounts, _ = self.get_amounts(prep=derivative, mix_names=mix_names, amounts=amounts) # Sum up the amounts. for comp_name, comp_amount in recipe.items(): # Remove mix-type components (i.e. non-reagent) from the recipe. if comp_name in mix_names: continue amounts[comp_name] = comp_amount + amounts.get(comp_name, 0.0) return amounts, prepRecursive function to get the amounts of each reagent.
This function will modify the "prep" dict in place, remember to pass a deep copy: amounts, new_preps = self.get_amounts(deepcopy(preps), mix_names)
def make_protocol(self, preps, contents, template_data: dict)-
Expand source code
def make_protocol(self, preps, contents, template_data: dict): """Generate a high-level pipetting protocol for a 'PCR Mix' template.""" template_definition = template_data['protocol']['templateDefinition'] pcr_platform = template_definition[self.tmplt_pcr_tube] reagents_platform = template_definition[self.tmplt_rgt_tube] tips_platform = template_definition[self.tmplt_tip_plat] # Get the pit rack's platform data and first container. workspace = template_data["workspace"] tip_rack_item = next(i for i in workspace["items"] if i["name"] == tips_platform) tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"]) # Make a tip element, common to all steps. tip_element = make_tip( mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"), item=tips_platform, discardItem=template_definition[self.tmplt_trash_plat]) # Get the tip's container definition. # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests). tip_container = None if self.controller: tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"]) # Prepare the protocol steps. steps = self.preps_to_steps(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform) # Write output for debugging. if self.verbose: dump_json(self.debug_steps_file, steps) if template_data['protocol'].get('steps', None): logging.info('make_protocol: replacing previous protocol steps. Storing old steps in a "steps-old" key.') template_data['protocol']['steps-old'] = template_data['protocol']['steps'] if not template_data['protocol'].get("workspace", None): logging.info('make_protocol: the input protocol does not have a workspace. Getting its name from the template data.') template_data['protocol']["workspace"] = template_data["workspace"].get("name", None) # Save steps, overwriting any previous value. template_data['protocol']['steps'] = steps # Generate the protocol, using the originals as defaults. protocol = newt.hl_protocols.protocol_hl(**template_data['protocol']) return protocolGenerate a high-level pipetting protocol for a 'PCR Mix' template.
def prep_to_step(self,
prep,
contents,
tip_element,
tip_container,
pcr_platform,
reagents_platform,
initial_order=0)-
Expand source code
def prep_to_step(self, prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=0): """Funciones para generar pasos "TRANSFER" para un protocolo HL.""" steps = [] target_content_name = prep["name"] # Sort sources from high to low volume. sources = [ {"component": k, "volume": v} for k,v in prep["recipe"].items() ] def getval(d): return d["volume"] sources.sort(key=getval, reverse=True) for source in sources: source_content_name = source["component"] volume = source["volume"] # Get the "depth" level of the PCR tubes platform. max_depth = max([d["level"] for d in contents]) # Set source platform. source_platform = "any" for content in contents: if source_content_name == content["name"]: if content["level"] == max_depth: source_platform = pcr_platform else: source_platform = reagents_platform break # Set target platform. target_platform = "any" for content in contents: if target_content_name == content["name"]: if content["level"] == max_depth: target_platform = pcr_platform else: target_platform = reagents_platform break # Guess the tool. tool_name = self.config.get("pcr_template", {}).get("default_tool", None) if self.controller: tool_name = self.controller.plugins["pipettes"].guess_pipette( volume=volume, tip_container=tip_container, repeats=1, prioritize_accuracy=True) # Create and append steps. steps.append(step_transfer(target_value=target_content_name, source_value=source_content_name, volume=volume, target_platform=target_platform, source_platform=source_platform, tip=tip_element, tool=tool_name, order=len(steps)+1+initial_order, name=f"Add {source_content_name} to {target_content_name}", target_by="name", source_by="name")) return stepsFunciones para generar pasos "TRANSFER" para un protocolo HL.
def prepare_contents(self, master_recipe)-
Expand source code
def prepare_contents(self, master_recipe): """Generate preparation steps and pre-content data for a 'PCR Mix' template.""" # Make a mix plan object. mix_plan = MixPlan(master_recipe) # Make the recipes and preps. # Requires running "make_preparation" to generate the "mix_plan.preparations" object. preparations = mix_plan.make_preparation() # Get the reduced and flattened preparations.abs reduced_preps: list = mix_plan.preparations # Write output for debugging. if self.verbose: dump_json(self.debug_preps_file, preparations) dump_json(self.debug_reduced_preps_file, reduced_preps) # Calculate total amounts of each (non-mix) component: # Get the names of mixes. mix_names = [d["name"] for d in reduced_preps] # It's magic! amounts, new_preparations = self.get_amounts(deepcopy(preparations), mix_names) flat_new_preparations = self.flatten_derivatives(new_preparations) # Write output for debugging. if self.verbose: dump_json(self.debug_newpreps_file, new_preparations) dump_json(self.flat_new_preparations_file, flat_new_preparations) # List for pre-content data. pre_contents=[] # Add reagents as contents. for k, v in amounts.items(): reagent = {"level": 0, "name": k, "volume": v, "terminal": False} # Use the reagent name as tag name. reagent["tags"] = [k] + ["reagent"] # Get the base names of components, and add them as content tags. reagent_names = master_recipe.recipe_component_names reagent["tags"] += [t for t in reagent_names if str(k).startswith(str(t))] # Make unique. reagent["tags"] = list(set(reagent["tags"])) # Use "level 0" to let the pre_contents be placed in the reagents tube rack later on. pre_contents.append(reagent) # Prepare a list with the names of terminal mixes. all_mix_ingredients = set() # Example: ["fwPrimers", "templates", ...] for reduced_prep in reduced_preps: # Update the set keeping it unique: https://stackoverflow.com/a/32482852 all_mix_ingredients.update(reduced_prep["recipe"].keys()) # A terminal item is not an ingredient in any recipe. terminal_set = [n for n in mix_names if n not in all_mix_ingredients] # TODO: Patch "mix" to get this information directly from there, # instead of inferring it here. # Generate mixes and reaction tubes, and add them to the pre_contents list. # NOTE: These will not be "proper" content yet. They will be used to make a proper ones later on. for item in flat_new_preparations: tags = [] is_terminal = item["name"] in terminal_set if not is_terminal: tags += ["mix", f"level{item['level']}"] else: tags += item["reduced_fixed_comps"] pre_content = { "name": item["name"], "volume": sum(item["recipe"].values()), "level": item["level"], "tags": tags, "terminal": is_terminal } pre_contents.append(pre_content) # Write output for debugging. if self.verbose: dump_json(self.debug_precontents_file, pre_contents) # NOTE: Here the "new_preparations" is returned instead of the "preps", because # the (variable) component names in the pre_contents have been updated # and the same applies to the components in preps. return pre_contents, new_preparationsGenerate preparation steps and pre-content data for a 'PCR Mix' template.
def prepare_objects(self, template_data: dict)-
Expand source code
def prepare_objects(self, template_data: dict): """Generate data and intermediate objects for a 'PCR Mix' template.""" # Prepare the Recipe object. master_recipe: Recipe = self.prepare_recipe(template_data) # Generate draft contents and preparation steps. pre_contents, preps = self.prepare_contents(master_recipe) # Prepare the workspace. workspace = self.prepare_workspace(template_data, pre_contents) # Prepare the protocol. protocol = self.make_protocol(preps, pre_contents, template_data) return workspace, protocolGenerate data and intermediate objects for a 'PCR Mix' template.
def prepare_recipe(self, template_data)-
Expand source code
def prepare_recipe(self, template_data): """ Generate a 'Recipe' object from the data sent by the GUI's template. Example components: [{'fwPrimers': ['1', '2'], 'name': ['G1'], 'rvPrimers': ['5'], 'templates': ['A1']}, {'fwPrimers': ['3'], 'name': ['G2'], 'rvPrimers': ['6'], 'templates': ['A1', 'A2']}] PCR-specific parameters for the recipe: 'bufferFinal': '1', 'bufferStock': '10', 'dntpsFinal': '1', 'dntpsStock': '20', 'finalVol': '20', 'polVol': '0.2', 'primerFinal': '1', 'primerStock': '20', 'templateVol': '1', 'volLossCompensation': '0.2' """ # Get the template data. template_def = template_data['protocol']['templateDefinition'] # NOTE: a copy is needed because content names are "popped" below, # would otherwise propagate to the templateDefinition. components = deepcopy(template_def["components"]) # NOTE: "component groups" would normally be generated like this: # component_groups = groups( # reaccion1 = group( template="A550", pFw = c(667, 999), pRv = 668 ), # reaccion2 = group( template="A550", pFw = 667, pRv = c(668, 770), dmso=10 ) # ) # NOTE: an equivalent behaviour: groups_dict = {} for d in components: d_name = d.pop('name') # Remove the "name" attribute and save it. d_name = str(d_name) # Ensure that it is a string (the UI might send it as a list if it has commas). groups_dict[d_name] = group(**d) # Expand the components and add a group. component_groups = groups(**groups_dict) # Expand the groups. if self.verbose: print(f"Parsed component_groups:\n{pformat(component_groups)}") # NOTE: here properties from the GUI are passed to their counterparts in "Recipe". master_recipe = Recipe( component_groups = component_groups, volume = float(template_def['finalVol']), extra_fraction = float(template_def['volLossCompensation']) + 1.0, stock_solutions_x = comps( buffer = float(template_def['bufferStock']), dNTPs = float(template_def['dntpsStock']), fwPrimers = float(template_def['primerStock']), rvPrimers = float(template_def['primerStock']) ), stock_solutions_vol = comps( templates = float(template_def['templateVol']), pol = float(template_def['polVol']) ), target_solution = comps( buffer = float(template_def['bufferFinal']), dNTPs = float(template_def['dntpsFinal']), fwPrimers = float(template_def['primerFinal']), rvPrimers = float(template_def['primerFinal']) ) # variable_components=comps( # pol = c("Taq", "Pfu") # ) ) return master_recipeGenerate a 'Recipe' object from the data sent by the GUI's template.
Example components: [{'fwPrimers': ['1', '2'], 'name': ['G1'], 'rvPrimers': ['5'], 'templates': ['A1']}, {'fwPrimers': ['3'], 'name': ['G2'], 'rvPrimers': ['6'], 'templates': ['A1', 'A2']}]
PCR-specific parameters for the recipe: 'bufferFinal': '1', 'bufferStock': '10', 'dntpsFinal': '1', 'dntpsStock': '20', 'finalVol': '20', 'polVol': '0.2', 'primerFinal': '1', 'primerStock': '20', 'templateVol': '1', 'volLossCompensation': '0.2'
def prepare_workspace(self, template_data, contents)-
Expand source code
def prepare_workspace(self, template_data, contents): """Generate workspace platformns and contents for a 'PCR Mix' template.""" # Make default platforms (in case the workspace does not have them). template_definition = template_data['protocol']['templateDefinition'] workspace = template_data['workspace'] # Get target platforms by name if found, and clear their contents. # NOTE: This overrides the defaults set above, in order to reuse # any properties sent by the GUI. pcr_tubes_item = None reagents_item = None for i, item in enumerate(workspace['items']): # Look for the PCR tubes paltform. if item.get('name', None) == template_definition[self.tmplt_pcr_tube]: pcr_tubes_item = workspace['items'][i] pcr_tubes_item["content"].clear() # Look for the reagent tubes paltform. elif item.get('name', None) == template_definition[self.tmplt_rgt_tube]: reagents_item = workspace['items'][i] reagents_item["content"].clear() # If no platforms were found, then create and insert them. if not pcr_tubes_item: pcr_tubes_item = newt.platform_items.base_platform_item( name=template_definition[self.tmplt_pcr_tube], # TODO: PCRtubePlatformItem ?? platform=template_definition[self.tmplt_pcr_tube], position={"x": 0, "y": 0, "z": 0}) workspace['items'].append(pcr_tubes_item) if not reagents_item: reagents_item = newt.platform_items.base_platform_item( name=template_definition[self.tmplt_rgt_tube], # TODO: tube15PlatformItem ?? platform=template_definition[self.tmplt_rgt_tube], position={"x": 0, "y": 0, "z": 0}) workspace['items'].append(reagents_item) # The "level" key indicates the depth of the mix. # The "deepest" mix corresponds to the terminal solution, # with the target composition and volume. # In a PCR mix scenario, contents of this depth should end up # in the "pcr tubes" platform, and the rest can go in the "reagents" platform. # NOTE: This was replaced by a precalculated property: "terminal". # Using "max_depth" was buggy, because not all terminal leaves have the same # number of levels, as derived from the number of cuts of their cluster. #max_depth = max([d["level"] for d in contents]) # Populate the platform items with contents. for i, content in enumerate(contents): # Get the appropriate platform depending on which kind of tube this is. # That is, figure out if this is a "PCR tube" or a "reagent tube", by looking # at its "level" property. PCR tubes will be terminal tubes. if content.get("terminal", None): content_list = pcr_tubes_item["content"] platform_data = pcr_tubes_item["platformData"] # Get default container container_name = platform_data["containers"][0]["container"] container_name = self.config.get("pcr_template", {}).get("pcr_container", container_name) else: content_list = reagents_item["content"] platform_data = reagents_item["platformData"] # Get default container container_name = platform_data["containers"][0]["container"] container_name = self.config.get("pcr_template", {}).get("reagent_container", container_name) # Make content index = len(content_list) + 1 col, row = datautils.get_colrow(index, platform_data["wellsColumns"], platform_data["wellsRows"]) tube = newt.contents.base_content( container=container_name, index=index, position={"col": col, "row": row}, name=content["name"], tags=content.get("tags", []), volume=content["volume"]) # Append the content to the platform iteal: bool = item["name"]m's content list. content_list.append(tube) # Return the updated workspace return workspaceGenerate workspace platformns and contents for a 'PCR Mix' template.
def preps_to_steps(self,
preps: dict,
contents: list,
tip_element,
tip_container,
pcr_platform,
reagents_platform)-
Expand source code
def preps_to_steps(self, preps: dict, contents: list, tip_element, tip_container, pcr_platform, reagents_platform): """Funciones para generar pasos "TRANSFER" para un protocolo HL.""" steps = [] steps.extend(self.prep_to_step(preps, contents, tip_element, tip_container, pcr_platform, reagents_platform)) for prep in preps["derivatives"]: steps.extend(self.prep_to_step(prep, contents, tip_element, tip_container, pcr_platform, reagents_platform, initial_order=len(steps))) return stepsFunciones para generar pasos "TRANSFER" para un protocolo HL.
def process_pcr_template(self, template_data)-
Expand source code
def process_pcr_template(self, template_data): """Processes a 'PCR Mix' protocol template event.""" # Generate new workspace and protocol steps. workspace, protocol = self.prepare_objects(template_data) # Generate the output. result = {"protocol": protocol, "workspace": workspace} # Cleanup. self.scrub_mongo_stuff(result) return resultProcesses a 'PCR Mix' protocol template event.
def process_template(self, data)-
Expand source code
def process_template(self, data): """Receives a protocol template and current workspace from the GUI/backend and sends new ones. The incoming SocketIO event expects a response (with a timeout), which is the return value of this function. """ # Logs. logging.info(f"Received template command for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'.") logging.debug(f"Received template command with data:\n{pformat(data)}") try: # Look for a handler function. template_type = data['protocol']['template'] template_handler = self.template_handlers.get(template_type, None) # Generate the output. if template_handler: result = template_handler(template_data=deepcopy(data)) # Send our congrats! msg = f"Successfully processed '{template_type}' template for protocol '{data['protocol']['name']}' and workspace '{data['workspace']['name']}'." self.controller.send_alert(msg, alert_type="message") logging.info(msg) logging.debug(f"This is the result:\n{pformat(result)}") else: logging.warning(f"Unknown template type: '{template_type}'. Returning input data.") logging.debug(f"Input protocol data:\n{pformat(data['protocol'])}") result = deepcopy(data) except Exception as e: protocol_name = data.get('protocol', {}).get('name', None) msg = f"Error during template processing for protocol '{protocol_name}': {pformat(e)}\n" + traceback.format_exc() self.controller.send_alert(msg, alert_type="message") logging.error(msg + "\n" + pformat(data)) # return the received data as is. return data logging.debug(f"Processed template command with result:\n{pformat(result)}") return resultReceives a protocol template and current workspace from the GUI/backend and sends new ones. The incoming SocketIO event expects a response (with a timeout), which is the return value of this function.
def process_well_plate_prep(self, template_data: dict)-
Expand source code
def process_well_plate_prep(self, template_data: dict): """Processes a 'Well Plate Prep' protocol template event. TO-DO: 1. Generate contents for a 384 well-plate platform. - Tag all as "targets" - Use "strain" and "stimuli" data to add more tags. 2. Generate contents for a tube rack, for "strain" and "stimuli" data. 3. Generate high-level steps to transfer "strains" and "stimuli" by tag. """ dump_json("/tmp/template_data.json", template_data) # Extract the data. file_data = self.parse_file_data(template_data) # Get data. well_data = self.parse_well_plate(file_data["layout"], sort_col_idx=2) well_data_dict = self.well_list_to_dict(well_data) # Strain and stimuli data. strain_data = self.parse_well_plate(file_data["strains"]) strain_data_dict = self.well_list_to_dict(strain_data) stimuli_data = self.parse_well_plate(file_data["stimuli"]) stimuli_data_dict = self.well_list_to_dict(stimuli_data) # Get item names template_definition = template_data['protocol']['templateDefinition'] well_plate_name = template_definition["well_plate"] reagent_rack_name = template_definition["reagent_rack"] tip_rack_name = template_definition["tip_rack"] trash_name = template_definition["trash"] stimuli_volume = template_definition["stimuli_volume"] culture_volume = template_definition["cell_volume"] # Get items workspace = template_data["workspace"] well_plate_item = next(i for i in workspace["items"] if i["name"] == well_plate_name) reagent_rack_item = next(i for i in workspace["items"] if i["name"] == reagent_rack_name) tip_rack_item = next(i for i in workspace["items"] if i["name"] == tip_rack_name) # trash_item = next(i for i in workspace["items"] if i["name"] == trash_name) # Get first container as default well_plate_container = next(c for c in well_plate_item["platformData"]["containers"]) reagent_rack_container = next(c for c in reagent_rack_item["platformData"]["containers"]) tip_rack_container = next(c for c in tip_rack_item["platformData"]["containers"]) # Generate contents. contents = [] strains, stimuli = {}, {} # Unpack the row and column identifiers for col_id, row_id, _ in well_data: # Calculate the main index for the content main_index = datautils.get_index( col_id = col_id-1, # Convert to zero-based index row_id = row_id, ncols=well_plate_item["platformData"]["wellsColumns"], nrows=well_plate_item["platformData"]["wellsRows"] ) # Get strain tag: strain_tag = "strain-" + str(strain_data_dict.get(row_id, {}).get(col_id, "default")) stimulus_tag = "stimulus-" + str(stimuli_data_dict.get(row_id, {}).get(col_id, "default")) # Generate the content using the base_content function row_index = datautils.row_to_index(row_id) # Convert the row letter(s) to an index (0-based) content = newt.contents.base_content( container=well_plate_container["container"], # Convert index to 1-based. index=main_index+1, position={"col": col_id, "row": row_index+1}, name=f"Content_{row_id}{col_id}", # Example name tags=[strain_tag, stimulus_tag], # Add any tags if needed volume=0 ) # Append the content. contents.append(content) # Count strain tags, to insert reagent tubes later. strains.setdefault(strain_tag, 0) strains[strain_tag] += 1 # Count stimuli tags, to insert reagent tubes later. stimuli.setdefault(stimulus_tag, 0) stimuli[stimulus_tag] += 1 # Override the platform item's contents. well_plate_item["content"] = contents # Make a list of tags, counts, and the target volume for each one. tag_counts = [[t, c, culture_volume, "strain"] for t, c in strains.items()] tag_counts += [[t, c, stimuli_volume, "stimulus"] for t, c in stimuli.items()] # Add contents to tube rack. tubes = [] tips = [] for tag_name, tag_count, volume, label in tag_counts: # Create and append a tube for each source. tube_index = len(tubes) + 1 col, row = datautils.get_colrow(tube_index, reagent_rack_item["platformData"]["wellsColumns"], reagent_rack_item["platformData"]["wellsRows"]) tube = newt.contents.base_content( container=reagent_rack_container["container"], # Convert index to 1-based. index=tube_index, position={"col": col, "row": row}, name=f"Content_{row}{col}", # Example name tags=[tag_name, label], # Add any tags if needed volume=tag_count*volume # As much as needed ) tubes.append(tube) # Create and append a tip for each tube. # NOTE: This should be enough as the tips are meant to be reused for each source. tip_index = len(tips) + 1 col, row = datautils.get_colrow(tip_index, tip_rack_item["platformData"]["wellsColumns"], tip_rack_item["platformData"]["wellsRows"]) tip = newt.contents.base_content( container=tip_rack_container["container"], # Convert index to 1-based. index=tip_index, position={"col": col, "row": row}, name=f"Content_{row}{col}", volume=0 ) tips.append(tip) # Override the platform item's contents. reagent_rack_item["content"] = tubes tip_rack_item["content"] = tips # Make a tip element, common to all steps. tip_element = make_tip( item=tip_rack_name, mode=self.config.get("pcr_template", {}).get("tip_mode", "reuse"), discardItem=trash_name) # TODO: Defaulting to "None" in case this is being used as a standalone module (e.g. for tests). tip_container = None # Get the tip's container definition. if self.controller: tip_container = self.controller.database_tools.getContainerByName(tip_rack_container["container"]) # Create and append steps. steps = [] for tag_name, tag_count, volume, _ in tag_counts: # Guess the tool. tool_name = self.config.get("pcr_template", {}).get("default_tool", None) if self.controller: tool_name = self.controller.plugins["pipettes"].guess_pipette( volume=volume, tip_container=tip_container, repeats=tag_count, prioritize_accuracy=True) # Create step. step = step_transfer(target_value=tag_name, source_value=tag_name, volume=volume, target_platform=well_plate_name, source_platform=reagent_rack_name, tool=tool_name, tip=tip_element, order=len(steps)+1, name=f"Add {reagent_rack_name} to {well_plate_name}", target_by="tag", source_by="tag") steps.append(step) # Save steps, overwriting any previous value. template_data['protocol']['steps'] = steps # Generate the protocol, using the originals as defaults. protocol = newt.hl_protocols.protocol_hl(**template_data['protocol']) # Generate the output. result = {"protocol": protocol, "workspace": workspace} # Cleanup result = self.scrub_mongo_stuff(result) # Push data into the result (for tests). result["well_data"] = well_data result["well_data_dict"] = well_data_dict dump_json("/tmp/well_data_dict.json", result) return resultProcesses a 'Well Plate Prep' protocol template event. TO-DO: 1. Generate contents for a 384 well-plate platform. - Tag all as "targets" - Use "strain" and "stimuli" data to add more tags. 2. Generate contents for a tube rack, for "strain" and "stimuli" data. 3. Generate high-level steps to transfer "strains" and "stimuli" by tag.
def register_sio_callbacks(self)-
Expand source code
def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI.""" if self.controller.comms.sio: # Register template handler. @self.controller.comms.sio.on(self.protocol_template_event) async def protocol_template(data): return self.process_template(data) else: logging.error("Failed to register event handlers. The SIO comms object is undefined.")Function to register socketio event callbacks, typically sent by the Pipettin GUI.
Inherited members