Skip to content

Commit 3ae40aa

Browse files
committed
Use Reactor's expand(…) operator for recursive pagination.
We now use Mono.expand(…) to recursively read paginated results. Using Mono.expand(…) avoids deep stack recursion and therefore doesn't lead to StackOverflowError. Closes #1215
1 parent 2b49f2c commit 3ae40aa

File tree

1 file changed

+9
-41
lines changed

1 file changed

+9
-41
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/session/DefaultBridgedReactiveSession.java

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717

1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20-
import reactor.core.publisher.MonoProcessor;
2120
import reactor.core.scheduler.Scheduler;
2221

2322
import java.util.Collections;
@@ -33,6 +32,7 @@
3332
import org.springframework.data.cassandra.ReactiveSession;
3433
import org.springframework.util.Assert;
3534

35+
import com.datastax.oss.driver.api.core.AsyncPagingIterable;
3636
import com.datastax.oss.driver.api.core.CqlIdentifier;
3737
import com.datastax.oss.driver.api.core.CqlSession;
3838
import com.datastax.oss.driver.api.core.context.DriverContext;
@@ -226,52 +226,20 @@ static class DefaultReactiveResultSet implements ReactiveResultSet {
226226

227227
@Override
228228
public Flux<Row> rows() {
229-
return getRows(Mono.just(this.resultSet));
230-
}
231-
232-
@Override
233-
public Flux<Row> availableRows() {
234-
return toRows(this.resultSet);
235-
}
236229

237-
private Flux<Row> getRows(Mono<AsyncResultSet> nextResults) {
238-
239-
return nextResults.flatMapMany(it -> {
240-
241-
Flux<Row> rows = toRows(it);
242-
243-
if (!it.hasMorePages()) {
244-
return rows;
230+
return Mono.just(this.resultSet).expand(asyncResultSet -> {
231+
if (asyncResultSet.hasMorePages()) {
232+
return Mono.fromCompletionStage(asyncResultSet.fetchNextPage());
245233
}
246-
247-
MonoProcessor<AsyncResultSet> processor = MonoProcessor.create();
248-
249-
return rows.doOnComplete(() -> fetchMore(it.fetchNextPage(), processor)).concatWith(getRows(processor));
250-
});
234+
return Mono.empty();
235+
}).flatMapIterable(AsyncPagingIterable::currentPage);
251236
}
252237

253-
static Flux<Row> toRows(AsyncResultSet resultSet) {
238+
@Override
239+
public Flux<Row> availableRows() {
254240
return Flux.fromIterable(resultSet.currentPage());
255241
}
256242

257-
static void fetchMore(CompletionStage<AsyncResultSet> future, MonoProcessor<AsyncResultSet> sink) {
258-
259-
try {
260-
261-
future.whenComplete((rs, err) -> {
262-
263-
if (err != null) {
264-
sink.onError(err);
265-
} else {
266-
sink.onNext(rs);
267-
sink.onComplete();
268-
}
269-
});
270-
271-
} catch (Exception cause) {
272-
sink.onError(cause);
273-
}
274-
}
275243

276244
@Override
277245
public ColumnDefinitions getColumnDefinitions() {

0 commit comments

Comments
 (0)