Skip to content

Commit c640893

Browse files
authored
Merge pull request #11181 from lassewesth/logcorrelationm
add job ids as correlation ids for log messages
2 parents 8e6b03c + c951925 commit c640893

File tree

8 files changed

+238
-85
lines changed

8 files changed

+238
-85
lines changed

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/BatchingProgressLogger.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BatchingProgressLogger implements ProgressLogger {
3636
public static final long MAXIMUM_LOG_INTERVAL = (long) Math.pow(2, 13);
3737

3838
private final LoggerForProgressTracking log;
39+
private final JobId jobId;
3940
private final Concurrency concurrency;
4041
private long taskVolume;
4142
private long batchSize;
@@ -58,12 +59,13 @@ static long calculateBatchSize(long taskVolume, Concurrency concurrency) {
5859
return Math.max(1, BitUtil.nextHighestPowerOfTwo(batchSize));
5960
}
6061

61-
public BatchingProgressLogger(LoggerForProgressTracking log, Task task, Concurrency concurrency) {
62-
this(log, task, calculateBatchSize(task, concurrency), concurrency);
62+
public BatchingProgressLogger(LoggerForProgressTracking log, JobId jobId, Task task, Concurrency concurrency) {
63+
this(log, jobId, task, calculateBatchSize(task, concurrency), concurrency);
6364
}
6465

65-
public BatchingProgressLogger(LoggerForProgressTracking log, Task task, long batchSize, Concurrency concurrency) {
66+
public BatchingProgressLogger(LoggerForProgressTracking log, JobId jobId, Task task, long batchSize, Concurrency concurrency) {
6667
this.log = log;
68+
this.jobId = jobId;
6769
this.taskVolume = task.getProgress().volume();
6870
this.batchSize = batchSize;
6971
this.taskName = task.description();
@@ -136,44 +138,38 @@ private synchronized void doLogPercentage(Supplier<String> msgFactory, long prog
136138
}
137139

138140
private void logProgress(int nextPercentage) {
139-
logInfo(formatWithLocale("[%s] %s %d%%", Thread.currentThread().getName(), taskName, nextPercentage));
141+
logMessage(formatWithLocale("%d%%", nextPercentage));
140142
}
141143

142144
private void logProgressWithMessage(int nextPercentage, String msg) {
143-
logInfo(
144-
formatWithLocale("[%s] %s %d%% %s", Thread.currentThread().getName(), taskName, nextPercentage, msg)
145-
);
145+
logMessage(formatWithLocale("%d%% %s", nextPercentage, msg));
146146
}
147147

148148
@Override
149149
public void logMessage(String msg) {
150-
log.info("[%s] %s %s", Thread.currentThread().getName(), taskName, msg);
150+
log.info("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg);
151151
}
152152

153153
@Override
154154
public void logMessage(Supplier<String> msg) {
155155
logMessage(Objects.requireNonNull(msg.get()));
156156
}
157157

158-
private void logInfo(String message) {
159-
log.info(message);
160-
}
161-
162158
@Override
163159
public void logDebug(Supplier<String> msg) {
164160
if (log.isDebugEnabled()) {
165-
log.debug("[%s] %s %s", Thread.currentThread().getName(), taskName, msg.get());
161+
log.debug("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg.get());
166162
}
167163
}
168164

169165
@Override
170166
public void logWarning(String message) {
171-
log.warn("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
167+
log.warn("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
172168
}
173169

174170
@Override
175171
public void logError(String message) {
176-
log.error("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
172+
log.error("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
177173
}
178174

179175
@Override
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.core.utils.progress.tasks;
21+
22+
import org.neo4j.gds.core.utils.progress.ProgressLogger;
23+
24+
@SuppressWarnings("ClassCanBeRecord")
25+
final class LoggingLeafTaskVisitor implements TaskVisitor {
26+
private final ProgressLogger progressLogger;
27+
28+
LoggingLeafTaskVisitor(ProgressLogger progressLogger) {
29+
this.progressLogger = progressLogger;
30+
}
31+
32+
@Override
33+
public void visitLeafTask(LeafTask leafTask) {
34+
progressLogger.logFinishPercentage();
35+
}
36+
}

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressLogger.java

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,36 @@
2222
import org.jetbrains.annotations.Nullable;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
2424
import org.neo4j.gds.core.utils.progress.BatchingProgressLogger;
25+
import org.neo4j.gds.core.utils.progress.JobId;
2526
import org.neo4j.gds.core.utils.progress.ProgressLogger;
2627

27-
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
28+
import java.util.function.Supplier;
2829

29-
public class TaskProgressLogger extends BatchingProgressLogger {
30+
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
3031

32+
@SuppressWarnings("ClassCanBeRecord")
33+
public final class TaskProgressLogger implements ProgressLogger {
34+
private final BatchingProgressLogger batchingProgressLogger;
3135
private final Task baseTask;
3236
private final TaskVisitor loggingLeafTaskVisitor;
3337

34-
TaskProgressLogger(LoggerForProgressTracking log, Task baseTask, Concurrency concurrency) {
35-
super(log, baseTask, concurrency);
36-
this.baseTask = baseTask;
37-
this.loggingLeafTaskVisitor = new LoggingLeafTaskVisitor(this);
38+
static TaskProgressLogger create(LoggerForProgressTracking log, JobId jobId, Task baseTask, Concurrency concurrency) {
39+
var batchingProgressLogger = new BatchingProgressLogger(log, jobId, baseTask, concurrency);
40+
var loggingLeafTaskVisitor = new LoggingLeafTaskVisitor(batchingProgressLogger);
3841

42+
return new TaskProgressLogger(batchingProgressLogger, baseTask, loggingLeafTaskVisitor);
3943
}
40-
TaskProgressLogger(LoggerForProgressTracking log, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
41-
super(log, baseTask, concurrency);
44+
45+
static TaskProgressLogger create(LoggerForProgressTracking log, JobId jobId, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
46+
var batchingProgressLogger = new BatchingProgressLogger(log, jobId, baseTask, concurrency);
47+
48+
return new TaskProgressLogger(batchingProgressLogger, baseTask, leafTaskVisitor);
49+
}
50+
51+
private TaskProgressLogger(BatchingProgressLogger batchingProgressLogger, Task baseTask, TaskVisitor loggingLeafTaskVisitor) {
52+
this.batchingProgressLogger = batchingProgressLogger;
4253
this.baseTask = baseTask;
43-
this.loggingLeafTaskVisitor = leafTaskVisitor;
54+
this.loggingLeafTaskVisitor = loggingLeafTaskVisitor;
4455
}
4556

4657
void logBeginSubTask(Task task, Task parentTask) {
@@ -104,20 +115,13 @@ private String unboundedIterationsTaskName(
104115

105116
private String taskDescription(Task task, Task parentTask) {
106117
String taskName;
107-
if (parentTask instanceof IterativeTask) {
108-
var iterativeParentTask = (IterativeTask) parentTask;
118+
if (parentTask instanceof IterativeTask iterativeParentTask) {
109119
var iterativeTaskMode = iterativeParentTask.mode();
110-
switch (iterativeTaskMode) {
111-
case DYNAMIC:
112-
case FIXED:
113-
taskName = boundedIterationsTaskName(iterativeParentTask, task);
114-
break;
115-
case OPEN:
116-
taskName = unboundedIterationsTaskName(iterativeParentTask, task);
117-
break;
118-
default:
119-
throw new UnsupportedOperationException(formatWithLocale("Enum value %s is not supported", iterativeTaskMode));
120-
}
120+
121+
taskName = switch (iterativeTaskMode) {
122+
case DYNAMIC, FIXED -> boundedIterationsTaskName(iterativeParentTask, task);
123+
case OPEN -> unboundedIterationsTaskName(iterativeParentTask, task);
124+
};
121125
} else {
122126
taskName = taskDescription(task);
123127
}
@@ -134,17 +138,58 @@ private void log100OnLeafTaskFinish(Task task) {
134138
task.visit(loggingLeafTaskVisitor);
135139
}
136140

137-
private static final class LoggingLeafTaskVisitor implements TaskVisitor {
141+
@Override
142+
public String getTask() {
143+
return batchingProgressLogger.getTask();
144+
}
138145

139-
private final ProgressLogger progressLogger;
146+
@Override
147+
public void setTask(String task) {
148+
batchingProgressLogger.setTask(task);
149+
}
140150

141-
private LoggingLeafTaskVisitor(ProgressLogger progressLogger) {
142-
this.progressLogger = progressLogger;
143-
}
151+
@Override
152+
public void logProgress(Supplier<String> msgFactory) {
153+
batchingProgressLogger.logProgress(msgFactory);
154+
}
144155

145-
@Override
146-
public void visitLeafTask(LeafTask leafTask) {
147-
progressLogger.logFinishPercentage();
148-
}
156+
@Override
157+
public void logProgress(long progress, Supplier<String> msgFactory) {
158+
batchingProgressLogger.logProgress(progress, msgFactory);
159+
}
160+
161+
@Override
162+
public void logMessage(Supplier<String> msg) {
163+
batchingProgressLogger.logMessage(msg);
164+
}
165+
166+
@Override
167+
public void logFinishPercentage() {
168+
batchingProgressLogger.logFinishPercentage();
169+
}
170+
171+
@Override
172+
public void logDebug(Supplier<String> msg) {
173+
batchingProgressLogger.logDebug(msg);
174+
}
175+
176+
@Override
177+
public void logWarning(String msg) {
178+
batchingProgressLogger.logWarning(msg);
179+
}
180+
181+
@Override
182+
public void logError(String msg) {
183+
batchingProgressLogger.logError(msg);
184+
}
185+
186+
@Override
187+
public long reset(long newTaskVolume) {
188+
return batchingProgressLogger.reset(newTaskVolume);
189+
}
190+
191+
@Override
192+
public void release() {
193+
batchingProgressLogger.release();
149194
}
150195
}

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public TaskProgressTracker(
6868
TaskRegistryFactory taskRegistryFactory,
6969
UserLogRegistryFactory userLogRegistryFactory
7070
) {
71-
this(baseTask, jobId, taskRegistryFactory, new TaskProgressLogger(log, baseTask, concurrency), userLogRegistryFactory);
71+
this(baseTask, jobId, taskRegistryFactory, TaskProgressLogger.create(log, jobId, baseTask, concurrency), userLogRegistryFactory);
7272
}
7373

74-
protected TaskProgressTracker(
74+
public TaskProgressTracker(
7575
Task baseTask,
7676
JobId jobId,
7777
TaskRegistryFactory taskRegistryFactory,

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskTreeProgressTracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ public TaskTreeProgressTracker(
3838
baseTask,
3939
jobId,
4040
taskRegistryFactory,
41-
new TaskProgressLogger(
41+
TaskProgressLogger.create(
4242
log,
43+
jobId,
4344
baseTask,
4445
concurrency,
4546
new PassThroughTaskVisitor()

0 commit comments

Comments
 (0)