Source code for lab_utils.database

""" Basic interface to a `PostgreSQL <https://www.postgresql.org/>`_ database.

The module consists of a main class :class:`Database <Database>` which
implements methods for connection and disconnection, table verification
and data insertion.

The database settings are set with a
:doc:`config file<../examples/conf/conf_database>`
and the standard library :obj:`configparser`.
"""

import configparser
import logging
from pkg_resources import resource_filename
from os.path import abspath, expanduser
from enum import Enum

import psycopg2
from psycopg2 import sql


[docs]class DataType(Enum): """ List of accepted data types for new columns. The types are hard-coded for safety reasons: SQL insertions are potentially dangerous. See `here <https://www.postgresqltutorial.com/postgresql-data-types/>`_ for more information. """ bool = 'BOOLEAN' #: Boolean short = 'SMALLINT' #: Integer (2 bytes, range is -32,768 to +32,767) int = 'INTEGER' #: Integer (4 bytes, range is -2,147,483,648 to +2,147,483,647) long = 'BIGINT' #: Integer (8 bytes, range is -9,223,372,036,854,775,808 to +9,223,372,036,854,775,807) float = 'FLOAT(24)' #: Floating-point number, 4 bytes double = 'FLOAT(53)' #: Floating-point number, 8 bytes string = 'TEXT' #: String, unlimited length time = 'TIMESTAMPTZ' #: Time stamp, with time zone information def __str__(self) -> str: """ Parses the data type as an SQL string. Returns ------- str The SQL query """ return str(self.value)
[docs]class Constraint(Enum): """ List of accepted constraints for new columns. The constraints are hard-coded for safety reasons: SQL insertions are potentially dangerous. """ positive = ''' CHECK({column_name} >= 0) ''' #: The variable must be greater or equal to 0 positive_strict = ''' CHECK({column_name} > 0) ''' #: The variable must be strictly positive def __str__(self) -> str: """ Parses the constraint as an SQL string. Returns ------- str The SQL query """ return str(self.value)
[docs]class Database: """Manages connections and operations with a PostgreSQL database. The class is based on the :obj:`psycopg2` library and on this `tutorial <https://www.postgresqltutorial.com/postgresql-python/>`_. """ host: str = 'localhost' #: The host name where the database is located port: int = 5432 #: Connection port database: str = 'beam' #: The database name to connect to user: str = 'cw-beam' #: User name passfile: str = '~/.pgpass' #: Location of the pgpass file with the credentials logger: logging.Logger = None #: Single logger for the whole class. connection = None #: Connection object returned by psycopg2.connect() db_version: str = '' #: Database version cursor = None #: Cursor provided by _connection.cursor() to execute an SQL query # List of predefined SQL queries _query_createTimeTable = ''' CREATE TABLE {table_name} ( time TIMESTAMPTZ NOT NULL {default} ) ''' _query_makeTimescaleHypertable = ''' SELECT create_hypertable ( %(table_name)s, %(key)s ); ''' _query_checkColumn = ''' SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema = %(table_schema)s AND table_name = %(table_name)s AND column_name = %(column_name)s ); ''' _query_checkTable = ''' SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_schema = %(table_schema)s AND table_name = %(table_name)s ) ; ''' _query_addColumn = ''' ALTER TABLE {table_name} ADD COLUMN {column_name} {data_type} {checks} ; ''' _query_insertData = ''' INSERT INTO {table_name} ({columns}) VALUES ({data}) ; '''
[docs] def __init__(self, config_file: str = None): """ Initializes the :class:`Database` object. If a :paramref:`configuration file name<Database.__init__.config_file>` is given, the constructor calls the method :meth:`~.Database.config` and overrides the default attributes Parameters ---------- config_file : str, optional Configuration file name, default is `None`. See :doc:`here<../../../examples/conf/conf_database>` for a configuration file example Raises ------ :class:`configparser.Error` If a configuration file name was given, the method :meth:`config` can fail raising this exception. """ # Read config file, if given if config_file is not None: self.config(config_file) else: # Use a single logger for all database related messages self.logger = logging.getLogger('DB \'{d}\' on {h}:{p}'.format( d=self.database, h=self.host, p=self.port ))
def __del__(self): """ Closes the connection to the database, if it was ever open. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ self.close()
[docs] def config(self, config_file: str = resource_filename(__name__, 'conf/database.ini')): """ Loads the configuration. Reads the :paramref:`config_file` using the :obj:`configparser` library. The structure of the file is shown in the :ref:`examples section<configuration-files>`. Parameters ---------- config_file : str, optional :doc:`Configuration file name<../../../examples/conf/conf_database>`, default is 'conf/database.ini' Raises ------ :class:`configparser.Error` Error while parsing the file, e.g. no file was found, a parameter is missing or it has an invalid value. """ # Expand configuration file path config_file = abspath(expanduser(config_file)) # Temporary logger name self.logger = logging.getLogger('database') try: self.logger.info("Loading configuration file %s", config_file) # Initialize config parser and read file config_parser = configparser.ConfigParser() config_parser.read(config_file) # Assign values to class attributes self.host = config_parser.get(section='Overall', option='host') self.port = config_parser.getint(section='Overall', option='port') self.database = config_parser.get(section='Overall', option='database') self.user = config_parser.get(section='Overall', option='user') self.passfile = config_parser.get(section='Overall', option='passfile') # Expand passfile path self.passfile = abspath(expanduser(self.passfile)) # Rename logger accordingly self.logger = logging.getLogger('database') self.logger.info('Configuration loaded') except configparser.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def connect(self, print_version: bool = False): """ Connects to the database. If the connection was successful and the flag :paramref:`~Database.connect.print` was set, it also prints the database version as a connection check. Parameters ---------- print_version : bool, optional Print the database version, default is False. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ self.logger.info( 'Connecting to database <%s> on host <%s> over port <%d> as user <%s>', self.database, self.host, self.port, self.user ) try: # Connect to the database self.connection = psycopg2.connect( dbname=self.database, user=self.user, host=self.host, port=self.port, passfile=self.passfile ) # Read database version, serves as connection check self.cursor = self.connection.cursor() self.cursor.execute('SELECT version()') self.db_version = self.cursor.fetchone() except psycopg2.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed self.logger.exception("{}: {}".format(type(e).__name__, e)) raise else: if print_version: self.logger.info('Connection successful, database version: %s', self.db_version) else: self.logger.info('Connection successful') self.logger.debug('Database version: %s', self.db_version)
[docs] def close(self): """ Closes the connection to the database. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ # Close, if the connection had been opened if self.connection is not None: self.logger.info('Closing connection to database <%s>', self.database) try: self.connection.close() except psycopg2.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed self.logger.exception("{}: {}".format(type(e).__name__, e)) raise else: self.logger.info('Connection to database <%s> closed', self.database) finally: self.connection = None
[docs] def create_timescale_db(self, table_name: str, default_now: bool = True): """ Creates a `TimescaleDB <https://docs.timescale.com/latest/main>`_ table. The table has a single column named 'time' with type 'TIMESTAMPTZ'. If the flag :paramref:`~Database.create_timescale_db.default_now` is set (default is 'True'), the column 'time' will default to 'NOW()' Parameters ---------- table_name : str The name of the table to be checked default_now : bool, optional Set the 'time' column default to 'NOW()', default is `True`. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ # Check database status if self.connection is None: raise psycopg2.DatabaseError('Trying to access database without initialization') try: # Prepare query for table creation if default_now: self.logger.info('Creating TimescaleDB table <%s> with default <time> NOW()', table_name) query = sql.SQL(self._query_createTimeTable.format( table_name=table_name, default=sql.SQL('DEFAULT NOW()').as_string(self.connection) )) else: self.logger.info('Creating TimescaleDB table <%s> without default <time>', table_name) query = sql.SQL(self._query_createTimeTable.format( table_name=table_name, default='' )) # Create the table table self.logger.debug('Sending query: %s', query) self.cursor.execute(query) # Make the table a TimescaleDB Hypertable self.cursor.execute( self._query_makeTimescaleHypertable, { 'table_name': table_name, 'key': 'time' } ) self.connection.commit() self.logger.info('TimescaleDB table <%s> created', table_name) except psycopg2.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def check_table(self, table_name) -> bool: """ Checks if a table exists. Parameters ---------- table_name : str The name of the table to be checked Returns ------- bool: `True` if the table exists, `False` otherwise. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ # Check database status if self.connection is None: raise psycopg2.DatabaseError('Trying to access database without initialization') try: self.logger.debug('Checking if table <%s> exists', table_name) self.cursor.execute( self._query_checkTable, {'table_schema': 'public', 'table_name': table_name, } ) reply = self.cursor.fetchone() if reply[0]: self.logger.debug('Table <%s> does exist', table_name) return True else: self.logger.debug('Table <%s> does not exist', table_name) return False except psycopg2.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def check_column(self, table_name, column_name) -> bool: """ Checks if a column exists in a given table. Parameters ---------- table_name : str The table where the column has to be checked column_name : str The column to be checked Returns ------- bool: `True` if the column exists, `False` otherwise. Raises ------ :class:`psycopg2.Error` Base exception for all kind of database errors """ # Check database status if self.connection is None: raise psycopg2.DatabaseError('Trying to access a database without initialization') try: self.logger.debug('Checking if column <%s> exists in table <%s>', column_name, table_name) self.cursor.execute( self._query_checkColumn, {'table_schema': 'public', 'table_name': table_name, 'column_name': column_name, } ) reply = self.cursor.fetchone() if reply[0]: self.logger.debug('Column <%s> does exist in table <%s>', column_name, table_name) return True else: self.logger.debug('Column <%s> does not exist in table <%s>', column_name, table_name) return False except psycopg2.Error as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def new_column( self, table_name: str, column_name: str, data_type: DataType, constraints: list = None, ): """ Creates a new column in a given table. If the column already exists, it just returns. If the table does not exist or the column could not be created, it raises a :class:`psycopg2.Error`. Parameters ---------- table_name : str Name of the table where the column has to be created column_name : str Name of the column to be created data_type : :class:`DataType` Data type of the new column constraints : list, optional List of :class:`Constraints<Constraint>`, default is 'None' Raises ------ :class:`TypeError` Invalid constraint or data type :class:`ValueError` Invalid constraint or data type :class:`psycopg2.Error` Base exception for all kind of database errors """ # Check database status if self.connection is None: raise psycopg2.DatabaseError('Trying to access database without initialization') # Check if the table exists if not self.check_table(table_name): raise psycopg2.IntegrityError( 'Trying to create a column in table <%s>, which does not exist'.format(table_name)) # Check if the column already exists if self.check_column(table_name, column_name): self.logger.info('Column <%s> already exists in table <%s>', column_name, table_name) return # Now let's send the SQL query self.logger.info('Creating column <%s> in table <%s>', column_name, table_name) try: if constraints is None: constraint_list = sql.SQL(' ') else: constraint_list = sql.SQL(' ').join( sql.SQL(str(k).format( table_name=table_name, column_name=column_name, )) for k in constraints ) query = sql.SQL(self._query_addColumn.format( table_name=table_name, column_name=column_name, data_type=str(data_type), checks=constraint_list.as_string(self.connection), ) ) self.logger.debug('Sending query: %s', query) self.cursor.execute(query) self.connection.commit() # Check if the column was created if self.check_column(table_name, column_name): self.logger.info('Column <%s> successfully created in table <%s>', column_name, table_name) else: raise psycopg2.OperationalError( 'Column <%s> could not be created in table <%s>', column_name, table_name ) except (psycopg2.Error, TypeError, ValueError, KeyError) as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def new_entry( self, table_name: str, columns: list, data: list, check_columns: bool = False, ): """ Inserts data into a given table. See :doc:`this example<../../../examples/ex_database>` for usage examples Parameters ---------- table_name : str Name of the table where the data has to be inserted columns : list[str] List of columns names corresponding to the data data : list Values of the new data entry check_columns : bool, optional Check that columns exist before insertion, default is False Raises ------ :class:`TypeError` Invalid data :class:`ValueError` Invalid data :class:`psycopg2.Error` Base exception for all kind of database errors """ if self.connection is None: raise psycopg2.DatabaseError('Trying to access database without initialization') # Check the list sizes are the same if len(data) != len(columns): raise psycopg2.DataError('Data and column lists have different sizes') self.logger.debug('Inserting data into table <%s>: ', table_name) for c, d in zip(columns, data): self.logger.debug('\t{:<25}{}'.format(c, d)) # Check if the table and the columns exist if check_columns: if not self.check_table(table_name): raise psycopg2.IntegrityError( 'Table {} does not exist'.format(table_name)) for column in columns: if not self.check_column(table_name, column): raise psycopg2.IntegrityError('Column <%s> does not exist in table <%s>', column, table_name) # Now let's send the SQL query try: column_list = sql.SQL(', ').join( sql.Identifier(n) for n in columns ) values_list = sql.SQL(', ').join( sql.Placeholder() for __ in columns ) query = sql.SQL(self._query_insertData.format( table_name=table_name, columns=column_list.as_string(self.connection), data=values_list.as_string(self.connection), ) ) self.logger.debug('Executing query: ', query) self.logger.debug('with data: ', data) self.cursor.execute(query, data) self.connection.commit() except (psycopg2.Error, TypeError, ValueError) as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: self.logger.exception("{}: {}".format(type(e).__name__, e)) raise else: self.logger.debug('Data successfully committed')