""" Basic interface to a `PostgreSQL <https://www.postgresql.org/>`_ database.
The module consists of a main class :class:`Database <Database>` which
implements methods for connection and disconnection, table verification
and data insertion.
The database settings are set with a
:doc:`config file<../examples/conf/conf_database>`
and the standard library :obj:`configparser`.
"""
# Imports
import configparser
from os.path import abspath, expanduser
from enum import Enum
from typing import List, Union, Tuple
# Third party
from pandas import DataFrame
from psycopg2 import (
sql,
Error as PSQL_Error,
DatabaseError,
IntegrityError,
OperationalError,
DataError,
connect,
)
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT # see https://stackoverflow.com/a/34484185
# Local packages
from lab_utils.custom_logging import getLogger
[docs]class DataType(Enum):
""" List of accepted data types for new columns.
The types are hard-coded for safety reasons: SQL
insertions are potentially dangerous. See
`here <https://www.postgresqltutorial.com/postgresql-data-types/>`_
for more information.
"""
bool = 'BOOLEAN' #: Boolean
short = 'SMALLINT' #: Integer (2 bytes, range is -32,768 to +32,767)
int = 'INTEGER' #: Integer (4 bytes, range is -2,147,483,648 to +2,147,483,647)
long = 'BIGINT' #: Integer (8 bytes, range is -9,223,372,036,854,775,808 to +9,223,372,036,854,775,807)
float = 'FLOAT(24)' #: Floating-point number, 4 bytes
double = 'FLOAT(53)' #: Floating-point number, 8 bytes
string = 'TEXT' #: String, unlimited length
time = 'TIMESTAMPTZ' #: Time stamp, with time zone information
def __str__(self) -> str:
""" Parses the data type as an SQL string.
Returns
-------
str
The SQL query
"""
return str(self.value)
[docs]class Constraint(Enum):
""" List of accepted constraints for new columns.
The constraints are hard-coded for safety reasons:
SQL insertions are potentially dangerous.
"""
positive = ''' CHECK({column_name} >= 0) ''' #: The variable must be greater or equal to 0
positive_strict = ''' CHECK({column_name} > 0) ''' #: The variable must be strictly positive
def __str__(self) -> str:
""" Parses the constraint as an SQL string.
Returns
-------
str
The SQL query
"""
return str(self.value)
[docs]class Database:
"""Manages connections and operations with a PostgreSQL database.
The class is based on the :obj:`psycopg2` library and
on this `tutorial
<https://www.postgresqltutorial.com/postgresql-python/>`_.
"""
host: str = 'localhost' #: The host name where the database is located
port: int = 5432 #: Connection port
database: str = 'beam' #: The database name to connect to
user: str = 'cw-beam' #: User name
passfile: str = '~/.pgpass' #: Location of the pgpass file with the credentials
connection = None #: Connection object returned by psycopg2.connect()
db_version: str = '' #: Database version
cursor = None #: Cursor provided by connection.cursor() to execute an SQL query
# List of predefined SQL queries
_query_createDatabase = '''
CREATE DATABASE {db_name}
WITH OWNER %(owner)s
;
'''
_query_createTimeTable = '''
CREATE TABLE {table_name} (
time TIMESTAMPTZ NOT NULL {default}
);
'''
_query_makeTimescaleHypertable = '''
SELECT create_hypertable (
%(table_name)s,
%(key)s
);
'''
# _query_createAggregateView = '''
# CREATE VIEW {table_name}_10s
# WITH (timescaledb.continuous)
# AS
# SELECT
# time_bucket('10 s', time) as time,
# {aggregate_list}
# FROM
# {table_name}
# GROUP BY 1;
#
#
# CREATE VIEW {table_name}_1min
# WITH (timescaledb.continuous)
# AS
# SELECT
# time_bucket('1 min', time) as time,
# {aggregate_list}
# FROM
# {table_name}
# GROUP BY 1;
#
#
# CREATE VIEW {table_name}_10min
# WITH (timescaledb.continuous)
# AS
# SELECT
# time_bucket('10 min', time) as time,
# {aggregate_list}
# FROM
# {table_name}
# GROUP BY 1;
# '''
_query_createAggregateView = '''
CREATE VIEW {table_name}_1min
WITH (timescaledb.continuous)
AS
SELECT
time_bucket('1 min', time) as time,
{aggregate_list}
FROM
{table_name}
GROUP BY 1;
'''
_query_checkColumn = '''
SELECT EXISTS
(SELECT 1
FROM information_schema.columns
WHERE
table_schema = %(table_schema)s
AND
table_name = %(table_name)s
AND
column_name = %(column_name)s
);
'''
_query_checkTable = '''
SELECT EXISTS
(SELECT 1
FROM information_schema.columns
WHERE
table_schema = %(table_schema)s
AND
table_name = %(table_name)s
)
;
'''
_query_addColumn = '''
ALTER TABLE {table_name}
ADD COLUMN {column_name}
{data_type}
{checks}
;
'''
_query_insertData = '''
INSERT INTO {table_name} ({columns})
VALUES ({data})
;
'''
_query_retrieveViews = '''
SELECT schemaname, viewname
FROM pg_catalog.pg_views
WHERE schemaname = 'public'
;
'''
_query_dropView = '''
DROP VIEW {view}
CASCADE;
'''
_query_retrieveColumns = '''
SELECT attrelid::regclass AS tbl,
attname AS col,
atttypid::regtype AS datatype
FROM pg_attribute
WHERE attrelid = %(full_table_id)s::regclass
AND attnum > 0
AND NOT attisdropped
ORDER BY attnum;
''' # from https://dba.stackexchange.com/a/22420
_query_retrieveLatestValue = '''
SELECT {column_list}
FROM {table_name}
ORDER BY time
DESC
LIMIT 1 ;
''' # from https://stackoverflow.com/a/52355034
_query_checkEmptyTable = '''
SELECT CASE
WHEN EXISTS (SELECT * FROM {table_name} LIMIT 1) THEN 1
ELSE 0
END
'''
_query_retrieveLatestSerialID = '''
SELECT last_value
FROM {table_name}_id_seq;
'''
_query_updateMeasurementEntry = '''
UPDATE {table_name}
SET {update_list}
WHERE id = {measurement_id}
;
'''
_query_retrieveEntry = '''
SELECT *
FROM {table_name}
WHERE {conditions}
;
'''
[docs] def __init__(self,
config_file: str = None,
host: str = None,
port: int = None,
database: str = None,
user: str = None,
passfile: str = None,
):
""" Initializes the :class:`Database` object. If a
:paramref:`configuration file name<Database.__init__.config_file>`
is given, the constructor calls the method
:meth:`~.Database.config` and overrides the default attributes
Parameters
----------
config_file : str, optional
Configuration file name, default is `None`. See
:doc:`here<../../../examples/conf/conf_database>`
for a configuration file example.
host : str, optional
Database host.
port : int, optional
Database port.
database : str, optional
Database name.
user : str, optional
Database user.
passfile : str, optional
Local file with PostgreSQL password.
Raises
------
:class:`configparser.Error`
If a configuration file name was given, the method
:meth:`config` can fail raising this exception.
"""
# Read config file, if given
if config_file is not None:
self.config(config_file)
# Override config file if other arguments are given
if host is not None:
self.host = host
if port is not None:
self.port = port
if database is not None:
self.database = database
if user is not None:
self.user = user
if passfile is not None:
self.passfile = abspath(expanduser(passfile))
def __del__(self):
""" Closes the connection to the database, if it was ever open.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
self.close()
[docs] def config(self, config_file: str):
""" Loads the configuration.
Reads the :paramref:`config_file` using the
:obj:`configparser` library. The structure
of the file is shown in the
:ref:`examples section<configuration-files>`.
Parameters
----------
config_file : str
Configuration file name.
Raises
------
:class:`configparser.Error`
Error while parsing the file, e.g. no file was found,
a parameter is missing or it has an invalid value.
"""
try:
# Expand configuration file path
config_file = abspath(expanduser(config_file))
getLogger().info("Loading configuration file %s", config_file)
# Initialize config parser and read file
config_parser = configparser.ConfigParser()
config_parser.read(config_file)
# Assign values to class attributes
self.host = config_parser.get(section='Overall', option='host')
self.port = config_parser.getint(section='Overall', option='port')
self.database = config_parser.get(section='Overall', option='database')
self.user = config_parser.get(section='Overall', option='user')
self.passfile = config_parser.get(section='Overall', option='passfile')
# Expand passfile path
self.passfile = abspath(expanduser(self.passfile))
except configparser.Error as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
getLogger().exception("{}: {}".format(type(e).__name__, e))
raise
else:
getLogger().info('Configuration loaded')
[docs] def connect(self, print_version: bool = False):
""" Connects to the database.
If the connection was successful and the flag
:paramref:`~Database.connect.print` was set, it also
prints the database version as a connection check.
Parameters
----------
print_version : bool, optional
Print the database version, default is 'False'.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
getLogger().info(
'Connecting to database <%s> on host <%s> over port <%d> as user <%s>',
self.database,
self.host,
self.port,
self.user
)
try:
# Connect to the database
self.connection = connect(
dbname=self.database,
user=self.user,
host=self.host,
port=self.port,
passfile=self.passfile
)
# Read database version, serves as connection check
self.cursor = self.connection.cursor()
self.cursor.execute('SELECT version()')
self.db_version = self.cursor.fetchone()
except PSQL_Error as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
getLogger().exception("{}: {}".format(type(e).__name__, e))
raise
else:
if print_version:
getLogger().info('Connection successful, database version: %s', self.db_version)
else:
getLogger().info('Connection successful')
getLogger().debug('Database version: %s', self.db_version)
[docs] def close(self):
""" Closes the connection to the database.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
# Close, if the connection had been opened
if self.connection is not None:
getLogger().info('Closing connection to database <%s>', self.database)
try:
self.connection.close()
except PSQL_Error as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
getLogger().exception("{}: {}".format(type(e).__name__, e))
raise
else:
getLogger().info('Connection to database <%s> closed', self.database)
finally:
self.connection = None
[docs] def create_database(self,
db_name: str,
owner: str = 'postgres',
):
""" Creates a database named
:paramref:`~Database.create_database.db_name`. If
:paramref:`~Database.create_database.timescaledb_extension`
is set (default is 'True'), the TimescaleDB extension
is installed in the database to allow TimescaleDB
hypertables.
Parameters
----------
db_name : str
The name of the database to be created
owner : str, optional
Database owner, default is 'postgres'.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
try:
# Set autocommit level, see https://stackoverflow.com/a/34484185
self.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
# Prepare query for table creation
logger.info('Creating database <%s>', db_name)
query = self._query_createDatabase.format(
db_name=db_name
)
# Create the table table
logger.debug('Query: %s', self.cursor.mogrify(query).decode())
self.cursor.execute(
query,
{
'owner': owner
}
)
self.connection.commit()
logger.info('Database <%s> created', db_name)
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def create_timescale_db(self,
table_name: str,
default_now: bool = True):
""" Creates a
`TimescaleDB <https://docs.timescale.com/latest/main>`_ table.
The table has a single column named 'time'
with type 'TIMESTAMPTZ'. If the flag
:paramref:`~Database.create_timescale_db.default_now`
is set (default is 'True'), the column 'time'
will default to 'NOW()'
Parameters
----------
table_name : str
The name of the table to be checked
default_now : bool, optional
Set the 'time' column default to 'NOW()', default is `True`.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
try:
# Prepare query for table creation
if default_now:
logger.info('Creating TimescaleDB table <%s> with default <time> NOW()', table_name)
query = sql.SQL(self._query_createTimeTable.format(
table_name=table_name,
default=sql.SQL('DEFAULT NOW()').as_string(self.connection)
))
else:
logger.info('Creating TimescaleDB table <%s> without default <time>', table_name)
query = sql.SQL(self._query_createTimeTable.format(
table_name=table_name,
default=''
))
# Create the table
logger.debug('Query: %s', self.cursor.mogrify(query).decode())
self.cursor.execute(query)
# Make the table a TimescaleDB Hypertable
logger.debug('Sending query: %s', self.cursor.mogrify(
self._query_makeTimescaleHypertable,
{
'table_name': table_name,
'key': 'time'
}
).decode())
self.cursor.execute(
self._query_makeTimescaleHypertable,
{
'table_name': table_name,
'key': 'time'
}
)
# Commit queries
self.connection.commit()
logger.info('TimescaleDB table <%s> created', table_name)
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def check_table(self, table_name) -> bool:
""" Checks if a table exists.
Parameters
----------
table_name : str
The name of the table to be checked
Returns
-------
bool:
`True` if the table exists, `False` otherwise.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
try:
logger.debug('Checking if table <%s> exists', table_name)
logger.debug('Query: %s', self.cursor.mogrify(
self._query_checkTable,
{'table_schema': 'public',
'table_name': table_name,
}
).decode())
self.cursor.execute(
self._query_checkTable,
{'table_schema': 'public',
'table_name': table_name,
}
)
reply = self.cursor.fetchone()
if reply[0]:
logger.debug('Table <%s> does exist', table_name)
return True
else:
logger.debug('Table <%s> does not exist', table_name)
return False
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def check_column(self, table_name, column_name) -> bool:
""" Checks if a column exists in a given table.
Parameters
----------
table_name : str
The table where the column has to be checked
column_name : str
The column to be checked
Returns
-------
bool:
`True` if the column exists, `False` otherwise.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access a database without initialization')
try:
logger.debug('Checking if column <%s> exists in table <%s>', column_name, table_name)
logger.debug('Query: %s', self.cursor.mogrify(
self._query_checkColumn,
{'table_schema': 'public',
'table_name': table_name,
'column_name': column_name,
}
).decode())
self.cursor.execute(
self._query_checkColumn,
{'table_schema': 'public',
'table_name': table_name,
'column_name': column_name,
}
)
reply = self.cursor.fetchone()
if reply[0]:
logger.debug('Column <%s> does exist in table <%s>', column_name, table_name)
return True
else:
logger.debug('Column <%s> does not exist in table <%s>', column_name, table_name)
return False
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def new_column(
self,
table_name: str,
column_name: str,
data_type: DataType,
constraints: list = None,
):
""" Creates a new column in a given table.
If the column already exists, it just returns.
If the table does not exist or the column could not
be created, it raises a :class:`psycopg2.Error`.
Parameters
----------
table_name : str
Name of the table where the column has to be created
column_name : str
Name of the column to be created
data_type : :class:`DataType`
Data type of the new column
constraints : list, optional
List of :class:`Constraints<Constraint>`, default is 'None'
Raises
------
:class:`TypeError`
Invalid constraint or data type
:class:`ValueError`
Invalid constraint or data type
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to create a column in table <%s>, which does not exist'.format(table_name))
# Check if the column already exists
if self.check_column(table_name, column_name):
logger.info('Column <%s> already exists in table <%s>', column_name, table_name)
return
# Now let's send the SQL query
logger.info('Creating column <%s> in table <%s>', column_name, table_name)
try:
if constraints is None:
constraint_list = sql.SQL(' ')
else:
constraint_list = sql.SQL(' ').join(
sql.SQL(str(k).format(
table_name=table_name,
column_name=column_name,
)) for k in constraints
)
query = sql.SQL(self._query_addColumn.format(
table_name=table_name,
column_name=column_name,
data_type=str(data_type),
checks=constraint_list.as_string(self.connection),
)
)
logger.debug('Query: %s', self.cursor.mogrify(query).decode())
self.cursor.execute(query)
self.connection.commit()
# Check if the column was created
if self.check_column(table_name, column_name):
logger.info('Column <%s> successfully created in table <%s>', column_name, table_name)
else:
raise OperationalError(
'Column <%s> could not be created in table <%s>', column_name, table_name
)
except (PSQL_Error, TypeError, ValueError, KeyError) as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
# Undefined exception, full traceback to be printed
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def new_entry(
self,
table_name: str,
columns: list,
data: list,
check_columns: bool = False,
):
""" Inserts data into a given table.
See :doc:`this example<../../../examples/ex_database>`
for usage examples
Parameters
----------
table_name : str
Name of the table where the data has to be inserted
columns : list[str]
List of columns names corresponding to the data
data : list
Values of the new data entry
check_columns : bool, optional
Check that columns exist before insertion, default is False
Raises
------
:class:`TypeError`
Invalid data
:class:`ValueError`
Invalid data
:class:`psycopg2.Error`
Base exception for all kind of database errors
"""
logger = getLogger()
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check the list sizes are the same
if len(data) != len(columns):
raise DataError('Data and column lists have different sizes')
# Check the list sizes are not zero
if len(data) == 0:
raise DataError('Empty data')
logger.debug('Inserting data into table <%s>: ', table_name)
for c, d in zip(columns, data):
logger.debug('\t{:<25}{}'.format(c, d))
# Check if the table and the columns exist
if check_columns:
if not self.check_table(table_name):
raise IntegrityError(
'Table {} does not exist'.format(table_name))
for column in columns:
if not self.check_column(table_name, column):
raise IntegrityError('Column <%s> does not exist in table <%s>', column, table_name)
# Now let's send the SQL query
try:
column_list = sql.SQL(', ').join(
sql.Identifier(n) for n in columns
)
values_list = sql.SQL(', ').join(
sql.Placeholder() for __ in columns
)
query = sql.SQL(self._query_insertData.format(
table_name=table_name,
columns=column_list.as_string(self.connection),
data=values_list.as_string(self.connection),
)
)
logger.debug('Query: %s', self.cursor.mogrify(query, data).decode())
self.cursor.execute(query, data)
self.connection.commit()
except (PSQL_Error, TypeError, ValueError) as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
else:
logger.debug('Data successfully committed')
[docs] def get_list_columns(
self,
table_name: str
) -> List[List[str]]:
""" Fetches the list of columns in a given table.
Returns
-------
List[List[str]]:
Column data: list of pairs <name, type>.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors. In particular,
it is raised if the table does not exist or the aggregate view
could not be created.
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to access table <{}>, which does not exist'.format(table_name))
try:
# Retrieve list of columns
logger.debug('Retrieving list of columns from table <%s>', table_name)
logger.debug('Query: %s', self.cursor.mogrify(
self._query_retrieveColumns,
{'full_table_id': 'public.{}'.format(table_name)}
).decode())
self.cursor.execute(
self._query_retrieveColumns,
{'full_table_id': 'public.{}'.format(table_name)}
)
reply = self.cursor.fetchall()
list_of_columns = [[row[1], row[2]] for row in reply]
logger.debug('List of columns: %s', list_of_columns)
return list_of_columns
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def create_aggregate_view(self,
table_name: str,
index_name: str = 'time',
recreate: bool = True,
):
""" Creates a set of
`aggregate views <https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates>`_
in a given table.
Parameters
----------
table_name : str
Name of the table where the column has to be created.
index_name : str, optional
Column to be used as index for the aggregate view, default is 'time'.
recreate : bool, optional
Recreate the aggregate view if it exists, default is 'True'.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors. In particular,
it is raised if the table does not exist or the aggregate view
could not be created.
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to create aggregate views for table <{}>, which does not exist'.format(table_name))
try:
# Retrieve list of columns
list_of_columns = self.get_list_columns(table_name)
# Check index column is present
found = False
for col in list_of_columns:
if col[0] == index_name:
found = True
list_of_columns.remove(col)
break
if not found:
raise IntegrityError('Index column <{}> not found in table <{}>'.format(index_name, table_name))
# Check that at least one numeric column was found
valid_data_types = [
'integer',
'double precision',
'real',
]
found = False
for col in list_of_columns:
if col[1] in valid_data_types:
found = True
break
if not found:
raise IntegrityError('Cannot create an aggregate view without any column')
# Create list of aggregates, picking only numeric variables
aggregate_list = ', '.join(
['AVG({column_name}) AS {column_name}_avg'.format(column_name=col[0])
for col in list_of_columns
if col[1] in valid_data_types]
)
# Check if aggregate views already exist, and drop them
if recreate:
self.cursor.execute(self._query_retrieveViews)
reply = self.cursor.fetchall()
for view in [row[1] for row in reply]:
if table_name in view:
query = sql.SQL(self._query_dropView.format(
view=view,
))
logger.debug('Dropping view <%s>', view)
logger.debug('Query: %s', self.cursor.mogrify(query).decode())
self.cursor.execute(query)
# Create aggregate view
query = sql.SQL(self._query_createAggregateView.format(
table_name=table_name,
aggregate_list=aggregate_list,
))
logger.debug('Creating aggregate views')
logger.debug('Query: %s', self.cursor.mogrify(query).decode())
self.cursor.execute(query)
self.connection.commit()
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def fetch_latest_value(
self,
table_name: str,
column_name: List[str] = None
) -> tuple:
""" Retrieves the latest values of a time-ordered table.
If paramref:'column_name' is not given, all values of the table
are retrieved.
Parameters
----------
table_name : str
Name of the table where the column has to be retrieved from.
column_name : str
Column[s] to be retrieved, default is all columns.
Returns
-------
tuple:
Latest data. First element is the time of the latest entry.
Consecutive elements are the value of each column.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors.
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to retrieve data from table <{}>, which does not exist'.format(table_name))
try:
if column_name is None:
logger.debug('Fetching latest data from all columns in table <%s>', table_name)
column_list = '*'
else:
logger.debug('Fetching latest data from column(s) <%s> in table <%s>', column_name, table_name)
column_list = 'time, ' + ', '.join(column_name)
logger.debug('Query: %s', self.cursor.mogrify(
self._query_retrieveLatestValue.format(
column_list=column_list,
table_name=table_name
)
).decode())
self.cursor.execute(
self._query_retrieveLatestValue.format(
column_list=column_list,
table_name=table_name
)
)
reply = self.cursor.fetchone()
logger.debug('Latest values: {}'.format(reply))
return reply
except PSQL_Error as e:
logger.error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
logger.exception("{}: {}".format(type(e).__name__, e))
raise
[docs] def check_empty_table(
self,
table_name: str
) -> bool:
""" Checks whether a table is empty or not
Parameters
----------
table_name : str
Name of the table where the column has to be retrieved from.
Returns
-------
bool:
True if the table is empty, False if it is not empty.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors.
"""
logger = getLogger()
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to access table <{}>, which does not exist'.format(table_name))
logger.debug('Query: %s', self.cursor.mogrify(
self._query_checkEmptyTable.format(table_name=table_name)
).decode())
self.cursor.execute(
self._query_checkEmptyTable.format(table_name=table_name)
)
reply = self.cursor.fetchone()
logger.debug('Table exists: {}'.format(reply))
return reply[0] != 1
[docs] def fetch_next_serial_id(
self,
table_name: str,
) -> int:
""" Checks the latest ID of a serialized table and returns the next available one.
Parameters
----------
table_name : str
Name of the table to check.
Returns
-------
int:
Next available serial ID.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors.
"""
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if table is not empty
if self.check_empty_table(table_name):
return 1
# Get latest ID
getLogger().debug('Query: %s', self.cursor.mogrify(
self._query_retrieveLatestSerialID.format(table_name=table_name)
).decode())
self.cursor.execute(
self._query_retrieveLatestSerialID.format(table_name=table_name)
)
reply = self.cursor.fetchone()
getLogger().debug('Latest serial ID in table {}: {}'.format(table_name, reply))
return reply[0] + 1
[docs] def update_measurement(
self,
table_name: str,
measurement_id: int,
columns: List[str] = None,
values: List = None,
pairs: List[Union[Tuple, List]] = None
):
""" Updates the entry of a measurement.
Parameters
----------
table_name : str
Measurement type.
measurement_id: int
Measurement ID.
columns : List[str]
List of columns names corresponding to the data
values : List
Values of the new data entry
pairs: List[tuple]
List of pairs (variable, value)
Returns
-------
bool:
True if the table is empty, False if it is not empty.
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors.
"""
# Sanitize input
# If no "pairs" given, use lists of columns and values
if pairs is None:
# Check the list sizes are the same and non-empty
if values is None or columns is None:
raise DataError('Empty data')
if len(values) != len(columns):
raise DataError('Data and column lists have different sizes')
pairs = [(c, d) for c, d in zip(columns, values)]
# Check empty data
if len(pairs) == 0:
raise DataError('Empty data')
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to access table <{}>, which does not exist'.format(table_name))
# Debug log
getLogger().info('Modifying measurement entry {} in table <{}>: '.format(measurement_id, table_name))
for pair in pairs:
getLogger().debug('\t{:<25}{}'.format(pair[0], pair[1]))
# Now let's send the SQL query
try:
update_list = ', '.join(
'\n\t\t\t{} = \'{}\''.format(pair[0], pair[1]) for pair in pairs
)
getLogger().info('Query: %s', self.cursor.mogrify(
self._query_updateMeasurementEntry.format(
table_name=table_name,
update_list=update_list,
measurement_id=measurement_id
)
).decode())
self.cursor.execute(
self._query_updateMeasurementEntry.format(
table_name=table_name,
update_list=update_list,
measurement_id=measurement_id
)
)
self.connection.commit()
except (PSQL_Error, TypeError, ValueError) as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
getLogger().exception("{}: {}".format(type(e).__name__, e))
raise
else:
getLogger().info('Entry successfully updated')
[docs] def retrieve_entries(
self,
table_name: str,
conditions: Union[str, List[str]]
) -> DataFrame:
"""
Retrieves entries from a table with given conditions.
Parameters
----------
table_name: str
Name of the table to query
conditions: str, List[str]
Condition (or list of conditions) for data selection
Returns
-------
DataFrame
Pandas Dataframe with retrieved data, and column names as indexes
Raises
------
:class:`psycopg2.Error`
Base exception for all kind of database errors.
"""
# Sanitize input
if isinstance(conditions, str):
condition_exp = '\n\t\t\t{}'.format(conditions)
else:
condition_exp = ' AND '.join(
'\n\t\t\t{}'.format(c) for c in conditions
)
# Check database status
if self.connection is None:
raise DatabaseError('Trying to access database without initialization')
# Check if the table exists
if not self.check_table(table_name):
raise IntegrityError(
'Trying to access table <{}>, which does not exist'.format(table_name))
# Debug log
getLogger().info('Retrieving entries from table <{}> with conditions '.format(table_name, conditions))
# Now let's send the SQL query
try:
getLogger().info('Query: %s', self.cursor.mogrify(
self._query_retrieveEntry.format(
table_name=table_name,
conditions=condition_exp
)
).decode())
self.cursor.execute(
self._query_retrieveEntry.format(
table_name=table_name,
conditions=condition_exp
)
)
# Parse data
column_names = [desc[0] for desc in self.cursor.description]
data = self.cursor.fetchall()
reply = DataFrame(
data=data,
columns=column_names
)
getLogger().debug('Data: {}'.format(reply))
return reply
except (PSQL_Error, TypeError, ValueError) as e:
getLogger().error("{}: {}".format(type(e).__name__, e))
raise
except BaseException as e:
getLogger().exception("{}: {}".format(type(e).__name__, e))
raise