Skip to content

Commit d2dce68

Browse files
denglimingchristophstrobl
authored andcommitted
DATAREDIS-1226 - Add support for xClaimJustId using Lettuce.
Original Pull Request: #567
1 parent 0279c25 commit d2dce68

File tree

5 files changed

+50
-15
lines changed

5 files changed

+50
-15
lines changed

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ default List<ByteRecord> xClaim(byte[] key, String group, String newOwner, Durat
229229

230230
/**
231231
* @author Christoph Strobl
232+
* @author Dengliming
232233
* @since 2.3
233234
*/
234235
class XClaimOptions {
@@ -239,16 +240,18 @@ class XClaimOptions {
239240
private final @Nullable Instant unixTime;
240241
private final @Nullable Long retryCount;
241242
private final boolean force;
243+
private final boolean justId;
242244

243245
private XClaimOptions(List<RecordId> ids, Duration minIdleTime, @Nullable Duration idleTime,
244-
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force) {
246+
@Nullable Instant unixTime, @Nullable Long retryCount, boolean force, boolean justId) {
245247

246248
this.ids = new ArrayList<>(ids);
247249
this.minIdleTime = minIdleTime;
248250
this.idleTime = idleTime;
249251
this.unixTime = unixTime;
250252
this.retryCount = retryCount;
251253
this.force = force;
254+
this.justId = justId;
252255
}
253256

254257
/**
@@ -281,7 +284,7 @@ public static XClaimOptionsBuilder minIdleMs(long millis) {
281284
* @return {@code this}.
282285
*/
283286
public XClaimOptions idle(Duration idleTime) {
284-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
287+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
285288
}
286289

287290
/**
@@ -292,7 +295,7 @@ public XClaimOptions idle(Duration idleTime) {
292295
* @return {@code this}.
293296
*/
294297
public XClaimOptions time(Instant unixTime) {
295-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
298+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
296299
}
297300

298301
/**
@@ -302,7 +305,7 @@ public XClaimOptions time(Instant unixTime) {
302305
* @return new instance of {@link XClaimOptions}.
303306
*/
304307
public XClaimOptions retryCount(long retryCount) {
305-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force);
308+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, justId);
306309
}
307310

308311
/**
@@ -312,7 +315,16 @@ public XClaimOptions retryCount(long retryCount) {
312315
* @return new instance of {@link XClaimOptions}.
313316
*/
314317
public XClaimOptions force() {
315-
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true);
318+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, true, justId);
319+
}
320+
321+
/**
322+
* Set the JUSTID flag.
323+
*
324+
* @return new instance of {@link XClaimOptions}.
325+
*/
326+
public XClaimOptions justId() {
327+
return new XClaimOptions(ids, minIdleTime, idleTime, unixTime, retryCount, force, true);
316328
}
317329

318330
/**
@@ -381,6 +393,15 @@ public boolean isForce() {
381393
return force;
382394
}
383395

396+
/**
397+
* Get the JUSTID flag.
398+
*
399+
* @return
400+
*/
401+
public boolean isJustId() {
402+
return justId;
403+
}
404+
384405
public static class XClaimOptionsBuilder {
385406

386407
private final Duration minIdleTime;
@@ -404,7 +425,7 @@ public XClaimOptions ids(List<?> ids) {
404425
.map(it -> it instanceof RecordId ? (RecordId) it : RecordId.of(it.toString()))
405426
.collect(Collectors.toList());
406427

407-
return new XClaimOptions(idList, minIdleTime, null, null, null, false);
428+
return new XClaimOptions(idList, minIdleTime, null, null, null, false, false);
408429
}
409430

410431
/**

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,6 @@ public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStream
123123
@Override
124124
public Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<XClaimCommand> commands) {
125125

126-
if (true /* TODO: set the JUSTID flag */ ) {
127-
throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)");
128-
}
129-
130126
return connection.execute(cmd -> Flux.from(commands).map(command -> {
131127

132128
String[] ids = command.getOptions().getIdsAsStringArray();

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* @author Mark Paluch
5050
* @author Tugdual Grall
5151
* @author Dejan Jankov
52+
* @author Dengliming
5253
* @since 2.2
5354
*/
5455
class LettuceStreamCommands implements RedisStreamCommands {
@@ -136,10 +137,6 @@ public List<RecordId> xClaimJustId(byte[] key, String group, String newOwner, XC
136137
LettuceConverters.toBytes(newOwner));
137138
XClaimArgs args = StreamConverters.toXClaimArgs(options);
138139

139-
if (true /* TODO: set the JUSTID flag */ ) {
140-
throw new UnsupportedOperationException("Lettuce does not support XCLAIM with JUSTID. (Ref: lettuce-io#1233)");
141-
}
142-
143140
try {
144141
if (isPipelined()) {
145142

src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ public XClaimArgs convert(XClaimOptions source) {
236236
if (source.getUnixTime() != null) {
237237
args.time(source.getUnixTime());
238238
}
239-
239+
if (source.isJustId()) {
240+
args.justid();
241+
}
240242
return args;
241243

242244
}

src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
* @author Mark Paluch
4646
* @author Christoph Strobl
4747
* @author Tugdual Grall
48+
* @author Dengliming
4849
*/
4950
public class LettuceReactiveStreamCommandsTests extends LettuceReactiveCommandsTestsBase {
5051

@@ -488,4 +489,22 @@ public void xinfoConsumersNoConsumer() {
488489
connection.streamCommands().xInfoConsumers(KEY_1_BBUFFER, "my-group").as(StepVerifier::create).verifyComplete();
489490
}
490491

492+
@Test // DATAREDIS-1226
493+
public void xClaimJustId() {
494+
495+
String initialMessage = nativeCommands.xadd(KEY_1, KEY_1, VALUE_1);
496+
nativeCommands.xgroupCreate(XReadArgs.StreamOffset.from(KEY_1, initialMessage), "my-group");
497+
498+
String expected = nativeCommands.xadd(KEY_1, KEY_2, VALUE_2);
499+
500+
connection.streamCommands()
501+
.xReadGroup(Consumer.from("my-group", "my-consumer"),
502+
StreamOffset.create(KEY_1_BBUFFER, ReadOffset.lastConsumed())) //
503+
.delayElements(Duration.ofMillis(5)).next() //
504+
.flatMapMany(record -> connection.streamCommands().xClaimJustId(KEY_1_BBUFFER, "my-group", "my-consumer",
505+
XClaimOptions.minIdle(Duration.ofMillis(1)).ids(record.getId()).justId())
506+
).as(StepVerifier::create) //
507+
.assertNext(it -> assertThat(it.getValue()).isEqualTo(expected)) //
508+
.verifyComplete();
509+
}
491510
}

0 commit comments

Comments
 (0)