h******b 发帖数: 6055 | 1 用NodeJS写了一个简单的log file processor。 几百个gzip文档,每个压缩以后都有
半gig左右。
我用了nodejs的csv/zlib插件来stream这些文档,写成新的gzip。速度很慢,结果发现
CPU使用还不到15%,查了一下发现是因为nodejs同时只用一个core, 我公司的i7完全排
不上用场。
有什么简单的多线程处理方法吗? | c*********e 发帖数: 16335 | 2 CPU使用还不到15% ?
【在 h******b 的大作中提到】 : 用NodeJS写了一个简单的log file processor。 几百个gzip文档,每个压缩以后都有 : 半gig左右。 : 我用了nodejs的csv/zlib插件来stream这些文档,写成新的gzip。速度很慢,结果发现 : CPU使用还不到15%,查了一下发现是因为nodejs同时只用一个core, 我公司的i7完全排 : 不上用场。 : 有什么简单的多线程处理方法吗?
| h******b 发帖数: 6055 | 3 var directory = require('fs');
function processFile(filename) {
// dependencies
var fs = require('fs');
var zlib = require('zlib');
var csv = require('csv');
// filenames
var sourceFileName = filename;
mainFileName = 'main_' + filename;
// streams
var reader = fs.createReadStream(__dirname + '/original/' + sourceFileName),
writer = fs.createWriteStream(__dirname + '/transformed/' + mainFileName),
gunzip = zlib.createGunzip(),
gzip = zlib.createGzip();
//Main File
csv()
.from.stream(reader.pipe(gunzip))
.to.stream(gzip) // Use GZIP output
.transform(function (row) {
//*** BUSINESS LOGIC OMITTED ***
return row;
});
gzip.pipe(writer); //Use GZIP output
}
//Read Original Directory
directory.readdir(__dirname + '/original/', function (err, files) {
if (!err)
console.log("Starting batch job for: " + files.length + " files");
for (var i = 0; i < files.length; i++) {
console.log("Processing: " + files[i]);
processFile(files[i]);
}
}
else
throw err;
}); | d****n 发帖数: 1637 | 4 why not batch([]).parallel()? | h******b 发帖数: 6055 | 5 我先去掉了那个batch manager,更新了一下代码。 性能和使用他没有区别。 可能我
引用的不对。
用一个简单的for loop循环,我现在同时process四个500mb的gzip, CPU的使用,node
只占了15%。
我理解是需要用cluster的node module才能利用所有的CPU核? 怎样才能加速运行呢
? 我是ssd的盘不存在io速度问题。
【在 d****n 的大作中提到】 : why not batch([]).parallel()?
| h******b 发帖数: 6055 | 6 嗯您要是有空请看一下结构,我把代码精简到最少了。同时处理压缩后四个500gb的
gzip,nodejs的CPU只占了15%。 电脑没拿来做任何其他事情。 我希望能够占60%以上
的CPU加快处理速度。
谷歌了一下,好像是因为node一个app只能占一个核? 除非你写多线程代码?最理想
的是每个核处理一个文档,完成以后再处理下一个。
十个包子求助!
【在 c*********e 的大作中提到】 : CPU使用还不到15% ?
| e*******o 发帖数: 4654 | 7 https://github.com/adambom/parallel.js
看看这个? parallel map 一下就完了。
我用python,perl, R, scala 都搞过,都是差不多加一行代码,就是没搞个node
我用的机器12core,python,perl, r 都能搞到差不多100%, scala 最简单,加个.
par 就完了,不过有点不好控制,不同地方加par 效率大不一样。 | k****i 发帖数: 101 | 8 var fs = require('fs'),
zlib = require('zlib'),
csv = require('csv'),
P = require('paralleljs'),
ind = process.argv[2] + '/',
outd = process.argv[3] + '/',
files = fs.readdirSync(ind),
p = new P(files),
ins = function(file){return fs.createReadStream(ind+file)},
gnuzip = zlib.createGnuzip(),
parse = csv.parse(),
transform = csv.transform(function(rdata){/*todo*/}),
gzip = zip.createGzip(),
out = function(file){return fs.createWriteStream(outd+file)},
run = function(file){
return ins
.pipe(gnuzip)
.pipe(parse)
.pipe(transform)
.pipe(gzip)
.pipe(out)
};
p.map(run)
【在 e*******o 的大作中提到】 : https://github.com/adambom/parallel.js : 看看这个? parallel map 一下就完了。 : 我用python,perl, R, scala 都搞过,都是差不多加一行代码,就是没搞个node : 我用的机器12core,python,perl, r 都能搞到差不多100%, scala 最简单,加个. : par 就完了,不过有点不好控制,不同地方加par 效率大不一样。
| S*******n 发帖数: 305 | | h******b 发帖数: 6055 | 10 非常感谢这个思路! 明天一早试试!
【在 k****i 的大作中提到】 : var fs = require('fs'), : zlib = require('zlib'), : csv = require('csv'), : P = require('paralleljs'), : ind = process.argv[2] + '/', : outd = process.argv[3] + '/', : files = fs.readdirSync(ind), : p = new P(files), : ins = function(file){return fs.createReadStream(ind+file)}, : gnuzip = zlib.createGnuzip(),
| | | h******b 发帖数: 6055 | 11 不好意思,我用您原生代码,出来一个"ins is not defined"的错误。
应该怎么改? 比较菜不是特别熟悉您这种写法。ins显然已经define了。
还有就是如果我directory里面有几百个文档他会试图同时处理几百个吗? 感觉会out
of memory啊。能够控制worker数量吗? 还是paralleljs自动优化到最大?
【在 k****i 的大作中提到】 : var fs = require('fs'), : zlib = require('zlib'), : csv = require('csv'), : P = require('paralleljs'), : ind = process.argv[2] + '/', : outd = process.argv[3] + '/', : files = fs.readdirSync(ind), : p = new P(files), : ins = function(file){return fs.createReadStream(ind+file)}, : gnuzip = zlib.createGnuzip(),
| e*******o 发帖数: 4654 | 12 应该可以设置。
一般默认跟core 数量一致。
不知道你为啥用node来干这个活。完全是自找麻烦。
out
【在 h******b 的大作中提到】 : 不好意思,我用您原生代码,出来一个"ins is not defined"的错误。 : 应该怎么改? 比较菜不是特别熟悉您这种写法。ins显然已经define了。 : 还有就是如果我directory里面有几百个文档他会试图同时处理几百个吗? 感觉会out : of memory啊。能够控制worker数量吗? 还是paralleljs自动优化到最大?
| k****i 发帖数: 101 | 13 是有问题,paralleljs这里用不适合,也没人维护了。
out
【在 h******b 的大作中提到】 : 不好意思,我用您原生代码,出来一个"ins is not defined"的错误。 : 应该怎么改? 比较菜不是特别熟悉您这种写法。ins显然已经define了。 : 还有就是如果我directory里面有几百个文档他会试图同时处理几百个吗? 感觉会out : of memory啊。能够控制worker数量吗? 还是paralleljs自动优化到最大?
| h**********c 发帖数: 4120 | 14 windows NT kernel 可以自动feed mutlicore, 忘了叫啥了。 | s*********y 发帖数: 6151 | | s*i 发帖数: 5025 | 16 var cluster = require('cluster'),
os = require('os');
if (cluster.isMaster) {
let files = ['1', '2', '3', '4', '5', '6', '7'];
let trySpawnNewWorker = function () {
if (files.length > 0) {
let params = {};
params.file = files.shift();
cluster.fork(params); //params becomes part of env
}
}
for (let i = 0; i < os.cpus().length; i++) {
trySpawnNewWorker();
}
// if one dies, spawn another to continue processing files
cluster.on('exit', () => trySpawnNewWorker());
} else {
let file = process.env['file'];
console.log('new worker, file=', file);
// emulating long file tasks
setTimeout(() => { console.log('done:', file); process.disconnect(); },
2000);
} | c*********e 发帖数: 16335 | 17 parallel.js怎么只能在firefox,chrome上用,ie, safari,opera都不能用?
【在 e*******o 的大作中提到】 : https://github.com/adambom/parallel.js : 看看这个? parallel map 一下就完了。 : 我用python,perl, R, scala 都搞过,都是差不多加一行代码,就是没搞个node : 我用的机器12core,python,perl, r 都能搞到差不多100%, scala 最简单,加个. : par 就完了,不过有点不好控制,不同地方加par 效率大不一样。
| c*********e 发帖数: 16335 | 18 node.js是单线程,async的。你确定你知道node.js是干什么的吗?
【在 h******b 的大作中提到】 : 用NodeJS写了一个简单的log file processor。 几百个gzip文档,每个压缩以后都有 : 半gig左右。 : 我用了nodejs的csv/zlib插件来stream这些文档,写成新的gzip。速度很慢,结果发现 : CPU使用还不到15%,查了一下发现是因为nodejs同时只用一个core, 我公司的i7完全排 : 不上用场。 : 有什么简单的多线程处理方法吗?
| Y**G 发帖数: 1089 | 19 NodeJS拿手的是大量的I/O但是CPU light的活,如果是CPU heavy的,可以在设一个服
务器单独处理计算吧。
【在 h******b 的大作中提到】 : 用NodeJS写了一个简单的log file processor。 几百个gzip文档,每个压缩以后都有 : 半gig左右。 : 我用了nodejs的csv/zlib插件来stream这些文档,写成新的gzip。速度很慢,结果发现 : CPU使用还不到15%,查了一下发现是因为nodejs同时只用一个core, 我公司的i7完全排 : 不上用场。 : 有什么简单的多线程处理方法吗?
| k****i 发帖数: 101 | 20 // tested, should work :)
(main = () => {
var cluster = require('cluster'),
fs = require('fs')
if (cluster.isMaster) {
(check = () => {
var mand = 'Usage: node thisScript inputDir outputDir ',
opt = '[optional: numberOfWorkerProcesses; default:
numberOfCpuCores]'
if (process.argv.length < 4) {
console.log(mand + opt)
process.exit(1)
}
})();
(schedule = () => {
var ind = process.argv[2],
files = fs.readdirSync(ind),
forkWorker = () => {
if (files.length > 0) {
var params = {
ind : ind,
outd: process.argv[3],
file: files.shift()
}
cluster.fork(params)
}
},
os = require('os'),
cores = os.cpus().length,
workers = process.argv[4]
for (var i = 0; i < (workers || cores); i++) forkWorker()
cluster.on('exit', () => { forkWorker() })
})()
} else {
(work = () => {
var zlib = require('zlib'),
csv = require('csv'),
file = process.env.file,
ins = fs.createReadStream(process.env.ind + '/' + file),
gunzip = zlib.createGunzip(),
parse = csv.parse(),
transform = csv.transform((rdata) => { return rdata.map((v) => {
return v * 10 }) }),
stringify = csv.stringify(),
gzip = zlib.createGzip(),
out = fs.createWriteStream(process.env.outd + '/' + file),
disconnect = () => { process.disconnect() }
ins
.pipe(gunzip)
.pipe(parse)
.pipe(transform)
.pipe(stringify)
.pipe(gzip)
.pipe(out)
.on('finish', disconnect)
})()
}
})()
【在 s*i 的大作中提到】 : var cluster = require('cluster'), : os = require('os'); : if (cluster.isMaster) { : let files = ['1', '2', '3', '4', '5', '6', '7']; : let trySpawnNewWorker = function () { : if (files.length > 0) { : let params = {}; : params.file = files.shift(); : cluster.fork(params); //params becomes part of env : }
| | | h******b 发帖数: 6055 | 21 感谢! 回头试试。
【在 s*i 的大作中提到】 : var cluster = require('cluster'), : os = require('os'); : if (cluster.isMaster) { : let files = ['1', '2', '3', '4', '5', '6', '7']; : let trySpawnNewWorker = function () { : if (files.length > 0) { : let params = {}; : params.file = files.shift(); : cluster.fork(params); //params becomes part of env : }
| h******b 发帖数: 6055 | 22 两位大牛真的好厉害,这个()=>是ES6的东西啊,一下子干净了很多。 我得好好学
习一下。
您的代码,我跑了一下出来了一个:
invalid opening quote at line 1, 是需要什么package吗?
sui的代码我倒是能跑,不过缺了gzip/csv的部分,我得自己试试手写。
【在 k****i 的大作中提到】 : // tested, should work :) : (main = () => { : var cluster = require('cluster'), : fs = require('fs') : if (cluster.isMaster) { : (check = () => { : var mand = 'Usage: node thisScript inputDir outputDir ', : opt = '[optional: numberOfWorkerProcesses; default: : numberOfCpuCores]' : if (process.argv.length < 4) {
| k****i 发帖数: 101 | 23 测试用的输入文件:
1,2,3
运行后输出文件:
10,20,30
另外只须装csv包。如果你的文件是其他格式内容,csv部分就得改下。
【在 h******b 的大作中提到】 : 两位大牛真的好厉害,这个()=>是ES6的东西啊,一下子干净了很多。 我得好好学 : 习一下。 : 您的代码,我跑了一下出来了一个: : invalid opening quote at line 1, 是需要什么package吗? : sui的代码我倒是能跑,不过缺了gzip/csv的部分,我得自己试试手写。
| h******b 发帖数: 6055 | 24 测试成功,非常感谢大牛们。
我用的是csv的旧版,不怕CSV里面的double quote,但api也不一样。 你用的是新的
csv, 我更新了包以后加了相关parse option就过关了。
非常感谢。 学了很多。 你这个fat arrow的使用当真是耳目一新。 es6真心一目了
然。
跑了一下三个500gb的文档,CPU使用轻松破50%,速度提升的一塌糊涂。
上面说node不适合做这种运算的,作为文科生实在是没时间学各种语言/插件。 我是
拿javascript作为万能语言来使用的,平常编程时间本来就不多,多不如精啊。 何况
本来也不是多复杂的活,node如果没办法做这种多线程数据处理也不用混了。
【在 k****i 的大作中提到】 : 测试用的输入文件: : 1,2,3 : 运行后输出文件: : 10,20,30 : 另外只须装csv包。如果你的文件是其他格式内容,csv部分就得改下。
|
|