Build a library

In this section we will build a NOSQL API compliant library, using the nosqlapi library. We will first build the core objects that will allow us to connect and operate with our database.

Note

The following example is designed for a NOSQL database of the Document type; by changing the application logic of the various methods in the classes you can build a library for another type of NOSQL database in the same way. The procedures are the same.

Warning

The purpose of this document is to explain how to use API class interfaces in the real world. Do not use the following example as a production library because it is very simplified and does not reflect all possible operations on a CouchDB server.

Let’s prepare the steps:

  • Create a package (directory with an __init__.py file) named pycouch.

  • write core.py, with core classes and functions.

  • write utils.py, with utility classes and function, like ODM objects.

Core

The core classes must provide a connection and operation interface towards the database. All CRUD operations will need to be supported. We can implement new ones, based on the type of database we support.

Create a core.py module.

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# core.py

"""Python library for document CouchDB server"""

import nosqlapi
import urllib.request
import json

Connection class

Let’s build the server connection class. Since we are connecting to a CouchDB database, we will take advantage of the http API.

We define the __init__ constructor with all the info needed to perform operations with the database and create a Session object using the connect() method.

class Connection(nosqlapi.DocConnection):
    """CouchDB connection class; connect to CouchDB server."""
    def __init__(self, host='localhost', port=5984, username=None, password=None, ssl=None, tls=None, cert=None,
                database=None, ca_cert=None, ca_bundle=None):
        super().__init__(self, host=host, port=port, username=username, password=password, ssl=ssl, tls=tls,
                        cert=cert, ca_cert=ca_cert, ca_bundle=ca_bundle)
        self.method = 'https://' if self.port == 6984 or self.port == 443 else 'http://'
        if self.username and self.password:
            auth = f'{self.username}:{self.password}@'
        self.url = f'{self.method}{auth}{self.host}:{self.port}'

Now let’s define the close and connect methods, to create the database connection.

def close(self, clean=False):
    self._connected = False
    if clean:
        self.database = None
        self.host = None
        self.url = None

def connect(self):
    session_url = self.url + f'/{self.database}'
    if urllib.request.head(session_url).status_code == 200:
        session = Session(connection=self, database=session_url)
        self._connected = True
        return session
    else:
        raise nosqlapi.ConnectError(f'I cannot connect to the server: {self.url}')

Now let’s all define methods that operate at the database level.

def create_database(self, name, shards=8, replicas=3, partitioned=False):
    data = {"w": shards, "n": replicas}
    if partitioned:
        data['partitioned'] = partitioned
    req = urllib.request.Request(self.url + f'/{name}',
                                data=json.dumps(data).encode('utf8'),
                                method='PUT')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 201:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseCreationError(f'Database creation unsuccessfully: {name}'),
                        header=response.header_items())

def has_database(self, name):
    response = urllib.request.urlopen(self.url + f'/{name}')
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseError(f'Database not found: {name}'),
                        header=response.header_items())

def delete_database(self, name):
    req = urllib.request.Request(self.url + f'/{name}', method='DELETE')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=bool(json.loads(response.read())),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseDeletionError(f'Database deletion unsuccessfully: {name}'),
                        header=response.header_items())

def databases(self):
    response = urllib.request.urlopen(self.url + '/_all_dbs')
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseError(f'Database not found: {name}'),
                        header=response.header_items())

def show_database(self, name):
    response = urllib.request.urlopen(self.url + f'/{name}')
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseError(f'Database not found: {name}'),
                        header=response.header_items())

def copy_database(self, source, target, host, user=None, password=None, create_target=True, continuous=True):
    data = {
        "_id": f"{source}to{target}",
        "source": source,
        "target": {
            "url": target,
            "auth": {
                "basic": {
                    "username": f"{user}",
                    "password": f"{password}"
                }
            }
        },
        "create_target":  create_target,
        "continuous": continuous
    }
    req = urllib.request.Request(self.url + '/_replicator', data=json.dumps(data).encode('utf8'), method='POST')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.DatabaseError(f'Database copy error: {name}'),
                        header=response.header_items())

Response class

There isn’t much to do with the Response class. We inherit directly from the nosqlapi.docdb.DocResponse class.

