""" Basic interface to a `PostgreSQL <https://www.postgresql.org/>`_ database.
The module consists of a main class :class:`Database <Database>` which
implements methods for connection and disconnection, table verification
and data insertion.
The database settings are set with a
:doc:`config file<../examples/conf/conf_database>`
and the standard library :obj:`configparser`.
"""
import configparser
import logging
from pkg_resources import resource_filename
from enum import Enum
import psycopg2
from psycopg2 import sql
[docs]class DataType(Enum):
""" List of accepted data types for new columns.
The types are hard-coded for safety reasons: SQL
insertions are potentially dangerous. See
`here <https://www.postgresqltutorial.com/postgresql-data-types/>`_
for more information.
"""
bool = 'BOOLEAN' #: Boolean
short = 'SMALLINT' #: Integer (2 bytes, range is -32,768 to +32,767)
int = 'INTEGER' #: Integer (4 bytes, range is -2,147,483,648 to +2,147,483,647)
long = 'BIGINT' #: Integer (8 bytes, range is -9,223,372,036,854,775,808 to +9,223,372,036,854,775,807)
float = 'FLOAT(24)' #: Floating-point number, 4 bytes
double = 'FLOAT(53)' #: Floating-point number, 8 bytes
string = 'TEXT' #: String, unlimited length
time = 'TIMESTAMPTZ' #: Time stamp, with time zone information
def __str__(self) -> str:
""" Parse the data type as an SQL string
Returns
-------
str
The SQL query
"""
return str(self.value)
[docs]class Constraint(Enum):
""" List of accepted constraints for new columns.
The constraints are hard-coded for safety reasons:
SQL insertions are potentially dangerous.
"""
positive = ''' CHECK({column_name} >= 0) ''' #: The variable must be greater or equal to 0
positive_strict = ''' CHECK({column_name} > 0) ''' #: The variable must be strictly positive
def __str__(self) -> str:
""" Parse the constraint as an SQL string
Returns
-------
str
The SQL query
"""
return str(self.value)
[docs]class Database:
"""Manages connections and operations with a PostgreSQL database.
The class is based on the library :obj:`psycopg2` and
on this `tutorial
<https://www.postgresqltutorial.com/postgresql-python/>`_.
"""
_host: str = 'localhost' #: The host name where the database is located
_port: int = 5432 #: Connection port
_database: str = 'beam' #: The database name to connect to
_user: str = 'cw-beam' #: User name
_passfile: str = '~/.pgpass' #: Location of the pgpass file with the credentials
_connection = None #: Connection object returned by psycopg2.connect()
_db_version: str = '' #: Database version
_cursor = None #: Cursor provided by _connection.cursor() to execute an SQL query
# List of predefined SQL queries
_query_createTimeTable = '''
CREATE TABLE {table_name} (
time TIMESTAMPTZ NOT NULL {default}
)
'''
_query_makeTimescaleHypertable = '''
SELECT create_hypertable (
%(table_name)s,
%(key)s
);
'''
_query_checkColumn = '''
SELECT EXISTS
(SELECT 1
FROM information_schema.columns
WHERE
table_schema = %(table_schema)s
AND
table_name = %(table_name)s
AND
column_name = %(column_name)s
);
'''
_query_checkTable = '''
SELECT EXISTS
(SELECT 1
FROM information_schema.columns
WHERE
table_schema = %(table_schema)s
AND
table_name = %(table_name)s)
;
'''
_query_addColumn = '''
ALTER TABLE {table_name}
ADD COLUMN {column_name}
{data_type}
{checks}
;
'''
_query_insertData = '''
INSERT INTO {table_name} ({columns})
VALUES ({data})
;
'''
[docs] def __init__(self, config_file: str = None):
""" Initializes the :class:`Database` object. If a
:paramref:`configuration file name<Database.__init__.config_file>`
is given, the constructor calls the method
:meth:`~.Database.config` and overrides the default attributes
Parameters
----------
config_file : str, optional
Configuration file name, default is `None`. See
:doc:`here<../../../examples/conf/conf_database>`
for a configuration file example
Raises
------
:class:`configparser.Error`
If a configuration file name was given, the method
:meth:`config` can fail raising this exception.
"""
# Read config file, if given
if config_file is not None:
self.config(config_file)
def __del__(self):
""" Closes the connection to the database, if it was ever open.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
self.close()
[docs] def config(self, config_file: str = resource_filename(__name__, 'conf/database.ini')):
""" Loads the configuration.
The method reads the :paramref:`config_file`
using the library :obj:`configparser`. The
structure of the file is shown in the
:ref:`examples section<configuration-files>`.
Parameters
----------
config_file : str, optional
:doc:`Configuration file name<../../../examples/conf/conf_database>`,
default is 'conf/database.ini'
Raises
------
:class:`configparser.Error`
Error while parsing the file, e.g. no file was found,
a parameter is missing or it has an invalid value.
"""
# Use a logger named like the module itself
logger = logging.getLogger(__name__)
logger.info("Loading configuration file %s", config_file)
try:
# Initialize config parser and read file
config_parser = configparser.ConfigParser()
config_parser.read(config_file)
# Assign values to class attributes
self._host = config_parser.get(section='Overall', option='host', fallback='localhost')
self._port = config_parser.getint(section='Overall', option='port', fallback=5432)
self._database = config_parser.get(section='Overall', option='database', fallback='beam')
self._user = config_parser.get(section='Overall', option='user', fallback='cw-beam')
self._passfile = config_parser.get(section='Overall', option='passfile', fallback='~/.pgpass')
except configparser.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def connect(self, print_version: bool = False):
""" Connects to the database.
If the connection was successful and the flag
:paramref:`~Database.connect.print` was set, it also
prints the database version as a connection check.
Parameters
----------
print_version : bool, optional
Print the database version, default is False.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
# Use a logger named like the module itself
logger = logging.getLogger(__name__)
logger.info('Connecting to database <%s> on host <%s> over port <%d> as user <%s>',
self._database,
self._host,
self._port,
self._user)
try:
# Connect to the database
logger.debug(self._passfile)
self._connection = psycopg2.connect(
dbname=self._database,
user=self._user,
host=self._host,
port=self._port,
passfile=self._passfile)
# Read database version, serves as connection check
self._cursor = self._connection.cursor()
self._cursor.execute('SELECT version()')
self._db_version = self._cursor.fetchone()
except psycopg2.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
logger.exception("{}: {}".format(type(e).__name__, e))
raise
else:
if print_version:
logger.info('Connection successful, database version %s', self._db_version)
else:
logger.info('Connection successful')
[docs] def close(self):
""" Closes the connection to the database.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
# Close, if the connection had been opened
if self._connection is not None:
logger = logging.getLogger(__name__)
logger.info('Closing connection to database %s', self._database)
try:
self._connection.close()
except psycopg2.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
logger.exception("{}: {}".format(type(e).__name__, e))
raise
finally:
self._connection = None
[docs] def create_timescale_db(self, table_name: str, default_now: bool = True):
""" Creates a
`TimescaleDB <https://docs.timescale.com/latest/main>`_ table.
The table has a single column named 'time'
with type 'TIMESTAMPTZ'. If the flag
:paramref:`~Database.create_timescale_db.default_now`
is set (default is 'True'), the column 'time'
will default to 'NOW()'
Parameters
----------
table_name : str
The name of the table to be checked
default_now : bool, optional
Set the 'time' column default to 'NOW()', default is True.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = logging.getLogger(__name__)
if self._connection is None:
raise psycopg2.DatabaseError('Trying to access database without initialization')
try:
# Prepare query for table creation
if default_now:
logger.debug('Creating TimescaleDB table \'%s\' with default \'time\' NOW()', table_name)
query = sql.SQL(self._query_createTimeTable.format(
table_name=table_name,
default=sql.SQL('DEFAULT NOW()').as_string(self._connection)
))
else:
logger.debug('Creating TimescaleDB table \'%s\' without default \'time\'', table_name)
query = sql.SQL(self._query_createTimeTable.format(
table_name=table_name,
default=''
))
# Create the table table
self._cursor.execute(query)
# Make the table a TimescaleDB Hypertable
self._cursor.execute(
self._query_makeTimescaleHypertable,
{
'table_name': table_name,
'key': 'time'
}
)
self._connection.commit()
except psycopg2.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def check_table(self, table_name):
""" Checks if a table exists.
Parameters
----------
table_name : str
The name of the table to be checked
Returns
-------
bool
True if the table exists, False otherwise.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = logging.getLogger(__name__)
if self._connection is None:
raise psycopg2.DatabaseError('Trying to access database without initialization')
try:
logger.debug('Checking if table %s exists', table_name)
self._cursor.execute(
self._query_checkTable,
{'table_schema': 'public',
'table_name': table_name,
}
)
reply = self._cursor.fetchone()
if reply[0]:
logger.debug('Table %s does exist', table_name)
return True
else:
logger.debug('Table %s does not exist', table_name)
return False
except psycopg2.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def check_column(self, table_name, column_name):
""" Checks if a column exists in a given table.
Parameters
----------
table_name : str
The table where the column has to be checked
column_name : str
The column to be checked
Returns
-------
bool
True if the column exists, False otherwise.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = logging.getLogger(__name__)
if self._connection is None:
raise psycopg2.DatabaseError('Trying to access a database without initialization')
try:
logger.info('Checking if column %s exists in table %s', column_name, table_name)
self._cursor.execute(
self._query_checkColumn,
{'table_schema': 'public',
'table_name': table_name,
'column_name': column_name,
}
)
reply = self._cursor.fetchone()
if reply[0]:
logger.debug('Column %s does exist in table %s', column_name, table_name)
return True
else:
logger.debug('Column %s does not exist in table %s', column_name, table_name)
return False
except psycopg2.Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def new_column(
self,
table_name: str,
column_name: str,
data_type: DataType,
constraints: list = None,
):
""" Creates a new column in a given table.
If the column already exists, it just returns True.
If the table does not exist, returns False.
Parameters
----------
table_name : str
Name of the table where the column has to be created
column_name : str
Name of the column to be created
data_type : :class:`DataType`
Data type of the new column
constraints : list, optional
List of :class:`Constraints<Constraint>`, default is 'None'
Raises
------
:class:`TypeError`
Invalid constraint or data type
:class:`ValueError`
Invalid constraint or data type
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = logging.getLogger(__name__)
if self._connection is None:
raise psycopg2.DatabaseError('Trying to access database without initialization')
logger.info('Creating column %s in table %s', column_name, table_name)
# First let's check if the table exists
if not self.check_table(table_name):
raise psycopg2.IntegrityError(
'Trying to create a column in table %s, which does not exist'.format(table_name))
# Check if the column already exists
if self.check_column(table_name, column_name):
logger.info('Column %s already exists in table %s', column_name, table_name)
return
# Now let's send the SQL query
try:
if constraints is None:
constraint_list = sql.SQL(' ')
else:
constraint_list = sql.SQL(' ').join(
sql.SQL(str(k).format(
table_name=table_name,
column_name=column_name,
)) for k in constraints
)
query = sql.SQL(self._query_addColumn.format(
table_name=table_name,
column_name=column_name,
data_type=str(data_type),
checks=constraint_list.as_string(self._connection),
)
)
self._cursor.execute(query)
self._connection.commit()
# Check if the column was created
if self.check_column(table_name, column_name):
logger.debug('Column %s successfully created in table %s', column_name, table_name)
else:
raise psycopg2.OperationalError(
'Column {c} could not be created in table {t}'.format(c=column_name, t=table_name))
except (psycopg2.Error, TypeError, ValueError, KeyError) as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def new_entry(
self,
table_name: str,
columns: list,
data: list,
check_columns: bool = False,
):
""" Inserts data into a given table.
See :doc:`this example<../../../examples/ex_database>`
for usage examples
Parameters
----------
table_name : str
Name of the table where the data has to be inserted
columns : list[str]
List of columns names corresponding to the data
data : list
Values of the new data entry
check_columns : bool, optional
Check that columns exist before insertion, default is False
Raises
------
:class:`TypeError`
Invalid data
:class:`ValueError`
Invalid data
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = logging.getLogger(__name__)
if self._connection is None:
raise psycopg2.DatabaseError('Trying to access database without initialization')
logger.debug('Inserting data into table %s', table_name)
# Check if the table and the columns exist
if check_columns:
if not self.check_table(table_name):
raise psycopg2.IntegrityError(
'Table %s does not exist'.format(table_name))
for column in columns:
if not self.check_column(table_name, column):
raise psycopg2.IntegrityError(
'Column %s does not exist in table %s'.format(column, table_name))
# Check the list sizes are the same
if len(data) != len(columns):
raise psycopg2.DataError('Data and column list have different sizes')
# Now let's send the SQL query
try:
column_list = sql.SQL(', ').join(
sql.Identifier(n) for n in columns
)
values_list = sql.SQL(', ').join(
sql.Placeholder() for __ in columns
)
query = sql.SQL(self._query_insertData.format(
table_name=table_name,
columns=column_list.as_string(self._connection),
data=values_list.as_string(self._connection),
)
)
self._cursor.execute(query, data)
self._connection.commit()
except (psycopg2.Error, TypeError, ValueError) as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise