Source code for rse.main

"""

Copyright (C) 2020 Vanessa Sochat.

This Source Code Form is subject to the terms of the
Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
with this file, You can obtain one at http://mozilla.org/MPL/2.0/.

"""

from rse.main.config import Config
from rse.defaults import RSE_DATABASE, RSE_PARSERS, RSE_CONFIG_FILE

from rse.exceptions import RepoNotFoundError, RepoMetadataExistError
from rse.main.database import init_db
from rse.utils.prompt import confirm, choice_prompt
from rse.utils.file import read_file
from rse.utils.command import get_github_username
from rse.utils.urls import repository_regex
from rse.main.parsers import get_parser
from rse.main.criteria import get_criteria
from rse.main.taxonomy import get_taxonomy
from rse.logger.message import bot as message

import json
import logging
import os
import re

bot = logging.getLogger("rse.main")
parser_regex = "github"


[docs]class Encyclopedia: """An encyclopedia is one or more namespaces to store research software. By default, we create a structure on the filesystem, however an sqlite database (or other) can be used. """ def __init__(self, config_file=None, database=None, generate=False): """create a software repository. We take a config file, which should sit at the root of the repository, and then parse the subfolders accordingly. """ self.config = Config(config_file or RSE_CONFIG_FILE, generate=generate) self.config_dir = os.path.dirname(self.config.configfile) self.initdb(database)
[docs] def initdb(self, database): """setup the rse home (where the config directory is stored) and the database specification. If a database string is required (and not provided) alert the user and exit on error). Arguments: - config_dir (str) : the configuration directory (home for rse) - database (str) : a string to specify the database setup """ self.database = ( database or RSE_DATABASE or self.config.get("DEFAULT", "database") or "filesystem" ) database_string = self.config.get("DEFAULT", "databaseconnect") bot.debug("Database: %s" % self.database) # Supported database options valid = ("sqlite", "postgresql", "mysql+pymysql", "filesystem") if not self.database.startswith(valid): bot.warning( "%s is not yet a supported type, saving to filesystem." % self.database ) self.database = "filesystem" # Create database client with functions for database type self.db = init_db( self.database, config_dir=self.config_dir, database_string=database_string, config=self.config, )
[docs] def exists(self, uid): """based on a parser type and unique identifier, determine if software exists in the database """ parser = get_parser(uid, config=self.config) return self.db.exists(parser.uid)
[docs] def list(self, name=None): """A wrapper to the database list_repos function. Optionally take a whole parser name (e.g., github) or just a specific uid. No parser indicates that we list everything. """ return self.db.list_repos(name)
[docs] def list_criteria(self): """Get a listing of criteria from the rse API""" if not hasattr(self, "criteria"): self.criteria = get_criteria() return self.criteria
[docs] def list_taxonomy(self): """Get a listing of a flattened taxonomy from the rse API""" if not hasattr(self, "taxonomy"): self.taxonomy = get_taxonomy() return self.taxonomy
[docs] def bulk_add(self, filename): """Given a filename with a single list of repos, add each""" repos = [] if os.path.exists(filename): for name in read_file(filename): uid = name.strip() repos += [self.add(uid, quiet=True)] or [] return repos
[docs] def bulk_update(self, filename, rewrite=False): """Given a filename with a single list of repos, add each""" repos = [] if os.path.exists(filename): for name in read_file(filename): uid = name.strip() try: repos += [self.update(uid, rewrite=rewrite)] except RepoNotFoundError: pass return repos
[docs] def add(self, uid, quiet=False): """A wrapper to add a repository to the software database.""" if not self.exists(uid): repo = self.db.add(uid) return repo if not quiet: bot.error(f"{uid} already exists in the database.")
[docs] def get(self, uid=None): """A wrapper to get a repo id from the database. If an id is not provided, will return the last updated repo based on timestamp of file or database. """ return self.db.get(uid)
[docs] def get_or_create(self, uid): return self.db.get_or_create(uid)
[docs] def clear(self, target=None, noprompt=False): """clear takes a target, and that can be a uid, parser, or none We ask the user for confirmation. """ # Case 1: no target indicates clearing all if not target: if noprompt or confirm( "This will delete all software in the database, are you sure?" ): return self.db.clear() # Case 2: it's a parser elif target in RSE_PARSERS: if noprompt or confirm( f"This will delete all {target} software in the database, are you sure?" ): return self.db.delete_parser(target) # Case 3, it's a specific software identifier elif re.search(parser_regex, target): if noprompt or confirm( f"This will delete software {target}, are you sure?" ): return self.db.delete_repo(target) else: raise RuntimeError(f"Unrecognized {target} to clear")
[docs] def update(self, uid, rewrite=False): """Update an existing software repository.""" try: repo = self.get(uid) self.db.update(repo, rewrite=rewrite) bot.info(f"{repo.uid} has been updated.") return repo except RepoNotFoundError: bot.error(f"{uid} does not exist.")
[docs] def label(self, uid, key, value, force=False): """Update an existing software repository with a specific label.""" try: repo = self.get(uid) self.db.label(repo, key, value, force=force) bot.info(f"{repo.uid} has been updated.") return repo except RepoMetadataExistError: bot.error( f"{repo.uid} already has value for {key}. Use --force to overwrite." ) except RepoNotFoundError: bot.error(f"{uid} does not exist.")
[docs] def search(self, query, taxonomy=None, criteria=None): """Search across commands and general metadata for a string of interest. We use regular expressions (re.search) so they are supported. Search is only available for non-filesystem databases. """ results = self.db.search(query, taxonomy=taxonomy, criteria=criteria) if results: return results bot.info(f"No results matching {query}")
# Topics
[docs] def topics(self, pattern=None): """return a list of unique topics, optionally matching a pattern""" topics = set() for name in self.list(): repo = self.get(name[0]) # Relational needs to load from string data = repo.data if isinstance(data, str): data = json.loads(data) # Get the list of topics, optionally filter by a pattern topiclist = data.get("topics", []) if pattern is not None: topiclist = [t for t in topiclist if re.search(pattern, t)] # Add to topics set [topics.add(t) for t in topiclist] return sorted(list(topics))
[docs] def repos_by_topics(self, topics): """return a list of unique topics, optionally matching a pattern""" repos = [] for name in self.list(): repo = self.get(name[0]) topiclist = repo.parser.get_metadata().get("topics", []) if set(topics).intersection(set(topiclist)): repos.append(repo.uid) return sorted(repos)
# Save Handlers
[docs] def save_criteria(self, repo): """Given a repository that can be a handle to a filesystem entry or database, save the criteria """ # The filesystem database saves at the end if hasattr(repo, "save_criteria"): repo.save_criteria() # Relational saves the database item else: self.db.update(repo)
[docs] def save_taxonomy(self, repo, username, uids): """Given a repository that can be a handle to a filesystem entry or database, save the criteria """ if hasattr(repo, "save_taxonomy"): repo.taxonomy[username] = uids repo.save_taxonomy() else: repo.update_taxonomy(username, uids) self.db.update(repo)
# Metrics
[docs] def analyze_bulk( self, cthresh=0.5, tthresh=1, taxonomy_uids=None, criteria_uids=None, include_empty=False, ): """analyze takes a repository and calculates a "final answer" based on user provided thresholds """ results = [] for repo in self.list(): result = self.analyze( repo[0], cthresh=cthresh, tthresh=tthresh, taxonomy_uids=taxonomy_uids, criteria_uids=criteria_uids, ) if not result["taxonomy"] and not result["criteria"] and not include_empty: continue results.append(result) return results
[docs] def analyze( self, repo, cthresh=0.5, tthresh=1, taxonomy_uids=None, criteria_uids=None ): """analyze takes a repository and calculates a "final answer" based on user provided thresholds """ # If taxonomy or criteria lists aren't defined, use all if not taxonomy_uids: taxonomy_uids = [x["uid"] for x in self.list_taxonomy()] if not criteria_uids: criteria_uids = [x["uid"] for x in self.list_criteria()] parser = get_parser(repo, config=self.config) repo = self.get(parser.uid) metrics = {"repo": parser.uid, "criteria": {}, "taxonomy": {}} # Calculate "final" answers for each criteria based on votes and threshold counts = {} for name, votes in repo.get_criteria().items(): # Skip criteria if not important if name not in criteria_uids: continue if name not in counts: counts[name] = {"yes": 0, "no": 0, "total": 0} for username, response in votes.items(): counts[name][response] += 1 counts[name]["total"] += 1 # Calculate final answers! for name, summary in counts.items(): if summary["yes"] / summary["total"] >= cthresh: metrics["criteria"][name] = "yes" else: metrics["criteria"][name] = "no" counts = {} for username, categories in repo.get_taxonomy().items(): for category in categories: if category not in counts: counts[category] = 0 counts[category] += 1 # Include those above the requested threshold for name, count in counts.items(): if count >= tthresh: metrics["taxonomy"][name] = count return metrics
[docs] def summary(self, repo=None): """Summarize metrics for the entire database if uid is not defined, or one specific repository. """ if repo is None: repos = self.list() metrics = {"repos": len(repos)} else: parser = get_parser(repo, config=self.config) repos = [[parser.uid]] metrics = {"repo": parser.uid} # Add taxonomy and criteria items metrics["taxonomy-count"] = len(self.list_taxonomy()) metrics["criteria-count"] = len(self.list_criteria()) metrics["users"] = {} metrics["taxonomy"] = {} metrics["criteria"] = {} # Count annotations for for repo in repos: parser = get_parser(repo[0], config=self.config) repo = self.get(parser.uid) if not repo.criteria and not repo.taxonomy: continue # Add repository to summary metrics metrics["taxonomy"][repo.uid] = {} metrics["criteria"][repo.uid] = {} # Derive all users that have annotated taxonomy/criteria users = set() for name, votes in repo.get_criteria().items(): [users.add(user) for user in votes.keys()] if name not in metrics["criteria"][repo.uid]: metrics["criteria"][repo.uid] = {"yes": 0, "no": 0} for vote in votes.values(): metrics["criteria"][repo.uid][vote] += 1 # Update criteria annotations for user in users: if user not in metrics["users"]: metrics["users"][user] = { "criteria-annotations": 0, "taxonomy-annotations": 0, } metrics["users"][user]["criteria-annotations"] += 1 # Derive all users that have annotated taxonomy/criteria users = set() for username, categories in repo.get_taxonomy().items(): users.add(username) for category in categories: if category not in metrics["taxonomy"][repo.uid]: metrics["taxonomy"][repo.uid][category] = 0 metrics["taxonomy"][repo.uid][category] += 1 # Don't add empty entries if not repo.taxonomy and repo.uid in metrics["taxonomy"]: del metrics["taxonomy"][repo.uid] if not repo.criteria and repo.uid in metrics["criteria"]: del metrics["criteria"][repo.uid] # Add unique users metrics["users-count"] = len(metrics["users"]) return metrics
# Annotation
[docs] def annotate(self, username, atype, unseen_only=True, repo=None, save=False): """Annotate the encyclopedia, either for criteria or taxonomy. A username is required for the namespace. Arguments: - username (str) : the user's GitHub username - atype (str) : the annotation type - unseen_only (bool): annotate only items not seen by username - repo (str) : annotate a particular software repository """ # git config user.name if not username: username = get_github_username() if atype == "criteria": return self.annotate_criteria(username, unseen_only, repo, save) elif atype == "taxonomy": return self.annotate_taxonomy(username, unseen_only, repo, save) bot.error(f"Unknown annotation type, {atype}.")
def _import_annotation(self, input_file, username, stop_line="## Criteria"): """A general helper (private) function to import an annotation, meaning we parse a repository and return additional lines for parsing. """ if not username or not input_file: raise RuntimeError( "A username and input file are required to import annotation criteria." ) if not os.path.exists(input_file): raise FileNotFoundError(input_file) lines = read_file(input_file) line = lines.pop(0) # Find the repository name while stop_line not in line: match = re.search(repository_regex, line) if match: break line = lines.pop(0) # Retrieve the match if not match: raise RuntimeError(f"repository pattern not found in {input_file}") reponame = match.group() parser = get_parser(reponame, config=self.config) repo = self.get(parser.uid) return repo, lines
[docs] def import_criteria_annotation(self, input_file, username): """Given a text file that has a bullet list of (some checked) criteria as might be generated in a GitHub issue, read in the file and the username to do an annotation. If a user has already done an annotation, his or her record is updated. """ repo, lines = self._import_annotation(input_file, username) # Now iterate through checklist, update for line in lines: uid = line.split("criteria-")[-1].strip() if "[x]" in line: repo.update_criteria(uid, username, "yes") print(f"Updating {repo.uid}: {uid}->yes") elif re.search("\[]|\[ \]", line): repo.update_criteria(uid, username, "no") print(f"Updating {repo.uid}: {uid}->no") # Save the criteria self.save_criteria(repo)
[docs] def import_taxonomy_annotation(self, input_file, username): """Given a text file that has a bullet list of (some checked) criteria as might be generated in a GitHub issue, read in the file and the username to do an annotation. If a user has already done an annotation, his or her record is updated. """ repo, lines = self._import_annotation( input_file, username, stop_line="## Taxonomy" ) # Now iterate through checklist, update uids = [] for line in lines: if "RSE-taxonomy" not in line or "[x]" not in line: continue uid = line.split("]")[-1].strip() if uid.startswith("RSE-taxonomy") and uid not in uids: print(f"{repo.uid} adding {uid}") uids.append(uid) self.save_taxonomy(repo, username, uids)
[docs] def yield_criteria_annotation_repos(self, username, unseen_only=True, repo=None): """Given a username, repository, and preference for seen / unseen, yield a repository to annotate. """ if repo is None: repos = self.list() else: parser = get_parser(repo, config=self.config) repos = [[parser.uid]] unseen_only = False # yield combinations that don't exist yet, repo first to save changes for name in repos: repo = self.get(name[0]) for item in self.list_criteria(): if unseen_only and not repo.has_criteria_annotation( item["uid"], username ): yield repo, item elif not unseen_only: yield repo, item
[docs] def yield_taxonomy_annotation_repos(self, username, unseen_only=True, repo=None): """Given a username, repository, and preference for seen / unseen, yield a repository to annotate. """ if repo is None: repos = self.list() else: parser = get_parser(repo, config=self.config) repos = [[parser.uid]] unseen_only = False # yield combinations that don't exist yet, repo first to save changes for name in repos: repo = self.get(name[0]) if unseen_only and not repo.has_taxonomy_annotation(username): yield repo elif not unseen_only: yield repo
[docs] def annotate_criteria(self, username, unseen_only=True, repo=None, save=False): """Annotate criteria, meaning we iterate over repos and criteria that match the user request, namely to annotate unseen only, or just a particular repository. If the repository is specified, unseen_only is assumed False. """ annotations = {} last = None for repo, criteria in self.yield_criteria_annotation_repos( username, unseen_only, repo ): # Only print repository if not seen yet if not last or repo.uid != last.uid: # If we have a last repo, we need to save progress if last is not None and save is True: self.save_criteria(last) if last is not None: annotations[last.uid] = last.criteria message.info(f"\n{repo.url} [{repo.description}]:") last = repo response = choice_prompt( criteria["name"], choices=["y", "Y", "n", "N", "s", "S", "skip"], choice_prefix="y/n or s to skip", ) # The user can skip an answer if wanted if response in ["s", "S", "skip"]: continue repo.update_criteria(criteria["uid"], username, response) # Save the last repository if last is not None and save is True: self.save_criteria(last) if last is not None: annotations[last.uid] = last.criteria return annotations
[docs] def annotate_taxonomy(self, username, unseen_only=True, repo=None, save=False): """Annotate taxonomy, meaning we iterate over repos and criteria that match the user request, namely to annotate unseen only, or just a particular repository. If the repository is specified, unseen_only is assumed False. """ annotations = {} # Retrieve the full taxonomy items = self.list_taxonomy() choices = [str(i) for i, _ in enumerate(items)] + ["s", "S", "skip"] prefix = "0:%s or s to skip" % (len(items) - 1) for repo in self.yield_taxonomy_annotation_repos(username, unseen_only, repo): message.info(f"\n{repo.url} [{repo.description}]:") print("How would you categorize this software? [enter one or more numbers]") for i, t in enumerate(items): example = t.get("example", "") name = t.get("name", "") if name and example: print(f"[{i}] {name} ({example})") elif name: print(f"[{i}] {name}") response = choice_prompt( "Please enter one or more numbers, separated by spaces", choices=choices, choice_prefix=prefix, multiple=True, ) if response in ["s", "S", "skip"]: continue # Get the unique ids uids = [ items[int(x)]["uid"] for x in set(response.split(" ")) if int(x) < len(items) ] # Filesystem database we write filename to repository folder self.save_taxonomy(repo, username, uids) annotations[repo.uid] = repo.taxonomy return annotations