Skip to content

Commit 3920890

Browse files
DATAREDIS-1226 - Polishing.
Remove justId from XClaimOptions and set the flag inside xClaimJustId to avoid errors when processing justId inside xClaim having a different return type. Original Pull Request: #567
1 parent d2dce68 commit 3920890

File tree

6 files changed

+52
-38
lines changed

6 files changed

+52
-38
lines changed

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, Durat
229229

230230
/**
231231
* @author Christoph Strobl
232-
* @author Dengliming
233232
* @since 2.3
234233
*/
235234
class XClaimOptions {
@@ -240,18 +239,16 @@ class XClaimOptions {
240239
private final @Nullable Instant unixTime;
241240
private final @Nullable Long retryCount;
242241
private final boolean force;
243-
private final boolean justId;
244242

245243
private XClaimOptions(List<RecordId> ids, Duration minIdleTime, @Nullable Duration idleTime,
246-
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force, boolean justId) {
244+
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force) {
247245

248246
this.ids = new ArrayList<>(ids);
249247
this.minIdleTime = minIdleTime;
250248
this.idleTime = idleTime;
251249
this.unixTime = unixTime;
252250
this.retryCount = retryCount;
253251
this.force = force;
254-
this.justId = justId;
255252
}
256253

257254
/**
@@ -284,7 +281,7 @@ public static XClaimOptionsBuilder minIdleMs(long millis) {
284281
* @return {@code this}.
285282
*/
286283
public XClaimOptions idle(Duration idleTime) {
287-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
284+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
288285
}
289286

290287
/**
@@ -295,7 +292,7 @@ public XClaimOptions idle(Duration idleTime) {
295292
* @return {@code this}.
296293
*/
297294
public XClaimOptions time(Instant unixTime) {
298-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
295+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
299296
}
300297

301298
/**
@@ -305,7 +302,7 @@ public XClaimOptions time(Instant unixTime) {
305302
* @return new instance of {@link XClaimOptions}.
306303
*/
307304
public XClaimOptions retryCount(long retryCount) {
308-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
305+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
309306
}
310307

311308
/**
@@ -315,16 +312,7 @@ public XClaimOptions retryCount(long retryCount) {
315312
* @return new instance of {@link XClaimOptions}.
316313
*/
317314
public XClaimOptions force() {
318-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true, justId);
319-
}
320-
321-
/**
322-
* Set the JUSTID flag.
323-
*
324-
* @return new instance of {@link XClaimOptions}.
325-
*/
326-
public XClaimOptions justId() {
327-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, true);
315+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true);
328316
}
329317