class Response(nosqlapi.DocResponse):
    """CouchDB response class; information about a certain transaction."""
    ...

Session class

Ok, now build the Session class. This class used for CRUD operation on the specific database. The acl property is used to retrieve the Access Control lists of the current database and therefore the read/write permissions of the current session. The property indexes is used to retrieve all the indexes created for the current database.

class Session(nosqlapi.DocSession):
    """CouchDB session class; CRUD operation on the database."""
    @property
    def acl(self):
        response = urllib.request.urlopen(self.database + f'/_security')
        if response.status_code == 200:
            return Response(data=json.loads(response.read()),
                            code=response.status_code,
                            error=None,
                            header=response.header_items())
        else:
            return Response(data=None,
                            code=response.status_code,
                            error=noslapi.SessionACLError(f'ACLs error'),
                            header=response.header_items())

    @property
    def indexes(self):
        response = urllib.request.urlopen(self.database + f'/_index')
        if response.status_code == 200:
            return Response(data=json.loads(response.read()),
                            code=response.status_code,
                            error=None,
                            header=response.header_items())
        else:
            return Response(data=None,
                            code=response.status_code,
                            error=noslapi.SessionError(f'Index error'),
                            header=response.header_items())

Now let’s define the item_count and description properties. item_count will be used to indicate a counter of each CRUD operation that will be impacted. description instead will contain the database info.

@property
def item_count(self):
    return self._item_count

@property
def description(self):
    response = urllib.request.urlopen(self.database + f'/_index')
    if response.status_code == 200:
        return self._description

    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionError(f'Get description failed'),
                        header=response.header_items())

Now let’s all define CRUD (Create, Read, Update, Delete) methods. Create word is associated to insert and insert_many methods; Read to get method, Update to update and update_many methods and Delete to delete method. Each CRUD method is created to directly manage the data in the database to which the connection was created via the Connection object.

def get(self, document='_all_docs', rev=None, attachment=None, partition=None, local=False, key=None):
    url = self.database
    if partition:
        url += url + f'/{partition}/{document}'
    else:
        url += url + f'/{document}'
    if attachment:
        url += url + f'/{attachment}'
    if rev:
        url += url + f'?rev={rev}'
    elif key:
        url += url + f'?key={key}'
    if bool(local):
        url = self.database + f'/_local_docs'
    response = urllib.request.urlopen(url)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionFindingError(f'Document not found: {url}'),
                        header=response.header_items())

def insert(self, name, data=None, attachment=None, partition=None):
     url = self.database + f'/{name}'
     if attachment:
         url += f"/{attachment}"
     id = f"{partition}:{name}" if partition else name
     data = {"_id": id}
     if data:
         data.update(data)
     req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
     req.add_header('Content-Type', 'application/json')
     response = urllib.request.urlopen(req)
     if response.status_code == 201:
         return Response(data=json.loads(response.read()),
                         code=response.status_code,
                         error=None,
                         header=response.header_items())
     else:
         return Response(data=None,
                         code=response.status_code,
                         error=noslapi.SessionInsertingError(f'Insert document {url} with data {data} failed'),
                         header=response.header_items())

def insert_many(self, *documents):
     url = f"{self.database}/_bulk_docs"
     data = {"docs": []}
     if documents:
         data['docs'].extend(documents)
     req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='POST')
     req.add_header('Content-Type', 'application/json')
     response = urllib.request.urlopen(req)
     if response.status_code == 201:
         return Response(data=json.loads(response.read()),
                         code=response.status_code,
                         error=None,
                         header=response.header_items())
     else:
         return Response(data=None,
                         code=response.status_code,
                         error=noslapi.SessionInsertingError('Bulk insert document failed'),
                         header=response.header_items())

def update(self, name, rev, data=None, partition=None):
     url = self.database + f'/{name}?rev={rev}'
     id = f"{partition}:{name}" if partition else name
     data = {"_id": id}
     if data:
         data.update(data)
     req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
     req.add_header('Content-Type', 'application/json')
     response = urllib.request.urlopen(req)
     if response.status_code == 201:
         return Response(data=json.loads(response.read()),
                         code=response.status_code,
                         error=None,
                         header=response.header_items())
     else:
         return Response(data=None,
                         code=response.status_code,
                         error=noslapi.SessionUpdatingError(f'Update document {url} with data {data} failed'),
                         header=response.header_items())

