Source code for lesana.collection

import collections
import io
import logging
import os
import shutil
import uuid

import jinja2
import ruamel.yaml
import xapian
from pkg_resources import resource_filename, resource_string

from . import templating, types

logger = logging.getLogger(__name__)

try:
    import git

    git_available = True
except ImportError:
    git_available = False


[docs]class Entry(object): def __init__(self, collection, data={}, fname=None): self.collection = collection self.data = data or self.empty_data() self.fname = fname self.eid = self.data.get('eid', None) if not self.eid: if self.fname: self.eid, ext = os.path.splitext(os.path.basename(self.fname)) else: self.eid = uuid.uuid4().hex if not self.fname: self.fname = self.eid + '.yaml' def __str__(self): label = self.collection.settings.get('entry_label', None) if label: t = jinja2.Template(label) return t.render(**self.get_data()) else: return self.eid
[docs] def get_data(self): d = self.data.copy() d['eid'] = self.eid d['fname'] = self.fname d['short_id'] = self.short_id return d
[docs] def empty_data(self): data = self.collection.yaml.load("{}") for name, field in self.collection.fields.items(): if field.field.get('default', None): data[name] = field.field['default'] else: data[name] = field.empty() if field.field.get('help', None) is not None: comment = "{name} ({type}): {help}\n".format(**field.field) try: data.yaml_set_comment_before_after_key( key=name, before=comment, indent=0 ) except AttributeError: logger.warning( "Not adding comments because they are not" "supported by the yaml loader." ) valid_values = field.field.get('values', []) if valid_values: comment = "{name} ({type}): {valid_values}".format( valid_values="|".join(valid_values), **field.field ) try: data.yaml_set_comment_before_after_key( key=name, before=comment, indent=0 ) except AttributeError: logger.warning( "Not adding comments because they are not" "supported by the yaml loader." ) return data
@property def yaml_data(self): to_dump = self.data.copy() # Decimal fields can't be represented by # ruamel.yaml.RoundTripDumper, but transforming them to strings # should be enough for all cases that we need. for field in self.collection.settings['fields']: if field['type'] == 'decimal': v = to_dump.get(field['name'], '') if v is not None: to_dump[field['name']] = str(v) s_io = io.StringIO() self.collection.yaml.dump(to_dump, s_io) return s_io.getvalue() @property def idterm(self): return "Q" + self.eid @property def short_id(self): return self.eid[:8]
[docs] def validate(self): errors = [] valid = True for name, field in self.collection.fields.items(): value = self.data.get(name, None) try: self.data[name] = field.load(value) except types.LesanaValueError as e: valid = False errors.append( { 'field': name, 'error': e, } ) return valid, errors
[docs] def render(self, template, searchpath='.'): jtemplate = self.collection.get_template(template, searchpath) try: return jtemplate.render(entry=self) except jinja2.exceptions.TemplateSyntaxError as e: raise TemplatingError('Template Syntax Error: ' + str(e))
[docs] def auto(self): """ Update all fields of this entry, as required by the field settings. This is called by the reference client before an edit, so that the user can make further changes. Note that the stored file is not changed: if you need it you need to save the entry yourself. """ for name, field in self.collection.fields.items(): self.data[name] = field.auto(self.data.get(name, None))
[docs]class Collection(object): """ """ PARSER_FLAGS = ( xapian.QueryParser.FLAG_BOOLEAN | xapian.QueryParser.FLAG_PHRASE # noqa: W503 | xapian.QueryParser.FLAG_LOVEHATE # noqa: W503 | xapian.QueryParser.FLAG_WILDCARD # noqa: W503 ) def __init__(self, directory=None, itemdir='items'): self.basedir = directory or os.getcwd() self.itemdir = os.path.join(self.basedir, itemdir) self.yaml = ruamel.yaml.YAML() self.yaml.preserve_quotes = True self.yaml.typ = 'rt' try: with open(os.path.join(self.basedir, 'settings.yaml')) as fp: self.settings = self.yaml.load(fp) except FileNotFoundError: self.settings = self.yaml.load("{}") self.fields = self._load_field_types() os.makedirs(os.path.join(self.basedir, '.lesana'), exist_ok=True) if 'lang' in self.settings: try: self.stemmer = xapian.Stem(self.settings['lang']) except xapian.InvalidArgumentError: logger.warning( "Invalid language %s, in settings.yaml: using english.", self.settings['lang'], ) self.stemmer = xapian.Stem('english') else: self.stemmer = xapian.Stem('english') self._enquire = None self.entry_class = Entry def _get_subsubclasses(self, cls): for c in cls.__subclasses__(): yield c yield from self._get_subsubclasses(c) def _load_field_types(self): type_loaders = {} for t in self._get_subsubclasses(types.LesanaType): type_loaders[t.name] = t fields = {} for i, field in enumerate(self.settings.get('fields', [])): try: fields[field['name']] = type_loaders[field['type']]( field, type_loaders, # value slot 0 is used to store the filename, and we # reserve a few more slots just in case they are # needed by lesana or some derivative value_index=i + 16, ) except KeyError: # unknown fields are treated as if they were # (unvalidated) generic YAML to support working with # collections based on lesana derivatives logger.warning( "Unknown field type %s in field %s", field['type'], field['name'], ) fields[field['name']] = types.LesanaYAML(field, type_loaders) return fields def _index_file(self, fname, cache): with open(os.path.join(self.itemdir, fname)) as fp: data = self.yaml.load(fp) entry = self.entry_class(self, data, fname) valid, errors = entry.validate() if not valid: logger.warning( "Not indexing {fname}: invalid data".format(fname=fname) ) return False, errors doc = xapian.Document() self.indexer.set_document(doc) for field, loader in self.fields.items(): loader.index(doc, self.indexer, entry.data.get(field)) doc.set_data(entry.yaml_data) doc.add_boolean_term(entry.idterm) doc.add_value(0, entry.fname.encode('utf-8')) cache.replace_document(entry.idterm, doc) return True, [] @property def indexed_fields(self): fields = [] for field in self.settings['fields']: if field.get('index', '') in ['free', 'field']: prefix = field.get('prefix', 'X' + field['name'].upper()) fields.append( { 'prefix': prefix, 'name': field['name'], 'free_search': field['index'] == 'free', 'multi': field['type'] in ['list'], } ) return fields
[docs] def update_cache(self, fnames=None, reset=False): """ Update the xapian db with the data in files. ``fnames`` is a list of *basenames* of files in ``self.itemdir``. If no files have been passed, add everything. if ``reset`` the existing xapian db is deleted before indexing Return the number of files that have been added to the cache. """ if reset: shutil.rmtree(os.path.join(self.basedir, '.lesana')) os.makedirs(os.path.join(self.basedir, '.lesana'), exist_ok=True) cache = xapian.WritableDatabase( os.path.join(self.basedir, '.lesana/xapian'), xapian.DB_CREATE_OR_OPEN, ) self.indexer = xapian.TermGenerator() self.indexer.set_stemmer(self.stemmer) if not fnames: try: fnames = os.listdir(self.itemdir) except FileNotFoundError: logger.warning( "No such file or directory: {}, not updating cache".format( self.itemdir ) ) return 0 updated = 0 for fname in fnames: try: valid, errors = self._index_file(fname, cache) except IOError as e: logger.warning( "Could not load file {}: {}".format(fname, str(e)) ) else: if valid: updated += 1 else: logger.warning( "File {fname} could not be indexed: {errors}".format( fname=fname, errors=errors ) ) return updated
[docs] def save_entries(self, entries=[]): for e in entries: complete_name = os.path.join(self.itemdir, e.fname) with open(complete_name, 'w') as fp: fp.write(e.yaml_data)
[docs] def git_add_files(self, files=[]): if not git_available: logger.warning( "python3-git not available, could not initalise " + "the git repository." # noqa: W503 ) return False if not self.settings.get('git', False): logger.info("This collection is configured not to use git") return False try: repo = git.Repo(self.basedir, search_parent_directories=True) except git.exc.InvalidGitRepositoryError: logger.warning( "Could not find a git repository in {}".format(self.basedir) ) return False repo.index.add(files) return True
def _get_cache(self): try: cache = xapian.Database( os.path.join(self.basedir, '.lesana/xapian'), ) except xapian.DatabaseOpeningError: logger.info("No database found, indexing entries.") self.update_cache() cache = xapian.Database( os.path.join(self.basedir, '.lesana/xapian'), ) return cache
[docs] def render_query_template(self, query): """ Render a query template, filling it with search_aliases. """ t = jinja2.Template(query) return t.render(**self.settings.get('search_aliases', {}))
[docs] def get_search_results(self, offset=0, pagesize=12): if not self._enquire: return for match in self._enquire.get_mset(offset, pagesize): yield self._match_to_entry(match)
[docs] def get_all_search_results(self): if not self._enquire: return offset = 0 pagesize = 100 while True: mset = self._enquire.get_mset(offset, pagesize) if mset.size() == 0: break for match in mset: yield self._match_to_entry(match) offset += pagesize
[docs] def get_all_documents(self): """ Yield all documents in the collection. Note that the results can't be sorted, even if the collection has a default_sort; if you need sorted values you need to use a regular search with a query of '*' """ cache = self._get_cache() postlist = cache.postlist("") for post in postlist: doc = cache.get_document(post.docid) yield self._doc_to_entry(doc)
[docs] def get_field_values(self, field, querystring='*'): field = self.fields[field] if field.field.get('sortable', False): self.start_search(querystring) spy = xapian.ValueCountMatchSpy(field.value_index) self._enquire.add_matchspy(spy) cache = self._get_cache() self._enquire.get_mset(0, cache.get_doccount()) for v in spy.values(): yield { 'value': v.term, 'frequency': v.termfreq, } else: logger.info( "Trying to get the list of values for a non sortable field." ) logger.info( "This is going to be pretty inefficient." ) if field.field['type'] == 'list': values = [] for e in self.get_all_documents(): values.extend(e.data[field.field['name']]) else: values = ( e.data[field.field['name']] for e in self.get_all_documents() ) logger.info("Values are %s", str(values)) counter = collections.Counter(values) for v in counter.most_common(): yield { 'value': v[0], 'frequency': v[1], }
def _match_to_entry(self, match): return self._doc_to_entry(match.document) def _doc_to_entry(self, doc): fname = doc.get_value(0).decode('utf-8') data = self.yaml.load(doc.get_data()) entry = self.entry_class(self, data=data, fname=fname,) return entry
[docs] def entry_from_eid(self, eid): cache = self._get_cache() postlist = cache.postlist('Q' + eid) for pitem in postlist: return self._doc_to_entry(cache.get_document(pitem.docid)) return None
[docs] def entries_from_short_eid(self, seid): # It would be better to search for partial UIDs inside xapian, # but I still can't find a way to do it, so this is a workable # workaround on repos where the eids are stored in the # filenames. potential_eids = [ os.path.splitext(f)[0] for f in os.listdir(self.itemdir) if f.startswith(seid) ] return [self.entry_from_eid(u) for u in potential_eids if u]
[docs] def remove_entries(self, eids): cache = xapian.WritableDatabase( os.path.join(self.basedir, '.lesana/xapian'), xapian.DB_CREATE_OR_OPEN, ) for eid in eids: for entry in self.entries_from_short_eid(eid): if entry is not None: cache.delete_document(entry.idterm) self.remove_file(entry.fname) else: logger.warning("Not removing {}: no such entry".format( eid )) cache.commit() cache.close()
[docs] def remove_file(self, fname): f_path = os.path.join(self.itemdir, fname) if git_available and self.settings.get('git', False): try: repo = git.Repo(self.basedir, search_parent_directories=True) except git.exc.InvalidGitRepositoryError: logger.warning( "Could not find a git repository in {}".format( self.basedir ) ) return False repo.index.remove([f_path]) os.remove(f_path)
[docs] def update_field(self, query, field, value): self.start_search(query) changed = [] for e in self.get_all_search_results(): e.data[field] = value changed.append(e) self.save_entries(changed) self.git_add_files( [os.path.join(self.itemdir, e.fname) for e in changed] ) self.update_cache([e.fname for e in changed])
[docs] def get_template(self, template_fname, searchpath='.'): env = templating.Environment( loader=jinja2.FileSystemLoader( searchpath=searchpath, followlinks=True, ), # TODO: add autoescaping settings ) try: template = env.get_template(template_fname) except jinja2.exceptions.TemplateNotFound as e: raise TemplatingError('Could not find template ' + str(e)) return template
[docs] def entry_from_rendered_template(self, template, data): try: template = self.get_template(template) rendered = template.render(**data) except jinja2.exceptions.TemplateSyntaxError as e: raise TemplatingError(e) try: data = self.yaml.load(rendered) except ruamel.yaml.YAMLError as e: logger.warning( "The following data failed to load as YAML: \n{}".format( rendered ) ) raise TemplatingError(e) entry = self.entry_class(self, data=data) self.save_entries([entry]) return entry
[docs] @classmethod def init( cls, directory=None, git_enabled=True, edit_file=None, settings={} ): """ Initialize a lesana repository directory defaults to . if git_enabled is True, git support is enabled and if possible a git repository is initalized. edit_file is a syncronous function that runs on a filename (possibly opening the file in an editor) and should manage its own errors. """ c_dir = os.path.abspath(directory or '.') os.makedirs(c_dir, exist_ok=True) if git_enabled: # Try to initalize a git repo if git_available: repo = git.Repo.init(c_dir, bare=False) else: logger.warning( "python3-git not available, could not initalise " + "the git repository." # noqa: W503 ) repo = None # Add .lesana directory to .gitignore and add it to the # staging lesana_ignored = False try: with open(os.path.join(c_dir, '.gitignore'), 'r') as fp: for line in fp: if '.lesana' in line: lesana_ignored = True continue except FileNotFoundError: pass if not lesana_ignored: with open(os.path.join(c_dir, '.gitignore'), 'a') as fp: fp.write('#Added by lesana init\n.lesana') if repo: repo.index.add(['.gitignore']) # Add post-checkout and post-merge hook hook_source = resource_filename('lesana', 'data/post-checkout') hooks_dir = os.path.join( c_dir, '.git', 'hooks', ) checkout_hook = os.path.join(hooks_dir, 'post-checkout') merge_hook = os.path.join(hooks_dir, 'post-merge') shutil.copy(hook_source, checkout_hook) if not os.path.islink(merge_hook): os.symlink(checkout_hook, merge_hook) # If it doesn't exist, create a skeleton of settings.yaml file # then open settings.yaml for editing filepath = os.path.join(c_dir, 'settings.yaml') if not os.path.exists(filepath): skel = resource_string('lesana', 'data/settings.yaml').decode( 'utf-8' ) yaml = ruamel.yaml.YAML() skel_dict = yaml.load(skel) skel_dict['git'] = git_enabled skel_dict.update(settings) with open(filepath, 'w') as fp: yaml.dump(skel_dict, stream=fp) if edit_file: edit_file(filepath) if git_enabled and repo: repo.index.add(['settings.yaml']) coll = cls(c_dir) os.makedirs(os.path.join(coll.basedir, coll.itemdir), exist_ok=True) return coll
[docs]class TemplatingError(Exception): """ Raised when there are errors rendering a jinja template """