Skip to content

Commit e31d88e

Browse files
committed
add job ids as correlation ids for log messages
1 parent a77f077 commit e31d88e

File tree

7 files changed

+123
-52
lines changed

7 files changed

+123
-52
lines changed

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/BatchingProgressLogger.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class BatchingProgressLogger implements ProgressLogger {
3636
public static final long MAXIMUM_LOG_INTERVAL = (long) Math.pow(2, 13);
3737

3838
private final LoggerForProgressTracking log;
39+
private final JobId jobId;
3940
private final Concurrency concurrency;
4041
private long taskVolume;
4142
private long batchSize;
@@ -58,12 +59,13 @@ static long calculateBatchSize(long taskVolume, Concurrency concurrency) {
5859
return Math.max(1, BitUtil.nextHighestPowerOfTwo(batchSize));
5960
}
6061

61-
public BatchingProgressLogger(LoggerForProgressTracking log, Task task, Concurrency concurrency) {
62-
this(log, task, calculateBatchSize(task, concurrency), concurrency);
62+
public BatchingProgressLogger(LoggerForProgressTracking log, JobId jobId, Task task, Concurrency concurrency) {
63+
this(log, jobId, task, calculateBatchSize(task, concurrency), concurrency);
6364
}
6465

65-
public BatchingProgressLogger(LoggerForProgressTracking log, Task task, long batchSize, Concurrency concurrency) {
66+
public BatchingProgressLogger(LoggerForProgressTracking log, JobId jobId, Task task, long batchSize, Concurrency concurrency) {
6667
this.log = log;
68+
this.jobId = jobId;
6769
this.taskVolume = task.getProgress().volume();
6870
this.batchSize = batchSize;
6971
this.taskName = task.description();
@@ -136,44 +138,38 @@ private synchronized void doLogPercentage(Supplier<String> msgFactory, long prog
136138
}
137139

138140
private void logProgress(int nextPercentage) {
139-
logInfo(formatWithLocale("[%s] %s %d%%", Thread.currentThread().getName(), taskName, nextPercentage));
141+
logMessage(formatWithLocale("%d%%", nextPercentage));
140142
}
141143

142144
private void logProgressWithMessage(int nextPercentage, String msg) {
143-
logInfo(
144-
formatWithLocale("[%s] %s %d%% %s", Thread.currentThread().getName(), taskName, nextPercentage, msg)
145-
);
145+
logMessage(formatWithLocale("%d%% %s", nextPercentage, msg));
146146
}
147147

148148
@Override
149149
public void logMessage(String msg) {
150-
log.info("[%s] %s %s", Thread.currentThread().getName(), taskName, msg);
150+
log.info("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg);
151151
}
152152

153153
@Override
154154
public void logMessage(Supplier<String> msg) {
155155
logMessage(Objects.requireNonNull(msg.get()));
156156
}
157157

158-
private void logInfo(String message) {
159-
log.info(message);
160-
}
161-
162158
@Override
163159
public void logDebug(Supplier<String> msg) {
164160
if (log.isDebugEnabled()) {
165-
log.debug("[%s] %s %s", Thread.currentThread().getName(), taskName, msg.get());
161+
log.debug("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, msg.get());
166162
}
167163
}
168164

169165
@Override
170166
public void logWarning(String message) {
171-
log.warn("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
167+
log.warn("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
172168
}
173169

174170
@Override
175171
public void logError(String message) {
176-
log.error("[%s] %s %s", Thread.currentThread().getName(), taskName, message);
172+
log.error("[%s] [%s] %s %s", jobId.asString(), Thread.currentThread().getName(), taskName, message);
177173
}
178174

179175
@Override

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressLogger.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.jetbrains.annotations.Nullable;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
2424
import org.neo4j.gds.core.utils.progress.BatchingProgressLogger;
25+
import org.neo4j.gds.core.utils.progress.JobId;
2526
import org.neo4j.gds.core.utils.progress.ProgressLogger;
2627

2728
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
@@ -31,14 +32,13 @@ public class TaskProgressLogger extends BatchingProgressLogger {
3132
private final Task baseTask;
3233
private final TaskVisitor loggingLeafTaskVisitor;
3334

34-
TaskProgressLogger(LoggerForProgressTracking log, Task baseTask, Concurrency concurrency) {
35-
super(log, baseTask, concurrency);
35+
TaskProgressLogger(LoggerForProgressTracking log, JobId jobId, Task baseTask, Concurrency concurrency) {
36+
super(log, jobId, baseTask, concurrency);
3637
this.baseTask = baseTask;
3738
this.loggingLeafTaskVisitor = new LoggingLeafTaskVisitor(this);
38-
3939
}
40-
TaskProgressLogger(LoggerForProgressTracking log, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
41-
super(log, baseTask, concurrency);
40+
TaskProgressLogger(LoggerForProgressTracking log, JobId jobId, Task baseTask, Concurrency concurrency, TaskVisitor leafTaskVisitor) {
41+
super(log, jobId, baseTask, concurrency);
4242
this.baseTask = baseTask;
4343
this.loggingLeafTaskVisitor = leafTaskVisitor;
4444
}

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public TaskProgressTracker(
6868
TaskRegistryFactory taskRegistryFactory,
6969
UserLogRegistryFactory userLogRegistryFactory
7070
) {
71-
this(baseTask, jobId, taskRegistryFactory, new TaskProgressLogger(log, baseTask, concurrency), userLogRegistryFactory);
71+
this(baseTask, jobId, taskRegistryFactory, new TaskProgressLogger(log, jobId, baseTask, concurrency), userLogRegistryFactory);
7272
}
7373

74-
protected TaskProgressTracker(
74+
public TaskProgressTracker(
7575
Task baseTask,
7676
JobId jobId,
7777
TaskRegistryFactory taskRegistryFactory,

progress-tracking/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskTreeProgressTracker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public TaskTreeProgressTracker(
4040
taskRegistryFactory,
4141
new TaskProgressLogger(
4242
log,
43+
jobId,
4344
baseTask,
4445
concurrency,
4546
new PassThroughTaskVisitor()

progress-tracking/src/test/java/org/neo4j/gds/core/utils/BatchingProgressLoggerTest.java renamed to progress-tracking/src/test/java/org/neo4j/gds/core/utils/progress/BatchingProgressLoggerTest.java

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* You should have received a copy of the GNU General Public License
1818
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1919
*/
20-
package org.neo4j.gds.core.utils;
20+
package org.neo4j.gds.core.utils.progress;
2121

2222
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
2323
import org.eclipse.collections.impl.utility.ListIterate;
@@ -28,10 +28,11 @@
2828
import org.neo4j.gds.core.concurrency.Concurrency;
2929
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
3030
import org.neo4j.gds.core.utils.logging.LoggerForProgressTrackingAdapter;
31-
import org.neo4j.gds.core.utils.progress.BatchingProgressLogger;
31+
import org.neo4j.gds.core.utils.progress.tasks.LeafTask;
3232
import org.neo4j.gds.core.utils.progress.tasks.LoggerForProgressTracking;
3333
import org.neo4j.gds.core.utils.progress.tasks.Tasks;
3434
import org.neo4j.gds.logging.GdsTestLog;
35+
import org.neo4j.gds.logging.Log;
3536
import org.neo4j.gds.mem.BitUtil;
3637
import org.neo4j.gds.utils.CloseableThreadLocal;
3738

@@ -47,6 +48,9 @@
4748
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4849
import static org.assertj.core.api.Assertions.fail;
4950
import static org.junit.jupiter.api.Assertions.assertEquals;
51+
import static org.mockito.Mockito.mock;
52+
import static org.mockito.Mockito.verify;
53+
import static org.mockito.Mockito.when;
5054
import static org.neo4j.gds.assertj.Extractors.removingThreadId;
5155
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
5256

@@ -61,6 +65,7 @@ void mustLogProgressOnlyAfterBatchSizeInvocations() {
6165
var concurrency = new Concurrency(1);
6266
var logger = new BatchingProgressLogger(
6367
new LoggerForProgressTrackingAdapter(log),
68+
new JobId("some job id"),
6469
Tasks.leaf("foo", taskVolume),
6570
batchSize,
6671
concurrency
@@ -72,13 +77,13 @@ void mustLogProgressOnlyAfterBatchSizeInvocations() {
7277
}
7378

7479
var threadName = Thread.currentThread().getName();
75-
var messageTemplate = "[%s] foo %d%% %d";
80+
var messageTemplate = "[%s] [%s] foo %d%% %d";
7681
var expectedMessages = List.of(
77-
formatWithLocale(messageTemplate, threadName, 1 * batchSize * 100 / taskVolume, 1 * batchSize - 1),
78-
formatWithLocale(messageTemplate, threadName, 2 * batchSize * 100 / taskVolume, 2 * batchSize - 1),
79-
formatWithLocale(messageTemplate, threadName, 3 * batchSize * 100 / taskVolume, 3 * batchSize - 1),
80-
formatWithLocale(messageTemplate, threadName, 4 * batchSize * 100 / taskVolume, 4 * batchSize - 1),
81-
formatWithLocale(messageTemplate, threadName, 5 * batchSize * 100 / taskVolume, 5 * batchSize - 1)
82+
formatWithLocale(messageTemplate, "some job id", threadName, 1 * batchSize * 100 / taskVolume, 1 * batchSize - 1),
83+
formatWithLocale(messageTemplate, "some job id", threadName, 2 * batchSize * 100 / taskVolume, 2 * batchSize - 1),
84+
formatWithLocale(messageTemplate, "some job id", threadName, 3 * batchSize * 100 / taskVolume, 3 * batchSize - 1),
85+
formatWithLocale(messageTemplate, "some job id", threadName, 4 * batchSize * 100 / taskVolume, 4 * batchSize - 1),
86+
formatWithLocale(messageTemplate, "some job id", threadName, 5 * batchSize * 100 / taskVolume, 5 * batchSize - 1)
8287
);
8388

8489
var messages = log.getMessages("info");
@@ -94,6 +99,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
9499
var concurrency = new Concurrency(1);
95100
var logger = new BatchingProgressLogger(
96101
new LoggerForProgressTrackingAdapter(log),
102+
new JobId("a job id"),
97103
Tasks.leaf("foo", taskVolume),
98104
batchSize,
99105
concurrency
@@ -104,7 +110,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
104110
}
105111

106112
var threadName = Thread.currentThread().getName();
107-
var messageTemplate = "[%s] foo %d%%";
113+
var messageTemplate = "[%s] [%s] foo %d%%";
108114

109115
var progressSteps = IntStream
110116
.iterate(0, i -> i < taskVolume, i -> i + progressStep)
@@ -114,7 +120,7 @@ void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
114120

115121
var expectedMessages = loggedProgressSteps.stream()
116122
.skip(1)
117-
.map(i -> formatWithLocale(messageTemplate, threadName, i * 100 / taskVolume))
123+
.map(i -> formatWithLocale(messageTemplate, "a job id", threadName, i * 100 / taskVolume))
118124
.collect(Collectors.toList());
119125

120126
var messages = log.getMessages("info");
@@ -151,7 +157,11 @@ void shouldLogAfterResetWhereACallCountHigherThanBatchSizeIsLeftBehind() {
151157
var taskVolume = 1337;
152158

153159
var log = new GdsTestLog();
154-
var logger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), Tasks.leaf("Test", taskVolume), concurrency); // batchSize is 13
160+
var logger = new BatchingProgressLogger(
161+
new LoggerForProgressTrackingAdapter(log),
162+
new JobId(),
163+
Tasks.leaf("Test", taskVolume),
164+
concurrency); // batchSize is 13
155165
logger.reset(taskVolume);
156166
logger.logProgress(20); // callCount is 20, call count after logging == 20 - 13 = 7
157167
assertThat(log.getMessages(TestLog.INFO))
@@ -168,7 +178,7 @@ void shouldLogAfterResetWhereACallCountHigherThanBatchSizeIsLeftBehind() {
168178
void log100Percent() {
169179
var log = new GdsTestLog();
170180
var concurrency = new Concurrency(1);
171-
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), Tasks.leaf("Test"), concurrency);
181+
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), new JobId(), Tasks.leaf("Test"), concurrency);
172182
testProgressLogger.reset(1337);
173183
testProgressLogger.logFinishPercentage();
174184
assertThat(log.getMessages(TestLog.INFO))
@@ -180,7 +190,7 @@ void log100Percent() {
180190
void shouldLog100OnlyOnce() {
181191
var log = new GdsTestLog();
182192
var concurrency = new Concurrency(1);
183-
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), Tasks.leaf("Test"), concurrency);
193+
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), new JobId(), Tasks.leaf("Test"), concurrency);
184194
testProgressLogger.reset(1);
185195
testProgressLogger.logProgress(1);
186196
testProgressLogger.logFinishPercentage();
@@ -193,7 +203,7 @@ void shouldLog100OnlyOnce() {
193203
void shouldNotExceed100Percent() {
194204
var log = new GdsTestLog();
195205
var concurrency = new Concurrency(1);
196-
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), Tasks.leaf("Test"), concurrency);
206+
var testProgressLogger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), new JobId(), Tasks.leaf("Test"), concurrency);
197207
testProgressLogger.reset(1);
198208
testProgressLogger.logProgress(1); // reaches 100 %
199209
testProgressLogger.logProgress(1); // exceeds 100 %
@@ -206,6 +216,7 @@ void shouldNotExceed100Percent() {
206216
void closesThreadLocal() {
207217
var logger = new BatchingProgressLogger(
208218
LoggerForProgressTracking.noOpLog(),
219+
new JobId(),
209220
Tasks.leaf("foo", 42),
210221
new Concurrency(1)
211222
);
@@ -227,7 +238,7 @@ void closesThreadLocal() {
227238

228239
private static List<Integer> performLogging(long taskVolume, Concurrency concurrency) {
229240
var log = new GdsTestLog();
230-
var logger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), Tasks.leaf("Test", taskVolume), concurrency);
241+
var logger = new BatchingProgressLogger(new LoggerForProgressTrackingAdapter(log), new JobId("the_job_id"), Tasks.leaf("Test", taskVolume), concurrency);
231242
logger.reset(taskVolume);
232243

233244
var batchSize = (int) BitUtil.ceilDiv(taskVolume, concurrency.value());
@@ -249,9 +260,69 @@ private static List<Integer> performLogging(long taskVolume, Concurrency concurr
249260
return log
250261
.getMessages(TestLog.INFO)
251262
.stream()
252-
.map(progress -> progress.split(" ")[2].replace("%", ""))
263+
.map(progress -> progress.split(" ")[3].replace("%", ""))
253264
.map(Integer::parseInt)
254265
.collect(Collectors.toList());
255266
}
256267

268+
@Test
269+
void shouldPrependCorrelationIdToInfoLogMessages() {
270+
var log = mock(Log.class);
271+
var batchingProgressLogger = new BatchingProgressLogger(
272+
new LoggerForProgressTrackingAdapter(log),
273+
JobId.parse("my job id"),
274+
new LeafTask("Monsieur Alfonse", 42),
275+
new Concurrency(87)
276+
);
277+
278+
batchingProgressLogger.logMessage("Swiftly, and with style");
279+
280+
verify(log).info("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
281+
}
282+
283+
@Test
284+
void shouldPrependCorrelationIdToDebugLogMessages() {
285+
var log = mock(Log.class);
286+
var batchingProgressLogger = new BatchingProgressLogger(
287+
new LoggerForProgressTrackingAdapter(log),
288+
JobId.parse("my job id"),
289+
new LeafTask("Monsieur Alfonse", 42),
290+
new Concurrency(87)
291+
);
292+
293+
when(log.isDebugEnabled()).thenReturn(true);
294+
batchingProgressLogger.logDebug("Swiftly, and with style");
295+
296+
verify(log).debug("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
297+
}
298+
299+
@Test
300+
void shouldPrependCorrelationIdToWarningLogMessages() {
301+
var log = mock(Log.class);
302+
var batchingProgressLogger = new BatchingProgressLogger(
303+
new LoggerForProgressTrackingAdapter(log),
304+
JobId.parse("my job id"),
305+
new LeafTask("Monsieur Alfonse", 42),
306+
new Concurrency(87)
307+
);
308+
309+
batchingProgressLogger.logWarning("Swiftly, and with style");
310+
311+
verify(log).warn("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
312+
}
313+
314+
@Test
315+
void shouldPrependCorrelationIdToErrorLogMessages() {
316+
var log = mock(Log.class);
317+
var batchingProgressLogger = new BatchingProgressLogger(
318+
new LoggerForProgressTrackingAdapter(log),
319+
JobId.parse("my job id"),
320+
new LeafTask("Monsieur Alfonse", 42),
321+
new Concurrency(87)
322+
);
323+
324+
batchingProgressLogger.logError("Swiftly, and with style");
325+
326+
verify(log).error("[%s] [%s] %s %s", "my job id", "Test worker", "Monsieur Alfonse", "Swiftly, and with style");
327+
}
257328
}

progress-tracking/src/test/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressLoggerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.jupiter.api.Test;
2323
import org.neo4j.gds.core.concurrency.Concurrency;
24+
import org.neo4j.gds.core.utils.progress.JobId;
2425

2526
import java.util.List;
2627

@@ -35,7 +36,7 @@ void shouldNotEliminateParentTaskIfCommonPreffix(){
3536
var taskAB = Tasks.task("A B", List.of(taskA));
3637
var task = Tasks.task("T", List.of(taskA));
3738

38-
var logger =new TaskProgressLogger(LoggerForProgressTracking.noOpLog(),task,new Concurrency(1));
39+
var logger =new TaskProgressLogger(LoggerForProgressTracking.noOpLog(), new JobId(),task,new Concurrency(1));
3940

4041
assertThatNoException().isThrownBy(
4142
()-> {

0 commit comments

Comments
 (0)