patternpythonMinor
Display Sensor Data to Web
Viewed 0 times
displaydatasensorweb
Problem
Last time I worked on a DHT11 sensor and got a lot of feedback, this time I have added another sensor (the BMP180), stored both of them in MySQL, and am now displaying them to the web.
The Raspberry Pi now has an HTTP server running on port 80, and I've built a small little UI to display all my sensor data.
So this is all provided by a few files, one of them is a JavaScript file, one is a CSS file, one is an HTML file, and the rest are python files.
We'll start with the JavaScript for the graphs. This is pretty simple and self explanatory, the biggest issue I have is I need to break this down further so that there's no duplication. This is
`function buildGraph(svgTitle, table, title) {
var svg = d3.select(svgTitle);
var margin = {top: 20, right: 20, bottom: 30, left: 40};
var width = +svg.attr("width") - margin.left - margin.right;
var height = +svg.attr("height") - margin.top - margin.bottom;
var g = svg.append("g").attr("transform", "translate(" + margin.left + "," + margin.top + ")");
var parseTime = d3.timeParse("%Y-%m-%d %H:%M:%S");
var x = d3.scaleTime().rangeRound([0, width]);
var y = d3.scaleLinear().rangeRound([height, 0]);
var line = d3.line()
.curve(d3.curveMonotoneX)
//.curve(d3.curveBasis)
.x(function(d) { return x(d.Logged); })
.y(function(d) { return y(d.Value); });
var area = d3.area()
.curve(d3.curveMonotoneX)
//.curve(d3.curveBasis)
.x(function(d) { return x(d.Logged); })
.y1(function(d) { return y(d.Value); });
d3.tsv("history.py?LogType=" + table, function(d) {
d.Logged = parseTime(d.Logged);
d.Value = +d.Value;
return d;
}, function(error, data) {
if (error) throw error;
x.domain(d3.extent(data, function(d) { return d.Logged; }));
var yS = y.domain(d3.extent(data, function(d) { return d.Value; })).nice();
area.y0(y(yS.domain()[0]));
g.append("g")
The Raspberry Pi now has an HTTP server running on port 80, and I've built a small little UI to display all my sensor data.
So this is all provided by a few files, one of them is a JavaScript file, one is a CSS file, one is an HTML file, and the rest are python files.
We'll start with the JavaScript for the graphs. This is pretty simple and self explanatory, the biggest issue I have is I need to break this down further so that there's no duplication. This is
graphs.js:`function buildGraph(svgTitle, table, title) {
var svg = d3.select(svgTitle);
var margin = {top: 20, right: 20, bottom: 30, left: 40};
var width = +svg.attr("width") - margin.left - margin.right;
var height = +svg.attr("height") - margin.top - margin.bottom;
var g = svg.append("g").attr("transform", "translate(" + margin.left + "," + margin.top + ")");
var parseTime = d3.timeParse("%Y-%m-%d %H:%M:%S");
var x = d3.scaleTime().rangeRound([0, width]);
var y = d3.scaleLinear().rangeRound([height, 0]);
var line = d3.line()
.curve(d3.curveMonotoneX)
//.curve(d3.curveBasis)
.x(function(d) { return x(d.Logged); })
.y(function(d) { return y(d.Value); });
var area = d3.area()
.curve(d3.curveMonotoneX)
//.curve(d3.curveBasis)
.x(function(d) { return x(d.Logged); })
.y1(function(d) { return y(d.Value); });
d3.tsv("history.py?LogType=" + table, function(d) {
d.Logged = parseTime(d.Logged);
d.Value = +d.Value;
return d;
}, function(error, data) {
if (error) throw error;
x.domain(d3.extent(data, function(d) { return d.Logged; }));
var yS = y.domain(d3.extent(data, function(d) { return d.Value; })).nice();
area.y0(y(yS.domain()[0]));
g.append("g")
Solution
You’ve got at least two bits of somehow duplicated code in your Python files. Better build a utility module to reduce that.
utils.py
You can then simplify your scripts:
latest.py
history.py
Writting that, I realize that there are even more duplicated code, let's refactor the hell out of it. You need:
Start and end arguments are simple and are done similarly to
You may optionnaly also use the connection's context-manager that provides you with a cursor open a transaction and auto commit/rollback it when leaving the block (even though it's not useful on a SELECT statement, it's also a good habit to have).
Also don't forget to
Lastly, formatting the response can be done by providing the formatting function an iterable of the response content. The easiest one to come up with is a list, but it can also be any kind of generator.
All in all, I would write:
utils.py
```
from datetime import datetime, timedelta
import cgi
import pymysql
REQUEST_ARGS = cgi.FieldStorage()
METRICS_FORMATTERS = {
"H
utils.py
import pymysql
METRICS_FORMATTERS = {
"HumidityLog": lambda x: x,
"BmpTempLog": lambda c: c * 1.8 + 32,
"PressureLog": lambda p: p * 0.0002953,
"AltitudeLog": lambda m: m * 3.28084,
"TemperatureLog": lambda c: c * 1.8 + 32,
"SeaPressureLog": lambda p: p * 0.0002953,
}
def connect_to_database():
return pymysql.connect(
db='WeatherLog',
user='WeatherLog',
passwd='Weather!',
host='localhost')You can then simplify your scripts:
latest.py
#!/usr/bin/python
from datetime import datetime, timedelta
import cgi
import utils
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_DAY_LIMIT = 7
DEFAULT_MINUTE_GROUPING = 1
if __name__ == '__main__':
print "Content-type: text/html"
print
args = cgi.FieldStorage()
start_date = (datetime.utcnow() - timedelta(days=DEFAULT_DAY_LIMIT)).strftime(DATE_FORMAT)
end_date = datetime.utcnow().strftime(DATE_FORMAT)
minute_group = DEFAULT_MINUTE_GROUPING
if "Start" in args:
start_date = args["Start"].value
if "End" in args:
end_date = args["End"].value
conn = utils.connect_to_database()
cursor = conn.cursor()
for table, formatter in utils.METRICS_FORMATTERS.iteritems():
cursor.execute("""
SELECT
Logged, AVG(Value)
FROM
{0}
WHERE
Logged >= '{1}' AND Logged <= '{2}'
GROUP BY
ROUND(UNIX_TIMESTAMP(Logged)/({3} * 60))
ORDER BY
Logged DESC
LIMIT
1;""".format(table, start_date, end_date, minute_group))
for row in cursor.fetchall():
print "{2}\t{0}\t{1}".format(row[0], formatter(row[1]), table[:-3])history.py
#!/usr/bin/python
from datetime import datetime, timedelta
import cgi
import utils
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_DAY_LIMIT = 7
DEFAULT_MINUTE_GROUPING = 3
if __name__ == '__main__':
print "Content-type: text/html"
print
args = cgi.FieldStorage()
start_date = (datetime.utcnow() - timedelta(days = DEFAULT_DAY_LIMIT)).strftime(DATE_FORMAT)
end_date = datetime.utcnow().strftime(DATE_FORMAT)
minute_group = DEFAULT_MINUTE_GROUPING
table = "TemperatureLog"
formatter = lambda c: c * 1.8 + 32
if "LogType" in args:
log_type = args["LogType"].value + "Log"
try:
formatter = utils.METRICS_FORMATTERS[log_type]
except KeyError:
pass
else:
table = log_type
if "Start" in args:
start_date = args["Start"].value
if "End" in args:
end_date = args["End"].value
conn = connect_to_database()
cursor = conn.cursor()
cursor.execute("""
SELECT
Logged, AVG(Value)
FROM
{0}
WHERE
Logged >= '{1}' AND Logged <= '{2}'
GROUP BY
ROUND(UNIX_TIMESTAMP(Logged)/({3} * 60));""".format(table, start_date, end_date, minute_group))
didfirst = 0
print "Logged\tValue"
for row in cursor.fetchall():
didfirst = 1
print "{0}\t{1}".format(row[0], formatter(row[1]))Writting that, I realize that there are even more duplicated code, let's refactor the hell out of it. You need:
- A way to retrieve start and end dates out of the query string or sensible defaults;
- A way to perform a
SELECTstatement;
- A way to format the response (with headers first, always).
Start and end arguments are simple and are done similarly to
connect_to_database(). Performing the query, though, can benefit from some advices. For starter, and even though the raspberry is on your network with only you accessing it, it always feel bad to see SQL queries with user-provided values injected directly into it. You should take the habit to use parametrized statements; pymysql using the 'format' kind of markers, you can use:cursor.execute("""
SELECT Logged, Avg(Value)
FROM %s
WHERE Logged >= '%s' AND Logged <= '%s'
GROUP BY ROUND(UNIX_TIMESTAMP(Logged)/(%s * 60));
""", (table, start_date, end_date, minute_group))You may optionnaly also use the connection's context-manager that provides you with a cursor open a transaction and auto commit/rollback it when leaving the block (even though it's not useful on a SELECT statement, it's also a good habit to have).
Also don't forget to
close() your connection once you’re done with it.Lastly, formatting the response can be done by providing the formatting function an iterable of the response content. The easiest one to come up with is a list, but it can also be any kind of generator.
All in all, I would write:
utils.py
```
from datetime import datetime, timedelta
import cgi
import pymysql
REQUEST_ARGS = cgi.FieldStorage()
METRICS_FORMATTERS = {
"H
Code Snippets
import pymysql
METRICS_FORMATTERS = {
"HumidityLog": lambda x: x,
"BmpTempLog": lambda c: c * 1.8 + 32,
"PressureLog": lambda p: p * 0.0002953,
"AltitudeLog": lambda m: m * 3.28084,
"TemperatureLog": lambda c: c * 1.8 + 32,
"SeaPressureLog": lambda p: p * 0.0002953,
}
def connect_to_database():
return pymysql.connect(
db='WeatherLog',
user='WeatherLog',
passwd='Weather!',
host='localhost')#!/usr/bin/python
from datetime import datetime, timedelta
import cgi
import utils
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_DAY_LIMIT = 7
DEFAULT_MINUTE_GROUPING = 1
if __name__ == '__main__':
print "Content-type: text/html"
print
args = cgi.FieldStorage()
start_date = (datetime.utcnow() - timedelta(days=DEFAULT_DAY_LIMIT)).strftime(DATE_FORMAT)
end_date = datetime.utcnow().strftime(DATE_FORMAT)
minute_group = DEFAULT_MINUTE_GROUPING
if "Start" in args:
start_date = args["Start"].value
if "End" in args:
end_date = args["End"].value
conn = utils.connect_to_database()
cursor = conn.cursor()
for table, formatter in utils.METRICS_FORMATTERS.iteritems():
cursor.execute("""
SELECT
Logged, AVG(Value)
FROM
{0}
WHERE
Logged >= '{1}' AND Logged <= '{2}'
GROUP BY
ROUND(UNIX_TIMESTAMP(Logged)/({3} * 60))
ORDER BY
Logged DESC
LIMIT
1;""".format(table, start_date, end_date, minute_group))
for row in cursor.fetchall():
print "{2}\t{0}\t{1}".format(row[0], formatter(row[1]), table[:-3])#!/usr/bin/python
from datetime import datetime, timedelta
import cgi
import utils
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
DEFAULT_DAY_LIMIT = 7
DEFAULT_MINUTE_GROUPING = 3
if __name__ == '__main__':
print "Content-type: text/html"
print
args = cgi.FieldStorage()
start_date = (datetime.utcnow() - timedelta(days = DEFAULT_DAY_LIMIT)).strftime(DATE_FORMAT)
end_date = datetime.utcnow().strftime(DATE_FORMAT)
minute_group = DEFAULT_MINUTE_GROUPING
table = "TemperatureLog"
formatter = lambda c: c * 1.8 + 32
if "LogType" in args:
log_type = args["LogType"].value + "Log"
try:
formatter = utils.METRICS_FORMATTERS[log_type]
except KeyError:
pass
else:
table = log_type
if "Start" in args:
start_date = args["Start"].value
if "End" in args:
end_date = args["End"].value
conn = connect_to_database()
cursor = conn.cursor()
cursor.execute("""
SELECT
Logged, AVG(Value)
FROM
{0}
WHERE
Logged >= '{1}' AND Logged <= '{2}'
GROUP BY
ROUND(UNIX_TIMESTAMP(Logged)/({3} * 60));""".format(table, start_date, end_date, minute_group))
didfirst = 0
print "Logged\tValue"
for row in cursor.fetchall():
didfirst = 1
print "{0}\t{1}".format(row[0], formatter(row[1]))cursor.execute("""
SELECT Logged, Avg(Value)
FROM %s
WHERE Logged >= '%s' AND Logged <= '%s'
GROUP BY ROUND(UNIX_TIMESTAMP(Logged)/(%s * 60));
""", (table, start_date, end_date, minute_group))from datetime import datetime, timedelta
import cgi
import pymysql
REQUEST_ARGS = cgi.FieldStorage()
METRICS_FORMATTERS = {
"HumidityLog": lambda x: x,
"BmpTempLog": lambda c: c * 1.8 + 32,
"PressureLog": lambda p: p * 0.0002953,
"AltitudeLog": lambda m: m * 3.28084,
"TemperatureLog": lambda c: c * 1.8 + 32,
"SeaPressureLog": lambda p: p * 0.0002953,
}
def connect_to_database():
return pymysql.connect(
db='WeatherLog',
user='WeatherLog',
passwd='Weather!',
host='localhost')
def select_from_database(cursor, table, start_date, end_date, minute_grouping):
cursor.execute("""
SELECT Logged, Avg(Value)
FROM %s
WHERE Logged >= '%s' AND Logged <= '%s'
GROUP BY ROUND(UNIX_TIMESTAMP(Logged)/(%s * 60));
""", (table, start_date, end_date, minute_grouping))
return cursor.fetchall()
def date_from_now(days_limit, fmt):
earlier = datetime.utcnow() - timedelta(days=days_limit)
return earlier.strftime(fmt)
def retrieve_start_and_end_date(days_limit=7, fmt="%Y-%m-%d %H:%M:%S"):
date_start = REQUEST_ARGS.getvalue('Start') if 'Start' in REQUEST_ARGS else date_from_now(days_limit, fmt)
end_date = REQUEST_ARGS.getvalue('End') if 'End' in REQUEST_ARGS else datetime.utcnow().strftime(fmt)
return date_start, date_end
def send_response(content, headers=["Content-type: text/html"]):
for header in headers:
print header
print
for line in content:
print lineContext
StackExchange Code Review Q#156832, answer score: 3
Revisions (0)
No revisions yet.