patternpythonMinor
Select external resource to feed data into a processing function
Viewed 0 times
resourceintofunctionfeedexternalselectprocessingdata
Problem
I wrote an application a while back to plan and manage drones races. (Well, it could be anything with identification numbers going through checkpoints.) At that time, communication between the gates (checkpoints) and the device running this application was done through XBee radios. When a gate detected a drone, it sent an XBee dataframe containing the gate identification letter and the number ID of the detected drone.
The application was responsible of reading and interpretting these XBee messages. It was done through the
For a new event, it was decided to switch from XBee communication to WiFi. So I was asked to handle messages through UDP datagrams. I would like to put the data readers and their selection at startup to review. The whole code is being accessible on GitHub and some other parts might end up as new questions in a near future.
droneracer.py
```
import os
from argparse import ArgumentParser
import drone_racer
# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))
parser = ArgumentParser(description='Interface graphique "Drone Racer"')
# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')
# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
# UDP args
parser.add_argument('--use-udp', dest='udp
The application was responsible of reading and interpretting these XBee messages. It was done through the
xbee module. I also included an stdin reader for debug/testing purposes.For a new event, it was decided to switch from XBee communication to WiFi. So I was asked to handle messages through UDP datagrams. I would like to put the data readers and their selection at startup to review. The whole code is being accessible on GitHub and some other parts might end up as new questions in a near future.
droneracer.py
```
import os
from argparse import ArgumentParser
import drone_racer
# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))
parser = ArgumentParser(description='Interface graphique "Drone Racer"')
# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')
# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
# UDP args
parser.add_argument('--use-udp', dest='udp
Solution
Improve consistency
Duck-typing can be useful in some situation, but letting users that may want to provide their own reader for their own devices figure out the internals of your particular duck-typing is not very friendly.
Look at how you define your reader in
Why construct an (factory) object in the first two cases and use the class object in the third one? How are we supposed to make the difference between the two?
You could, instead, have
Improve documentation
PEP8 recommend to limit docstring length to 72 characters.
You document classes and methods but do not provide a docstring to populate the modules
You could use the
Rewriting
drone_racer/threads.py
``
"""
import os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None
class BaseReader(Thread):
"""Base class for custom data readers."""
def __init__(self):
"""Spawn a thread that will continuously read data for drones
statuses.
"""
super().__init__(name="reader")
def __call__(self, update_function):
"""Starts the thread with the given callback function to
process data with.
Parameter:
- update_function: the function that will be called each time
a valid data is read.
"""
self._update_data = update_function
self._should_continue = True
self.start()
# Return ourselves to allow for duck typing and other classes
# to return other kind of objects (see XBeeReader).
return self
def run(self):
"""The main action of the thread.
Wait for data, read them and send them to the rest of the
application for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)
def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")
def _process_value(self, gate, drone):
"""Send input data to the rest of the application.
Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone
try:
gate, drone = msg.split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', msg, 'a été reçu mais n’est pas'
'compris par l’application.', file=sys.stderr)
print(e, file=sys.stderr)
else:
return gate, drone
if XBee is None:
class XBeeReader(BaseReader):
"""Read data from a serial port bound to an XBee.
Dummy implementation because xbee module could not be loaded.
"""
def __init__(self, *args, **kwargs):
"""Accepts argum
Duck-typing can be useful in some situation, but letting users that may want to provide their own reader for their own devices figure out the internals of your particular duck-typing is not very friendly.
Look at how you define your reader in
droneracer.py:if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReaderWhy construct an (factory) object in the first two cases and use the class object in the third one? How are we supposed to make the difference between the two?
You could, instead, have
BaseReader objects callable (instead of initializable) with the callback function and start the thread at this moment only. It gives you more flexibility on how to initialize your objects and has two advantages:- classes that acts as both the thread and its initializer can be merged (so you can remove the
UDPReaderfactory);
- classes that need an external initializer (such as
xbee.XBeeorxbee.ZigBee) can use a factory and provide the exact same interface thanBaseReaderderived classes.
Improve documentation
PEP8 recommend to limit docstring length to 72 characters.
You document classes and methods but do not provide a docstring to populate the modules
__doc__ from.You could use the
__all__ variable in drone_racer/__init__.py to improve the module's help. Classes that you include in this list will have their documentation merged to the one of the module when using help(drone_racer). It will also limit the amount of objects imported when/if using from drone_racer import *.Rewriting
drone_racer/threads.py
``
"""Collection of classes to crete threaded objects allowing to read
data from various sources.
Readers should be created with whatever parameter they require and
then allow to be called with a callback function. This call return
the threaded object reading data.
These threaded objects are started immediatly and monitor incomming
data to normalize them before feeding them into the callback function.
They can easily be halted using their stop` method."""
import os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None
class BaseReader(Thread):
"""Base class for custom data readers."""
def __init__(self):
"""Spawn a thread that will continuously read data for drones
statuses.
"""
super().__init__(name="reader")
def __call__(self, update_function):
"""Starts the thread with the given callback function to
process data with.
Parameter:
- update_function: the function that will be called each time
a valid data is read.
"""
self._update_data = update_function
self._should_continue = True
self.start()
# Return ourselves to allow for duck typing and other classes
# to return other kind of objects (see XBeeReader).
return self
def run(self):
"""The main action of the thread.
Wait for data, read them and send them to the rest of the
application for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)
def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")
def _process_value(self, gate, drone):
"""Send input data to the rest of the application.
Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone
try:
gate, drone = msg.split(b':')
gate = gate.decode()
# Compensate for the drone numbering vs. its indexing
drone = int(drone) - 1
except (UnicodeError, ValueError) as e:
print('Le message', msg, 'a été reçu mais n’est pas'
'compris par l’application.', file=sys.stderr)
print(e, file=sys.stderr)
else:
return gate, drone
if XBee is None:
class XBeeReader(BaseReader):
"""Read data from a serial port bound to an XBee.
Dummy implementation because xbee module could not be loaded.
"""
def __init__(self, *args, **kwargs):
"""Accepts argum
Code Snippets
if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReader"""Collection of classes to crete threaded objects allowing to read
data from various sources.
Readers should be created with whatever parameter they require and
then allow to be called with a callback function. This call return
the threaded object reading data.
These threaded objects are started immediatly and monitor incomming
data to normalize them before feeding them into the callback function.
They can easily be halted using their `stop` method.
"""
import os
import sys
import socket
from threading import Thread
from select import select
try:
from serial import Serial
from xbee import XBee, ZigBee
except ImportError:
XBee = None
class BaseReader(Thread):
"""Base class for custom data readers."""
def __init__(self):
"""Spawn a thread that will continuously read data for drones
statuses.
"""
super().__init__(name="reader")
def __call__(self, update_function):
"""Starts the thread with the given callback function to
process data with.
Parameter:
- update_function: the function that will be called each time
a valid data is read.
"""
self._update_data = update_function
self._should_continue = True
self.start()
# Return ourselves to allow for duck typing and other classes
# to return other kind of objects (see XBeeReader).
return self
def run(self):
"""The main action of the thread.
Wait for data, read them and send them to the rest of the
application for further computation.
"""
while self._should_continue:
try:
gate, drone = self.read_new_value()
except TypeError:
pass
else:
self._process_value(gate, drone)
def stop(self):
"""Signal that the thread has to stop reading its inputs."""
self._should_continue = False
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number). Subclasses must implement this method.
"""
raise NotImplementedError("Subclasses must implement this method")
def _process_value(self, gate, drone):
"""Send input data to the rest of the application.
Parameters:
- gate: the gate identification letter(s)
- drone: the drone identification number (0-based)
"""
if drone < 0:
return
self._update_data(gate, drone)
class StdInReader(BaseReader):
"""Read data from stdin. Primarily used for tests and debug."""
def read_new_value(self):
"""Read input data and return them as a tuple (gate identifier,
drone number).
Convert data such as "0 1" to the tuple ('A', 1).
"""
raw = input('[@] ').split()
try:
gate, drone = raw
return chr(int(gate) + ord('A')), int(drone)
except ValueError:"""Pubilc interface to the various components defined in this package.
Allows to construct the GUI responsible of the whole application
and to select a reader from the built-in ones.
"""
from .ui import DroneRacer as Application
from .threads import StdInReader, XBeeReader, UDPReader
__all__ = [
'Application',
'StdInReader',
'XBeeReader',
'UDPReader',
]"""Drone Racer is a project primarily developed for the DroneFest
organized as part of the FabLab Festival 2015. Its aim is to provide
an all-in-one interface for races organizers to:
- create different events for drones competition;
- register contestants and their associated drones;
- classify drones into categories;
- create several routes with their own set of rules for each event;
- setup and monitor races on a designated route;
- gather statistics on races for drivers, event or kind of route.
To reduce the overhead of having extraneous services for database
access, Drone Racer makes use of the python's built-in sqlite module.
It uses it to store informations on the contestants, the drones, the
different type of routes and the races leaderboards.
Additionally, setup, updates & leaderboard for each race can be sent
to a RESTful API for the audience.
"""
import os
from argparse import ArgumentParser
import drone_racer
# Be sure to be at the right place for relative path of images in Gtk
os.chdir(os.path.dirname(os.path.abspath(__file__)))
parser = ArgumentParser(description='Interface graphique "Drone Racer"')
# GUI args
parser.add_argument('--fancy-title', dest='fancy', action='store_true',
help='Utilise une barre de titre un peu plus Gtk3')
# XBee args
parser.add_argument('--serial-port', dest='serial', metavar='FILE',
default=None, help='Spécifie le port série à utiliser '
'pour récupérer les informations provenant du XBee')
parser.add_argument('--zigbee', dest='zigbee', action='store_true',
help='Spécifie si le module XBee est un ZigBee')
parser.add_argument('--baudrate', dest='baudrate', metavar='BPS',
type=int, default=9600, help='Débit du port série '
'utilisé pour la connexion avec le module XBee')
# UDP args
parser.add_argument('--use-udp', dest='udp', action='store_true',
help='Spécifie si la communication doit se faire '
'par datagrames UDP.')
parser.add_argument('--port', dest='port', metavar='NUM', type=int,
default=4387, help='Port à utiliser pour l’écoute UDP')
# Choose the appropriate reader
args = parser.parse_args()
if args.serial is not None:
reader = drone_racer.XBeeReader(
args.serial, args.baudrate, zigbee=args.zigbee)
elif args.udp:
reader = drone_racer.UDPReader(args.port)
else:
reader = drone_racer.StdInReader()
# Launch the GUI (which will, in turn, start the reader)
app = drone_racer.Application(reader, args.fancy)
app.run()Context
StackExchange Code Review Q#115181, answer score: 3
Revisions (0)
No revisions yet.