"""
Copyright (C) 2020 Vanessa Sochat.
This Source Code Form is subject to the terms of the
Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
"""
from rse.utils.file import read_file
from datetime import datetime
import os
import json
import tempfile
import subprocess
[docs]class Capturing:
"""capture output from stdout and stderr into capture object.
This is based off of github.com/vsoch/gridtest but modified
to write files. The stderr and stdout are set to temporary files at
the init of the capture, and then they are closed when we exit. This
means expected usage looks like:
with Capturing() as capture:
process = subprocess.Popen(...)
And then the output and error are retrieved from reading the files:
and exposed as properties to the client:
capture.out
capture.err
And cleanup means deleting these files, if they exist.
"""
def __enter__(self):
self.set_stdout()
self.set_stderr()
self.output = []
self.error = []
return self
[docs] def set_stdout(self):
self.stdout = open(tempfile.mkstemp()[1], "w")
[docs] def set_stderr(self):
self.stderr = open(tempfile.mkstemp()[1], "w")
def __exit__(self, *args):
self.stderr.close()
self.stdout.close()
@property
def out(self):
"""Return output stream. Returns empty string if empty or doesn't exist.
Returns (str) : output stream written to file
"""
if os.path.exists(self.stdout.name):
return read_file(self.stdout.name)
return ""
@property
def err(self):
"""Return error stream. Returns empty string if empty or doesn't exist.
Returns (str) : error stream written to file
"""
if os.path.exists(self.stderr.name):
return read_file(self.stderr.name)
return ""
[docs] def cleanup(self):
for filename in [self.stdout.name, self.stderr.name]:
if os.path.exists(filename):
os.remove(filename)
[docs]class ParserBase:
"""A parser base exists to extract and format repository metadata."""
name = "base"
def __init__(self, uid=None):
"""set a unique id that includes parser name (type) and unique identifier)"""
self.uid = None
if uid is not None:
self.set_uid(uid)
if not hasattr(self, "data"):
self.data = {}
[docs] def set_uid(self, uid):
"""Given a unique resource identifier, set it for the parser"""
uid = self._set_uid(uid)
self.uid = "%s/%s" % (self.name, uid)
def _set_uid(self, uid):
"""Given a uri from the user, parse the consistent identifier (e.g.,
in the case of GitHub a repository username and name)
"""
raise NotImplementedError
[docs] def load(self, data):
"""If a repository has already been instantiated, we might want to load
data into a parser to interact with it
"""
if isinstance(data, str):
data = json.loads(data)
self.data = data
def _export_common(self):
"""export common repo variables such as timestamp when it was updated.
This might include envars at some point, but we'd need to be careful.
"""
return {"timestamp": str(datetime.now())}
[docs] def get_url(self, data):
"""a common function for a parser to return the html url for the
upper level of metadata
"""
raise NotImplementedError
[docs] def get_description(self, data):
"""a common function for a parser to return a description."""
raise NotImplementedError
[docs] def export(self):
"""return data as json. This is intended to save to the software database.
Any important parser specific metadata should be added to self.data
"""
# Get common context (e.g., pwd)
common = self._export_common()
common.update(self.data)
return common
[docs] def capture(self, cmd):
"""capture is a helper function to capture a shell command. We
use Capturing and then save attributes like the pid, output, error
to it, and return to the calling function. For example:
capture = self.capture_command(cmd)
self.pid = capture.pid
self.returncode = capture.returncode
self.out = capture.output
self.err = capture.error
"""
# Capturing provides temporary output and error files
with Capturing() as capture:
process = subprocess.Popen(
cmd,
stdout=capture.stdout,
stderr=capture.stderr,
universal_newlines=True,
)
capture.pid = process.pid
returncode = process.poll()
# Iterate through the output
while returncode is None:
returncode = process.poll()
# Get the remainder of lines, add return code
capture.output += [x for x in self.decode(capture.out) if x]
capture.error += [x for x in self.decode(capture.err) if x]
# Cleanup capture files and save final return code
capture.cleanup()
capture.returncode = returncode
return capture
[docs] def get_setting(self, key, default=None):
"""Get a setting, meaning that we first check the environment, then
the config file, and then (if provided) a default.
"""
# First preference to environment
envar = ("RSE_%s_%s" % (self.name, key)).upper()
envar = os.environ.get(envar)
if envar is not None:
return envar
# Next preference to config setting
parser = "parser.%s" % self.name
# Parsers instantiated separate from database won't have config
if not hasattr(self, "config"):
return default
if parser not in self.config.config:
return default
if key in self.config.config[parser]:
return self.config.get(parser, key)
return default
[docs] def summary(self):
if self.uid:
return "[%s][%s]" % (self.name, self.uid)
return "[%s]" % self.name