Skip to content

Commit d6a3281

Browse files
committed
DATACASS-809 - Add queryForStream methods to CqlOperations.
CqlOperations now exposes queryForStream methods to consume a query as java.util.stream.Stream accepting a RowMapper.
1 parent 4f332dc commit d6a3281

File tree

8 files changed

+254
-27
lines changed

8 files changed

+254
-27
lines changed

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.function.Consumer;
2020
import java.util.function.Function;
2121
import java.util.stream.Stream;
22-
import java.util.stream.StreamSupport;
2322

2423
import org.springframework.beans.BeansException;
2524
import org.springframework.context.ApplicationContext;
@@ -374,10 +373,8 @@ public <T> Stream<T> stream(Statement<?> statement, Class<T> entityClass) throws
374373
Assert.notNull(statement, "Statement must not be null");
375374
Assert.notNull(entityClass, "Entity type must not be null");
376375

377-
ResultSet resultSet = getCqlOperations().queryForResultSet(statement);
378-
379-
return StreamSupport.stream(resultSet.spliterator(), false)
380-
.map(getMapper(entityClass, entityClass, EntityQueryUtils.getTableName(statement)));
376+
Function<Row, T> mapper = getMapper(entityClass, entityClass, EntityQueryUtils.getTableName(statement));
377+
return getCqlOperations().queryForStream(statement, (row, rowNum) -> mapper.apply(row));
381378
}
382379

383380
// -------------------------------------------------------------------------
@@ -453,10 +450,8 @@ <T> Stream<T> doStream(Query query, Class<?> entityClass, CqlIdentifier tableNam
453450
StatementBuilder<Select> select = getStatementFactory().select(query, getRequiredPersistentEntity(entityClass),
454451
tableName);
455452

456-
ResultSet resultSet = getCqlOperations().queryForResultSet(select.build());
457-
458453
Function<Row, T> mapper = getMapper(entityClass, returnType, tableName);
459-
return StreamSupport.stream(resultSet.map(mapper).spliterator(), false);
454+
return getCqlOperations().queryForStream(select.build(), (row, rowNum) -> mapper.apply(row));
460455
}
461456

462457
/* (non-Javadoc)

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/AsyncCqlTemplate.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -752,8 +752,7 @@ protected AsyncPreparedStatementCreator newAsyncPreparedStatementCreator(String
752752
}
753753

754754
/**
755-
* Constructs a new instance of the {@link ResultSetExtractor} initialized with and adapting the given
756-
* {@link RowCallbackHandler}.
755+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowCallbackHandler}.
757756
*
758757
* @param rowCallbackHandler {@link RowCallbackHandler} to adapt as a {@link ResultSetExtractor}.
759758
* @return a {@link ResultSetExtractor} implementation adapting an instance of the {@link RowCallbackHandler}.
@@ -767,8 +766,7 @@ protected AsyncRowCallbackHandlerResultSetExtractor newAsyncResultSetExtractor(
767766
}
768767

769768
/**
770-
* Constructs a new instance of the {@link ResultSetExtractor} initialized with and adapting the given
771-
* {@link RowMapper}.
769+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowMapper}.
772770
*
773771
* @param rowMapper {@link RowMapper} to adapt as a {@link ResultSetExtractor}.
774772
* @return a {@link ResultSetExtractor} implementation adapting an instance of the {@link RowMapper}.

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlOperations.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
import java.util.Collection;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.stream.Stream;
22+
23+
import org.springframework.dao.DataAccessException;
24+
import org.springframework.dao.IncorrectResultSizeDataAccessException;
25+
import org.springframework.lang.Nullable;
2126

2227
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
2328
import com.datastax.oss.driver.api.core.cql.ResultSet;
2429
import com.datastax.oss.driver.api.core.cql.Row;
2530
import com.datastax.oss.driver.api.core.cql.Statement;
2631

27-
import org.springframework.dao.DataAccessException;
28-
import org.springframework.dao.IncorrectResultSizeDataAccessException;
29-
import org.springframework.lang.Nullable;
30-
3132
/**
3233
* Interface specifying a basic set of CQL operations. Implemented by {@link CqlTemplate}. Not often used directly, but
3334
* a useful option to enhance testability, as it can easily be mocked or stubbed.
@@ -195,6 +196,20 @@ public interface CqlOperations {
195196
*/
196197
<T> List<T> query(String cql, RowMapper<T> rowMapper, Object... args) throws DataAccessException;
197198

