Skip to content

Commit 7077ce3

Browse files
committed
Updated scheduler implementation
1 parent ab6160d commit 7077ce3

File tree

1 file changed

+45
-31
lines changed

1 file changed

+45
-31
lines changed

src/processes/scheduler.js

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,58 @@
11
"use strict";
22

3-
//Scheduler. Borrowed and modified from RxJS's Default Scheduler.
4-
//While it is probably more robust, this should fit the needs for
5-
//this project.
3+
const MAX_REDUCTIONS_PER_PROCESS = 8;
4+
5+
class ProcessQueue {
6+
constructor(pid){
7+
this.pid = pid;
8+
this.tasks = [];
9+
}
10+
11+
empty(){
12+
return this.tasks.length === 0;
13+
}
14+
15+
add(task){
16+
this.tasks.push(task);
17+
}
18+
19+
next(){
20+
return this.tasks.shift();
21+
}
22+
}
623

7-
//TODO: Use a fair scheduling implementation
824
class Scheduler {
925
constructor(throttle = 0){
10-
this.nextTaskId = 1;
11-
this.tasks = {}
1226
this.isRunning = false;
1327
this.invokeLater = function (callback) { setTimeout(callback, throttle); }
28+
this.queues = {};
29+
this.run();
1430
}
1531

16-
removeFromScheduler(taskId){
17-
delete this.tasks[taskId];
32+
addToQueue(pid, task){
33+
if(!this.queues[pid]){
34+
this.queues[pid] = new ProcessQueue(pid);
35+
}
36+
37+
this.queues[pid].add(task);
1838
}
1939

2040
removePid(pid){
21-
//prevent further execution while removing tasks
22-
//with matching pids
2341
this.isRunning = true;
2442

25-
for(let taskId of Object.keys(this.tasks)){
26-
if(this.tasks[taskId] && this.tasks[taskId][0] === pid){
27-
this.removeFromScheduler(taskId);
28-
}
29-
}
43+
delete this.queues[pid];
3044

3145
this.isRunning = false;
3246
}
3347

34-
runTask(taskId){
48+
run(){
3549
if (this.isRunning) {
36-
this.invokeLater(() => { this.runTask(taskId); });
50+
this.invokeLater(() => { this.run(); });
3751
} else {
38-
if(this.tasks[taskId]){
39-
40-
let [pid, task] = this.tasks[taskId];
41-
42-
if (task) {
52+
for(let pid of Object.keys(this.queues)){
53+
let reductions = 0;
54+
while(this.queues[pid] && !this.queues[pid].empty() && reductions < MAX_REDUCTIONS_PER_PROCESS){
55+
let task = this.queues[pid].next();
4356
this.isRunning = true;
4457

4558
let result;
@@ -51,29 +64,30 @@ class Scheduler {
5164
result = e;
5265
}
5366

54-
this.removeFromScheduler(taskId);
5567
this.isRunning = false;
5668

5769
if (result instanceof Error) {
5870
throw result;
5971
}
60-
}
6172

73+
reductions++;
74+
}
6275
}
76+
77+
this.invokeLater(() => { this.run(); });
6378
}
6479
}
6580

6681
addToScheduler(pid, task, dueTime = 0) {
67-
let id = this.nextTaskId ++;
68-
this.tasks[id] = [ pid, task ];
69-
7082
if(dueTime === 0){
71-
this.invokeLater(() => { this.runTask(id); });
83+
this.invokeLater(() => {
84+
this.addToQueue(pid, task);
85+
});
7286
}else{
73-
setTimeout(() => { this.runTask(id); }, dueTime);
87+
setTimeout(() => {
88+
this.addToQueue(pid, task);
89+
}, dueTime);
7490
}
75-
76-
return id;
7791
};
7892

7993
schedule(pid, task){

0 commit comments

Comments
 (0)