Source code for rse.main.database.relational

"""

Copyright (C) 2020 Vanessa Sochat.

This Source Code Form is subject to the terms of the
Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""

from rse.exceptions import (
    MissingDatabaseString,
    NoReposError,
    MultipleReposExistError,
    RepoNotFoundError,
)
from rse.main.database.base import Database
from rse.main.parsers import get_parser
from rse.main.parsers.base import ParserBase

from sqlalchemy import create_engine, desc
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import or_

import logging
import json

bot = logging.getLogger("rse.main.database.relational")


[docs]class RelationalDatabase(Database): """A RelationalDatabase is a more robust relational datbase (to sqlite). Since the global database property can be any of postgresql, mysql+pysq;, it is defined on init. The sqlite database also uses this class, but defines a custom init function to handle the rse.db file. """ def __init__(self, config_dir, config=None, **kwargs): """init for the filesystem ensures that the base folder (named according to the studyid) exists. """ self.database = kwargs.get("database") self.config = config database_string = kwargs.get("database_string") if not database_string: raise MissingDatabaseString # The database url includes the type and string self.db = "%s://%s" % (self.database, database_string) self.create_database()
[docs] def create_database(self): """create the databsae based on the string, whether it's relational or sqlite. self.db must be defined. """ from rse.main.database.models import Base self.engine = create_engine(self.db) self.session = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=self.engine) ) Base.query = self.session.query_property() Base.metadata.create_all(bind=self.engine) self.Base = Base
# Global
[docs] def exists(self, uid): """Determine if a repo exists.""" from rse.main.database.models import SoftwareRepository parser = get_parser(uid, config=self.config) repo = SoftwareRepository.query.filter( SoftwareRepository.uid == parser.uid ).first() return repo is not None
[docs] def get_or_create(self, uid): """Determine if a repo exists.""" from rse.main.database.models import SoftwareRepository parser = get_parser(uid, config=self.config) repo = SoftwareRepository.query.filter( SoftwareRepository.uid == parser.uid ).first() if not repo: repo = self.add(uid) return repo
[docs] def clear(self): """clear (delete) all repos. This could be improved to cascade instead.""" from rse.main.database.models import SoftwareRepository SoftwareRepository.query.delete() self.session.commit() return True
# Add or Update requires executor
[docs] def add(self, uid): """Create a new repo based on a uid that matches to a parser.""" from rse.main.database.models import SoftwareRepository parser = get_parser(uid, config=self.config) if not self.exists(parser.uid): data = parser.get_metadata() # If it's a parser handoff if isinstance(data, ParserBase): parser = data data = parser.data if data: repo = SoftwareRepository( uid=parser.uid, parser=parser.name, data=json.dumps(parser.export()) ) self.session.add(repo) self.session.commit() bot.info(f"{parser.uid} was added to the the database.") repo.parser = parser return repo
[docs] def update(self, repo, updates=None, rewrite=False): """update a repo with a json dictionary.""" # Return of None indicates non-success data = repo.parser.get_metadata() if data: updates = updates or {} updates.update(repo.parser.export()) # Load the previous data to update data = {} if repo.data and not rewrite: data = json.loads(repo.data) data.update(updates) repo.data = json.dumps(data) self.session.add(repo) self.session.commit() return repo
[docs] def label(self, repo, key, value, force=False): """Update a repository with a specific key/value pair.""" data = {} if repo.data: data = json.loads(repo.data) if key in data and not force: raise RuntimeError( f"{key} is already defined for {repo.uid}. Use --force to overwrite." ) data.update({key: value}) bot.debug(f"Adding key {key}:{value}") repo.data = json.dumps(data) self.session.add(repo) self.session.commit() return repo
# Get, delete, etc. only require uid
[docs] def get(self, uid=None): """Get a repo based on a uid. Exits on error if doesn't exist. If a uid is not provided, get the last updated repository. """ from rse.main.database.models import SoftwareRepository # Retrieve either the last repo, or the one with a specific uid if not uid: repo = ( self.session.query(SoftwareRepository) .order_by(desc("timestamp")) .first() ) parser = get_parser(repo.uid, config=self.config) if not repo: raise NoReposError else: parser = get_parser(uid, config=self.config) repo = SoftwareRepository.query.filter( SoftwareRepository.uid == parser.uid ).first() # If an exact match isn't there, look for partial match if not repo: query = "%" + parser.uid + "%" query = self.session.query(SoftwareRepository).filter( SoftwareRepository.uid.ilike(query) ) results = self.session.execute(query).fetchall() if len(results) == 1: return self.get(results[0][0]) elif len(results) > 1: raise MultipleReposExistError(parser.uid) else: raise RepoNotFoundError(parser.uid) repo.parser = parser return repo
[docs] def delete_repo(self, uid): """delete a repo based on a specific repo id.""" from rse.main.database.models import SoftwareRepository repo = self.get(uid) if not repo: bot.error(f"{uid} does not exist in the database.") return False SoftwareRepository.query.filter(SoftwareRepository.uid == repo.uid).delete() self.session.commit() bot.info(f"{uid} has been removed.") return True
[docs] def delete_parser(self, name): """delete all repos for a parser, based on parser's name (str).""" from rse.main.database.models import SoftwareRepository deleted_items = False for repo in SoftwareRepository.query.filter( SoftwareRepository.parser_name == name ): deleted_items = True self.session.delete(repo) self.session.commit() return deleted_items
[docs] def list_repos(self, name=None): """list repos, either under a particular parser name (if provided) or just the parsers. """ from rse.main.database.models import SoftwareRepository if name: repos = SoftwareRepository.query.filter( SoftwareRepository.parser_name == name ) else: repos = SoftwareRepository.query.all() rows = [] for repo in repos: rows.append([repo.uid]) return rows
[docs] def search(self, query, taxonomy=None, criteria=None): """Search across the database for a particular query.""" from rse.main.database.models import SoftwareRepository # We will return a lookup of results results = {} # Required to have a query if not query: return results # Ensure that query can be part of a larger string expression = "%" + query + "%" result = self.session.query(SoftwareRepository).filter( or_( SoftwareRepository.data.ilike(expression), SoftwareRepository.uid.ilike(expression), ) ) # list of tuples, (uid, datetime, executor] results = self.session.execute(result).fetchall() return {query: [[r[0], str(r[2]), str(r[1])] for r in results]}