HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Importing database of 4 million rows into Pandas DataFrame

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
rowsmillionpandasintodatabasedataframeimporting

Problem

I am using the following code to import database table into a DataFrame:

def import_db_table(chunk_size, offset):
    dfs_ct = []
    j = 0
    start = dt.datetime.now()
    df = pd.DataFrame()

    while True:
        sql_ct = "SELECT * FROM my_table limit %d offset %d" % (chunk_size, offset)
        dfs_ct.append(psql.read_sql_query(sql_ct, connection))
        offset += chunk_size

        if len(dfs_ct[-1]) < chunk_size:
            break

        df = pd.concat(dfs_ct) 

        # Convert columns to datetime
        columns = ['col1', 'col2', 'col3','col4', 'col5', 'col6',
                   'col7', 'col8', 'col9', 'col10', 'col11', 'col12',
                   'col13', 'col14', 'col15']

        for column in columns:
            df[column] = pd.to_datetime(df[column], errors='coerce')

        # Remove the uninteresting columns
        columns_remove = ['col42', 'col43', 'col67','col52', 'col39', 'col48','col49', 'col50', 'col60', 'col61', 'col62', 'col63', 'col64','col75', 'col80']

        for c in df.columns:
            if c not in columns_remove:
                df = df.drop(c, axis=1) 

        j+=1
        print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunk_size))

    return df


I am calling it with:

df = import_db_table(100000, 0)


This seems to be very slow - it starts with importing 100000 rows in 7 seconds but later after 1 million rows the number of seconds needed grows to 40-50 and more. Could this be improved somehow? I am using PostgreSQL, Python 3.5.

`7 seconds: completed 100000 rows
17 seconds: completed 200000 rows
30 seconds: completed 300000 rows
47 seconds: completed 400000 rows
69 seconds: completed 500000 rows
92 seconds: completed 600000 rows
121 seconds: completed 700000 rows
153 seconds: completed 800000 rows
188 seconds: completed 900000 rows
228 seconds: completed 1000000 rows
271 seconds: completed 1100000 rows
318 seconds: completed 1200000 rows
368 seconds: completed 13000

Solution

Your code

def import_db_table(chunk_size, offset):


It doesn't look like you need to pass offset to this function. All it does is give you the functionality to read from a given row to the bottom. I would omit it, or at least give it a default value of 0. It also looks like you need connection as one of the variables.

dfs_ct = []
    j = 0
    start = dt.datetime.now()
    df = pd.DataFrame()

    while True:
        sql_ct = "SELECT * FROM my_table limit %d offset %d" % (chunk_size, offset)
        dfs_ct.append(psql.read_sql_query(sql_ct, connection))

        offset += chunk_size

        if len(dfs_ct[-1]) < chunk_size:
            break


As written, the while loop should stop here. You can also get better performance by making a generator instead of a list out of the query results. For example:

Code suggestions

def generate_df_pieces(connection, chunk_size, offset = 0):
        while True:
            sql_ct = "SELECT * FROM my_table limit %d offset %d" % (chunk_size, offset)
            df_piece = psql.read_sql_query(sql_ct, connection)

            # don't yield an empty data frame
            if not df_piece.shape[0]:
                break
            yield df_piece

            # don't make an unnecessary database query
            if df_piece.shape[0] < chunk_size:
                break

            offset += chunk_size


Then you can call:

df = pd.concat(generate_df_pieces(connection, chunk_size, offset=offset))


The function pd.concat can take a sequence. Making the sequence be a generator like this is more efficient than growing a list, as
you don't need to keep more than one df_piece in memory until you actually make them into the final, larger one.

Back to your code

df = pd.concat(dfs_ct)


You're resetting the entire dataframe each time and rebuilding it anew from the whole list! If this were outside of the loop it would make sense.

# Convert columns to datetime
        columns = ['col1', 'col2', 'col3','col4', 'col5', 'col6',
                   'col7', 'col8', 'col9', 'col10', 'col11', 'col12',
                   'col13', 'col14', 'col15']

        for column in columns:
            df[column] = pd.to_datetime(df[column], errors='coerce')

        # Remove the uninteresting columns
        columns_remove = ['col42', 'col43', 'col67','col52', 'col39', 'col48','col49', 'col50', 'col60', 'col61', 'col62', 'col63', 'col64','col75', 'col80']

        for c in df.columns:
            if c not in columns_remove:
                df = df.drop(c, axis=1)


This part could be done in the loop / generator function or outside. Dropping columns is a good thing to place inside as then the big dataframe you build won't ever need to be larger than you want. If you're able to put only the columns you want in the SQL query, that would be even better, as it would be less to send over the connection.

Another point to make about df.drop is that by default it makes a new dataframe. So use inplace = True so you don't copy your huge dataframe. And it also accepts a list of columns to be dropped:

Code suggestions

df.drop(columns_remove, inplace = True, axis = 1)


gives the same result without looping and copying df over and over. You can also use:

columns_remove_numbers = [ ... ] # list the column numbers
        columns_remove = df.columns[columns_remove_numbers]


So you don't have to type all those strings.

Back to your code

j+=1
        print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunk_size))


If you use the generator function version of this, you could put this inside that function to keep track of the performance.

Code Snippets

def import_db_table(chunk_size, offset):
dfs_ct = []
    j = 0
    start = dt.datetime.now()
    df = pd.DataFrame()

    while True:
        sql_ct = "SELECT * FROM my_table limit %d offset %d" % (chunk_size, offset)
        dfs_ct.append(psql.read_sql_query(sql_ct, connection))

        offset += chunk_size

        if len(dfs_ct[-1]) < chunk_size:
            break
def generate_df_pieces(connection, chunk_size, offset = 0):
        while True:
            sql_ct = "SELECT * FROM my_table limit %d offset %d" % (chunk_size, offset)
            df_piece = psql.read_sql_query(sql_ct, connection)

            # don't yield an empty data frame
            if not df_piece.shape[0]:
                break
            yield df_piece

            # don't make an unnecessary database query
            if df_piece.shape[0] < chunk_size:
                break

            offset += chunk_size
df = pd.concat(generate_df_pieces(connection, chunk_size, offset=offset))
df = pd.concat(dfs_ct)

Context

StackExchange Code Review Q#162402, answer score: 3

Revisions (0)

No revisions yet.