Skip to content

Commit 15452c9

Browse files
Define output format
1 parent 0c164eb commit 15452c9

File tree

8 files changed

+209
-21
lines changed

8 files changed

+209
-21
lines changed

progress-tracking/src/main/java/org/neo4j/gds/mem/GraphStoreMemoryContainer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.neo4j.gds.api.graph.store.catalog.GraphStoreAddedEvent;
2323
import org.neo4j.gds.api.graph.store.catalog.GraphStoreRemovedEvent;
2424

25+
import java.util.HashSet;
26+
import java.util.Optional;
27+
import java.util.Set;
2528
import java.util.concurrent.ConcurrentHashMap;
2629
import java.util.concurrent.atomic.AtomicLong;
2730
import java.util.stream.Stream;
@@ -56,11 +59,25 @@ Stream<UserEntityMemory> listGraphs(String user){
5659
.getOrDefault(user, EMPTY_HASH_MAP)
5760
.entrySet()
5861
.stream()
59-
.map( entry -> new UserEntityMemory(user, entry.getKey(), entry.getValue()));
62+
.map( entry -> UserEntityMemory.createGraph(user, entry.getKey(), entry.getValue()));
6063
}
6164

6265
Stream<UserEntityMemory> listGraphs(){
6366
return graphStoresMemory.keySet().stream().flatMap(this::listGraphs);
6467
}
6568

69+
long memoryOfGraphs(String user){
70+
return graphStoresMemory
71+
.getOrDefault(user, EMPTY_HASH_MAP)
72+
.values()
73+
.stream()
74+
.reduce(0L, Long::sum);
75+
}
76+
77+
Set<String> graphUsers(Optional<Set<String>> inputUsers){
78+
Set<String> users = inputUsers.orElseGet(HashSet::new);
79+
users.addAll(graphStoresMemory.keySet());
80+
return users;
81+
}
82+
6683
}

progress-tracking/src/main/java/org/neo4j/gds/mem/MemoryTracker.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import org.neo4j.gds.core.utils.progress.UserTask;
2929
import org.neo4j.gds.logging.Log;
3030

31+
import java.util.Optional;
32+
import java.util.stream.Stream;
33+
3134
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
3235

