Skip to content

Commit 2d0e36c

Browse files
Introduce task container in mem. tracker
1 parent 701210c commit 2d0e36c

File tree

3 files changed

+77
-12
lines changed

3 files changed

+77
-12
lines changed

progress-tracking/src/main/java/org/neo4j/gds/mem/GraphStoreMemoryContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class GraphStoreMemoryContainer {
3030

3131
private final ConcurrentHashMap<String, ConcurrentHashMap<String,Long>> graphStoresMemory = new ConcurrentHashMap<>();
3232
private final AtomicLong graphStoreReservedMemory = new AtomicLong();
33-
private static final ConcurrentHashMap<String,Long> EMPTY_HASH_MAP =new ConcurrentHashMap<>();
33+
private static final ConcurrentHashMap<String,Long> EMPTY_HASH_MAP = new ConcurrentHashMap<>();
3434

3535
long addGraph(GraphStoreAddedEvent graphStoreAddedEvent){
3636
var addedGraphMemory = graphStoreAddedEvent.memoryInBytes();

progress-tracking/src/main/java/org/neo4j/gds/mem/MemoryTracker.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
*/
2020
package org.neo4j.gds.mem;
2121

22-
import com.carrotsearch.hppc.ObjectLongHashMap;
23-
import com.carrotsearch.hppc.ObjectLongMap;
24-
import com.carrotsearch.hppc.procedures.LongProcedure;
2522
import org.neo4j.gds.api.graph.store.catalog.GraphStoreAddedEvent;
2623
import org.neo4j.gds.api.graph.store.catalog.GraphStoreAddedEventListener;
2724
import org.neo4j.gds.api.graph.store.catalog.GraphStoreRemovedEvent;
@@ -31,14 +28,12 @@
3128
import org.neo4j.gds.core.utils.progress.UserTask;
3229
import org.neo4j.gds.logging.Log;
3330

34-
import java.util.concurrent.atomic.LongAdder;
35-
3631
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
3732

3833
public class MemoryTracker implements TaskStoreListener, GraphStoreAddedEventListener, GraphStoreRemovedEventListener {
3934
private final long initialMemory;
4035
private final GraphStoreMemoryContainer graphStoreMemoryContainer = new GraphStoreMemoryContainer();
41-
private final ObjectLongMap<JobId> memoryInUse = new ObjectLongHashMap<>();
36+
private final TaskMemoryContainer taskMemoryContainer = new TaskMemoryContainer();
4237
private final Log log;
4338

4439
public MemoryTracker(long initialMemory, Log log) {
@@ -53,7 +48,7 @@ public long initialMemory() {
5348

5449
public synchronized void track(JobId jobId, long memoryEstimate) {
5550
log.debug("Tracking %s: %s bytes", jobId.asString(), memoryEstimate);
56-
memoryInUse.put(jobId, memoryEstimate);
51+
taskMemoryContainer.reserve(jobId, memoryEstimate);
5752
log.debug("Available memory after tracking task: %s bytes", availableMemory());
5853
}
5954

@@ -66,22 +61,21 @@ public synchronized void tryToTrack(JobId jobId, long memoryEstimate) throws Mem
6661
}
6762

6863
public synchronized long availableMemory() {
69-
var reservedMemory = new LongAdder();
70-
memoryInUse.values().forEach((LongProcedure) reservedMemory::add);
71-
return initialMemory - (reservedMemory.longValue() + graphStoreMemoryContainer.graphStoreReservedMemory());
64+
return initialMemory - graphStoreMemoryContainer.graphStoreReservedMemory() - taskMemoryContainer.taskReservedMemory();
7265
}
7366

7467
@Override
7568
public void onTaskAdded(UserTask userTask) {
7669
// do nothing, we add the memory explicitly prior to execution
70+
taskMemoryContainer.addTask(userTask);
7771
}
7872

7973
@Override
8074
public synchronized void onTaskRemoved(UserTask userTask) {
8175
var taskDescription = userTask.task().description();
8276
log.debug("Removing task: %s", taskDescription);
8377
var jobId = userTask.jobId();
84-
var removed = memoryInUse.remove(jobId);
78+
var removed= taskMemoryContainer.removeTask(userTask);
8579
log.debug("Removed task %s (%s): %s bytes", taskDescription, jobId.asString(), removed);
8680
log.debug("Available memory after removing task: %s bytes", availableMemory());
8781
log.debug("Done removing task: %s", taskDescription);
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.mem;
21+
22+
import com.carrotsearch.hppc.ObjectLongHashMap;
23+
import com.carrotsearch.hppc.ObjectLongMap;
24+
import org.neo4j.gds.core.utils.progress.JobId;
25+
import org.neo4j.gds.core.utils.progress.UserTask;
26+
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
import java.util.stream.Stream;
30+
31+
class TaskMemoryContainer {
32+
33+
private final ConcurrentHashMap<String,ConcurrentHashMap<UserTask,Long>> memoryInUse = new ConcurrentHashMap<>();
34+
private final AtomicLong allocatedMemory = new AtomicLong();
35+
private final ObjectLongMap<JobId> temporaryMap =new ObjectLongHashMap<>();
36+
private static final ConcurrentHashMap<UserTask,Long> EMPTY_HASH_MAP = new ConcurrentHashMap<>();
37+
38+
void reserve(JobId jobId,long memoryAmount){
39+
temporaryMap.put(jobId,memoryAmount);
40+
allocatedMemory.addAndGet(memoryAmount);
41+
}
42+
43+
void addTask(UserTask task){
44+
var memoryAmount = temporaryMap.remove(task.jobId());
45+
memoryInUse.putIfAbsent(task.username(), new ConcurrentHashMap<>());
46+
memoryInUse.get(task.username()).put(task,memoryAmount);
47+
}
48+
49+
long removeTask(UserTask task){
50+
var mem= memoryInUse.getOrDefault(task.username(), EMPTY_HASH_MAP).remove(task);
51+
allocatedMemory.addAndGet(-mem);
52+
return mem;
53+
}
54+
55+
long taskReservedMemory(){
56+
return allocatedMemory.get();
57+
}
58+
59+
Stream<UserEntityMemory> listTasks(String user){
60+
return memoryInUse
61+
.getOrDefault(user, EMPTY_HASH_MAP)
62+
.entrySet()
63+
.stream()
64+
.map( entry -> new UserEntityMemory(user, entry.getKey().task().description(), entry.getValue()));
65+
}
66+
67+
Stream<UserEntityMemory> listTasks(){
68+
return memoryInUse.keySet().stream().flatMap(this::listTasks);
69+
}
70+
71+
}

0 commit comments

Comments
 (0)