Module pipettin-piper.piper.datatools.mongo

Functions

def load_datatools(controller: Controller)
Expand source code
def load_datatools(controller: Controller):
    if controller.database_tools:
        raise DataError("Database tools already set in controller to: " + str(controller.database_tools))
    # Set the databse_tools property to an instance of MongoObjects.
    controller.database_tools = MongoObjects(controller=controller)
    return controller.database_tools
def load_from_config(**db_config: dict)
Expand source code
def load_from_config(**db_config : dict):
    return MongoObjects(**db_config)

Classes

class MongoObjects (database_url=None, database_name=None, env_file=None, controller=None, verbose=True)
Expand source code
class MongoObjects(DataTools):
    """A class holding and querying data about the machine, objects on it, and its configuration.
    Requires MongoDB as a data backend.
    """

    verbose: bool = False
    controller: Controller
    database_url: str = 'localhost:27017'
    database_name: str = 'pipettin'

    env_file: str
    """Path to the UI's '.env' file, containing DATABASE_NAME and DATABASE_URI variables."""

    def __init__(self,
                 database_url=None,
                 database_name=None,
                 env_file=None,
                 controller=None,
                 verbose=True):

        self.verbose = verbose
        self.config = {"database": {
            "database_url": self.database_url,
            "database_name": self.database_name
            }
        }

        # Get the config from the controller, if any.
        if controller:
            self.controller = controller
            self.verbose = controller.verbose

            # Update connection details.
            self.config["database"].update(controller.config["database"])

            # Use input argument if set, else use the config.
            if env_file:
                self.env_file = env_file
            else:
                self.env_file = self.config["database"].get("env_file", None)

            # Load variables from ".env" file.
            if self.env_file is not None:
                self.update_from_env_file(self.config, self.env_file)

        # Set "database_url" from config, defaulting to input arguments.
        self.database_url = self.config["database"]["database_url"]
        self.database_name = self.config["database"]["database_name"]

        # Override connection details from init arguments.
        if database_url:
            self.database_url = database_url
        if database_name:
            self.database_name = database_name

        self.setup_db(self.database_url, self.database_name)

    def make_mongo_url(self, database_url):
        """Compose a MongoDB connection URL from the database URL"""
        return f'mongodb://{database_url}/'

    @property
    def mongo_url(self):
        """Current MongoDB connection URL"""
        return self.make_mongo_url(self.database_url)

    # DATABASE SETUP ############

    def setup_db(self, database_url, database_name):
        """Establish a connection to the MongoDB database and configure collections.

        This method sets up the connection to a MongoDB instance using the provided or default
        `database_url` and `database_name`. It initializes various collections, ensuring that
        the appropriate indexes are created. If the `database_url` or `database_name` is not 
        provided, the method uses the default values stored in the instance.

        Args:
            database_url (str): The MongoDB connection address.
            database_name (str): The name of the database to connect to.

        Raises:
            pymongo.errors.InvalidName: Raised if the provided `database_name` is invalid.
            pymongo.errors.DuplicateKeyError: Raised when duplicate keys are found while creating indexes.
            DataError: Raised if there are errors during the connection or index creation process.
        """

        try:
            # Connect and select database name.
            mongo_url = self.make_mongo_url(database_url)
            logging.info(f"Connecting to mongodb at: '{mongo_url}'")
            self.client = pymongo.MongoClient(mongo_url)
            logging.info(f"Connected to database: '{database_name}'")
            # The databases in the MongoClient are accessed with a dict-style syntax.
            self.db = self.client[database_name]

            # Collection names.
            self.collectionProtocols: Collection = self.db['protocols']
            self.collectionHlProtocols: Collection = self.db['hLprotocols']
            self.collectionWorkspaces: Collection = self.db['workspaces']
            self.collectionPlatforms: Collection = self.db['platforms']
            self.collectionContainers: Collection = self.db['containers']
            self.collectionTools: Collection = self.db['tools']
            self.collectionSettings: Collection = self.db['settings']

            try:
                # Create a unique index by name and workspace on "hLprotocols".
                # See: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index
                self.collectionHlProtocols.create_index(
                    keys=[('name', pymongo.ASCENDING),
                          ('workspace', pymongo.ASCENDING)],
                    unique=True)
                # Ensure unique indexes in the rest of the collections.
                self.collectionProtocols.create_index([('name', pymongo.ASCENDING)], unique=True)
                self.collectionWorkspaces.create_index([('name', pymongo.ASCENDING)], unique=True)
                self.collectionPlatforms.create_index([('name', pymongo.ASCENDING)], unique=True)
                self.collectionContainers.create_index([('name', pymongo.ASCENDING)], unique=True)
                self.collectionTools.create_index([('name', pymongo.ASCENDING)], unique=True)
                self.collectionSettings.create_index([('name', pymongo.ASCENDING)], unique=True)
            except pymongo.errors.DuplicateKeyError as e:
                logging.error(f"Failed to ensure indexes on database {database_name}. You have duplicate object IDs: {e}")
            except Exception as e:
                msg = f"Failed to ensure index with error: {e}"
                logging.error(msg)
                raise DataError(msg) from e

        except pymongo.errors.InvalidName as e:
            msg = f"Invalid database name '{database_name}' ({e})."
            logging.error(msg)
        except DataError as e:
            logging.error(f"Failed connect to the database or to load objects from it: {e}\n" + traceback.format_exc())
            raise e

    # Main class methods ####

    #### POP METHODS ####

    def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int):
        """Remove a content from a platform item by index, deleting it form the DB."""
        try:
            workspace = self.getWorkspaceByName(workspace_name)
            item = self.getWorkspaceItemByName(workspace=workspace, item_name=item_name)
            content = item["content"][content_idx]

            # Delete.
            # mongo_id = content["_id"]
            # result = self.db.workspaces.find_one_and_delete(
            #     # {"_id": ObjectId(str(mongo_id))}
            #     # { "items.content._id": ObjectId(str(mongo_id)) }
            #     {"name": workspace_name, "items.name": item_name, "items.content.name": "tip1"}
            # )

            # Delete by update.
            result = self.db.workspaces.update_one(
                { "name": workspace_name, "items.name": item_name },
                { "$pull": { "items.$.content": content } }
            )
            result = result.modified_count

            # Find one and delete by update.
            # result = self.db.workspaces.find_one_and_update(
            #     { "name": workspace_name, "items.name": item_name },
            #     { "$pull": { "items.$.content": content } },
            #     return_document=True  # This returns the document after the update
            # )

            if result:
                logging.debug(f"Deleted content from the DB: {content}")
            else:
                logging.warning(f"Failed to delete content: {content}")
        except Exception as e:
            msg = f"Failed to pull content {content_idx}"
            msg += f" from item '{item_name}' in workspace '{workspace_name}'. Error: {e}"
            logging.error(msg)
            raise DataError(msg) from e

        return result

    #### GET METHODS ####

    def listProtocols(self):
        """Method to get protocols from MongoDB as a list, log protocol names, and return them."""
        protocols = list(self.collectionProtocols.find())

        logging.debug(f"Found the following protocols: {[p['name'] for p in protocols]}")

        return protocols

    def listHlProtocols(self):
        """Method to get high-level protocols from MongoDB as a list, log protocol names, and return them."""
        hl_protocols = list(self.collectionHlProtocols.find())

        logging.debug(f"Found the following high-level protocols: {[p['name'] for p in hl_protocols]}")

        return hl_protocols

    def listWorkspaces(self):
        """Method to get workspaces from MongoDB as a list, log names, and return them."""
        workspaces = list(self.collectionWorkspaces.find())

        logging.debug(f"Found the following workspaces: {[w['name'] for w in workspaces]}")

        return workspaces

    def listPlatforms(self):
        """Method to get platforms from MongoDB as a list, log names, and return them."""
        platforms = list(self.collectionPlatforms.find())

        logging.debug(f"Found the following platforms: {[pl['name'] for pl in platforms]}")

        return platforms

    def listContainers(self):
        """Method to get 'containers' from MongoDB as a list, log names, and return them."""
        containers = list(self.collectionContainers.find())

        logging.debug(f"Found the following containers: {[t['name'] for t in containers]}")

        return containers

    def listTools(self):
        """Method to get tools from MongoDB as a list, log names, and return them."""
        tools = list(self.collectionTools.find())

        logging.debug(f"Found the following tools: {[t['name'] for t in tools]}")

        return tools

    def listSettings(self):
        """Method to get 'settings' from MongoDB as a list, log names, and return them."""
        settings = list(self.collectionSettings.find())

        logging.debug(f"Found {len(settings)} set(s) of settings.")

        if settings is None:
            pass
        elif len(settings) > 1:
            logging.debug("More than one set of settings was retreived from the database.")

        return settings

    #### CUSTOM UPDATE METHODS ####

    def updateObject(self, collection_name: str, document_id: str, field: str, data):
        """Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID

        Test:
            m.updateObject(
                collection="protocols",
                oid=action["_id"],
                field="actions.0.sarasa",
                data={"hola": 3}
            )

        References:
        - https://www.w3schools.com/python/python_mongodb_update.asp
        - https://stackoverflow.com/a/4374288/11524079
        - https://www.mongodb.com/docs/manual/reference/operator/update/set/
        - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation
        - https://stackoverflow.com/a/46608956/11524079
        """
        # Get the collection.
        collection = self.db[collection_name]

        # Alt2:
        collection.find_one_and_update({"_id": ObjectId(str(document_id))},
                                       {"$set": {field: data}})

    def updateActionBy(self, action: dict, protocol_name: str,
                       new_data: dict, field: str = None,
                       find_by: str = "index"):
        """Update an action's data"""
        logging.info(f"Updating action with {find_by}={action[find_by]} from protocol '{protocol_name}' with new_data={new_data} and field={field}")

        # Get the collection.
        protocols = self.db["protocols"]

        # Target field (mongo syntax).
        target = "actions.$"
        if field is not None:
            target += f".{field}"

        # Update.
        result = protocols.update_one(
            { "name": protocol_name, f"actions.{find_by}": action[find_by] },
            {"$set": {target: new_data}}
        )
        # Handle failure to update.
        if not result.modified_count:
            msg = f"Failed to update action by {find_by}={action[find_by]}."
            logging.error(msg)
            raise DataError(msg)

    def updateObjectBy(self, query: dict, collection_name: str, data: dict):
        """Find one object in a collection by the 'query' selector and update its data"""
        # Get the collection.
        collection: Collection = self.db[collection_name]
        # Find one object and update it.
        collection.find_one_and_update(query, {'$set': data})

    def updateObjectByName(self, collection_name: str, document_name: str, data: dict):
        """Find one object in a collection by the 'name' key and update its data"""
        # Select an object by its "name" key and update it.
        self.updateObjectBy(query={"name": document_name}, collection_name=collection_name, data=data)

    def updateToolByName(self, tool_name: str, tool_data: dict):
        """Find one tool by the 'name' key and update its data"""
        # Select a tool by its "name" key and update it.
        self.updateObjectByName(collection_name="tools", document_name=tool_name, data=tool_data)

