我正试图从串行方法转向并行方法来完成一些大型的多变量时间序列分析任务data.table
.该表包含许多不同组的数据,我正在尝试使用该包从for
循环移动到循环以利用安装的多核处理器.foreach
doParallel
我遇到的问题与内存以及新R进程似乎消耗大量内存有关.我认为正在发生的事情是data.table
包含所有数据的大型数据被复制到每个新进程中,因此我用完RAM并且Windows开始交换到磁盘.
我创建了一个简化的可重现的示例,它复制了我的问题,但循环内部的数据更少,分析更少.如果存在的解决方案只能按需将数据分配给工作进程,或者共享核心之间已经使用的内存,那将是理想的选择.或者,可能已存在某种解决方案将大数据拆分为4个块并将其传递给核心,以便它们具有可用的子集.
之前已经在Stackoverflow上发布了类似的问题,但我无法使用提供的bigmemory
解决方案,因为我的数据包含字符字段.我将进一步研究这个iterators
方案,但是我很欣赏那些在实践中遇到这个问题经验的成员的建议.
rm(list=ls()) library(data.table) num.series = 40 # can customise the size of the problem (x10 eats my RAM) num.periods = 200 # can customise the size of the problem (x10 eats my RAM) dt.all = data.table( grp = rep(1:num.series,each=num.periods), pd = rep(1:num.periods, num.series), y = rnorm(num.series * num.periods), x1 = rnorm(num.series * num.periods), x2 = rnorm(num.series * num.periods) ) dt.all[,y_lag := c(NA, head(y, -1)), by = c("grp")] f_lm = function(dt.sub, grp) { my.model = lm("y ~ y_lag + x1 + x2 ", data = dt.sub) coef = summary(my.model)$coefficients data.table(grp, variable = rownames(coef), coef) } library(doParallel) registerDoParallel(4) foreach(grp=unique(dt.all$grp), .packages="data.table", .combine="rbind") %dopar% { dt.sub = dt.all[grp == grp] f_lm(dt.sub, grp) } detach(package:doParallel)
Steve Weston.. 5
迭代器可以帮助减少需要传递给并行程序的worker的内存量.由于您正在使用data.table包,因此最好使用迭代器并组合针对data.table对象优化的函数.例如,这是一个类似于isplit
data.table对象的函数:
isplitDT <- function(x, colname, vals) { colname <- as.name(colname) ival <- iter(vals) nextEl <- function() { val <- nextElem(ival) list(value=eval(bquote(x[.(colname) == .(val)])), key=val) } obj <- list(nextElem=nextEl) class(obj) <- c('abstractiter', 'iter') obj }
请注意,它与它不完全兼容isplit
,因为参数和返回值略有不同.可能还有一种更好的方法来对data.table进行子集化,但我认为这比使用更有效isplit
.
以下是您使用的示例isplitDT
和使用rbindlist
组合data.tables 的组合函数rbind
:
dtcomb <- function(...) { rbindlist(list(...)) } results <- foreach(dt.sub=isplitDT(dt.all, 'grp', unique(dt.all$grp)), .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
更新
我编写了一个新的迭代器函数isplitDT2
,它的执行效果要好得多,isplitDT
但要求data.table有一个键:
isplitDT2 <- function(x, vals) { ival <- iter(vals) nextEl <- function() { val <- nextElem(ival) list(value=x[val], key=val) } obj <- list(nextElem=nextEl) class(obj) <- c('abstractiter', 'iter') obj }
这被称为:
setkey(dt.all, grp) results <- foreach(dt.sub=isplitDT2(dt.all, levels(dt.all$grp)), .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
这使用二进制搜索来进行子集dt.all
而不是向量扫描,因此更有效.但是,我不知道为什么isplitDT
会使用更多的内存.由于您正在使用doParallel
,它不会在发送任务时动态调用迭代器,您可能希望尝试拆分dt.all
然后删除它以减少内存使用量:
dt.split <- as.list(isplitDT2(dt.all, levels(dt.all$grp))) rm(dt.all) gc() results <- foreach(dt.sub=dt.split, .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
这可能有助于减少主进程在执行foreach循环期间所需的内存量,同时仍然只将所需数据发送给工作者.如果你仍然有内存问题,你也可以尝试使用doMPI或doRedis,它们都可以根据需要获得迭代器值,而不是一次性获得迭代器值,从而提高内存效率.
迭代器可以帮助减少需要传递给并行程序的worker的内存量.由于您正在使用data.table包,因此最好使用迭代器并组合针对data.table对象优化的函数.例如,这是一个类似于isplit
data.table对象的函数:
isplitDT <- function(x, colname, vals) { colname <- as.name(colname) ival <- iter(vals) nextEl <- function() { val <- nextElem(ival) list(value=eval(bquote(x[.(colname) == .(val)])), key=val) } obj <- list(nextElem=nextEl) class(obj) <- c('abstractiter', 'iter') obj }
请注意,它与它不完全兼容isplit
,因为参数和返回值略有不同.可能还有一种更好的方法来对data.table进行子集化,但我认为这比使用更有效isplit
.
以下是您使用的示例isplitDT
和使用rbindlist
组合data.tables 的组合函数rbind
:
dtcomb <- function(...) { rbindlist(list(...)) } results <- foreach(dt.sub=isplitDT(dt.all, 'grp', unique(dt.all$grp)), .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
更新
我编写了一个新的迭代器函数isplitDT2
,它的执行效果要好得多,isplitDT
但要求data.table有一个键:
isplitDT2 <- function(x, vals) { ival <- iter(vals) nextEl <- function() { val <- nextElem(ival) list(value=x[val], key=val) } obj <- list(nextElem=nextEl) class(obj) <- c('abstractiter', 'iter') obj }
这被称为:
setkey(dt.all, grp) results <- foreach(dt.sub=isplitDT2(dt.all, levels(dt.all$grp)), .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
这使用二进制搜索来进行子集dt.all
而不是向量扫描,因此更有效.但是,我不知道为什么isplitDT
会使用更多的内存.由于您正在使用doParallel
,它不会在发送任务时动态调用迭代器,您可能希望尝试拆分dt.all
然后删除它以减少内存使用量:
dt.split <- as.list(isplitDT2(dt.all, levels(dt.all$grp))) rm(dt.all) gc() results <- foreach(dt.sub=dt.split, .combine='dtcomb', .multicombine=TRUE, .packages='data.table') %dopar% { f_lm(dt.sub$value, dt.sub$key) }
这可能有助于减少主进程在执行foreach循环期间所需的内存量,同时仍然只将所需数据发送给工作者.如果你仍然有内存问题,你也可以尝试使用doMPI或doRedis,它们都可以根据需要获得迭代器值,而不是一次性获得迭代器值,从而提高内存效率.