HiveBrain v1.2.0
Get Started
← Back to all entries
patternpythonMinor

Clustering 16 million records in parallel

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
clusteringmillionrecordsparallel

Problem

I have a dataset with 16 million rows and may increase upwards of 30 million. I am using the parLapply to run across three cores in R. But it's taking two days to run to completion. When I try smaller datasets of about 60,000 it takes less than 5 minutes to run, what may be cause of this disparity.

Desktop Specs : Corei5 -QuadCore , 4GB RAM

FG DataSet (16 million rows)

"Id","R","T"
"1","12","43963"
"2","12","50273"
"3","12","40805" 
"4","13","50273"
"5","13","40805"
"6","14","40805"


AB (1.3 million rows)

"Id","R",
"1","12"   
"2","13"
"3","14"   
"4","15"


Locations (6600 rows)

T,NEWLong,NEWLat,SITENAME,
43963,-77.108995,17.942062,HARBOUR TOWN
50273,-77.108995,17.942062,NEW MEADOWS
40805,-77.108995,17.942062,ISLE AVENUE


Code

```
num_cores = detectCores() -1
cl = makeCluster(num_cores)
clusterExport(cl,varlist = c("FG","AB","sites","distancematrix")
,envir=environment())

results = parLapply(cl,1:nrow(AB),function(i){
row = AB[i,2]
filtered = subset(FG,FG$R == AB[i,2])
sites = merge(filtered , locations , by.x = "T" , by.y = "T" , all.x = FALSE)
resultdf =unique(data.frame(sites$NAME,sites$NEWLong,sites$NEWLat))

if ((nrow(resultdf))==0)
{

VAL = data.frame("AN" = AB[i,2] ,"SCORE" = 0 ,"SITES" = 0,"DISTANCE" = 0)
}
else if ((nrow(resultdf) > 0) & (nrow(resultdf) = 4) & (nrow(resultdf) <= 10 ))
{

alldistance = round(distanceMatrix(resultdf))
if (sum(alldistance) == 0)
{
VAL = data.frame("AN" = AB[i,2] ,"SCORE"= 1 ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))
}
else
{
value = nrow(resultdf)-1
require(fpc)
clustervaluePAMK = pamk(alldistance,krange = 1:value, criterion = "asw" ,critout = TRUE , usepam=FALSE, ns = 2)
clustervaluePAMK = clustervaluePAMK$nc
VAL2 = data.frame("AN" = AB[i,2] ,"SCORE"= clustervaluePAMK ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))

}

}
else
{
alldistance = round(distanceMatrix(resultdf))
if (sum(alldistanc

Solution

Hard to tell without sample data, but lets start with a cleaned up
version as there's soo much duplicated code here and the formatting is
inconsistent.

  • The require should probably go to the top?



  • AB[i, 2], nrow(resultdf) are run more than once and that's not


good.

  • Some expressions are the same in multiple branches and can be merged,


e.g. alldistance = ... and sum(alldistance).

  • AFAIK parLapply just uses the return value of the function, so the


assignments to VAR and VAR2 are super confusing.

All refactored that looks like this now, which almost fits into a single
screen now:

`require(fpc)

cl = makeCluster(detectCores() - 1)
clusterExport(cl, varlist = c("FG", "AB", "sites", "distancematrix"), envir = environment())

results = parLapply(cl, 1:nrow(AB), function(i) {
row = AB[i, 2]
filtered = subset(FG, FG$R == row)
sites = merge(filtered, locations, by.x = "T", by.y = "T", all.x = FALSE)
resultdf = unique(data.frame(sites$NAME, sites$NEWLong, sites$NEWLat))

n = nrow(resultdf)

if (n == 0)
{
data.frame("AN" = row, "SCORE" = 0, "SITES" = 0, "DISTANCE" = 0)
}
else
{
alldistance = round(distanceMatrix(resultdf))
s = sum(alldistance)

if (n

Context

StackExchange Code Review Q#136658, answer score: 4

Revisions (0)

No revisions yet.