A class holding and querying data about the machine, objects on it, and its configuration. Requires MongoDB as a data backend.

Ancestors

Class variables

var controller : Controller
var database_name : str
var database_url : str
var env_file : str

Path to the UI's '.env' file, containing DATABASE_NAME and DATABASE_URI variables.

var verbose : bool

Instance variables

prop mongo_url
Expand source code
@property
def mongo_url(self):
    """Current MongoDB connection URL"""
    return self.make_mongo_url(self.database_url)

Current MongoDB connection URL

Methods

def listContainers(self)
Expand source code
def listContainers(self):
    """Method to get 'containers' from MongoDB as a list, log names, and return them."""
    containers = list(self.collectionContainers.find())

    logging.debug(f"Found the following containers: {[t['name'] for t in containers]}")

    return containers

Method to get 'containers' from MongoDB as a list, log names, and return them.

def listHlProtocols(self)
Expand source code
def listHlProtocols(self):
    """Method to get high-level protocols from MongoDB as a list, log protocol names, and return them."""
    hl_protocols = list(self.collectionHlProtocols.find())

    logging.debug(f"Found the following high-level protocols: {[p['name'] for p in hl_protocols]}")

    return hl_protocols

Method to get high-level protocols from MongoDB as a list, log protocol names, and return them.

