Source code for lab_utils.socket_comm

""" Server/client communication via TCP sockets.
The module implements TCP communication between
a daemon-like :class:`Server` and a simple
:class:`Client`.

The :class:`Server` class is meant to be
run as a daemon-like app. The user should
override the :meth:`~Server.create_parser`
method to define the daemon behaviour upon
reception of a message from a :class:`Client`.
The base class provides support for the
message 'quit', which will terminate the
daemon. Any other message will be met with
a help-like reply.

The :class:`Client` class communicates with
the :class:`Server` sending a text string.

The :class:`ArgumentParser` class and
:class:`MessageError` exception are
necessary to override some unwanted
default behaviour of the
:obj:`argparse` library.

The module is based upon `this tutorial
<https://pymotw.com/2/socket/tcp.html>`_.

Attributes
----------
buffer_size: int, 1024
    Maximum length of a transmitted messages
"""

import socket
import configparser
import logging
from pkg_resources import resource_filename
from zc.lockfile import LockFile, LockError
import argparse

# Maximum length of transmitted messages
buffer_size: int = 1024


[docs]class ArgumentParser(argparse.ArgumentParser): """ Modifies error management of the :obj:`argparse` library. """
[docs] def error(self, message: str): raise MessageError(message)
[docs]class MessageError(BaseException): """ Invalid message. """ pass
[docs]class Server: """ Daemon-like TCP Server. The user should override the :meth:`create_parser` method, which defines the server behaviour upon reception of a message from a :class:`Client`. .. _LockFile: https://pypi.org/project/zc.lockfile/ """ # Flags quit_flag: bool = False #: Internal flag to stop the daemon. # TCP configuration host: str = 'localhost' #: Host address. port: int = 1507 #: Connection port. sock: socket.SocketType = None #: Connection socket. address: str = None #: TCP binding address. max_backlog: int = 1 #: TCP connection queue. # Server variables logger: logging.Logger = None #: Single logger for the whole class. message: str = '' #: Message from the client. reply: str = '' #: Reply to the client. # Daemon pid_file_name: str = '/tmp/socket_comm.pid' #: The PID file name lock: LockFile = None #: LockFile object. # Message parsing namespace: argparse.Namespace = None #: Attribute container parser: ArgumentParser = None #: Argument parser. # noinspection PyProtectedMember sp: argparse._SubParsersAction = None #: Argument subparser
[docs] def __init__(self, config_file: str = None, pid_file_name: str = None): """ Initializes and runs the :class:`Server` object. The constructor calls the :meth:`config` method to read out the server attributes, and initializes the :attr:`logger` and the message :attr:`parser`. The method :meth:`daemonize` tries to lock the PID file, and finally :meth:`run` is called, which starts an endless loop listening on the specified TCP :attr:`port`. Parameters ---------- config_file : str, optional Configuration file, default is `None`. pid_file_name : str, optional If given, overrides the default :attr:`PID file name<pid_file_name>`. Raises ------ :class:`configparser.Error` Configuration file error :class:`LockError` The PID file could not be locked (see `here <https://pypi.org/project/zc.lockfile/>`_). :class:`OSError` Various socket errors, e.g. address or timeout """ # Read _config file if config_file is not None: self.config(config_file) # Override PID file if pid_file_name is not None: self.pid_file_name = pid_file_name # Use a single logger for all server messages self.logger = logging.getLogger('Server on {h}:{p}'.format(h=self.host, p=self.port)) # Initialize parser self.create_parser() # Lock the PID file, raise LockError if it fails self.daemonize() # Run the server self.run()
def __del__(self): """ Releases the PID lock file and the TCP socket. """ if self.address is not None: self.logger.info('Closing socket') self.sock.close() if self.lock is not None: self.logger.info('Releasing PID file') self.lock.close()
[docs] def daemonize(self): """ Locks a PID file to ensure that a single instance of the server is running. Based on the (poorly documented) `zc.lockfile <https://pypi.org/project/zc.lockfile/>`_ package. Raises ------ :class:`LockError` The PID file could not be locked. """ try: self.logger.info('Locking PID file {f}'.format(f=self.pid_file_name)) self.lock = LockFile( path=self.pid_file_name, content_template='{pid};{hostname}' ) except LockError as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise
[docs] def config(self, filename: str): """ Loads the server configuration from a file. Parameters ---------- filename : str The file name to be read. Raises ------ :class:`configparser.Error` If an error happened while parsing the file, e.g. no file was found """ # Use a logger named like the module itself logger = logging.getLogger(__name__) logger.info("Loading configuration file %s", filename) try: # Initialize _config parser and read file config_parser = configparser.ConfigParser() config_parser.read(filename) # Assign values to class attributes self.host = config_parser.get(section='Overall', option='host', fallback='localhost') self.port = config_parser.getint(section='Overall', option='port', fallback=1507) except configparser.Error as e: logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def run(self): """ Starts the server. The server will _run in an endless loop until the message 'quit' is received. Clients can connect to the TCP port and send text string. The message will be parsed by the :attr:`parser`, which will call the respective function. If the message is invalid, a help string will be sent to the client. Raises ------ :class:`OSError` Various socket errors, e.g. address or timeout """ try: # Bind the server to the address self.logger.info('Binding to address {h}:{p}'.format(h=self.host, p=self.port)) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((self.host, self.port)) self.address = self.sock.getsockname()[1] self.sock.listen(self.max_backlog) self.logger.info('Server is now listening, send a \'quit\' message to stop it') # Endless loop while not self.quit_flag: # Accept connection connection, client_address = self.sock.accept() try: # Receive message self.logger.debug('Client connected from %s', client_address) self.message = connection.recv(buffer_size).decode().rstrip() self.logger.debug('Message: %s', self.message.split()) # Parse message and call appropriate task try: self.namespace = self.parser.parse_args( args=self.message.split(), ) self.namespace.func() # If the message is 'quit', make sure we quit regardless # of what the user did to the quit() method self.quit_flag = self.namespace.which == 'quit' except MessageError as e: # Invalid message self.logger.warning("{}: {}".format(type(e).__name__, e)) self.reply = '\'{m}\' is an invalid message\n{h}'.format( m=self.message, h=self.parser.format_help() ) self.logger.debug('Sending help message to the client') except BaseException as e: # Unknown error self.logger.exception("{}: {}".format(type(e).__name__, e)) self.reply = 'Error! Check daemon log!\n{}: {}'.format(type(e).__name__, e) else: # All good self.logger.debug('Sending reply: %s', self.reply) # Send reply to client # TODO: check that len(self.reply) < buffer_size connection.sendall(self.reply.encode()) except OSError as e: self.logger.error("{}: {}".format(type(e).__name__, e)) self.logger.error("Server could recover and is still listening") except BaseException as e: self.logger.exception("{}: {}".format(type(e).__name__, e)) self.logger.error("Unexpected exception, server could recover and is still listening") finally: # Close connection self.logger.debug('Closing connection to client %s', client_address) connection.close() except OSError as e: self.logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: self.logger.exception("Unexpected exception") self.logger.exception("{}: {}".format(type(e).__name__, e)) raise
[docs] def create_parser(self): """ Configures the message :attr:`parser`. Upon reception of a message, :meth:`create_parser` will call the appropriate function. Other arguments given to the parser will be available in the :attr:`namespace` attribute. As an example, the subparser for the message 'quit' is implemented. The user should override the method :meth:`quit`, as well as implement other methods for the particular daemon tasks. """ # Initialize the parser self.parser = ArgumentParser( prog='daemon', description='socket_comm daemon example, the user should override this', add_help=False, epilog='epilogs are also valid', ) self.sp = self.parser.add_subparsers(title='message') # One subparser per task: 1. QUIT sp_quit = self.sp.add_parser( name='quit', help='stops the daemon' ) sp_quit.set_defaults( func=self.quit, which='quit' )
[docs] def quit(self): """ User-defined task example. The method is called by the :attr:`parser` when the message 'quit' is received. For the base class, it just says goodbye to the client. Users should override it to do proper clean-up of their daemon. """ self.reply = 'Goodbye!'
[docs]class Client: """ Simple TCP client to communicate with a running :class:`Server`. It sends a message and receives the reply from the server. """ # Attributes host: str = 'localhost' #: Host address. port: int = 1507 #: Connection port.
[docs] def __init__(self, config_file: str = None, host: str = None, port: int = None): """ Initializes the :class:`Client` object. If a :paramref:`~Client.__init__.config_file` is given, the constructor calls the :meth:`~.Client._config` method and overrides the default attributes. If the parameters :paramref:`host` and :paramref:`port` are given, they will override the configuration file. Parameters ---------- config_file : str, optional Configuration file name, default is `None`. Same as See the example TODO. host : str, optional Host address, default is `None`. port : int, optional Connection port, default is `None`. Raises ------ :class:`configparser.Error` If a configuration file name was given, the method :meth:`_config` can fail raising this exception. """ # Read _config file, if given if config_file is not None: self.config(config_file) # Override attributes, if given if host is not None: self.host = host if port is not None: self.port = port
[docs] def config(self, config_file: str = resource_filename(__name__, 'conf/server.ini')): """ Loads the configuration from a file. The method reads the :paramref:`config_file` using the library :obj:`configparser`. The structure of the file is shown in the :ref:`examples section<configuration-files>`. Parameters ---------- config_file : str, optional TODO Raises ------ :class:`configparser.Error` Error while parsing the file, e.g. no file was found, a parameter is missing or it has an invalid value. """ # Use a logger named like the module itself logger = logging.getLogger(__name__) logger.info("Loading configuration file %s", config_file) try: # Initialize _config parser and read file config_parser = configparser.ConfigParser() config_parser.read(config_file) # Assign values to class attributes self.host = config_parser.get(section='Overall', option='host', fallback='localhost') self.port = config_parser.getint(section='Overall', option='port', fallback=1507) except configparser.Error as e: logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed logger.exception("{}: {}".format(type(e).__name__, e)) raise else: logger.info("Configuration file loaded")
[docs] def send_message(self, msg: str) -> str: """ Complete communication process. Connects to the server, sends a message, gets the reply and closes the connection. Raises ------ :class:`OSError` Various socket errors, e.g. address or timeout Returns ------- str Reply from the server """ # Get the logger logger = logging.getLogger(__name__) logger.info('Sending message to the server: %s', msg) try: # Connect to the server sock = socket.create_connection((self.host, self.port)) # Send message # TODO: check message length sock.sendall(msg.encode()) # Get reply reply = sock.recv(buffer_size).decode() logger.info('Reply received: %s', reply) # Close the connection sock.close() except OSError as e: logger.error("{}: {}".format(type(e).__name__, e)) raise except BaseException as e: # Undefined exception, full traceback to be printed logger.exception("{}: {}".format(type(e).__name__, e)) raise else: return reply