Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
603 views
in Technique[技术] by (71.8m points)

postgresql - foreach %dopar% + RPostgreSQL

I am using RPostgreSQL to connect to a local database. The setup works just fine on my Linux machine. R 2.11.1, Postgres 8.4.

I was playing with the 'foreach' with the multicore (doMC) parallel backend to wrap some repetitive queries (numbering a few thousand) and appending the results into a data structure. Curiously enough, it works if I use %do% but fails when I switch to %dopar%, with the exception when there is only one iteration (as shown below)

I wondered whether it had something to do with a single connection object, so I created 10 connection objects and depending on what 'i' was, a certain con object was given for that query, depending on i modulo 10. (indicated below by just 2 connection objects). The expression which is evaluated eval(expr.01), contains/is the query which depends on what 'i' is.

I can't make sense of these particular error messages. I am wondering whether there is any way to make this work.

Thanks.
Vishal Belsare

R snippet follows:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
> 

EDIT: I changed a few things, (still unsuccessful), but a few things come to light. Connection objects made in the loop and not 'disconnected' via dbDisconnect, lead to hanging connections as evident by the /var/log for Postgres. A few new error messages show up when I do this:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It's more efficient to create the database connection once per worker, rather than once per task. Unfortunately, mclapply doesn't provide a mechanism for initializing the workers before executing tasks, so it's not easy to do this using the doMC backend, but if you use the doParallel backend, you can initialize the workers using clusterEvalQ. Here's an example of how to restructure the code:

library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(DBI)
  library(RPostgreSQL)
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, dbname="nsdq")
  NULL
})

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                          .noexport="con",
                          .packages=c("DBI", "RPostgreSQL")) %dopar% {
  lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
  qry <- dbSendQuery(con, lst)
  tmp <- fetch(qry, n=-1)
  dt <- dates.qed2[i]
  list(date=dt, idreuters=tmp$idreuters)
}

clusterEvalQ(cl, {
  dbDisconnect(con)
})

Since doParallel and clusterEvalQ are using the same cluster object cl, the foreach loop will have access to the database connection object con when executing the tasks.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...