def listPlatforms(self)
Expand source code
def listPlatforms(self):
    """Method to get platforms from MongoDB as a list, log names, and return them."""
    platforms = list(self.collectionPlatforms.find())

    logging.debug(f"Found the following platforms: {[pl['name'] for pl in platforms]}")

    return platforms

Method to get platforms from MongoDB as a list, log names, and return them.

def listProtocols(self)
Expand source code
def listProtocols(self):
    """Method to get protocols from MongoDB as a list, log protocol names, and return them."""
    protocols = list(self.collectionProtocols.find())

    logging.debug(f"Found the following protocols: {[p['name'] for p in protocols]}")

    return protocols

Method to get protocols from MongoDB as a list, log protocol names, and return them.

def listSettings(self)
Expand source code
def listSettings(self):
    """Method to get 'settings' from MongoDB as a list, log names, and return them."""
    settings = list(self.collectionSettings.find())

    logging.debug(f"Found {len(settings)} set(s) of settings.")

    if settings is None:
        pass
    elif len(settings) > 1:
        logging.debug("More than one set of settings was retreived from the database.")

    return settings

Method to get 'settings' from MongoDB as a list, log names, and return them.

def listTools(self)
Expand source code
def listTools(self):
    """Method to get tools from MongoDB as a list, log names, and return them."""
    tools = list(self.collectionTools.find())

    logging.debug(f"Found the following tools: {[t['name'] for t in tools]}")

    return tools