330318
/**
@@ -393,15 +381,6 @@ public boolean isForce() {
393381
return force;
394382
}
395383

396-
/**
397-
* Get the JUSTID flag.
398-
*
399-
* @return
400-
*/
401-
public boolean isJustId() {
402-
return justId;
403-
}
404-
405384
public static class XClaimOptionsBuilder {
406385

407386
private final Duration minIdleTime;
@@ -425,7 +404,7 @@ public XClaimOptions ids(List<?> ids) {
425404
.map(it -> it instanceof RecordId ? (RecordId) it : RecordId.of(it.toString()))
426405
.collect(Collectors.toList());
427406

428-
return new XClaimOptions(idList, minIdleTime, null, null, null, false, false);
407+
return new XClaimOptions(idList, minIdleTime, null, null, null, false);
429408
}
430409

431410
/**

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ public Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publish
128128
String[] ids = command.getOptions().getIdsAsStringArray();
129129
io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer
130130
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
131-
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions());
132-
131+
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions()).justid();
133132
Flux<RecordId> result = cmd.xclaim(command.getKey(), from, args, ids).map(it -> RecordId.of(it.getId()));
134133
return new CommandResponse<>(command, result);
135134
}));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XC
135135
String[] ids = options.getIdsAsStringArray();
136136
io.lettuce.core.Consumer<byte[]> from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group),
137137
LettuceConverters.toBytes(newOwner));
138-
XClaimArgs args = StreamConverters.toXClaimArgs(options);
138+
XClaimArgs args = StreamConverters.toXClaimArgs(options).justid();
139139

140140
try {
141141
if (isPipelined()) {

src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,6 @@ public XClaimArgs convert(XClaimOptions source) {
236236
if (source.getUnixTime() != null) {
237237
args.time(source.getUnixTime());
238238
}
239-
if (source.isJustId()) {
240-
args.justid();
241-
}
242239
return args;
243240

244241
}

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTestSuite.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,30 @@
2020

2121
import io.lettuce.core.RedisClient;
2222
import io.lettuce.core.XAddArgs;
23+
import io.lettuce.core.XClaimArgs;
2324
import io.lettuce.core.api.StatefulRedisConnection;
2425
import io.lettuce.core.api.async.RedisAsyncCommands;
2526
import io.lettuce.core.api.sync.RedisCommands;
2627
import io.lettuce.core.codec.RedisCodec;
2728

2829
import java.lang.reflect.InvocationTargetException;
30+
import java.time.Duration;
2931
import java.util.Collections;
3032

3133
import org.junit.Before;
3234
import org.junit.Test;
3335
import org.junit.runner.RunWith;
3436
import org.junit.runners.Suite;
35-
3637
import org.mockito.ArgumentCaptor;
3738
import org.springframework.dao.InvalidDataAccessResourceUsageException;
3839
import org.springframework.data.redis.connection.AbstractConnectionUnitTestBase;
3940
import org.springframework.data.redis.connection.RedisServerCommands.ShutdownOption;
4041
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
42+
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
4143
import org.springframework.data.redis.connection.lettuce.LettuceConnectionUnitTestSuite.LettuceConnectionUnitTests;
4244
import org.springframework.data.redis.connection.lettuce.LettuceConnectionUnitTestSuite.LettucePipelineConnectionUnitTests;
4345
import org.springframework.data.redis.connection.stream.MapRecord;
46+
import org.springframework.test.util.ReflectionTestUtils;
4447

4548
/**
4649
* @author Christoph Strobl
@@ -173,15 +176,51 @@ public void translatesPipelineUnknownExceptions() throws Exception {
173176
@Test // DATAREDIS-1122
174177
public void xaddShouldHonorMaxlen() {
175178

176-
MapRecord<byte[], byte[], byte[]> record = MapRecord.create("key".getBytes(),
177-
Collections.emptyMap());
179+
MapRecord<byte[], byte[], byte[]> record = MapRecord.create("key".getBytes(), Collections.emptyMap());
178180

179181
connection.streamCommands().xAdd(record, XAddOptions.maxlen(100));
180182
ArgumentCaptor<XAddArgs> args = ArgumentCaptor.forClass(XAddArgs.class);
181-
verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
183+
if (connection.isPipelined()) {
184+
verify(asyncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
185+
} else {
186+
verify(syncCommandsMock, times(1)).xadd(any(), args.capture(), anyMap());
187+
}
182188

183189
assertThat(args.getValue()).extracting("maxlen").isEqualTo(100L);
184190
}
191+
192+
@Test // DATAREDIS-1226
193+
public void xClaimShouldNotAddJustIdFlagToArgs() {
194+
195+
connection.streamCommands().xClaim("key".getBytes(), "group", "owner",
196+
XClaimOptions.minIdle(Duration.ofMillis(100)).ids("1-1"));
197+
ArgumentCaptor<XClaimArgs> args = ArgumentCaptor.forClass(XClaimArgs.class);
198+
199+
if (connection.isPipelined()) {
200+
verify(asyncCommandsMock).xclaim(any(), any(), args.capture(), any());
201+
} else {
202+
verify(syncCommandsMock).xclaim(any(), any(), args.capture(), any());
203+
}
204+
205+
assertThat(ReflectionTestUtils.getField(args.getValue(), "justid")).isEqualTo(false);
206+
207+
}
208+
209+
@Test // DATAREDIS-1226
210+
public void xClaimJustIdShouldAddJustIdFlagToArgs() {
211+
212+
connection.streamCommands().xClaimJustId("key".getBytes(), "group", "owner",
213+
XClaimOptions.minIdle(Duration.ofMillis(100)).ids("1-1"));
214+
ArgumentCaptor<XClaimArgs> args = ArgumentCaptor.forClass(XClaimArgs.class);
215+
216+
if (connection.isPipelined()) {
217+
verify(asyncCommandsMock).xclaim(any(), any(), args.capture(), any());
218+
} else {
219+
verify(syncCommandsMock).xclaim(any(), any(), args.capture(), any());
220+
}
221+
222+
assertThat(ReflectionTestUtils.getField(args.getValue(), "justid")).isEqualTo(true);
223+
}
185224
}
186225

187226
public static class LettucePipelineConnectionUnitTests extends LettuceConnectionUnitTests {

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ public void xClaimJustId() {
502502
StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) //
503503
.delayElements(Duration.ofMillis(5)).next() //
504504
.flatMapMany(record -> connection.streamCommands().xClaimJustId(KEY_1_BBUFFER, "my-group", "my-consumer",
505-
XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()).justId())
505+
XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()))
506506
).as(StepVerifier::create) //
507507
.assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) //
508508
.verifyComplete();

0 commit comments

Comments
 (0)