patternpythonMinor
Loading 40GB twitter JSON TAR file from archive.org and load into PostgreSQL
Viewed 0 times
postgresqlfilearchiveintoorgandloadingloadjsontwitter
Problem
I wrote the script below to load data obtained from the twitter JSON archive on archive.org into a PostgreSQL database.
I'm looking for optimizations in the code. It currently runs at ~1.7 seconds per file (of 50,000 files), or loads 3 million rows in an hour.
Would I be looking at multithreading? The first run took approximately 12 hours (see profiler below).
Also, my setup is as follows:
```
"""
Read the output of an extracted TAR twitter archive from:
https://archive.org/details/twitterstream
"""
import bz2
import datetime
import json
import os
import profile
import psycopg2
from pprint import pprint
with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()
conn = psycopg2.connect(DB_CONNECTIONSTRING)
CACHE_DIR = "H:/Twitter datastream/PYTHONCACHE"
def load_bz2_json(filename):
""" Takes a bz2 filename, returns the tweets as a list of tweet dictionaries"""
with open(filename, 'rb') as f:
s = f.read()
lines = bz2.decompress(s).split("\n")
tweets = []
for line in lines:
try:
if line == "":
num_lines -= 1
continue
tweets.append(json.loads(line))
except: # I'm kind of lenient as I have millions of tweets, most errors were due to encoding or so)
continue
return tweets
def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
except KeyError:
return tweets_saved
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datet
I'm looking for optimizations in the code. It currently runs at ~1.7 seconds per file (of 50,000 files), or loads 3 million rows in an hour.
Would I be looking at multithreading? The first run took approximately 12 hours (see profiler below).
Also, my setup is as follows:
- Python 2.7, 32-bit, Windows 8
- PostgreSQL running on an external USB 3.0 hard drive
- TAR files are on that same USB hard drive
```
"""
Read the output of an extracted TAR twitter archive from:
https://archive.org/details/twitterstream
"""
import bz2
import datetime
import json
import os
import profile
import psycopg2
from pprint import pprint
with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()
conn = psycopg2.connect(DB_CONNECTIONSTRING)
CACHE_DIR = "H:/Twitter datastream/PYTHONCACHE"
def load_bz2_json(filename):
""" Takes a bz2 filename, returns the tweets as a list of tweet dictionaries"""
with open(filename, 'rb') as f:
s = f.read()
lines = bz2.decompress(s).split("\n")
tweets = []
for line in lines:
try:
if line == "":
num_lines -= 1
continue
tweets.append(json.loads(line))
except: # I'm kind of lenient as I have millions of tweets, most errors were due to encoding or so)
continue
return tweets
def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
except KeyError:
return tweets_saved
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datet
Solution
Improving throughput
If you want to get the best possible throughput, you need to decompose this into a few processes.
The decompression can be handled with
psycopg2's
The approach would be to move your JSON-parsing and record generating code into a standalone Python script that would read the uncompressed JSON records from
You would then modify your driver script to walk through all of the compressed data files and pass them through the pipeline (refer to
You could also process the files directly from the .tar file:
Encoding problems
It's not clear which version of Python you are using but I suspect that calling
Your load_bz2_json function could be vastly simplified to:
If the above code still produces encoding errors, comment below.
If you want to get the best possible throughput, you need to decompose this into a few processes.
The decompression can be handled with
bzcat, available for Windows as part of the GnuWin tools.psycopg2's
execute sends commands to the database synchronously and one-at-a-time. The executemany method also issues commands synchronously and one-at-a-time. Rather than attempt to get around this in Python, you could instead use PostgreSQL's console-based tool, psql.The approach would be to move your JSON-parsing and record generating code into a standalone Python script that would read the uncompressed JSON records from
stdin and write CSV data to stdout which would be passed to the psql process that would read it as part of a \copy (refer to the documentation for psql and the command). The transform script will look something like:import csv
def json_to_tuple(tweet):
return (tweet['id'],
tweet['text'],
tweet.get('lang','\\N'), # \N is NULL placeholder for PSQL COPY
tweet['created_at'],
datetime.datetime.now(), # FIXME: Format for PSQL COPY
json.dumps(tweet)
)
tweets = (tweet for tweet in map(json.loads, sys.stdin) if 'id' in tweet)
csvwriter = csv.writer(sys.stdout)
csvwriter.writerows(map(json_to_tuple, tweets))You would then modify your driver script to walk through all of the compressed data files and pass them through the pipeline (refer to
Popen in the subprocess module).You could also process the files directly from the .tar file:
tar xvfO archiveteam-twitter-stream-2014-02.tar --wildcards '*.bz2' | bzcat | python3 transform.py | psql -1f copy.sqlEncoding problems
It's not clear which version of Python you are using but I suspect that calling
.split("\n") is the source of the encoding errors. In Python 3.4.0, that raises a "TypeError: Type str doesn't support the buffer API" but it's possible in other versions you could end up with an object with the wrong encoding. You should not be encountering any errors with the source files.Your load_bz2_json function could be vastly simplified to:
def load_bz2_json(filename):
with bz2.BZ2File(filename,'r') as f:
for line in f:
tweet = json.loads(line)
yield tweetIf the above code still produces encoding errors, comment below.
Code Snippets
import csv
def json_to_tuple(tweet):
return (tweet['id'],
tweet['text'],
tweet.get('lang','\\N'), # \N is NULL placeholder for PSQL COPY
tweet['created_at'],
datetime.datetime.now(), # FIXME: Format for PSQL COPY
json.dumps(tweet)
)
tweets = (tweet for tweet in map(json.loads, sys.stdin) if 'id' in tweet)
csvwriter = csv.writer(sys.stdout)
csvwriter.writerows(map(json_to_tuple, tweets))tar xvfO archiveteam-twitter-stream-2014-02.tar --wildcards '*.bz2' | bzcat | python3 transform.py | psql -1f copy.sqldef load_bz2_json(filename):
with bz2.BZ2File(filename,'r') as f:
for line in f:
tweet = json.loads(line)
yield tweetContext
StackExchange Code Review Q#77022, answer score: 3
Revisions (0)
No revisions yet.