Method to get tools from MongoDB as a list, log names, and return them.

def listWorkspaces(self)
Expand source code
def listWorkspaces(self):
    """Method to get workspaces from MongoDB as a list, log names, and return them."""
    workspaces = list(self.collectionWorkspaces.find())

    logging.debug(f"Found the following workspaces: {[w['name'] for w in workspaces]}")

    return workspaces

Method to get workspaces from MongoDB as a list, log names, and return them.

def make_mongo_url(self, database_url)
Expand source code
def make_mongo_url(self, database_url):
    """Compose a MongoDB connection URL from the database URL"""
    return f'mongodb://{database_url}/'

Compose a MongoDB connection URL from the database URL

def setup_db(self, database_url, database_name)
Expand source code
def setup_db(self, database_url, database_name):
    """Establish a connection to the MongoDB database and configure collections.

    This method sets up the connection to a MongoDB instance using the provided or default
    `database_url` and `database_name`. It initializes various collections, ensuring that
    the appropriate indexes are created. If the `database_url` or `database_name` is not 
    provided, the method uses the default values stored in the instance.

    Args:
        database_url (str): The MongoDB connection address.
        database_name (str): The name of the database to connect to.

    Raises:
        pymongo.errors.InvalidName: Raised if the provided `database_name` is invalid.
        pymongo.errors.DuplicateKeyError: Raised when duplicate keys are found while creating indexes.
        DataError: Raised if there are errors during the connection or index creation process.
    """

    try:
        # Connect and select database name.
        mongo_url = self.make_mongo_url(database_url)
        logging.info(f"Connecting to mongodb at: '{mongo_url}'")
        self.client = pymongo.MongoClient(mongo_url)
        logging.info(f"Connected to database: '{database_name}'")
        # The databases in the MongoClient are accessed with a dict-style syntax.
        self.db = self.client[database_name]

        # Collection names.
        self.collectionProtocols: Collection = self.db['protocols']
        self.collectionHlProtocols: Collection = self.db['hLprotocols']
        self.collectionWorkspaces: Collection = self.db['workspaces']
        self.collectionPlatforms: Collection = self.db['platforms']
        self.collectionContainers: Collection = self.db['containers']
        self.collectionTools: Collection = self.db['tools']
        self.collectionSettings: Collection = self.db['settings']

        try:
            # Create a unique index by name and workspace on "hLprotocols".
            # See: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index
            self.collectionHlProtocols.create_index(
                keys=[('name', pymongo.ASCENDING),
                      ('workspace', pymongo.ASCENDING)],
                unique=True)
            # Ensure unique indexes in the rest of the collections.
            self.collectionProtocols.create_index([('name', pymongo.ASCENDING)], unique=True)
            self.collectionWorkspaces.create_index([('name', pymongo.ASCENDING)], unique=True)
            self.collectionPlatforms.create_index([('name', pymongo.ASCENDING)], unique=True)
            self.collectionContainers.create_index([('name', pymongo.ASCENDING)], unique=True)
            self.collectionTools.create_index([('name', pymongo.ASCENDING)], unique=True)
            self.collectionSettings.create_index([('name', pymongo.ASCENDING)], unique=True)
        except pymongo.errors.DuplicateKeyError as e:
            logging.error(f"Failed to ensure indexes on database {database_name}. You have duplicate object IDs: {e}")
        except Exception as e:
            msg = f"Failed to ensure index with error: {e}"
            logging.error(msg)
            raise DataError(msg) from e

    except pymongo.errors.InvalidName as e:
        msg = f"Invalid database name '{database_name}' ({e})."
        logging.error(msg)
    except DataError as e:
        logging.error(f"Failed connect to the database or to load objects from it: {e}\n" + traceback.format_exc())
        raise e