3336
public class MemoryTracker implements TaskStoreListener, GraphStoreAddedEventListener, GraphStoreRemovedEventListener {
@@ -64,6 +67,34 @@ public synchronized long availableMemory() {
6467
return initialMemory - graphStoreMemoryContainer.graphStoreReservedMemory() - taskMemoryContainer.taskReservedMemory();
6568
}
6669

70+
public Stream<UserEntityMemory> listUser(String user){
71+
return Stream.concat(taskMemoryContainer.listTasks(user), graphStoreMemoryContainer.listGraphs(user));
72+
}
73+
74+
public Stream<UserEntityMemory> listAll(){
75+
return Stream.concat(taskMemoryContainer.listTasks(), graphStoreMemoryContainer.listGraphs());
76+
}
77+
78+
public UserMemorySummary memorySummary(String user){
79+
return new UserMemorySummary(user,
80+
graphStoreMemoryContainer.memoryOfGraphs(user),
81+
taskMemoryContainer.memoryOfTasks(user)
82+
);
83+
}
84+
85+
public Stream<UserMemorySummary> memorySummary(){
86+
87+
var users = graphStoreMemoryContainer.graphUsers(Optional.empty());
88+
users= taskMemoryContainer.taskUsers(Optional.of(users));
89+
90+
return users.stream()
91+
.map(user -> new UserMemorySummary(
92+
user,
93+
graphStoreMemoryContainer.memoryOfGraphs(user),
94+
taskMemoryContainer.memoryOfTasks(user)
95+
));
96+
}
97+
6798
@Override
6899
public void onTaskAdded(UserTask userTask) {
69100
// do nothing, we add the memory explicitly prior to execution

progress-tracking/src/main/java/org/neo4j/gds/mem/TaskMemoryContainer.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.neo4j.gds.core.utils.progress.JobId;
2424
import org.neo4j.gds.core.utils.progress.UserTask;
2525

26+
import java.util.HashSet;
27+
import java.util.Optional;
28+
import java.util.Set;
2629
import java.util.concurrent.ConcurrentHashMap;
2730
import java.util.concurrent.atomic.AtomicLong;
2831
import java.util.stream.Stream;
@@ -42,7 +45,7 @@ void reserve(String username, String taskName, JobId jobId,long memoryAmount){
4245

4346
long removeTask(UserTask task){
4447
var mem= memoryInUse.getOrDefault(task.username(), EMPTY_HASH_MAP).remove(task.jobId()).getRight();
45-
allocatedMemory.addAndGet(mem);
48+
allocatedMemory.addAndGet(-mem);
4649
return mem;
4750
}
4851

@@ -53,13 +56,31 @@ long taskReservedMemory(){
5356
Stream<UserEntityMemory> listTasks(String user){
5457
return memoryInUse
5558
.getOrDefault(user, EMPTY_HASH_MAP)
56-
.values()
59+
.entrySet()
5760
.stream()
58-
.map(stringLongPair -> new UserEntityMemory(user, stringLongPair.getLeft(), stringLongPair.getRight()));
61+
.map(
62+
jobIdPairEntry
63+
-> UserEntityMemory.createTask(user, jobIdPairEntry.getValue().getLeft(), jobIdPairEntry.getKey(),jobIdPairEntry.getValue().getRight()));
5964
}
6065

6166
Stream<UserEntityMemory> listTasks(){
6267
return memoryInUse.keySet().stream().flatMap(this::listTasks);
6368
}
6469

70+
long memoryOfTasks(String user){
71+
return memoryInUse
72+
.getOrDefault(user, EMPTY_HASH_MAP)
73+
.values()
74+
.stream()
75+
.map(Pair::getRight)
76+
.reduce(0L, Long::sum);
77+
78+
}
79+
80+
Set<String> taskUsers(Optional<Set<String>> inputUsers){
81+
Set<String> users = inputUsers.orElseGet(HashSet::new);
82+
users.addAll(memoryInUse.keySet());
83+
return users;
84+
}
85+
6586
}

progress-tracking/src/main/java/org/neo4j/gds/mem/UserEntityMemory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,15 @@
1919
*/
2020
package org.neo4j.gds.mem;
2121

22-
public record UserEntityMemory(String user, String entity, long memoryInBytes) {
22+
import org.neo4j.gds.core.utils.progress.JobId;
23+
24+
public record UserEntityMemory(String user, String name, String entity, long memoryInBytes) {
25+
26+
static UserEntityMemory createGraph(String user, String name, long memoryInBytes){
27+
return new UserEntityMemory(user,name,"graph",memoryInBytes);
28+
}
29+
static UserEntityMemory createTask(String user, String name, JobId jobId, long memoryInBytes){
30+
return new UserEntityMemory(user, name, jobId.asString(), memoryInBytes);
31+
}
32+
2333
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.mem;
21+
22+
public record UserMemorySummary(String user, long totalGraphMemory, long totalTasksMemory) {}
23+

progress-tracking/src/test/java/org/neo4j/gds/mem/GraphStoreMemoryContainerTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void shouldListForUser(){
5353
graphStoreMemoryContainer.addGraph(new GraphStoreAddedEvent("Bob","DB","graph3",20));
5454
var aliceList =graphStoreMemoryContainer.listGraphs("Alice").toList();
5555
assertThat(aliceList).hasSize(2);
56-
assertThat(aliceList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("graph1","graph2");
56+
assertThat(aliceList.stream().map(UserEntityMemory::name).toList()).containsExactlyInAnyOrder("graph1","graph2");
5757
assertThat(aliceList.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(10L,15L);
5858

5959
}
@@ -67,9 +67,21 @@ void shouldListAll(){
6767
graphStoreMemoryContainer.addGraph(new GraphStoreAddedEvent("Bob","DB","graph3",20));
6868
var graphList =graphStoreMemoryContainer.listGraphs().toList();
6969
assertThat(graphList).hasSize(3);
70-
assertThat(graphList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("graph1","graph2","graph3");
70+
assertThat(graphList.stream().map(UserEntityMemory::name).toList()).containsExactlyInAnyOrder("graph1","graph2","graph3");
7171
assertThat(graphList.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(10L,15L,20L);
7272

7373
}
7474

75+
@Test
76+
void shouldReturnMemoryForUser(){
77+
GraphStoreMemoryContainer graphStoreMemoryContainer=new GraphStoreMemoryContainer();
78+
graphStoreMemoryContainer.addGraph(new GraphStoreAddedEvent("Alice","DB","graph1",10));
79+
graphStoreMemoryContainer.addGraph(new GraphStoreAddedEvent("Alice","DB","graph2",15));
80+
81+
graphStoreMemoryContainer.addGraph(new GraphStoreAddedEvent("Bob","DB","graph3",20));
82+
assertThat(graphStoreMemoryContainer.memoryOfGraphs("Alice")).isEqualTo(25L);
83+
}
84+
85+
86+
7587
}

progress-tracking/src/test/java/org/neo4j/gds/mem/MemoryTrackerTest.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.params.provider.Arguments;
2525
import org.junit.jupiter.params.provider.MethodSource;
2626
import org.mockito.Answers;
27+
import org.neo4j.gds.api.graph.store.catalog.GraphStoreAddedEvent;
2728
import org.neo4j.gds.core.utils.progress.JobId;
2829
import org.neo4j.gds.core.utils.progress.UserTask;
2930
import org.neo4j.gds.logging.Log;
@@ -75,20 +76,82 @@ void shouldHaveAvailableMemoryWithoutTheTrackedMemory() {
7576
.isEqualTo(7L);
7677
}
7778

79+
@Test
80+
void shouldListForUser(){
81+
var memoryTracker = new MemoryTracker(19L, Log.noOpLog());
82+
memoryTracker.track("alice","task1",new JobId("job1"), 9);
83+
memoryTracker.track("alice","task2",new JobId("job2"), 3);
84+
memoryTracker.track("bob","task3",new JobId("job3"), 5);
85+
memoryTracker.onGraphStoreAdded(new GraphStoreAddedEvent("alice","neo4j","graph1",11));
86+
var aliceList = memoryTracker.listUser("alice").toList();
87+
assertThat(aliceList.stream().map(UserEntityMemory::name).toList()).containsExactlyInAnyOrder("task1","task2","graph1");
88+
assertThat(aliceList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("job1","job2","graph");
89+
90+
assertThat(aliceList.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(9L,3L,11L);
91+
}
92+
93+
@Test
94+
void shouldListForAll(){
95+
var memoryTracker = new MemoryTracker(19L, Log.noOpLog());
96+
memoryTracker.track("alice","task1",new JobId("job1"), 9);
97+
memoryTracker.track("alice","task2",new JobId("job2"), 3);
98+
memoryTracker.track("bob","task3",new JobId("job3"), 5);
99+
memoryTracker.onGraphStoreAdded(new GraphStoreAddedEvent("alice","neo4j","graph1",11));
100+
101+
var list = memoryTracker.listAll().toList();
102+
assertThat(list.stream().map(UserEntityMemory::name).toList()).containsExactlyInAnyOrder("task1","task2","task3","graph1");
103+
assertThat(list.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("job1","job2","job3","graph");
104+
105+
assertThat(list.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(9L,3L,5L,11L);
106+
}
107+
108+
@Test
109+
void shouldReturnMemoryForUser(){
110+
var memoryTracker = new MemoryTracker(19L, Log.noOpLog());
111+
memoryTracker.track("alice","task1",new JobId("job1"), 9);
112+
memoryTracker.track("alice","task2",new JobId("job2"), 3);
113+
memoryTracker.track("bob","task3",new JobId("job3"), 5);
114+
memoryTracker.onGraphStoreAdded(new GraphStoreAddedEvent("alice","neo4j","graph1",11));
115+
116+
var aliceMemory = memoryTracker.memorySummary("alice");
117+
assertThat(aliceMemory.totalGraphMemory()).isEqualTo(11L);
118+
assertThat(aliceMemory.totalTasksMemory()).isEqualTo(12L);
119+
120+
var bobMemory = memoryTracker.memorySummary("bob");
121+
assertThat(bobMemory.totalGraphMemory()).isEqualTo(0L);
122+
assertThat(bobMemory.totalTasksMemory()).isEqualTo(5L);
123+
124+
}
125+
126+
@Test
127+
void shouldReturnMemoryForAll(){
128+
var memoryTracker = new MemoryTracker(19L, Log.noOpLog());
129+
memoryTracker.track("alice","task1",new JobId("job1"), 9);
130+
memoryTracker.track("alice","task2",new JobId("job2"), 3);
131+
memoryTracker.track("bob","task3",new JobId("job3"), 5);
132+
memoryTracker.onGraphStoreAdded(new GraphStoreAddedEvent("alice","neo4j","graph1",11));
133+
134+
var list = memoryTracker.memorySummary().toList();
135+
136+
assertThat(list.stream()).map(UserMemorySummary::totalGraphMemory).containsExactlyInAnyOrder(11L,0L);
137+
assertThat(list.stream()).map(UserMemorySummary::totalTasksMemory).containsExactlyInAnyOrder(12L,5L);
138+
139+
}
140+
78141
@Test
79142
void shouldFreeMemoryOnTaskRemoved() {
80143
var memoryTracker = new MemoryTracker(19L, Log.noOpLog());
81144

82-
memoryTracker.track("a","b",new JobId("foo"), 9);
145+
memoryTracker.track("a","b", new JobId("foo"), 9);
83146
memoryTracker.track("a","b",new JobId("bar"), 3);
84147

85148
var userTaskMock = mock(UserTask.class, Answers.RETURNS_MOCKS);
86149
when(userTaskMock.jobId()).thenReturn(new JobId("foo"));
150+
when(userTaskMock.username()).thenReturn("a");
87151

88152
memoryTracker.onTaskRemoved(userTaskMock);
89153

90154
assertThat(memoryTracker.availableMemory())
91-
.isEqualTo(memoryTracker.availableMemory())
92155
.isEqualTo(16L);
93156
}
94157

progress-tracking/src/test/java/org/neo4j/gds/mem/TaskMemoryContainerTest.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class TaskMemoryContainerTest {
3131
@Test
3232
void shouldReserve(){
3333
TaskMemoryContainer taskMemoryContainer=new TaskMemoryContainer();
34-
taskMemoryContainer.reserve("alice", "foo" , new JobId("JobId"), 10);
35-
taskMemoryContainer.reserve("alice", "foo2" , new JobId("JobId2"), 20);
34+
taskMemoryContainer.reserve("alice", "foo", new JobId("JobId"), 10);
35+
taskMemoryContainer.reserve("alice", "foo2", new JobId("JobId2"), 20);
3636
assertThat(taskMemoryContainer.taskReservedMemory()).isEqualTo(30L);
3737

3838
}
@@ -41,8 +41,8 @@ void shouldReserve(){
4141
void shouldRemove(){
4242
TaskMemoryContainer taskMemoryContainer=new TaskMemoryContainer();
4343
JobId jobId = new JobId("JobId");
44-
taskMemoryContainer.reserve("alice", "foo" , jobId, 10);
45-
taskMemoryContainer.reserve("alice", "foo2" , new JobId("JobId2"), 20);
44+
taskMemoryContainer.reserve("alice", "foo", jobId, 10);
45+
taskMemoryContainer.reserve("alice", "foo2", new JobId("JobId2"), 20);
4646
assertThat(taskMemoryContainer.taskReservedMemory()).isEqualTo(30L);
4747
taskMemoryContainer.removeTask(new UserTask("alice",jobId,null));
4848
assertThat(taskMemoryContainer.taskReservedMemory()).isEqualTo(20L);
@@ -52,27 +52,38 @@ void shouldRemove(){
5252
@Test
5353
void shouldListForUser(){
5454
TaskMemoryContainer taskMemoryContainer=new TaskMemoryContainer();
55-
taskMemoryContainer.reserve("alice", "foo" , new JobId("JobId1"), 10);
56-
taskMemoryContainer.reserve("alice", "foo2" , new JobId("JobId2"), 15);
57-
taskMemoryContainer.reserve("bob", "foo3" , new JobId("JobId3"), 30);
55+
taskMemoryContainer.reserve("alice", "foo", new JobId("JobId1"), 10);
56+
taskMemoryContainer.reserve("alice", "foo2", new JobId("JobId2"), 15);
57+
taskMemoryContainer.reserve("bob", "foo3", new JobId("JobId3"), 30);
5858

5959
var aliceList = taskMemoryContainer.listTasks("alice").toList();
6060
assertThat(aliceList).hasSize(2);
61-
assertThat(aliceList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("foo","foo2");
61+
assertThat(aliceList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("JobId1","JobId2");
6262
assertThat(aliceList.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(10L,15L);
6363

6464
}
6565

6666
@Test
6767
void shouldListAll(){
6868
TaskMemoryContainer taskMemoryContainer=new TaskMemoryContainer();
69-
taskMemoryContainer.reserve("alice", "foo" , new JobId("JobId1"), 10);
70-
taskMemoryContainer.reserve("alice", "foo2" , new JobId("JobId2"), 15);
71-
taskMemoryContainer.reserve("bob", "foo3" , new JobId("JobId3"), 20);
69+
taskMemoryContainer.reserve("alice", "foo", new JobId("JobId1"), 10);
70+
taskMemoryContainer.reserve("alice", "foo2", new JobId("JobId2"), 15);
71+
taskMemoryContainer.reserve("bob", "foo3", new JobId("JobId3"), 20);
7272
var taskList =taskMemoryContainer.listTasks().toList();
7373
assertThat(taskList).hasSize(3);
74-
assertThat(taskList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("foo","foo2","foo3");
74+
assertThat(taskList.stream().map(UserEntityMemory::entity).toList()).containsExactlyInAnyOrder("JobId1","JobId2","JobId3");
7575
assertThat(taskList.stream().map(UserEntityMemory::memoryInBytes).toList()).containsExactlyInAnyOrder(10L,15L,20L);
7676

7777
}
78+
79+
@Test
80+
void shouldReturnMemoryForUser(){
81+
TaskMemoryContainer taskMemoryContainer=new TaskMemoryContainer();
82+
taskMemoryContainer.reserve("alice", "foo", new JobId("JobId1"), 10);
83+
taskMemoryContainer.reserve("alice", "foo2", new JobId("JobId2"), 15);
84+
taskMemoryContainer.reserve("bob", "foo3", new JobId("JobId3"), 30);
85+
86+
assertThat(taskMemoryContainer.memoryOfTasks("alice")).isEqualTo(25L);
87+
}
88+
7889
}

0 commit comments

Comments
 (0)