Skip to content

Commit 1ff4193

Browse files
committed
Add lock-free, growable HugeAtomicBitSet
1 parent c43268d commit 1ff4193

File tree

3 files changed

+310
-0
lines changed

3 files changed

+310
-0
lines changed

collections/src/main/java/org/neo4j/gds/mem/HugeArrays.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ public static int indexInPage(long index) {
3838
return (int) (index & PAGE_MASK);
3939
}
4040

41+
public static int pageIndex(long index, int pageShift) {
42+
return (int) (index >>> pageShift);
43+
}
44+
45+
public static int indexInPage(long index, long pageMask) {
46+
return (int) (index & pageMask);
47+
}
48+
4149
public static int exclusiveIndexOfPage(long index) {
4250
return 1 + (int) ((index - 1L) & PAGE_MASK);
4351
}
@@ -52,6 +60,12 @@ public static int numberOfPages(long capacity) {
5260
return (int) numPages;
5361
}
5462

63+
public static int numberOfPages(long capacity, int pageShift, long pageMask) {
64+
final long numPages = (capacity + pageMask) >>> pageShift;
65+
assert numPages <= Integer.MAX_VALUE : "pageSize=" + (1 << pageShift) + " is too small for capacity: " + capacity;
66+
return (int) numPages;
67+
}
68+
5569
private HugeArrays() {
5670
throw new UnsupportedOperationException("No instances");
5771
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.core.utils.paged;
21+
22+
import org.neo4j.gds.mem.BitUtil;
23+
import org.neo4j.gds.mem.HugeArrays;
24+
25+
import java.util.concurrent.atomic.AtomicLongArray;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
public final class HugeAtomicPagedBitSet {
29+
30+
// Each page stores 2^PAGE_SHIFT_BITS entries.
31+
// Word-size is 64 bit (long), which means we
32+
// store 2^(PAGE_SHIFT_BITS - 6) words per page.
33+
static final int PAGE_SHIFT_BITS = 14;
34+
// Number of bits per word (long).
35+
private static final int NUM_BITS = Long.SIZE;
36+
private static final int BIT_MASK = NUM_BITS - 1;
37+
38+
private final int pageSize; // words per page
39+
private final int pageShift; // word-aligned page shift
40+
private final long pageMask; // word-aligned page mask
41+
42+
// We need to atomically update the reference to the
43+
// actual pages since multiple threads try to add a
44+
// new page at the same time and only once must succeed.
45+
private final AtomicReference<Pages> pages;
46+
47+
public static HugeAtomicPagedBitSet create(long bitSize) {
48+
// Number of words required to represent the bit size.
49+
long wordSize = BitUtil.ceilDiv(bitSize, NUM_BITS);
50+
51+
// Parameters for long pages representing the bits.
52+
int pageShift = PAGE_SHIFT_BITS - 6; // 2^6 == 64 Bits for a long
53+
int pageSize = 1 << pageShift;
54+
long pageMask = pageSize - 1;
55+
// We allocate in pages of fixed size, so the last page
56+
// might have extra space, which is fine as this is a
57+
// growing data structure anyway. The capacity will be
58+
// larger than the specified size.
59+
int pageCount = HugeArrays.numberOfPages(wordSize, pageShift, pageMask);
60+
61+
return new HugeAtomicPagedBitSet(pageCount, pageSize, pageShift, pageMask);
62+
}
63+
64+
private HugeAtomicPagedBitSet(int pageCount, int pageSize, int pageShift, long pageMask) {
65+
this.pageSize = pageSize;
66+
this.pageShift = pageShift;
67+
this.pageMask = pageMask;
68+
this.pages = new AtomicReference<>(new Pages(pageCount, pageSize));
69+
}
70+
71+
public void set(long index) {
72+
long longIndex = index >>> 6;
73+
int pageIndex = HugeArrays.pageIndex(longIndex, pageShift);
74+
int wordIndex = HugeArrays.indexInPage(longIndex, pageMask);
75+
int bitIndex = (int) (index & BIT_MASK);
76+
77+
var page = getPage(pageIndex);
78+
long bitMask = 1L << bitIndex;
79+
80+
long oldWord = page.get(wordIndex);
81+
while (true) {
82+
long newWord = oldWord | bitMask;
83+
if (newWord == oldWord) {
84+
// nothing to set
85+
return;
86+
}
87+
long currentWord = page.compareAndExchange(wordIndex, oldWord, newWord);
88+
if (currentWord == oldWord) {
89+
// CAS successful
90+
return;
91+
}
92+
// CAS unsuccessful, try again
93+
oldWord = currentWord;
94+
}
95+
}
96+
97+
public boolean get(long index) {
98+
long longIndex = index >>> 6;
99+
int pageIndex = HugeArrays.pageIndex(longIndex, pageShift);
100+
int wordIndex = HugeArrays.indexInPage(longIndex, pageMask);
101+
int bitIndex = (int) (index & BIT_MASK);
102+
103+
var page = getPage(pageIndex);
104+
long bitMask = 1L << bitIndex;
105+
return (page.get(wordIndex) & bitMask) != 0;
106+
}
107+
108+
public long cardinality() {
109+
final Pages pages = this.pages.get();
110+
final long pageCount = pages.length();
111+
final long pageSize = this.pageSize;
112+
113+
long setBitCount = 0;
114+
115+
for (int pageIndex = 0; pageIndex < pageCount; pageIndex++) {
116+
var page = pages.getPage(pageIndex);
117+
for (int wordIndex = 0; wordIndex < pageSize; wordIndex++) {
118+
long word = page.get(wordIndex);
119+
setBitCount += Long.bitCount(word);
120+
}
121+
}
122+
123+
return setBitCount;
124+
}
125+
126+
public long capacity() {
127+
return pages.get().length() * (1L << pageShift);
128+
}
129+
130+
private AtomicLongArray getPage(int pageIndex) {
131+
var pages = this.pages.get();
132+
133+
while (pages.length() <= pageIndex) {
134+
// We need to grow the number of pages to fit the requested page index.
135+
// This needs to happen in a loop since we can't guarantee that if the
136+
// current thread is not successful in updating the pages, the newly
137+
// created pages contain enough space.
138+
var newPages = new Pages(pages, pageIndex + 1, this.pageSize);
139+
// Atomically updating the reference. If we're successful, the witness will
140+
// be the prior `pages` value, and we're done. If we're unsuccessful, we
141+
// already read the new `pages` value due to CAX call and repeat with that one.
142+
var witness = this.pages.compareAndExchange(pages, newPages);
143+
144+
if (pages == witness) {
145+
// Success.
146+
pages = newPages;
147+
} else {
148+
// Throw away the created pages and try again with the new current value.
149+
pages = witness;
150+
}
151+
}
152+
153+
return pages.getPage(pageIndex);
154+
}
155+
156+
private static final class Pages {
157+
158+
private final AtomicLongArray[] pages;
159+
160+
private Pages(int pageCount, int pageSize) {
161+
var pages = new AtomicLongArray[pageCount];
162+
163+
for (int pageIndex = 0; pageIndex < pageCount; pageIndex++) {
164+
pages[pageIndex] = new AtomicLongArray(pageSize);
165+
}
166+
167+
this.pages = pages;
168+
}
169+
170+
private Pages(Pages oldPages, int newPageCount, int pageSize) {
171+
var pages = new AtomicLongArray[newPageCount];
172+
173+
// We transfer the existing pages to the new pages.
174+
final int oldPageCount = oldPages.length();
175+
System.arraycopy(oldPages.pages, 0, pages, 0, oldPageCount);
176+
// And add new pages for the remaining ones until we reach the page count.
177+
// This is potential garbage since the thread creating those might not win
178+
// the race to grow the pages.
179+
for (int pageIndex = oldPageCount; pageIndex < newPageCount; pageIndex++) {
180+
pages[pageIndex] = new AtomicLongArray(pageSize);
181+
}
182+
183+
this.pages = pages;
184+
}
185+
186+
private AtomicLongArray getPage(int pageIndex) {
187+
return pages[pageIndex];
188+
}
189+
190+
private int length() {
191+
return this.pages.length;
192+
}
193+
}
194+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.core.utils.paged;
21+
22+
import org.junit.jupiter.api.Test;
23+
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
24+
import org.neo4j.gds.core.utils.partition.PartitionUtils;
25+
26+
import java.util.List;
27+
import java.util.Optional;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.neo4j.gds.core.utils.paged.HugeAtomicPagedBitSet.PAGE_SHIFT_BITS;
31+
32+
class HugeAtomicPagedBitSetTest {
33+
34+
@Test
35+
void testSinglePageBitSet() {
36+
var atomicBitSet = HugeAtomicPagedBitSet.create(42);
37+
assertThat(atomicBitSet.get(23)).isFalse();
38+
atomicBitSet.set(23);
39+
assertThat(atomicBitSet.get(23)).isTrue();
40+
}
41+
42+
@Test
43+
void testMultiPageBitSet() {
44+
long size = 2 * (1L << PAGE_SHIFT_BITS) + 42; // 3 pages
45+
46+
var atomicBitSet = HugeAtomicPagedBitSet.create(size);
47+
// page 0
48+
assertThat(atomicBitSet.get(23)).isFalse();
49+
atomicBitSet.set(23);
50+
assertThat(atomicBitSet.get(23)).isTrue();
51+
// page 1
52+
assertThat(atomicBitSet.get((1L << PAGE_SHIFT_BITS) + 23)).isFalse();
53+
atomicBitSet.set((1L << PAGE_SHIFT_BITS) + 23);
54+
assertThat(atomicBitSet.get((1L << PAGE_SHIFT_BITS) + 23)).isTrue();
55+
// page 2
56+
assertThat(atomicBitSet.get(2 * (1L << PAGE_SHIFT_BITS) + 23)).isFalse();
57+
atomicBitSet.set(2 * (1L << PAGE_SHIFT_BITS) + 23);
58+
assertThat(atomicBitSet.get(2 * (1L << PAGE_SHIFT_BITS) + 23)).isTrue();
59+
}
60+
61+
@Test
62+
void testCardinality() {
63+
long size = 2 * (1L << PAGE_SHIFT_BITS) + 42; // 3 pages
64+
65+
var atomicBitSet = HugeAtomicPagedBitSet.create(size);
66+
assertThat(atomicBitSet.cardinality()).isEqualTo(0);
67+
// page 0
68+
atomicBitSet.set(23);
69+
assertThat(atomicBitSet.cardinality()).isEqualTo(1);
70+
// page 1
71+
atomicBitSet.set((1L << PAGE_SHIFT_BITS) + 23);
72+
assertThat(atomicBitSet.cardinality()).isEqualTo(2);
73+
// page 2
74+
atomicBitSet.set(2 * (1L << PAGE_SHIFT_BITS) + 23);
75+
assertThat(atomicBitSet.cardinality()).isEqualTo(3);
76+
}
77+
78+
@Test
79+
void writingAndGrowingShouldBeThreadSafe() {
80+
int concurrency = 8;
81+
int nodeCount = 1_000_000;
82+
83+
var bitSet = HugeAtomicPagedBitSet.create(0);
84+
85+
List<Runnable> tasks = PartitionUtils.rangePartition(concurrency, nodeCount, (partition) -> () -> {
86+
long startNode = partition.startNode();
87+
long endNode = partition.startNode() + partition.nodeCount();
88+
for (var i = startNode; i < endNode; i++) {
89+
bitSet.set(i);
90+
}
91+
}, Optional.empty());
92+
93+
RunWithConcurrency
94+
.builder()
95+
.tasks(tasks)
96+
.concurrency(concurrency)
97+
.build()
98+
.run();
99+
100+
assertThat(bitSet.cardinality()).isEqualTo(nodeCount);
101+
}
102+
}

0 commit comments

Comments
 (0)