Establish a connection to the MongoDB database and configure collections.

This method sets up the connection to a MongoDB instance using the provided or default database_url and database_name. It initializes various collections, ensuring that the appropriate indexes are created. If the database_url or database_name is not provided, the method uses the default values stored in the instance.

Args

database_url : str
The MongoDB connection address.
database_name : str
The name of the database to connect to.

Raises

pymongo.errors.InvalidName
Raised if the provided database_name is invalid.
pymongo.errors.DuplicateKeyError
Raised when duplicate keys are found while creating indexes.
DataError
Raised if there are errors during the connection or index creation process.
def updateActionBy(self,
action: dict,
protocol_name: str,
new_data: dict,
field: str = None,
find_by: str = 'index')
Expand source code
def updateActionBy(self, action: dict, protocol_name: str,
                   new_data: dict, field: str = None,
                   find_by: str = "index"):
    """Update an action's data"""
    logging.info(f"Updating action with {find_by}={action[find_by]} from protocol '{protocol_name}' with new_data={new_data} and field={field}")

    # Get the collection.
    protocols = self.db["protocols"]

    # Target field (mongo syntax).
    target = "actions.$"
    if field is not None:
        target += f".{field}"

    # Update.
    result = protocols.update_one(
        { "name": protocol_name, f"actions.{find_by}": action[find_by] },
        {"$set": {target: new_data}}
    )
    # Handle failure to update.
    if not result.modified_count:
        msg = f"Failed to update action by {find_by}={action[find_by]}."
        logging.error(msg)
        raise DataError(msg)

Update an action's data

def updateObject(self, collection_name: str, document_id: str, field: str, data)
Expand source code
def updateObject(self, collection_name: str, document_id: str, field: str, data):
    """Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID

    Test:
        m.updateObject(
            collection="protocols",
            oid=action["_id"],
            field="actions.0.sarasa",
            data={"hola": 3}
        )

    References:
    - https://www.w3schools.com/python/python_mongodb_update.asp
    - https://stackoverflow.com/a/4374288/11524079
    - https://www.mongodb.com/docs/manual/reference/operator/update/set/
    - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation
    - https://stackoverflow.com/a/46608956/11524079
    """
    # Get the collection.
    collection = self.db[collection_name]

    # Alt2:
    collection.find_one_and_update({"_id": ObjectId(str(document_id))},
                                   {"$set": {field: data}})

Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID

Test

m.updateObject( collection="protocols", oid=action["_id"], field="actions.0.sarasa", data={"hola": 3} )

References: - https://www.w3schools.com/python/python_mongodb_update.asp - https://stackoverflow.com/a/4374288/11524079 - https://www.mongodb.com/docs/manual/reference/operator/update/set/ - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation - https://stackoverflow.com/a/46608956/11524079

def updateObjectBy(self, query: dict, collection_name: str, data: dict)
Expand source code
def updateObjectBy(self, query: dict, collection_name: str, data: dict):
    """Find one object in a collection by the 'query' selector and update its data"""
    # Get the collection.
    collection: Collection = self.db[collection_name]
    # Find one object and update it.
    collection.find_one_and_update(query, {'$set': data})

Find one object in a collection by the 'query' selector and update its data

def updateObjectByName(self, collection_name: str, document_name: str, data: dict)
Expand source code
def updateObjectByName(self, collection_name: str, document_name: str, data: dict):
    """Find one object in a collection by the 'name' key and update its data"""
    # Select an object by its "name" key and update it.
    self.updateObjectBy(query={"name": document_name}, collection_name=collection_name, data=data)

Find one object in a collection by the 'name' key and update its data

def updateToolByName(self, tool_name: str, tool_data: dict)
Expand source code
def updateToolByName(self, tool_name: str, tool_data: dict):
    """Find one tool by the 'name' key and update its data"""
    # Select a tool by its "name" key and update it.
    self.updateObjectByName(collection_name="tools", document_name=tool_name, data=tool_data)

Find one tool by the 'name' key and update its data

Inherited members