Skip to content

Commit df51a8d

Browse files
DarthMaxsoerenreichardt
authored andcommitted
Eagerly consume RelationshipStream in RelationshipStreamExportTask
Co-authored-by: Sören Reichardt <soren.reichardt@neo4j.com>
1 parent e225bc2 commit df51a8d

File tree

2 files changed

+159
-0
lines changed

2 files changed

+159
-0
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package org.neo4j.gds.collections.hsl;
2+
3+
import org.neo4j.gds.collections.ArrayUtil;
4+
import org.neo4j.gds.collections.DrainingIterator;
5+
import org.neo4j.gds.collections.PageUtil;
6+
import org.neo4j.gds.mem.Estimate;
7+
8+
import java.lang.reflect.Array;
9+
import java.util.Arrays;
10+
import java.util.stream.Stream;
11+
12+
13+
public final class HugeSparseObjectList<E> {
14+
private static final int PAGE_SHIFT = 12;
15+
16+
private static final int PAGE_SIZE = 1 << PAGE_SHIFT;
17+
18+
private static final int PAGE_MASK = PAGE_SIZE - 1;
19+
20+
private static final long PAGE_SIZE_IN_BYTES = Estimate.sizeOfLongArray(PAGE_SIZE);
21+
private final Class<E> clazz;
22+
23+
private E[][] pages;
24+
25+
private final E defaultValue;
26+
27+
public HugeSparseObjectList(E defaultValue, long initialCapacity, Class<E> clazz) {
28+
this.clazz = clazz;
29+
int numPages = PageUtil.pageIndex(initialCapacity, PAGE_SHIFT);
30+
this.pages = (E[][]) Array.newInstance(clazz, numPages, PAGE_SIZE);
31+
this.defaultValue = defaultValue;
32+
}
33+
34+
public long capacity() {
35+
int numPages = pages.length;
36+
return ((long) numPages) << PAGE_SHIFT;
37+
}
38+
39+
public E get(long index) {
40+
int pageIndex = PageUtil.pageIndex(index, PAGE_SHIFT);
41+
int indexInPage = PageUtil.indexInPage(index, PAGE_MASK);
42+
if (pageIndex < pages.length) {
43+
E[] page = pages[pageIndex];
44+
if (page != null) {
45+
return page[indexInPage];
46+
}
47+
}
48+
return defaultValue;
49+
}
50+
51+
public boolean contains(long index) {
52+
int pageIndex = PageUtil.pageIndex(index, PAGE_SHIFT);
53+
if (pageIndex < pages.length) {
54+
E[] page = pages[pageIndex];
55+
if (page != null) {
56+
int indexInPage = PageUtil.indexInPage(index, PAGE_MASK);
57+
return !page[indexInPage].equals(defaultValue);
58+
}
59+
}
60+
return false;
61+
}
62+
63+
public DrainingIterator<E[]> drainingIterator() {
64+
return new DrainingIterator<>(pages, PAGE_SIZE);
65+
}
66+
67+
public void forAll(LongObjectConsumer<E> consumer) {
68+
E[][] pages = this.pages;
69+
for (int pageIndex = 0; pageIndex < pages.length; pageIndex++) {
70+
E[] page = pages[pageIndex];
71+
if (page == null) {
72+
continue;
73+
}
74+
for (int indexInPage = 0; indexInPage < page.length; indexInPage++) {
75+
E value = page[indexInPage];
76+
if (value.equals(defaultValue)) {
77+
continue;
78+
}
79+
long index = ((long) pageIndex << PAGE_SHIFT) | (long) indexInPage;
80+
consumer.consume(index, value);
81+
}
82+
}
83+
}
84+
85+
public void set(long index, E value) {
86+
int pageIndex = PageUtil.pageIndex(index, PAGE_SHIFT);
87+
int indexInPage = PageUtil.indexInPage(index, PAGE_MASK);
88+
getPage(pageIndex)[indexInPage] = value;
89+
}
90+
91+
public Stream<E> stream() {
92+
return Arrays.stream(this.pages).filter(obj -> !(obj == null)).flatMap(Arrays::stream);
93+
}
94+
95+
public boolean setIfAbsent(long index, E value) {
96+
int pageIndex = PageUtil.pageIndex(index, PAGE_SHIFT);
97+
int indexInPage = PageUtil.indexInPage(index, PAGE_MASK);
98+
E[] page = getPage(pageIndex);
99+
E currentValue = page[indexInPage];
100+
if (currentValue.equals(defaultValue)) {
101+
page[indexInPage] = value;
102+
return true;
103+
}
104+
return false;
105+
}
106+
107+
private E[] getPage(int pageIndex) {
108+
if (pageIndex >= pages.length) {
109+
grow(pageIndex + 1);
110+
}
111+
E[] page = pages[pageIndex];
112+
if (page == null) {
113+
page = allocateNewPage(pageIndex);
114+
}
115+
return page;
116+
}
117+
118+
private void grow(int minNewSize) {
119+
if (minNewSize <= pages.length) {
120+
return;
121+
}
122+
int newSize = ArrayUtil.oversize(minNewSize, Estimate.BYTES_OBJECT_REF);
123+
this.pages = Arrays.copyOf(this.pages, newSize);
124+
}
125+
126+
private E[] allocateNewPage(int pageIndex) {
127+
E[] page = (E[]) Array.newInstance(clazz, PAGE_SIZE);
128+
if (defaultValue != null) {
129+
Arrays.fill(page, defaultValue);
130+
}
131+
this.pages[pageIndex] = page;
132+
return page;
133+
}
134+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.collections.hsl;
21+
22+
@FunctionalInterface
23+
public interface LongObjectConsumer<E> {
24+
void consume(long index, E value);
25+
}

0 commit comments

Comments
 (0)