HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Loading 40GB twitter JSON TAR file from archive.org and load into PostgreSQL

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
postgresqlfilearchiveintoorgandloadingloadjsontwitter

Problem

I wrote the script below to load data obtained from the twitter JSON archive on archive.org into a PostgreSQL database.

I'm looking for optimizations in the code. It currently runs at ~1.7 seconds per file (of 50,000 files), or loads 3 million rows in an hour.

Would I be looking at multithreading? The first run took approximately 12 hours (see profiler below).

Also, my setup is as follows:

  • Python 2.7, 32-bit, Windows 8



  • PostgreSQL running on an external USB 3.0 hard drive



  • TAR files are on that same USB hard drive



```
"""
Read the output of an extracted TAR twitter archive from:
https://archive.org/details/twitterstream
"""
import bz2
import datetime
import json
import os
import profile
import psycopg2
from pprint import pprint

with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()

conn = psycopg2.connect(DB_CONNECTIONSTRING)
CACHE_DIR = "H:/Twitter datastream/PYTHONCACHE"

def load_bz2_json(filename):
""" Takes a bz2 filename, returns the tweets as a list of tweet dictionaries"""
with open(filename, 'rb') as f:
s = f.read()
lines = bz2.decompress(s).split("\n")
tweets = []
for line in lines:
try:
if line == "":
num_lines -= 1
continue
tweets.append(json.loads(line))
except: # I'm kind of lenient as I have millions of tweets, most errors were due to encoding or so)
continue
return tweets

def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
except KeyError:
return tweets_saved

data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datet

Solution

Improving throughput

If you want to get the best possible throughput, you need to decompose this into a few processes.

The decompression can be handled with bzcat, available for Windows as part of the GnuWin tools.

psycopg2's execute sends commands to the database synchronously and one-at-a-time. The executemany method also issues commands synchronously and one-at-a-time. Rather than attempt to get around this in Python, you could instead use PostgreSQL's console-based tool, psql.

The approach would be to move your JSON-parsing and record generating code into a standalone Python script that would read the uncompressed JSON records from stdin and write CSV data to stdout which would be passed to the psql process that would read it as part of a \copy (refer to the documentation for psql and the command). The transform script will look something like:

import csv

def json_to_tuple(tweet):
    return (tweet['id'],
            tweet['text'],
            tweet.get('lang','\\N'), # \N is NULL placeholder for PSQL COPY
            tweet['created_at'],
            datetime.datetime.now(), # FIXME: Format for PSQL COPY
            json.dumps(tweet)
           )

tweets = (tweet for tweet in map(json.loads, sys.stdin) if 'id' in tweet)
csvwriter = csv.writer(sys.stdout)
csvwriter.writerows(map(json_to_tuple, tweets))


You would then modify your driver script to walk through all of the compressed data files and pass them through the pipeline (refer to Popen in the subprocess module).

You could also process the files directly from the .tar file:

tar xvfO archiveteam-twitter-stream-2014-02.tar --wildcards '*.bz2' | bzcat | python3 transform.py | psql -1f copy.sql


Encoding problems

It's not clear which version of Python you are using but I suspect that calling .split("\n") is the source of the encoding errors. In Python 3.4.0, that raises a "TypeError: Type str doesn't support the buffer API" but it's possible in other versions you could end up with an object with the wrong encoding. You should not be encountering any errors with the source files.

Your load_bz2_json function could be vastly simplified to:

def load_bz2_json(filename):
    with bz2.BZ2File(filename,'r') as f:
        for line in f:
            tweet = json.loads(line)
            yield tweet


If the above code still produces encoding errors, comment below.

Code Snippets

import csv

def json_to_tuple(tweet):
    return (tweet['id'],
            tweet['text'],
            tweet.get('lang','\\N'), # \N is NULL placeholder for PSQL COPY
            tweet['created_at'],
            datetime.datetime.now(), # FIXME: Format for PSQL COPY
            json.dumps(tweet)
           )

tweets = (tweet for tweet in map(json.loads, sys.stdin) if 'id' in tweet)
csvwriter = csv.writer(sys.stdout)
csvwriter.writerows(map(json_to_tuple, tweets))
tar xvfO archiveteam-twitter-stream-2014-02.tar --wildcards '*.bz2' | bzcat | python3 transform.py | psql -1f copy.sql
def load_bz2_json(filename):
    with bz2.BZ2File(filename,'r') as f:
        for line in f:
            tweet = json.loads(line)
            yield tweet

Context

StackExchange Code Review Q#77022, answer score: 3

Revisions (0)

No revisions yet.