199+
/**
200+
* Query given CQL to create a prepared statement from CQL and a list of arguments to bind to the query, mapping each
201+
* row to a Java object via a {@link RowMapper} and turning it into an iterable {@link Stream}.
202+
*
203+
* @param cql static CQL to execute, must not be empty or {@literal null}.
204+
* @param rowMapper object that will map one object per row
205+
* @param args arguments to bind to the query (leaving it to the {@link PreparedStatement} to guess the corresponding
206+
* CQL type)
207+
* @return the result {@link Stream}, containing mapped objects
208+
* @throws DataAccessException if there is any problem executing the query.
209+
* @since 3.1
210+
*/
211+
<T> Stream<T> queryForStream(String cql, RowMapper<T> rowMapper, Object... args) throws DataAccessException;
212+
198213
/**
199214
* Query using a prepared statement, reading the {@link ResultSet} with a {@link ResultSetExtractor}.
200215
*
@@ -537,6 +552,22 @@ <T> List<T> query(String cql, @Nullable PreparedStatementBinder psb, RowMapper<T
537552
*/
538553
<T> List<T> query(Statement<?> statement, RowMapper<T> rowMapper) throws DataAccessException;
539554

555+
/**
556+
* Execute a query given static CQL, mapping each row to a Java object via a {@link RowMapper} and turning it into an
557+
* iterable {@link Stream}.
558+
* <p>
559+
* Uses a CQL Statement, not a {@link PreparedStatement}. If you want to execute a static query with a
560+
* {@link PreparedStatement}, use the overloaded {@code query} method with {@literal null} as argument array.
561+
*
562+
* @param statement static CQL {@link Statement}, must not be {@literal null}.
563+
* @param rowMapper object that will map one object per row, must not be {@literal null}.
564+
* @return the result {@link Stream}, containing mapped objects.
565+
* @throws DataAccessException if there is any problem executing the query.
566+
* @since 3.1
567+
* @see #queryForStream(String, RowMapper, Object...)
568+
*/
569+
<T> Stream<T> queryForStream(Statement<?> statement, RowMapper<T> rowMapper) throws DataAccessException;
570+
540571
/**
541572
* Execute a query for a result {@link List}, given static CQL.
542573
* <p>
@@ -729,6 +760,20 @@ void query(PreparedStatementCreator preparedStatementCreator, RowCallbackHandler
729760
<T> List<T> query(PreparedStatementCreator preparedStatementCreator, RowMapper<T> rowMapper)
730761
throws DataAccessException;
731762

763+
/**
764+
* Query using a prepared statement, mapping each row to a Java object via a {@link RowMapper} and turning it into an
765+
* iterable {@link Stream}.
766+
*
767+
* @param preparedStatementCreator object that can create a {@link PreparedStatement} given a
768+
* {@link com.datastax.oss.driver.api.core.CqlSession}, must not be {@literal null}.
769+
* @param rowMapper object that will map one object per row, must not be {@literal null}.
770+
* @return the result {@link Stream}, containing mapped objects.
771+
* @throws DataAccessException if there is any problem executing the query.
772+
* @since 3.1
773+
*/
774+
<T> Stream<T> queryForStream(PreparedStatementCreator preparedStatementCreator, RowMapper<T> rowMapper)
775+
throws DataAccessException;
776+
732777
/**
733778
* Query using a prepared statement and a {@link PreparedStatementBinder} implementation that knows how to bind values
734779
* to the query, reading the {@link ResultSet} with a {@link ResultSetExtractor}.
@@ -777,6 +822,24 @@ void query(PreparedStatementCreator preparedStatementCreator, @Nullable Prepared
777822
<T> List<T> query(PreparedStatementCreator preparedStatementCreator, @Nullable PreparedStatementBinder psb,
778823
RowMapper<T> rowMapper) throws DataAccessException;
779824

825+
/**
826+
* Query using a prepared statement and a {@link PreparedStatementBinder} implementation that knows how to bind values
827+
* to the query, mapping each row to a Java object via a {@link RowMapper} and turning it into an iterable
828+
* {@link Stream}.
829+
*
830+
* @param preparedStatementCreator object that can create a {@link PreparedStatement} given a
831+
* {@link com.datastax.oss.driver.api.core.CqlSession}, must not be {@literal null}.
832+
* @param psb object that knows how to set values on the prepared statement. If this is {@literal null}, the CQL will
833+
* be assumed to contain no bind parameters. Even if there are no bind parameters, this object may be used to
834+
* set fetch size and other performance options.
835+
* @param rowMapper object that will map one object per row, must not be {@literal null}.
836+
* @return the result {@link Stream}, containing mapped objects.
837+
* @throws DataAccessException if there is any problem executing the query.
838+
* @since 3.1
839+
*/
840+
<T> Stream<T> queryForStream(PreparedStatementCreator preparedStatementCreator, @Nullable PreparedStatementBinder psb,
841+
RowMapper<T> rowMapper) throws DataAccessException;
842+
780843
// -------------------------------------------------------------------------
781844
// Methods dealing with cluster metadata
782845
// -------------------------------------------------------------------------

spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/cql/CqlTemplate.java

Lines changed: 133 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818
import java.util.Collection;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.Spliterator;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.function.Consumer;
2124
import java.util.function.Function;
25+
import java.util.stream.Stream;
2226
import java.util.stream.StreamSupport;
2327

2428
import org.springframework.dao.DataAccessException;
@@ -316,6 +320,16 @@ public <T> List<T> query(Statement<?> statement, RowMapper<T> rowMapper) throws
316320
return query(statement, newResultSetExtractor(rowMapper));
317321
}
318322

323+
/*
324+
* (non-Javadoc)
325+
* @see org.springframework.data.cassandra.core.cqlOperations#queryForStream(com.datastax.oss.driver.api.core.cql.Statement, org.springframework.data.cassandra.core.cql.RowMapper)
326+
*/
327+
@Override
328+
public <T> Stream<T> queryForStream(Statement<?> statement, RowMapper<T> rowMapper) throws DataAccessException {
329+
// noinspection ConstantConditions
330+
return query(statement, newStreamExtractor(rowMapper));
331+
}
332+
319333
/*
320334
* (non-Javadoc)
321335
* @see org.springframework.data.cassandra.core.cqlOperations#queryForList(com.datastax.oss.driver.api.core.cql.Statement)
@@ -342,7 +356,6 @@ public <T> List<T> queryForList(Statement<?> statement, Class<T> elementType) th
342356
*/
343357
@Override
344358
public Map<String, Object> queryForMap(Statement<?> statement) throws DataAccessException {
345-
// noinspection ConstantConditions
346359
return queryForObject(statement, newColumnMapRowMapper());
347360
}
348361

@@ -485,6 +498,17 @@ public <T> List<T> query(PreparedStatementCreator preparedStatementCreator, RowM
485498
return query(preparedStatementCreator, null, newResultSetExtractor(rowMapper));
486499
}
487500

