patternpythonMinor
Clustering 16 million records in parallel
Viewed 0 times
clusteringmillionrecordsparallel
Problem
I have a dataset with 16 million rows and may increase upwards of 30 million. I am using the
Desktop Specs : Corei5 -QuadCore , 4GB RAM
FG DataSet (16 million rows)
AB (1.3 million rows)
Locations (6600 rows)
Code
```
num_cores = detectCores() -1
cl = makeCluster(num_cores)
clusterExport(cl,varlist = c("FG","AB","sites","distancematrix")
,envir=environment())
results = parLapply(cl,1:nrow(AB),function(i){
row = AB[i,2]
filtered = subset(FG,FG$R == AB[i,2])
sites = merge(filtered , locations , by.x = "T" , by.y = "T" , all.x = FALSE)
resultdf =unique(data.frame(sites$NAME,sites$NEWLong,sites$NEWLat))
if ((nrow(resultdf))==0)
{
VAL = data.frame("AN" = AB[i,2] ,"SCORE" = 0 ,"SITES" = 0,"DISTANCE" = 0)
}
else if ((nrow(resultdf) > 0) & (nrow(resultdf) = 4) & (nrow(resultdf) <= 10 ))
{
alldistance = round(distanceMatrix(resultdf))
if (sum(alldistance) == 0)
{
VAL = data.frame("AN" = AB[i,2] ,"SCORE"= 1 ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))
}
else
{
value = nrow(resultdf)-1
require(fpc)
clustervaluePAMK = pamk(alldistance,krange = 1:value, criterion = "asw" ,critout = TRUE , usepam=FALSE, ns = 2)
clustervaluePAMK = clustervaluePAMK$nc
VAL2 = data.frame("AN" = AB[i,2] ,"SCORE"= clustervaluePAMK ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))
}
}
else
{
alldistance = round(distanceMatrix(resultdf))
if (sum(alldistanc
parLapply to run across three cores in R. But it's taking two days to run to completion. When I try smaller datasets of about 60,000 it takes less than 5 minutes to run, what may be cause of this disparity.Desktop Specs : Corei5 -QuadCore , 4GB RAM
FG DataSet (16 million rows)
"Id","R","T"
"1","12","43963"
"2","12","50273"
"3","12","40805"
"4","13","50273"
"5","13","40805"
"6","14","40805"AB (1.3 million rows)
"Id","R",
"1","12"
"2","13"
"3","14"
"4","15"Locations (6600 rows)
T,NEWLong,NEWLat,SITENAME,
43963,-77.108995,17.942062,HARBOUR TOWN
50273,-77.108995,17.942062,NEW MEADOWS
40805,-77.108995,17.942062,ISLE AVENUECode
```
num_cores = detectCores() -1
cl = makeCluster(num_cores)
clusterExport(cl,varlist = c("FG","AB","sites","distancematrix")
,envir=environment())
results = parLapply(cl,1:nrow(AB),function(i){
row = AB[i,2]
filtered = subset(FG,FG$R == AB[i,2])
sites = merge(filtered , locations , by.x = "T" , by.y = "T" , all.x = FALSE)
resultdf =unique(data.frame(sites$NAME,sites$NEWLong,sites$NEWLat))
if ((nrow(resultdf))==0)
{
VAL = data.frame("AN" = AB[i,2] ,"SCORE" = 0 ,"SITES" = 0,"DISTANCE" = 0)
}
else if ((nrow(resultdf) > 0) & (nrow(resultdf) = 4) & (nrow(resultdf) <= 10 ))
{
alldistance = round(distanceMatrix(resultdf))
if (sum(alldistance) == 0)
{
VAL = data.frame("AN" = AB[i,2] ,"SCORE"= 1 ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))
}
else
{
value = nrow(resultdf)-1
require(fpc)
clustervaluePAMK = pamk(alldistance,krange = 1:value, criterion = "asw" ,critout = TRUE , usepam=FALSE, ns = 2)
clustervaluePAMK = clustervaluePAMK$nc
VAL2 = data.frame("AN" = AB[i,2] ,"SCORE"= clustervaluePAMK ,"SITES" = nrow(resultdf),"DISTANCE"=sum(alldistance))
}
}
else
{
alldistance = round(distanceMatrix(resultdf))
if (sum(alldistanc
Solution
Hard to tell without sample data, but lets start with a cleaned up
version as there's soo much duplicated code here and the formatting is
inconsistent.
good.
e.g.
assignments to
All refactored that looks like this now, which almost fits into a single
screen now:
`require(fpc)
cl = makeCluster(detectCores() - 1)
clusterExport(cl, varlist = c("FG", "AB", "sites", "distancematrix"), envir = environment())
results = parLapply(cl, 1:nrow(AB), function(i) {
row = AB[i, 2]
filtered = subset(FG, FG$R == row)
sites = merge(filtered, locations, by.x = "T", by.y = "T", all.x = FALSE)
resultdf = unique(data.frame(sites$NAME, sites$NEWLong, sites$NEWLat))
n = nrow(resultdf)
if (n == 0)
{
data.frame("AN" = row, "SCORE" = 0, "SITES" = 0, "DISTANCE" = 0)
}
else
{
alldistance = round(distanceMatrix(resultdf))
s = sum(alldistance)
if (n
version as there's soo much duplicated code here and the formatting is
inconsistent.
- The
requireshould probably go to the top?
AB[i, 2],nrow(resultdf)are run more than once and that's not
good.
- Some expressions are the same in multiple branches and can be merged,
e.g.
alldistance = ... and sum(alldistance).- AFAIK
parLapplyjust uses the return value of the function, so the
assignments to
VAR and VAR2 are super confusing.All refactored that looks like this now, which almost fits into a single
screen now:
`require(fpc)
cl = makeCluster(detectCores() - 1)
clusterExport(cl, varlist = c("FG", "AB", "sites", "distancematrix"), envir = environment())
results = parLapply(cl, 1:nrow(AB), function(i) {
row = AB[i, 2]
filtered = subset(FG, FG$R == row)
sites = merge(filtered, locations, by.x = "T", by.y = "T", all.x = FALSE)
resultdf = unique(data.frame(sites$NAME, sites$NEWLong, sites$NEWLat))
n = nrow(resultdf)
if (n == 0)
{
data.frame("AN" = row, "SCORE" = 0, "SITES" = 0, "DISTANCE" = 0)
}
else
{
alldistance = round(distanceMatrix(resultdf))
s = sum(alldistance)
if (n
Context
StackExchange Code Review Q#136658, answer score: 4
Revisions (0)
No revisions yet.