Source code for tiny_thingy

from thingy import DatabaseThingy
from tinydb import TinyDB, Query
from tinydb.database import Document

q = Query()


[docs]class Thingy(DatabaseThingy): """Represents a JSON object in a table""" def __init__(self, *args, **kwargs): for arg in args: if isinstance(arg, Document): self.doc_id = arg.doc_id super().__init__(*args, **kwargs) @classmethod def _get_table(cls, database, table_name): return database.table(table_name) @classmethod def _get_table_name(cls, table): return table.name
[docs] @classmethod def use_database(cls, filename): """Initialize a database for the class using the specified filename""" cls._database = TinyDB(filename)
[docs] @classmethod def create(cls, obj): """Directly insert an object in the table""" return cls.table.insert(obj)
[docs] @classmethod def find(cls, query=None, doc_ids=None): """Return a list of objects matching the query""" if doc_ids is not None: return cls._find_by_id(query, doc_ids) if query is None: documents = cls.table.all() else: documents = cls.table.search(query) return [cls(doc) for doc in documents]
@classmethod def _find_by_id(cls, query, doc_ids): def _filter(doc): return doc.doc_id in doc_ids documents = cls.find(_filter) if query is not None: documents = list(filter(query, documents)) return documents
[docs] @classmethod def find_one(cls, query=None, doc_id=None): """Return a single object matching the query""" if doc_id is not None: result = cls.find(query, doc_ids=[doc_id]) else: result = cls.find(query) try: return next(iter(result)) except StopIteration: return None
[docs] @classmethod def count(cls): """Count the number of objects in the table""" return len(cls.table)
[docs] @classmethod def remove(cls, query=None, doc_ids=None): """Remove the objects matching the query""" return cls.table.remove(query, doc_ids=doc_ids)
[docs] @classmethod def inplace_update(cls, fields, query=None, doc_ids=None): """Directly update objects matching the query with given fields""" return cls.table.update(fields, query, doc_ids=doc_ids)
[docs] def save(self): """Save the object in the table""" data = self.__dict__.copy() doc_id = data.pop("doc_id", None) if doc_id is not None: self.get_table().update(data, doc_ids=[doc_id]) else: self.doc_id = self.get_table().insert(data) return self
[docs] def delete(self): """Delete the object from the table""" self.remove(doc_ids=[self.doc_id])
use_database = Thingy.use_database