Skip to content

Commit 627c8a9

Browse files
Create AtomicWorkingSet
Co-authored-by: Ioannis Panagiotas <ioannis.panagiotas@neo4j.com>
1 parent 6a74470 commit 627c8a9

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.maxflow;
21+
22+
import org.neo4j.gds.collections.ha.HugeLongArray;
23+
import org.neo4j.gds.core.utils.paged.HugeLongArrayQueue;
24+
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
public class AtomicWorkingSet {
28+
private final HugeLongArray workingSet;
29+
private final AtomicLong index;
30+
private final AtomicLong size;
31+
32+
public AtomicWorkingSet(long capacity) {
33+
workingSet = HugeLongArray.newArray(capacity);
34+
index = new AtomicLong(0);
35+
size = new AtomicLong(0);
36+
}
37+
38+
boolean isEmpty() {
39+
return size.get() == index.get();
40+
}
41+
42+
void resetIdx() {
43+
index.set(0L);
44+
}
45+
46+
void reset() {
47+
resetIdx();
48+
size.set(0L);
49+
}
50+
51+
long size() {
52+
return size.get();
53+
}
54+
55+
void push(long value) {
56+
var idx = size.getAndIncrement();
57+
workingSet.set(idx, value);
58+
}
59+
60+
void batchPush(HugeLongArrayQueue queue) {
61+
long idx = size.getAndAdd(queue.size());
62+
while(!queue.isEmpty()) {
63+
var node = queue.remove();
64+
workingSet.set(idx++, node);
65+
}
66+
}
67+
68+
long getAndAdd(long batchSize) {
69+
return index.getAndAdd(batchSize);
70+
}
71+
72+
long unsafePeek(long idx) {
73+
return workingSet.get(idx);
74+
}
75+
76+
long pop() {
77+
var idx = index.getAndIncrement();
78+
if (idx < size.get()) {
79+
return workingSet.get(idx);
80+
} else {
81+
// index.decrementAndGet(); //undo increment
82+
return -1L;
83+
}
84+
}
85+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.maxflow;
21+
22+
import org.junit.jupiter.api.Test;
23+
import org.neo4j.gds.core.utils.paged.HugeLongArrayQueue;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertFalse;
27+
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
29+
class AtomicWorkingSetTest {
30+
@Test
31+
void shouldPushAndPullElements() {
32+
var workingSet = new AtomicWorkingSet(5);
33+
34+
workingSet.push(42);
35+
workingSet.push(43);
36+
37+
assertEquals(42, workingSet.pop());
38+
assertEquals(43, workingSet.pop());
39+
assertEquals(-1L, workingSet.pop()); // Should return -1 when no more elements
40+
}
41+
42+
@Test
43+
void shouldBatchPushElements() {
44+
var workingSet = new AtomicWorkingSet(5);
45+
var queue = HugeLongArrayQueue.newQueue(5);
46+
queue.add(1);
47+
queue.add(2);
48+
queue.add(3);
49+
50+
workingSet.batchPush(queue);
51+
52+
assertEquals(1, workingSet.pop());
53+
assertEquals(2, workingSet.pop());
54+
assertEquals(3, workingSet.pop());
55+
assertEquals(-1L, workingSet.pop());
56+
}
57+
58+
@Test
59+
void shouldReportEmptyCorrectly() {
60+
var workingSet = new AtomicWorkingSet(5);
61+
62+
assertTrue(workingSet.isEmpty());
63+
64+
workingSet.push(1);
65+
assertFalse(workingSet.isEmpty());
66+
67+
workingSet.pop();
68+
assertTrue(workingSet.isEmpty());
69+
}
70+
71+
@Test
72+
void shouldHandleCapacityLimit() {
73+
var workingSet = new AtomicWorkingSet(2);
74+
75+
workingSet.push(1);
76+
workingSet.push(2);
77+
78+
assertEquals(1, workingSet.pop());
79+
assertEquals(2, workingSet.pop());
80+
assertEquals(-1L, workingSet.pop());
81+
}
82+
}

0 commit comments

Comments
 (0)