HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Select external resource to feed data into a processing function

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
resourceintofunctionfeedexternalselectprocessingdata

Problem

I wrote an application a while back to plan and manage drones races. (Well, it could be anything with identification numbers going through checkpoints.) At that time, communication between the gates (checkpoints) and the device running this application was done through XBee radios. When a gate detected a drone, it sent an XBee dataframe containing the gate identification letter and the number ID of the detected drone.

The application was responsible of reading and interpretting these XBee messages. It was done through the xbee module. I also included an stdin reader for debug/testing purposes.

For a new event, it was decided to switch from XBee communication to WiFi. So I was asked to handle messages through UDP datagrams. I would like to put the data readers and their selection at startup to review. The whole code is being accessible on GitHub and some other parts might end up as new questions in a near future.

droneracer.py

```
import os

from argparse import ArgumentParser
import drone_racer

# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))

parser = ArgumentParser(description='Interface graphique "Drone Racer"')

# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')

# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')

# UDP args
parser.add_argument('--use-udp', dest='udp

Solution

Improve consistency

Duck-typing can be useful in some situation, but letting users that may want to provide their own reader for their own devices figure out the internals of your particular duck-typing is not very friendly.

Look at how you define your reader in droneracer.py:

if args.serial is not None:
    reader = drone_racer.XBeeReader(
            args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
    reader = drone_racer.UDPReader(args.port)
else:
    reader = drone_racer.StdInReader


Why construct an (factory) object in the first two cases and use the class object in the third one? How are we supposed to make the difference between the two?

You could, instead, have BaseReader objects callable (instead of initializable) with the callback function and start the thread at this moment only. It gives you more flexibility on how to initialize your objects and has two advantages:

  • classes that acts as both the thread and its initializer can be merged (so you can remove the UDPReader factory);



  • classes that need an external initializer (such as xbee.XBee or xbee.ZigBee) can use a factory and provide the exact same interface than BaseReader derived classes.



Improve documentation

PEP8 recommend to limit docstring length to 72 characters.

You document classes and methods but do not provide a docstring to populate the modules __doc__ from.

You could use the __all__ variable in drone_racer/__init__.py to improve the module's help. Classes that you include in this list will have their documentation merged to the one of the module when using help(drone_racer). It will also limit the amount of objects imported when/if using from drone_racer import *.

Rewriting

drone_racer/threads.py

``
"""Collection of classes to crete threaded objects allowing to read
data from various sources.

Readers should be created with whatever parameter they require and
then allow to be called with a callback function. This call return
the threaded object reading data.

These threaded objects are started immediatly and monitor incomming
data to normalize them before feeding them into the callback function.
They can easily be halted using their
stop` method.
"""

import os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None

class BaseReader(Thread):
"""Base class for custom data readers."""

def __init__(self):
"""Spawn a thread that will continuously read data for drones
statuses.
"""
super().__init__(name="reader")

def __call__(self, update_function):
"""Starts the thread with the given callback function to
process data with.

Parameter:
- update_function: the function that will be called each time
a valid data is read.
"""
self._update_data = update_function
self._should_continue = True
self.start()
# Return ourselves to allow for duck typing and other classes
# to return other kind of objects (see XBeeReader).
return self

def run(self):
"""The main action of the thread.

Wait for data, read them and send them to the rest of the
application for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)

def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False

def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")

def _process_value(self, gate, drone):
"""Send input data to the rest of the application.

Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone
try:
gate, drone = msg.split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', msg, 'a été reçu mais n’est pas'
'compris par l’application.', file=sys.stderr)
print(e, file=sys.stderr)
else:
return gate, drone

if XBee is None:
class XBeeReader(BaseReader):
"""Read data from a serial port bound to an XBee.
Dummy implementation because xbee module could not be loaded.
"""

def __init__(self, *args, **kwargs):
"""Accepts argum

Code Snippets

