HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Better multithreading with network IO and database queries

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
querieswithbetterdatabaseandmultithreadingnetwork

Problem

I'm writing a script that:

  • fetch a list of urls from a db (about 10000 urls)



  • download all the pages and insert them into the db



  • parse the code



  • if(some condition) do other inserts into the db



I have a Xeon quad-core with hyper-threading, so a total of 8 thread available and I'm under Linux (64 bit).

I'm using cStringIO as buffer, pycurl to fetch the pages, BeautifulSoup to parse them and MySQLdb to interact with the database.

I tried to simplify the code below (removing all the try/except, parsing operation, ...).

```
import cStringIO, threading, MySQLdb.cursors, pycurl

NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()

db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editable

class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """

while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()

""" download the page with pycurl and do some check """

""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """

lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()

"""do some parse with BeautifulSoup using the StringIO object"""

if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release

Solution

Take a look at the eventlet library. It'll let you write code that fetches all the web pages in parallel without ever explicitly implementing threads or locking.

import cStringIO, threading, MySQLdb.cursors, pycurl

NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()


The purist that I am, I wouldn't make this locks globals.

db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows]  # convert to a list so it's editable


It would make more sense to do this sort of thing after you've define your classes. At least that would be python's convention.

class MyThread(threading.Thread):
    def run(self):
        """ initialize a StringIO object and a pycurl object """


that's pretty much the most terrible description of this function I could have come up with. (You seem to be thinking of that as a comment, but by convention this should be a docstring, and describe the function)

while True:
            lock_list.acquire()  # acquire the lock to extract a url
            if not rows:  # list is empty, no more url to process
                lock_list.release()
                break
            row = rows.pop()
            lock_list.release()


It'd be a lot simpler to use a queue. It'd basically do all of that part for you.

""" download the page with pycurl and do some check """

            """ WARNING: possible bottleneck if all the pycurl
                connections are waiting for the timeout """

            lock_query.acquire()
            cur.execute("INSERT INTO ...")  # insert the full page into the database
            db.commit()
            lock_query.release()


It'd be better to put this data in another queue and have a database thread take care of it. This works, but I think the multi-queue approach would be cleaner.

"""do some parse with BeautifulSoup using the StringIO object"""

            if something is not None:
                lock_query.acquire()
                cur.execute("INSERT INTO ...")  # insert the result of parsing into the database
                db.commit()
                lock_query.release()


Same here. Note that there is no point in python of trying to split up processing using threads. The GIL means you'll get no advatange.

# create and start all the threads
threads = []
for i in range(NUM_THREADS):
    t = MyThread()
    t.start()
    threads.append(t)

# wait for threads to finish
for t in threads:
    t.join()

Code Snippets

import cStringIO, threading, MySQLdb.cursors, pycurl

NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows]  # convert to a list so it's editable
class MyThread(threading.Thread):
    def run(self):
        """ initialize a StringIO object and a pycurl object """
while True:
            lock_list.acquire()  # acquire the lock to extract a url
            if not rows:  # list is empty, no more url to process
                lock_list.release()
                break
            row = rows.pop()
            lock_list.release()
""" download the page with pycurl and do some check """

            """ WARNING: possible bottleneck if all the pycurl
                connections are waiting for the timeout """

            lock_query.acquire()
            cur.execute("INSERT INTO ...")  # insert the full page into the database
            db.commit()
            lock_query.release()

Context

StackExchange Code Review Q#18618, answer score: 2

Revisions (0)

No revisions yet.