Skip to content

Commit 16a2aab

Browse files
committed
Adds reduce implementation and test.
1 parent 167c15b commit 16a2aab

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

lib/parallel.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,50 @@
144144
return this;
145145
};
146146

147+
Parallel.prototype._spawnReduceWorker = function (data, cb, done) {
148+
var that = this;
149+
var wrk = new Worker(that.options.path);
150+
wrk.postMessage(Parallel.getWorkerSource(cb));
151+
wrk.postMessage(data);
152+
wrk.onmessage = function (msg) {
153+
wrk.terminate();
154+
that.data[that.data.length] = msg.data;
155+
done();
156+
};
157+
};
158+
159+
Parallel.prototype.reduce = function (cb) {
160+
if (!this.data.length) {
161+
throw new Error('Can\'t reduce non-array data');
162+
}
163+
164+
var that = this;
165+
function done(data) {
166+
if (that.data.length === 1) {
167+
that.data = that.data[0];
168+
newOp.resolve(null, that.data);
169+
} else {
170+
that._spawnReduceWorker([that.data[0], that.data[1]], cb, done);
171+
that.data.splice(0, 2);
172+
}
173+
}
174+
175+
var newOp = new Operation();
176+
this.operation.then(function () {
177+
if (that.data.length === 1) {
178+
newOp.resolve(null, that.data[0]);
179+
} else {
180+
for (var i = 0; i < that.options.maxWorkers && i < Math.floor(that.data.length / 2); ++i) {
181+
that._spawnReduceWorker([that.data[i * 2], that.data[i * 2 + 1]], cb, done);
182+
}
183+
184+
that.data.splice(0, i * 2);
185+
}
186+
});
187+
this.operation = newOp;
188+
return this;
189+
};
190+
147191
Parallel.prototype.then = function (cb, errCb) {
148192
var that = this;
149193
var newOp = new Operation();

test/api.spec.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,30 @@
140140
});
141141
});
142142

143+
it('should execute .reduce() correctly', function () {
144+
var Parallel = require('../lib/parallel.js');
145+
var p = new Parallel([1, 2, 3]);
146+
var done = false;
147+
var result = null;
148+
149+
runs(function () {
150+
p.reduce(function (data) {
151+
return data[0] + data[1];
152+
}).then(function (data) {
153+
result = data;
154+
done = true;
155+
});
156+
});
157+
158+
waitsFor(function () {
159+
return done;
160+
}, "it should finish", 500);
161+
162+
runs(function () {
163+
expect(result).toEqual(6);
164+
});
165+
});
166+
143167
it('should process data returned from .then()', function () {
144168
var Parallel = require('../lib/parallel.js');
145169
var p = new Parallel([1, 2, 3]);

0 commit comments

Comments
 (0)