patternpythonMinor
Better multithreading with network IO and database queries
Viewed 0 times
querieswithbetterdatabaseandmultithreadingnetwork
Problem
I'm writing a script that:
I have a Xeon quad-core with hyper-threading, so a total of 8 thread available and I'm under Linux (64 bit).
I'm using
I tried to simplify the code below (removing all the try/except, parsing operation, ...).
```
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editable
class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """
while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()
""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()
"""do some parse with BeautifulSoup using the StringIO object"""
if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release
- fetch a list of urls from a db (about 10000 urls)
- download all the pages and insert them into the db
- parse the code
- if(some condition) do other inserts into the db
I have a Xeon quad-core with hyper-threading, so a total of 8 thread available and I'm under Linux (64 bit).
I'm using
cStringIO as buffer, pycurl to fetch the pages, BeautifulSoup to parse them and MySQLdb to interact with the database.I tried to simplify the code below (removing all the try/except, parsing operation, ...).
```
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editable
class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """
while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()
""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()
"""do some parse with BeautifulSoup using the StringIO object"""
if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release
Solution
Take a look at the eventlet library. It'll let you write code that fetches all the web pages in parallel without ever explicitly implementing threads or locking.
The purist that I am, I wouldn't make this locks globals.
It would make more sense to do this sort of thing after you've define your classes. At least that would be python's convention.
that's pretty much the most terrible description of this function I could have come up with. (You seem to be thinking of that as a comment, but by convention this should be a docstring, and describe the function)
It'd be a lot simpler to use a queue. It'd basically do all of that part for you.
It'd be better to put this data in another queue and have a database thread take care of it. This works, but I think the multi-queue approach would be cleaner.
Same here. Note that there is no point in python of trying to split up processing using threads. The GIL means you'll get no advatange.
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()The purist that I am, I wouldn't make this locks globals.
db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editableIt would make more sense to do this sort of thing after you've define your classes. At least that would be python's convention.
class MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """that's pretty much the most terrible description of this function I could have come up with. (You seem to be thinking of that as a comment, but by convention this should be a docstring, and describe the function)
while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()It'd be a lot simpler to use a queue. It'd basically do all of that part for you.
""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()It'd be better to put this data in another queue and have a database thread take care of it. This works, but I think the multi-queue approach would be cleaner.
"""do some parse with BeautifulSoup using the StringIO object"""
if something is not None:
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the result of parsing into the database
db.commit()
lock_query.release()Same here. Note that there is no point in python of trying to split up processing using threads. The GIL means you'll get no advatange.
# create and start all the threads
threads = []
for i in range(NUM_THREADS):
t = MyThread()
t.start()
threads.append(t)
# wait for threads to finish
for t in threads:
t.join()Code Snippets
import cStringIO, threading, MySQLdb.cursors, pycurl
NUM_THREADS = 100
lock_list = threading.Lock()
lock_query = threading.Lock()db = MySQLdb.connect(host = "...", user = "...", passwd = "...", db = "...", cursorclass=MySQLdb.cursors.DictCursor)
cur = db.cursor()
cur.execute("SELECT...")
rows = cur.fetchall()
rows = [x for x in rows] # convert to a list so it's editableclass MyThread(threading.Thread):
def run(self):
""" initialize a StringIO object and a pycurl object """while True:
lock_list.acquire() # acquire the lock to extract a url
if not rows: # list is empty, no more url to process
lock_list.release()
break
row = rows.pop()
lock_list.release()""" download the page with pycurl and do some check """
""" WARNING: possible bottleneck if all the pycurl
connections are waiting for the timeout """
lock_query.acquire()
cur.execute("INSERT INTO ...") # insert the full page into the database
db.commit()
lock_query.release()Context
StackExchange Code Review Q#18618, answer score: 2
Revisions (0)
No revisions yet.