if args.serial is not None:
    reader = drone_racer.XBeeReader(
            args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
    reader = drone_racer.UDPReader(args.port)
else:
    reader = drone_racer.StdInReader
"""Collection of classes to crete threaded objects allowing to read
data from various sources.

Readers should be created with whatever parameter they require and
then allow to be called with a callback function. This call return
the threaded object reading data.

These threaded objects are started immediatly and monitor incomming
data to normalize them before feeding them into the callback function.
They can easily be halted using their `stop` method.
"""


import os
import sys
import socket
from threading import Thread
from select import select
try:
    from serial import Serial
    from xbee import XBee, ZigBee
except ImportError:
    XBee = None


class BaseReader(Thread):
    """Base class for custom data readers."""

    def __init__(self):
        """Spawn a thread that will continuously read data for drones
        statuses.
        """
        super().__init__(name="reader")

    def __call__(self, update_function):
        """Starts the thread with the given callback function to
        process data with.

        Parameter:
          - update_function: the function that will be called each time
            a valid data is read.
        """
        self._update_data = update_function
        self._should_continue = True
        self.start()
        # Return ourselves to allow for duck typing and other classes
        # to return other kind of objects (see XBeeReader).
        return self

    def run(self):
        """The main action of the thread.

        Wait for data, read them and send them to the rest of the
        application for further computation.
        """
        while self._should_continue:
            try:
                gate, drone = self.read_new_value()
            except TypeError:
                pass
            else:
                self._process_value(gate, drone)

    def stop(self):
        """Signal that the thread has to stop reading its inputs."""
        self._should_continue = False

    def read_new_value(self):
        """Read input data and return them as a tuple (gate identifier,
        drone number). Subclasses must implement this method.
        """
        raise NotImplementedError("Subclasses must implement this method")

    def _process_value(self, gate, drone):
        """Send input data to the rest of the application.

        Parameters:
          - gate: the gate identification letter(s)
          - drone: the drone identification number (0-based)
        """
        if drone < 0:
            return
        self._update_data(gate, drone)


class StdInReader(BaseReader):
    """Read data from stdin. Primarily used for tests and debug."""

    def read_new_value(self):
        """Read input data and return them as a tuple (gate identifier,
        drone number).

        Convert data such as "0 1" to the tuple ('A', 1).
        """
        raw = input('[@] ').split()
        try:
            gate, drone = raw
            return chr(int(gate) + ord('A')), int(drone)
        except ValueError:
"""Pubilc interface to the various components defined in this package.

Allows to construct the GUI responsible of the whole application
and to select a reader from the built-in ones.
"""


from .ui import DroneRacer as Application
from .threads import StdInReader, XBeeReader, UDPReader


__all__ = [
    'Application',
    'StdInReader',
    'XBeeReader',
    'UDPReader',
]
"""Drone Racer is a project primarily developed for the DroneFest
organized as part of the FabLab Festival 2015. Its aim is to provide
an all-in-one interface for races organizers to:

  - create different events for drones competition;
  - register contestants and their associated drones;
  - classify drones into categories;
  - create several routes with their own set of rules for each event;
  - setup and monitor races on a designated route;
  - gather statistics on races for drivers, event or kind of route.

To reduce the overhead of having extraneous services for database
access, Drone Racer makes use of the python's built-in sqlite module.
It uses it to store informations on the contestants, the drones, the
different type of routes and the races leaderboards.

Additionally, setup, updates & leaderboard for each race can be sent
to a RESTful API for the audience.
"""


import os

from argparse import ArgumentParser
import drone_racer


# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))

parser = ArgumentParser(description='Interface graphique "Drone Racer"')

# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
                    help='Utilise une barre de titre un peu plus Gtk3')

# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
                    default=None, help='Spécifie le port série à utiliser '
                    'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
                    help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
                    type=int, default=9600, help='Débit du port série '
                    'utilisé pour la connexion avec le module XBee')

# UDP args
parser.add_argument('--use-udp', dest='udp', action='store_true',
                    help='Spécifie si la communication doit se faire '
                    'par datagrames UDP.')
parser.add_argument('--port', dest='port', metavar='NUM', type=int,
                    default=4387, help='Port à utiliser pour l’écoute UDP')

# Choose the appropriate reader
args = parser.parse_args()
if args.serial is not None:
    reader = drone_racer.XBeeReader(
            args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
    reader = drone_racer.UDPReader(args.port)
else:
    reader = drone_racer.StdInReader()

# Launch the GUI (which will, in turn, start the reader)
app = drone_racer.Application(reader, args.fancy)
app.run()

Context

StackExchange Code Review Q#115181, answer score: 3

Revisions (0)

No revisions yet.