patternpythonMinor
Run query and insert the result to another table
Viewed 0 times
resulttheinsertqueryanotherandtablerun
Problem
```
import csv
import logging
import itertools
from random import choice
from string import ascii_lowercase
from collections import namedtuple
from tempfile import NamedTemporaryFile
DELIMITER = '\t'
ENCODING = 'utf-8'
def get_random_cursor_name(length=7):
return ''.join(choice(ascii_lowercase) for i in range(length))
def extract_table(conn, query, itersize=2000, use_named_cursor=True):
"""
:type conn: psycopg2.connection
:param conn: connection object returned by psycopg2.connect.
:type query: string
:param query: SQL query string.
:type itersize: int
:param itersize: If itersize == -1, then fetchall, else fetchmany(itersize)
:type use_named_cursor: boolean
:param use_named_cursor: If true, then use server side cursor, else client side cursor.
"""
if use_named_cursor:
cursor_name = get_random_cursor_name()
with conn.cursor(cursor_name) as cursor:
cursor.itersize = 2000 if itersize == -1 else itersize
cursor.execute(query)
row = cursor.fetchone()
header = [desc[0] for desc in cursor.description]
Row = namedtuple('Row', header)
yield Row(*row)
for row in cursor:
yield Row(*row)
else:
with conn.cursor() as cursor:
cursor.execute(query)
header = [desc[0] for desc in cursor.description]
Row = namedtuple('Row', header)
if itersize == -1:
rows = cursor.fetchall()
else:
cursor.arraysize = itersize
rows = itertools.chain.from_iterable(iter(cursor.fetchmany, []))
for row in rows:
yield Row(*row)
def copy_table(src_conn, dst_conn, src_query, dst_table_name, use_named_cursor=False, truncate_table=False):
"""
Run query and import result to another database.
We use null="" in copy_from, because csv library dumps None as "" (empty string).
:type
import csv
import logging
import itertools
from random import choice
from string import ascii_lowercase
from collections import namedtuple
from tempfile import NamedTemporaryFile
DELIMITER = '\t'
ENCODING = 'utf-8'
def get_random_cursor_name(length=7):
return ''.join(choice(ascii_lowercase) for i in range(length))
def extract_table(conn, query, itersize=2000, use_named_cursor=True):
"""
:type conn: psycopg2.connection
:param conn: connection object returned by psycopg2.connect.
:type query: string
:param query: SQL query string.
:type itersize: int
:param itersize: If itersize == -1, then fetchall, else fetchmany(itersize)
:type use_named_cursor: boolean
:param use_named_cursor: If true, then use server side cursor, else client side cursor.
"""
if use_named_cursor:
cursor_name = get_random_cursor_name()
with conn.cursor(cursor_name) as cursor:
cursor.itersize = 2000 if itersize == -1 else itersize
cursor.execute(query)
row = cursor.fetchone()
header = [desc[0] for desc in cursor.description]
Row = namedtuple('Row', header)
yield Row(*row)
for row in cursor:
yield Row(*row)
else:
with conn.cursor() as cursor:
cursor.execute(query)
header = [desc[0] for desc in cursor.description]
Row = namedtuple('Row', header)
if itersize == -1:
rows = cursor.fetchall()
else:
cursor.arraysize = itersize
rows = itertools.chain.from_iterable(iter(cursor.fetchmany, []))
for row in rows:
yield Row(*row)
def copy_table(src_conn, dst_conn, src_query, dst_table_name, use_named_cursor=False, truncate_table=False):
"""
Run query and import result to another database.
We use null="" in copy_from, because csv library dumps None as "" (empty string).
:type
Solution
psycopg2 already implements your
Since you're using
However,
The
with or without the
You can thus rewrite
Handling database state
The
Using the connection's context manager is to handle commits and rollbacks only and does not close the connection when leaving the context block. So you can write, say
You're also handling the creation of named cursors using lot of duplicated code. I understand that their use differ a bit from client cursors, but the creation can be simplified to:
since the
You can even go further by letting the user provide its own name using constructs such as:
This way, you can have calls such as:
Protect from user input
As usual in SQL, you should not trust user input. As such, you might want to not directly use the provided query string. However you do want to build the copy query yourself. For these usages, psycopg2 provides the
The copy query can thus be built using:
and then passed to
You may also be interested in providing a way to build the source query by binding parameters. In such cases, variable length arguments can be used to simplify handling of bound arguments:
You
extract_tableSince you're using
cursor.copy_from to load data into the new table, why not use something along the lines of cursor.copy_to to write them to a file?However,
cursor.copy_to will translate to COPY table_name TO 'filename' [ [ WITH ] ( option [, ...] ) ] in its current implementation. You thus need to use cursor.copy_expert and provide the whole SQL query to be executed.The
copy_expert call will write to STDOUT or read from STDIN and redirect to/from the file-like object passed as the second argument (file). I believe the query to execute should look likecopy_query = "COPY (" + query + ")"\
"TO STDOUT"\
"WITH CSV HEADER DELIMITER '\t' NULL '' ENCODING 'utf-8'"with or without the
HEADER option; not sure how copy_from would handle it.You can thus rewrite
copy_table something along the lines of:# Using format string because we trust the query, are we?
copy_query = "COPY ({}) TO STDOUT WITH CSV HEADER DELIMITER '{}' NULL '' ENCODING '{}'".format(src_query, DELIMITER, ENCODING)
with NamedTemporaryFile('w+t', encoding=ENCODING, newline='') as fp:
logging.info("Writing result to a temporary file.")
with src_conn.cursor() as cursor:
cursor.copy_expert(copy_query, fp)
fp.flush()
fp.seek(0)
logging.info("Copying result to {}".format(dst_table_name))
with dst_conn.cursor() as cursor:
if truncate_table:
logging.info("Truncating {}.".format(dst_table_name))
# It seems like we do trust the new name too
cursor.execute('TRUNCATE {};'.format(dst_table_name))
cursor.copy_from(file=fp, table=dst_table_name, sep=DELIMITER, null="")
dst_conn.commit()Handling database state
The
dst_conn.commit() at the end of copy_table is bad practice. You most likely want to use the context manager available on connection objects to handle that for you, especially if things goes wrong.Using the connection's context manager is to handle commits and rollbacks only and does not close the connection when leaving the context block. So you can write, say
conn = psycopg2.connect(...)
with conn:
# Use cursor(s)
with conn:
# Use (other) cursor(s)
with conn:
# You get the deal
conn.close()You're also handling the creation of named cursors using lot of duplicated code. I understand that their use differ a bit from client cursors, but the creation can be simplified to:
cursor = conn.cursor(name=get_random_cursor_name() if use_named_cursor else None)since the
name argument is already None by default.You can even go further by letting the user provide its own name using constructs such as:
def copy_table(..., cursor_name=None, ...):
...
with src_conn:
with src_conn.cursor(name=cursor_name) as cursor:
...This way, you can have calls such as:
copy_table(src_conn, dst_conn, query, dst_table_name='foobar')
copy_table(src_conn, dst_conn, query, cursor_name='foobar', dst_table_name='foobar')
copy_table(src_conn, dst_conn, query, cursor_name=get_random_cursor_name(17), dst_table_name='foobar')
Protect from user input
As usual in SQL, you should not trust user input. As such, you might want to not directly use the provided query string. However you do want to build the copy query yourself. For these usages, psycopg2 provides the
cursor.mogrify method.The copy query can thus be built using:
copy_query = "COPY (%s) TO STDOUT WITH CSV HEADER DELIMITER '%s' NULL '' ENCODING '%s'"and then passed to
copy_expert with:cursor.copy_expert(cursor.mogrify(copy_query, (src_query, DELIMITER, ENCODING)), fp)You may also be interested in providing a way to build the source query by binding parameters. In such cases, variable length arguments can be used to simplify handling of bound arguments:
def copy_table(query, *args, src_conn, dest_conn, dest_table_name,
cursor_name=None, truncate_table=False):
copy_query = "COPY (%s) TO STDOUT WITH CSV HEADER DELIMITER '%s' NULL '' ENCODING '%s'"
with NamedTemporaryFile('w+t', encoding=ENCODING, newline='') as fp:
logging.info("Writing result to a temporary file.")
with src_conn:
with src_conn.cursor(name=cursor_name) as cursor:
query = cursor.mogrify(query, args)
copy_query = cursor.mogrify(copy_query, (query, DELIMITER, ENCODING))
cursor.copy_expert(copy_query, fp)
fp.flush()
fp.seek(0)
logging.info("Copying result to {}".format(dst_table_name))
with dest_conn:
with dest_conn.cursor() as cursor:
if truncate_table:
logging.info("Truncating {}.".format(dst_table_name))
cursor.execute('TRUNCATE %s;', (dst_table_name,))
cursor.copy_from(file=fp, table=dst_table_name, sep=DELIMITER, null="")You
Code Snippets
copy_query = "COPY (" + query + ")"\
"TO STDOUT"\
"WITH CSV HEADER DELIMITER '\t' NULL '' ENCODING 'utf-8'"# Using format string because we trust the query, are we?
copy_query = "COPY ({}) TO STDOUT WITH CSV HEADER DELIMITER '{}' NULL '' ENCODING '{}'".format(src_query, DELIMITER, ENCODING)
with NamedTemporaryFile('w+t', encoding=ENCODING, newline='') as fp:
logging.info("Writing result to a temporary file.")
with src_conn.cursor() as cursor:
cursor.copy_expert(copy_query, fp)
fp.flush()
fp.seek(0)
logging.info("Copying result to {}".format(dst_table_name))
with dst_conn.cursor() as cursor:
if truncate_table:
logging.info("Truncating {}.".format(dst_table_name))
# It seems like we do trust the new name too
cursor.execute('TRUNCATE {};'.format(dst_table_name))
cursor.copy_from(file=fp, table=dst_table_name, sep=DELIMITER, null="")
dst_conn.commit()conn = psycopg2.connect(...)
with conn:
# Use cursor(s)
with conn:
# Use (other) cursor(s)
with conn:
# You get the deal
conn.close()cursor = conn.cursor(name=get_random_cursor_name() if use_named_cursor else None)def copy_table(..., cursor_name=None, ...):
...
with src_conn:
with src_conn.cursor(name=cursor_name) as cursor:
...Context
StackExchange Code Review Q#115863, answer score: 2
Revisions (0)
No revisions yet.