def update_many(self, *documents):
     url = f"{self.database}/_bulk_docs"
     data = {"docs": []}
     if documents:
         data['docs'].extend(documents)
     req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='POST')
     req.add_header('Content-Type', 'application/json')
     response = urllib.request.urlopen(req)
     if response.status_code == 201:
         return Response(data=json.loads(response.read()),
                         code=response.status_code,
                         error=None,
                         header=response.header_items())
     else:
         return Response(data=None,
                         code=response.status_code,
                         error=noslapi.SessionUpdatingError('Bulk update document failed'),
                         header=response.header_items())

def delete(self, name, rev, partition=None):
     url = self.database + f'/{name}?rev={rev}'
     id = f"{partition}:{name}" if partition else name
     req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='DELETE')
     req.add_header('Content-Type', 'application/json')
     response = urllib.request.urlopen(req)
     if response.status_code == 201:
         return Response(data=json.loads(response.read()),
                         code=response.status_code,
                         error=None,
                         header=response.header_items())
     else:
         return Response(data=None,
                         code=response.status_code,
                         error=noslapi.SessionDeletingError(f'Delete document {name} failed'),
                         header=response.header_items())

The close method will only close the session, but not the connection.

def close(self):
    self.database = None
    self._connection = None

The find method is the one that allows searching for data in the database. This method can accept strings or Selector objects, which help in the construction of the query in the database language.

def find(self, query):
    url = f"{self.database}/_find"
    if isinstance(query, Selector):
        query = query.build()
    req = urllib.request.Request(url,
                             data=query.encode('utf8'),
                             method='POST')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionFindingError(f'Find documents failed: {json.loads(response.read())}'),
                        header=response.header_items())

The grant and revoke methods are specific for enabling and revoking permissions on the current database.

def grant(self, admins, members):
    url = f"{self.database}/_security"
    data = {"admins": admins, "members": members}
    req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionACLError(f'Grant failed: {json.loads(response.read())}'),
                        header=response.header_items())

def revoke(self):
    url = f"{self.database}/_security"
    data = {"admins": {"names": [], "roles": []}, "members": {"names": [], "roles": []}}
    req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionACLError(f'Revoke failed: {json.loads(response.read())}'),
                        header=response.header_items())

Now let’s write the three methods for creating, modifying (also password reset) and deleting a user respectively: new_user, set_user and delete_user.

def new_user(self, name, password, roles=None, type='user'):
    if roles is None:
        roles = []
    server = self.database.split('/')
    url = f"{server[0]}//{server[2]}/_users/org.couchdb.user:{name}"
    data = {"name": name, "password": password, "roles": roles, "type": type}
    req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 201:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionACLError(f'New user failed: {json.loads(response.read())}'),
                        header=response.header_items())

def set_user(self, name, password, rev, roles=None, type='user'):
    if roles is None:
        roles = []
    server = self.database.split('/')
    url = f"{server[0]}//{server[2]}/_users/org.couchdb.user:{name}"
    data = {"name": name, "password": password, "roles": roles, "type": type}
    req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='PUT')
    req.add_header('Content-Type', 'application/json')
    req.add_header(f"If-Match: {rev}")
    response = urllib.request.urlopen(req)
    if response.status_code == 201:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionACLError(f'Modify user or password failed: {json.loads(response.read())}'),
                        header=response.header_items())

def delete_user(self, name, rev, admin=False):
    server = self.database.split('/')
    if admin:
        url = f"{server[0]}//{server[2]}/_users/org.couchdb.user:{name}"
    else:
        url = f"{server[0]}//{server[2]}/_config/admins/{name}"
    req = urllib.request.Request(url, method='DELETE')
    req.add_header('Content-Type', 'application/json')
    req.add_header(f"If-Match: {rev}")
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionACLError(f'Delete user failed: {json.loads(response.read())}'),
                        header=response.header_items())

We will now write the add_index and delete_index methods, which are mainly concerned with creating indexes for the database.