501+
/*
502+
* (non-Javadoc)
503+
* @see org.springframework.data.cassandra.core.cqlOperations#query(org.springframework.data.cassandra.core.cql.PreparedStatementCreator, org.springframework.data.cassandra.core.cql.RowMapper)
504+
*/
505+
@Override
506+
public <T> Stream<T> queryForStream(PreparedStatementCreator preparedStatementCreator, RowMapper<T> rowMapper)
507+
throws DataAccessException {
508+
// noinspection ConstantConditions
509+
return query(preparedStatementCreator, null, newStreamExtractor(rowMapper));
510+
}
511+
488512
/*
489513
* (non-Javadoc)
490514
* @see org.springframework.data.cassandra.core.cqlOperations#query(org.springframework.data.cassandra.core.cql.PreparedStatementCreator, org.springframework.data.cassandra.core.cql.PreparedStatementBinder, org.springframework.data.cassandra.core.cql.ResultSetExtractor)
@@ -544,6 +568,17 @@ public <T> List<T> query(PreparedStatementCreator preparedStatementCreator, @Nul
544568
return query(preparedStatementCreator, psb, newResultSetExtractor(rowMapper));
545569
}
546570

571+
/*
572+
* (non-Javadoc)
573+
* @see org.springframework.data.cassandra.core.cqlOperations#queryForStream(org.springframework.data.cassandra.core.cql.PreparedStatementCreator, org.springframework.data.cassandra.core.cql.PreparedStatementBinder, org.springframework.data.cassandra.core.cql.RowMapper)
574+
*/
575+
@Override
576+
public <T> Stream<T> queryForStream(PreparedStatementCreator preparedStatementCreator,
577+
@Nullable PreparedStatementBinder psb, RowMapper<T> rowMapper) throws DataAccessException {
578+
// noinspection ConstantConditions
579+
return query(preparedStatementCreator, psb, newStreamExtractor(rowMapper));
580+
}
581+
547582
/*
548583
* (non-Javadoc)
549584
* @see org.springframework.data.cassandra.core.cqlOperations#query(java.lang.String, org.springframework.data.cassandra.core.cql.ResultSetExtractor, java.lang.Object[])
@@ -574,6 +609,16 @@ public <T> List<T> query(String cql, RowMapper<T> rowMapper, Object... args) thr
574609
return query(newPreparedStatementCreator(cql), newPreparedStatementBinder(args), newResultSetExtractor(rowMapper));
575610
}
576611

612+
/*
613+
* (non-Javadoc)
614+
* @see org.springframework.data.cassandra.core.cqlOperations#queryForStream(java.lang.String, org.springframework.data.cassandra.core.cql.RowMapper, java.lang.Object[])
615+
*/
616+
@Override
617+
public <T> Stream<T> queryForStream(String cql, RowMapper<T> rowMapper, Object... args) throws DataAccessException {
618+
// noinspection ConstantConditions
619+
return query(newPreparedStatementCreator(cql), newPreparedStatementBinder(args), newStreamExtractor(rowMapper));
620+
}
621+
577622
/*
578623
* (non-Javadoc)
579624
* @see org.springframework.data.cassandra.core.cqlOperations#query(java.lang.String, org.springframework.data.cassandra.core.cql.PreparedStatementBinder, org.springframework.data.cassandra.core.cql.ResultSetExtractor)
@@ -732,8 +777,7 @@ protected PreparedStatementCreator newPreparedStatementCreator(String cql) {
732777
}
733778

734779
/**
735-
* Constructs a new instance of the {@link ResultSetExtractor} initialized with and adapting the given
736-
* {@link RowCallbackHandler}.
780+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowCallbackHandler}.
737781
*
738782
* @param rowCallbackHandler {@link RowCallbackHandler} to adapt as a {@link ResultSetExtractor}.
739783
* @return a {@link ResultSetExtractor} implementation adapting an instance of the {@link RowCallbackHandler}.
@@ -746,8 +790,7 @@ protected RowCallbackHandlerResultSetExtractor newResultSetExtractor(RowCallback
746790
}
747791

748792
/**
749-
* Constructs a new instance of the {@link ResultSetExtractor} initialized with and adapting the given
750-
* {@link RowMapper}.
793+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowMapper}.
751794
*
752795
* @param rowMapper {@link RowMapper} to adapt as a {@link ResultSetExtractor}.
753796
* @return a {@link ResultSetExtractor} implementation adapting an instance of the {@link RowMapper}.
@@ -760,8 +803,7 @@ protected <T> RowMapperResultSetExtractor<T> newResultSetExtractor(RowMapper<T>
760803
}
761804

762805
/**
763-
* Constructs a new instance of the {@link ResultSetExtractor} initialized with and adapting the given
764-
* {@link RowMapper}.
806+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowMapper}.
765807
*
766808
* @param rowMapper {@link RowMapper} to adapt as a {@link ResultSetExtractor}.
767809
* @param rowsExpected number of expected rows in the {@link ResultSet}.
@@ -774,6 +816,19 @@ protected <T> RowMapperResultSetExtractor<T> newResultSetExtractor(RowMapper<T>
774816
return new RowMapperResultSetExtractor<>(rowMapper, rowsExpected);
775817
}
776818

819+
/**
820+
* Constructs a new instance of the {@link ResultSetExtractor} adapting the given {@link RowMapper}.
821+
*
822+
* @param rowMapper {@link RowMapper} to adapt as a {@link ResultSetExtractor}.
823+
* @return a {@link ResultSetExtractor} implementation adapting an instance of the {@link RowMapper}.
824+
* @see ResultSetExtractor
825+
* @see RowCallbackHandler
826+
* @since 3.1
827+
*/
828+
protected <T> ResultSetExtractor<Stream<T>> newStreamExtractor(RowMapper<T> rowMapper) {
829+
return resultSet -> new ResultSetSpliterator<>(resultSet, rowMapper).stream();
830+
}
831+
777832
private CqlSession getCurrentSession() {
778833

779834
SessionFactory sessionFactory = getSessionFactory();
@@ -801,10 +856,79 @@ protected RowCallbackHandlerResultSetExtractor(RowCallbackHandler rowCallbackHan
801856
@Nullable
802857
public Object extractData(ResultSet resultSet) {
803858

804-
StreamSupport.stream(resultSet.spliterator(), false).forEach(rowCallbackHandler::processRow);
805-
859+
resultSet.forEach(rowCallbackHandler::processRow);
806860
return null;
807861
}
808862
}
809863

864+
/**
865+
* Spliterator for queryForStream adaptation of a {@link ResultSet} to a {@link Stream}.
866+
*
867+
* @since 3.1
868+
*/
869+
private static class ResultSetSpliterator<T> implements Spliterator<T> {
870+
871+
private final Spliterator<Row> delegate;
872+
873+
private final RowMapper<T> rowMapper;
874+
875+
private final AtomicInteger counter;
876+
877+
public ResultSetSpliterator(ResultSet rs, RowMapper<T> rowMapper) {
878+
this.delegate = rs.spliterator();
879+
this.rowMapper = rowMapper;
880+
this.counter = new AtomicInteger();
881+
}
882+
883+
private ResultSetSpliterator(Spliterator<Row> delegate, RowMapper<T> rowMapper, AtomicInteger counter) {
884+
this.delegate = delegate;
885+
this.rowMapper = rowMapper;
886+
this.counter = counter;
887+
}
888+
889+
/*
890+
* (non-Javadoc)
891+
* @see java.util.Spliterator#tryAdvance(java.util.function.Consumer)
892+
*/
893+
@Override
894+
public boolean tryAdvance(Consumer<? super T> action) {
895+
return this.delegate.tryAdvance(row -> action.accept(this.rowMapper.mapRow(row, this.counter.incrementAndGet())));
896+
}
897+
898+
/*
899+
* (non-Javadoc)
900+
* @see java.util.Spliterator#trySplit()
901+
*/
902+
@Override
903+
@Nullable
904+
public Spliterator<T> trySplit() {
905+
return new ResultSetSpliterator<>(delegate.trySplit(), this.rowMapper, this.counter);
906+
}
907+
908+
/*
909+
* (non-Javadoc)
910+
* @see java.util.Spliterator#estimateSize()
911+
*/
912+
@Override
913+
public long estimateSize() {
914+
return Long.MAX_VALUE;
915+
}
916+
917+
/*
918+
* (non-Javadoc)
919+
* @see java.util.Spliterator#characteristics()
920+
*/
921+
@Override
922+
public int characteristics() {
923+
return Spliterator.ORDERED;
924+
}
925+
926+
/**
927+
* @return
928+
*/
929+
public Stream<T> stream() {
930+
return StreamSupport.stream(this, false);
931+
}
932+
}
933+
810934
}

0 commit comments

Comments
 (0)