Source code for tokio.connectors.common

"""
Common methods and classes used by connectors
"""

import os
import sys
import gzip
import json
import errno
import tarfile
import warnings
import mimetypes
import subprocess
from tokio.common import isstr

[docs]class CacheableDict(dict): """Generic class to support connectors that are dicts that can be cached as JSON When deriving from this class, the child object will have to define its own ``load_native()`` method to be invoked when ``input_file`` is not JSON. """
[docs] def __init__(self, input_file=None): """Either initialize as empty or load from cache Args: input_file (str): Path to either a JSON file representing the dict or a native file that will be parsed into a JSON-compatible format """ super(CacheableDict, self).__init__() self.input_file = input_file self.load()
[docs] def load(self, input_file=None): """Wrapper around the filetype-specific loader. Infer the type of input being given, dispatch the correct loading function, and populate keys/values. Args: input_file (str or None): The input file to load. If not specified, uses whatever self.input_file is """ if input_file: self.input_file = input_file if self.input_file is None: return if not os.path.exists(self.input_file): raise OSError("Input file %s does not exist" % self.input_file) try: self.load_json() except ValueError: self.load_native()
[docs] def load_native(self, input_file=None): """Parse an uncached, native object This is a stub that should be overloaded on derived classes. Args: input_file (str or None): The input file to load. If not specified, uses whatever self.input_file is """ pass
[docs] def load_json(self, input_file=None): """Loads input from serialized JSON Load the serialized format of this object, encoded as a json dictionary. This is the converse of the save_cache() method. Args: input_file (str or None): The input file to load. If not specified, uses whatever self.input_file is """ if input_file: self.input_file = input_file _, encoding = mimetypes.guess_type(self.input_file) if encoding == 'gzip': open_func = gzip.open else: open_func = open for key, val in json.load(open_func(self.input_file, 'rt')).items(): self.__setitem__(key, val)
[docs] def save_cache(self, output_file=None, **kwargs): """Serializes self into a JSON output. Save the dictionary in a JSON file. This output can be read back in using load_json(). Args: output_file (str or None): Path to file to which json should be written. If None, write to stdout. Default is None. kwargs (dict): Additional arguments to be passed to json.dumps() """ if output_file is None: self._save_cache(sys.stdout, **kwargs) else: with open(output_file, 'w') as output: self._save_cache(output, **kwargs)
[docs] def _save_cache(self, output, **kwargs): """Generates serialized representation of self Args: output: Object with a ``.write()`` method into which the serialized form of self will be passed """ output.write(json.dumps(self, **kwargs))
[docs]class SubprocessOutputDict(dict): """Generic class to support connectors that parse the output of a subprocess When deriving from this class, the child object will have to 1. Define subprocess_cmd after initializing this parent object 2. Define self.__repr__ (if necessary) 3. Define its own self.load_str 4. Define any introspective analysis methods """ def __init__(self, cache_file=None, from_string=None, silent_errors=False): super(SubprocessOutputDict, self).__init__(self) self.cache_file = cache_file self.silent_errors = silent_errors self.from_string = from_string self.subprocess_cmd = []
[docs] def load(self, cache_file=None): """Load based on initialization state of object Args: cache_file (str or None): The cached input file to load. If not specified, uses whatever self.cache_file is """ if cache_file: self.cache_file = cache_file if self.from_string is not None: self.load_str(self.from_string) elif self.cache_file: self.load_cache() elif self.subprocess_cmd: self._load_subprocess()
[docs] def _load_subprocess(self, *args): """Run a subprocess and pass its stdout to a self-initializing parser """ cmd = self.subprocess_cmd if args: cmd += args try: if self.silent_errors: with open(os.devnull, 'w') as devnull: output_str = subprocess.check_output(cmd, stderr=devnull) else: output_str = subprocess.check_output(cmd) except subprocess.CalledProcessError as error: warnings.warn("%s returned nonzero exit code (%d)" % (cmd, error.returncode)) output_str = error.output except OSError as error: if error.errno == errno.ENOENT: raise type(error)(error.errno, "%s command not found" % self.subprocess_cmd[0]) raise if isstr(output_str): # Python 2 - subprocess.check_output returns a string self.load_str(output_str) else: # Python 3 - subprocess.check_output returns encoded bytes self.load_str(output_str.decode())
[docs] def load_cache(self, cache_file=None): """Load subprocess output from a cached text file Args: cache_file (str or None): The cached input file to load. If not specified, uses whatever self.cache_file is """ if cache_file: self.cache_file = cache_file _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_fp = gzip.open(self.cache_file, 'rt') else: input_fp = open(self.cache_file, 'r') self.load_str(input_fp.read()) input_fp.close()
[docs] def load_str(self, input_str): """Load subprocess output from a string Args: input_str (str): The text that came from the subprocess's stdout and should be parsed by this method. """ self['_raw'] = input_str
[docs] def save_cache(self, output_file=None): """Serialize subprocess output to a text file Args: output_file (str): Path to a file to which the output cache should be written. If None, write to stdout. """ if output_file is None: sys.stdout.write(str(self)) else: with open(output_file, 'w') as output_fp: output_fp.write(str(self))
[docs]class SubprocessOutputList(list): """Generic class to support connectors that parse the output of a subprocess When deriving from this class, the child object will have to 1. Define subprocess_cmd after initializing this parent object 2. Define self.__repr__ (if necessary) 3. Define its own self.load_str 4. Define any introspective analysis methods """ def __init__(self, cache_file=None, from_string=None, silent_errors=False): super(SubprocessOutputList, self).__init__(self) self.cache_file = cache_file self.silent_errors = silent_errors self.from_string = from_string self.subprocess_cmd = []
[docs] def load(self, cache_file=None): """Load based on initialization state of object Args: cache_file (str or None): The cached input file to load. If not specified, uses whatever self.cache_file is """ if cache_file: self.cache_file = cache_file if self.from_string is not None: self.load_str(self.from_string) elif self.cache_file: self.load_cache() elif self.subprocess_cmd: self._load_subprocess()
[docs] def _load_subprocess(self, *args): """Run a subprocess and pass its stdout to a self-initializing parser """ cmd = self.subprocess_cmd if args: cmd += args try: if self.silent_errors: with open(os.devnull, 'w') as devnull: output_str = subprocess.check_output(cmd, stderr=devnull) else: output_str = subprocess.check_output(cmd) except subprocess.CalledProcessError as error: warnings.warn("%s returned nonzero exit code (%d)" % (cmd, error.returncode)) output_str = error.output except OSError as error: if error.errno == errno.ENOENT: raise type(error)(error.errno, "%s command not found" % self.subprocess_cmd[0]) raise if isstr(output_str): # Python 2 - subprocess.check_output returns a string self.load_str(output_str) else: # Python 3 - subprocess.check_output returns encoded bytes self.load_str(output_str.decode())
[docs] def load_cache(self, cache_file=None): """Load subprocess output from a cached text file Args: cache_file (str or None): The cached input file to load. If not specified, uses whatever self.cache_file is """ if cache_file: self.cache_file = cache_file _, encoding = mimetypes.guess_type(self.cache_file) if encoding == 'gzip': input_fp = gzip.open(self.cache_file, 'rt') else: input_fp = open(self.cache_file, 'r') self.load_str(input_fp.read()) input_fp.close()
[docs] def load_str(self, input_str): """Load subprocess output from a string Args: input_str (str): The text that came from the subprocess's stdout and should be parsed by this method. """ self['_raw'] = input_str
[docs] def save_cache(self, output_file=None): """Serialize subprocess output to a text file Args: output_file (str): Path to a file to which the output cache should be written. If None, write to stdout. """ if output_file is None: sys.stdout.write(str(self)) else: with open(output_file, 'w') as output_fp: output_fp.write(str(self))
[docs]def walk_file_collection(input_source): """Walk all member files of an input source. Iterator that visits every member of an input source (either directory or tarfile) and yields its file name, last modify time, and a file handle to its contents. Args: input_source (str): A path to either a directory containing files or a tarfile containing files. Yields: tuple: Attributes for a member of `input_source` with the following data: * str: fully qualified path corresponding to its name * float: last modification time expressed as seconds since epoch * file: handle to access the member's contents """ if os.path.isdir(input_source): for root, _, files in os.walk(input_source): for file_name in files: fq_file_name = os.path.join(root, file_name) yield (fq_file_name, os.path.getmtime(fq_file_name), open(fq_file_name, 'r')) else: _, encoding = mimetypes.guess_type(input_source) if encoding == 'gzip': file_obj = tarfile.open(input_source, 'r:gz') else: file_obj = tarfile.open(input_source, 'r') for member in file_obj.getmembers(): if member.isfile(): yield (member.name, member.mtime, file_obj.extractfile(member))