HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Optimizing for data import in Neo4j using py2neo

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
py2neoneo4joptimizingforusingdataimport

Problem

Here is my code for importing from a .csv to a neo4j graph using py2neo and cypher statements. I've noticed that it slows down significantly the bigger the graph gets. It takes several seconds just to parse ~10,000 lines. I'm curious to see if there are any glaring mistakes or improvements to be made.

Before doing any of the import, I created indexes for every node and property, based off of this blog.

The machine I'm working on is windows 7 with 65gb RAM, so it's certainly fast enough to handle a large graph. The upload of a 20 million line csv took weeks, so there has to be something that can be improved on.

I am currently stuck with using windows, I can't switch over to Linux. I also can't use Jexp's batch importer unfortunately.

```
import csv
import sys
import os
from py2neo import neo4j,node, rel, cypher
import time

def main():

f = "C:file_path/file.csv"

graph_db = neo4j.GraphDatabaseService("http://localhost:7474/db/data/")

with open(f, 'r+') as in_file:
reader = csv.reader(in_file, delimiter = ',')
next(reader, None) # skip headers
batch = neo4j.WriteBatch(graph_db)

try:
i = 0
j = 0
for row in reader:
if row:
if (i == 10000):
print j, "processed"
i = 0
i += 1
j += 1
character = strip(row[0])
first_name = strip(row[1])
last_name = strip(row[2])
actor = strip(row[3])
character_birth = strip(row[4])
character_death = strip(row[5])
allegiance = strip(row[6])
house = strip(row[7])
territory = strip(row[8])
region = strip(row[9])

query = neo4j.CypherQuery(graph_db,

Solution

Here is the updated solution with batching. I found that the request times out of the batch size is much larger than 1000, unfortunately and I'm not sure what causes that:

import csv
from py2neo import Graph
import time

def main():

    f = "C:file_path/file.csv" 
    graph = Graph("http://localhost:7474/db/data/")                            
    with open(f, 'r+') as in_file:

        reader = csv.reader(in_file, delimiter=',')
        next(reader, None)        
        batch = graph.cypher.begin()                           

        try:
            i = 0;
            j = 0;
            for row in reader:    
                if row:
                    character = strip(row[0])
                    first_name = strip(row[1])
                    last_name = strip(row[2])
                    actor = strip(row[3])
                    character_birth = strip(row[4])
                    character_death = strip(row[5])
                    allegiance = strip(row[6])
                    house = strip(row[7])
                    territory = strip(row[8])
                    region = strip(row[9])
                    query = """
                        merge (character:Character {Character: {a}, First_Name:{b}, Last_Name:{c}, Actor:{d}, Birth:{e}, Death:{f}})
                        merge (house:House{House:{g}, Allegiance:{h}})
                        merge (territory:Territory {Territory: {i}})
                        merge (region:Region {Region: {j}})
                        merge (character)-[:Of_House{House:{k}}]-(house)-[:Is_From]->(territory)-[:Is_In]->(region)
                    """
                    batch.append(query, {"a":character, "b": first_name, "c": last_name, "d":actor, "e":character_birth, "f":character_death, "g":house, "h": allegiance, "i":territory, "j":region, "k":house})
                    i += 1
                    j += 1
                batch.process()

                if (i == 1000): #submits a batch every 1000 lines read
                    batch.commit()
                    print j, "lines processed"
                    i = 0                
                    batch = graph.cypher.begin()
            else: batch.commit() #submits remainder of lines read                       
            print j, "lines processed"     

        except Exception as e:
            print e, row, reader.line_num

def strip(string): return''.join([c if 0 < ord(c) < 128 else ' ' for c in string]) #removes non utf-8 chars from string within cell

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time() - start
    print "Time to complete:", end

Code Snippets

import csv
from py2neo import Graph
import time

def main():

    f = "C:file_path/file.csv" 
    graph = Graph("http://localhost:7474/db/data/")                            
    with open(f, 'r+') as in_file:

        reader = csv.reader(in_file, delimiter=',')
        next(reader, None)        
        batch = graph.cypher.begin()                           

        try:
            i = 0;
            j = 0;
            for row in reader:    
                if row:
                    character = strip(row[0])
                    first_name = strip(row[1])
                    last_name = strip(row[2])
                    actor = strip(row[3])
                    character_birth = strip(row[4])
                    character_death = strip(row[5])
                    allegiance = strip(row[6])
                    house = strip(row[7])
                    territory = strip(row[8])
                    region = strip(row[9])
                    query = """
                        merge (character:Character {Character: {a}, First_Name:{b}, Last_Name:{c}, Actor:{d}, Birth:{e}, Death:{f}})
                        merge (house:House{House:{g}, Allegiance:{h}})
                        merge (territory:Territory {Territory: {i}})
                        merge (region:Region {Region: {j}})
                        merge (character)-[:Of_House{House:{k}}]-(house)-[:Is_From]->(territory)-[:Is_In]->(region)
                    """
                    batch.append(query, {"a":character, "b": first_name, "c": last_name, "d":actor, "e":character_birth, "f":character_death, "g":house, "h": allegiance, "i":territory, "j":region, "k":house})
                    i += 1
                    j += 1
                batch.process()

                if (i == 1000): #submits a batch every 1000 lines read
                    batch.commit()
                    print j, "lines processed"
                    i = 0                
                    batch = graph.cypher.begin()
            else: batch.commit() #submits remainder of lines read                       
            print j, "lines processed"     

        except Exception as e:
            print e, row, reader.line_num

def strip(string): return''.join([c if 0 < ord(c) < 128 else ' ' for c in string]) #removes non utf-8 chars from string within cell

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time() - start
    print "Time to complete:", end

Context

StackExchange Code Review Q#75842, answer score: 6

Revisions (0)

No revisions yet.