def add_index(self, name, fields):
    url = f"{self.database}/_index"
    data = {"name": name, 'type': 'json', "index": {'fields': fields}}
    req = urllib.request.Request(url,
                             data=json.dumps(data).encode('utf8'),
                             method='POST')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 201:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionError(f'Index creation error: {json.loads(response.read())}'),
                        header=response.header_items())

def delete_index(self, ddoc, name):
    url = f"{self.database}/_index/{ddoc}/json/{name}"
    req = urllib.request.Request(url, method='DELETE')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionError(f'Index deletion error: {json.loads(response.read())}'),
                        header=response.header_items())

Finally, let’s add the database compact method, which is useful after inserting a lot of data into the database to reduce the physical disk space.

def compact(self):
    url = f"{self.database}/_compact"
    req = urllib.request.Request(url, method='POST')
    req.add_header('Content-Type', 'application/json')
    response = urllib.request.urlopen(req)
    if response.status_code == 200:
        return Response(data=json.loads(response.read()),
                        code=response.status_code,
                        error=None,
                        header=response.header_items())
    else:
        return Response(data=None,
                        code=response.status_code,
                        error=noslapi.SessionError(f'Compaction error: {json.loads(response.read())}'),
                        header=response.header_items())

Batch class

Since we have already defined “bulk” operations in the insert_many and in the update_many in the Session class, we can define the get bulk through a Batch class.

class Batch(nosqlapi.DocBatch):
    """CouchDB batch class; multiple get from session."""

    def execute(self):
        data = {"docs": self.batch}
        url = f"{self.session.database}/_bulk_get"
        req = urllib.request.Request(url, method='POST')
        req.add_header('Content-Type', 'application/json')
        response = urllib.request.urlopen(req)
        if response.status_code == 200:
            return Response(data=json.loads(response.read()),
                            code=response.status_code,
                            error=None,
                            header=response.header_items())
        else:
            return Response(data=None,
                            code=response.status_code,
                            error=noslapi.SessionError(f'Get multiple document error: {json.loads(response.read())}'),
                            header=response.header_items())

Selector class

Now instead, let’s define the last class that will represent the query shape for our CouchDB server, the Selector class.

class Selector(nosqlapi.DocSelector):
    """CouchDB selector class; query representation."""
    pass

Utils

The utils classes and functions they map objects that represent data on the CouchDB server. These types of objects are called ODMs.

Create a utils.py module.

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# utils.py

"""Python utility library for document CouchDB server"""

from nosqlapi.docdb import Database, Document, Index
import core
import json

connect function

Let’s create a simple function that will create a Connection object for us and return a Session object. We will call it connect().

def connect(host='localhost', port=5984, username=None, password=None, ssl=None, tls=None, cert=None,
                database=None, ca_cert=None, ca_bundle=None):
    conn = core.Connection(host='localhost', port=5984, username=None, password=None, ssl=None, tls=None, cert=None,
                database=None, ca_cert=None, ca_bundle=None)
    return conn.connect()

ODM classes

Now let’s define a DesignDocument class, which will represent a design document in the CouchDB server.

class DesignDocument(Document):
    """Design document"""

    def __init__(self, oid=None, views=None, updates=None, filters=None, validate_doc_update=None):
        super().__init__(oid)
        self._id = self['_id'] = f'_design/{self.id}'
        self["language"] = "javascript"
        self['views'] = {}
        self['updates'] = {}
        self['filters'] = {}
        if views:
            self['views'].update(views)
        if updates:
            self['updates'].update(updates)
        if filters:
            self['filters'].update(filters)
        if validate_doc_update:
            self['validate_doc_update'] = validate_doc_update

Now let’s define a PermissionDocument class, which will represent a permission document in the CouchDB server.

class PermissionDocument(Document):
    """Permission document"""

    def __init__(self, admins=None, members=None):
        super().__init__()
        self._id = None
        del self['_id']
        self["admins"] = {"names": [], "roles": []} if not admins else admins
        self['members'] = {"names": [], "roles": []} if not members else members

Note

Now that we have defined some classes that represent documents, we can adapt our methods of the Session class around these ODM types.

If you want to see more examples, clone the official repository of nosqlapi and find in the tests folder all the examples for each type of database.

$ git clone https://github.com/MatteoGuadrini/nosqlapi.git
$ cd nosqlapi
$ python -m unittest discover tests
$ ls -l tests