HHH-13872 - Make the Java Stream close the underlying ScrollableResultsIterator upon calling a terminal operation
This commit is contained in:
parent
3c67d521e1
commit
d9a335c429
|
@ -165,9 +165,10 @@ include::{sourcedir}/HQLTest.java[tags=jpql-api-positional-parameter-example]
|
|||
It's good practice not to mix parameter binding forms in a given query.
|
||||
====
|
||||
|
||||
In terms of execution, JPA `Query` offers 2 different methods for retrieving a result set.
|
||||
In terms of execution, JPA `Query` offers 3 different methods for retrieving a result set.
|
||||
|
||||
* `Query#getResultList()` - executes the select query and returns back the list of results.
|
||||
* `Query#getResultStream()` - executes the select query and returns back a `Stream` over the results.
|
||||
* `Query#getSingleResult()` - executes the select query and returns a single result. If there were more than one result an exception is thrown.
|
||||
|
||||
[[jpql-api-list-example]]
|
||||
|
@ -179,6 +180,15 @@ include::{sourcedir}/HQLTest.java[tags=jpql-api-list-example]
|
|||
----
|
||||
====
|
||||
|
||||
[[jpql-api-stream-example]]
|
||||
.JPA `getResultStream()` result
|
||||
====
|
||||
[source, JAVA, indent=0]
|
||||
----
|
||||
include::{sourcedir}/HQLTest.java[tags=jpql-api-stream-example]
|
||||
----
|
||||
====
|
||||
|
||||
[[jpql-api-unique-result-example]]
|
||||
.JPA `getSingleResult()`
|
||||
====
|
||||
|
@ -414,6 +424,29 @@ include::{sourcedir}/HQLTest.java[tags=hql-api-stream-example]
|
|||
Just like with `ScrollableResults`, you should always close a Hibernate `Stream` either explicitly or using a https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html[try-with-resources] block.
|
||||
====
|
||||
|
||||
[[jpql-api-stream]]
|
||||
==== Query streaming
|
||||
|
||||
Since version 2.2, the JPA `Query` interface offers support for returning a `Stream` via the `getResultStream` method.
|
||||
|
||||
Just like the `scroll` method, you can use a try-with-resources block to close the `Stream`
|
||||
prior to closing the currently running Persistence Context.
|
||||
|
||||
Since Hibernate 5.4, the `Stream` is also closed when calling a terminal operation,
|
||||
as illustrated by the following example.
|
||||
|
||||
[[jpql-api-stream-terminal-operation]]
|
||||
====
|
||||
[source, JAVA, indent=0]
|
||||
----
|
||||
include::{sourcedir}/HQLTest.java[tags=jpql-api-stream-terminal-operation]
|
||||
----
|
||||
====
|
||||
|
||||
The `Stream` is closed automatically after calling the `collect` method,
|
||||
since there is no reason to keep the underlying JDBC `ResultSet` open
|
||||
if the `Stream` cannot be reused.
|
||||
|
||||
[[hql-case-sensitivity]]
|
||||
=== Case Sensitivity
|
||||
|
||||
|
|
|
@ -10,7 +10,6 @@ import java.math.BigDecimal;
|
|||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
@ -43,7 +42,6 @@ import org.hibernate.userguide.model.PhoneType;
|
|||
import org.hibernate.userguide.model.WireTransferPayment;
|
||||
|
||||
import org.hibernate.testing.DialectChecks;
|
||||
import org.hibernate.testing.FailureExpected;
|
||||
import org.hibernate.testing.RequiresDialect;
|
||||
import org.hibernate.testing.RequiresDialectFeature;
|
||||
import org.hibernate.testing.SkipForDialect;
|
||||
|
@ -673,6 +671,38 @@ public class HQLTest extends BaseEntityManagerFunctionalTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_jpql_api_stream_example() {
|
||||
doInJPA( this::entityManagerFactory, entityManager -> {
|
||||
//tag::jpql-api-stream-example[]
|
||||
try(Stream<Person> personStream = entityManager.createQuery(
|
||||
"select p " +
|
||||
"from Person p " +
|
||||
"where p.name like :name", Person.class )
|
||||
.setParameter( "name", "J%" )
|
||||
.getResultStream()) {
|
||||
List<Person> persons = personStream
|
||||
.skip( 5 )
|
||||
.limit( 5 )
|
||||
.collect( Collectors.toList() );
|
||||
}
|
||||
//end::jpql-api-stream-example[]
|
||||
|
||||
// tag::jpql-api-stream-terminal-operation[]
|
||||
List<Person> persons = entityManager.createQuery(
|
||||
"select p " +
|
||||
"from Person p " +
|
||||
"where p.name like :name", Person.class )
|
||||
.setParameter( "name", "J%" )
|
||||
.getResultStream()
|
||||
.skip( 5 )
|
||||
.limit( 5 )
|
||||
.collect( Collectors.toList() );
|
||||
|
||||
//end::jpql-api-stream-terminal-operation[]
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_jpql_api_single_result_example() {
|
||||
doInJPA( this::entityManagerFactory, entityManager -> {
|
||||
|
|
|
@ -375,6 +375,15 @@ public final class ReflectHelper {
|
|||
}
|
||||
}
|
||||
|
||||
public static Method getMethod(Class clazz, String methodName, Class... paramTypes) {
|
||||
try {
|
||||
return clazz.getMethod( methodName, paramTypes );
|
||||
}
|
||||
catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static Field findField(Class containerClass, String propertyName) {
|
||||
if ( containerClass == null ) {
|
||||
throw new IllegalArgumentException( "Class on which to find field [" + propertyName + "] cannot be null" );
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.hibernate.engine.spi.SharedSessionContractImplementor;
|
|||
import org.hibernate.engine.spi.TypedValue;
|
||||
import org.hibernate.graph.GraphSemantic;
|
||||
import org.hibernate.graph.RootGraph;
|
||||
import org.hibernate.graph.internal.RootGraphImpl;
|
||||
import org.hibernate.graph.spi.RootGraphImplementor;
|
||||
import org.hibernate.hql.internal.QueryExecutionRequestException;
|
||||
import org.hibernate.internal.EmptyScrollableResults;
|
||||
|
@ -64,7 +65,6 @@ import org.hibernate.internal.HEMLogging;
|
|||
import org.hibernate.internal.util.collections.ArrayHelper;
|
||||
import org.hibernate.jpa.QueryHints;
|
||||
import org.hibernate.jpa.TypedParameterValue;
|
||||
import org.hibernate.graph.internal.RootGraphImpl;
|
||||
import org.hibernate.jpa.internal.util.CacheModeHelper;
|
||||
import org.hibernate.jpa.internal.util.ConfigurationHelper;
|
||||
import org.hibernate.jpa.internal.util.FlushModeTypeHelper;
|
||||
|
@ -80,6 +80,7 @@ import org.hibernate.query.spi.QueryParameterBinding;
|
|||
import org.hibernate.query.spi.QueryParameterBindings;
|
||||
import org.hibernate.query.spi.QueryParameterListBinding;
|
||||
import org.hibernate.query.spi.ScrollableResultsImplementor;
|
||||
import org.hibernate.query.spi.StreamDecorator;
|
||||
import org.hibernate.transform.ResultTransformer;
|
||||
import org.hibernate.type.Type;
|
||||
|
||||
|
@ -1512,8 +1513,10 @@ public abstract class AbstractProducedQuery<R> implements QueryImplementor<R> {
|
|||
final ScrollableResultsIterator<R> iterator = new ScrollableResultsIterator<>( scrollableResults );
|
||||
final Spliterator<R> spliterator = Spliterators.spliteratorUnknownSize( iterator, Spliterator.NONNULL );
|
||||
|
||||
final Stream<R> stream = StreamSupport.stream( spliterator, false );
|
||||
stream.onClose( scrollableResults::close );
|
||||
final Stream<R> stream = new StreamDecorator(
|
||||
StreamSupport.stream( spliterator, false ),
|
||||
scrollableResults::close
|
||||
);
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,318 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
|
||||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
|
||||
*/
|
||||
package org.hibernate.query.spi;
|
||||
|
||||
import java.util.DoubleSummaryStatistics;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.DoubleBinaryOperator;
|
||||
import java.util.function.DoubleConsumer;
|
||||
import java.util.function.DoubleFunction;
|
||||
import java.util.function.DoublePredicate;
|
||||
import java.util.function.DoubleToIntFunction;
|
||||
import java.util.function.DoubleToLongFunction;
|
||||
import java.util.function.DoubleUnaryOperator;
|
||||
import java.util.function.ObjDoubleConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.hibernate.Incubating;
|
||||
|
||||
/**
|
||||
* The {@link DoubleStreamDecorator} wraps a Java {@link DoubleStream} and registers a {@code closeHandler}
|
||||
* which is passed further to any resulting {@link Stream}.
|
||||
* <p>
|
||||
* The goal of the {@link DoubleStreamDecorator} is to close the underlying {@link DoubleStream} upon
|
||||
* calling a terminal operation.
|
||||
*
|
||||
* @author Vlad Mihalcea
|
||||
* @since 5.4
|
||||
*/
|
||||
@Incubating
|
||||
public class DoubleStreamDecorator implements DoubleStream {
|
||||
|
||||
private final DoubleStream delegate;
|
||||
|
||||
private Runnable closeHandler;
|
||||
|
||||
public DoubleStreamDecorator(
|
||||
DoubleStream delegate,
|
||||
Runnable closeHandler) {
|
||||
this.delegate = delegate;
|
||||
this.closeHandler = closeHandler;
|
||||
this.delegate.onClose( closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream filter(DoublePredicate predicate) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.filter( predicate ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream map(DoubleUnaryOperator mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.map( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
|
||||
return new StreamDecorator<>(
|
||||
delegate.mapToObj( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream mapToInt(DoubleToIntFunction mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.mapToInt( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream mapToLong(DoubleToLongFunction mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.mapToLong( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.flatMap( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream distinct() {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.distinct(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream sorted() {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.sorted(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream peek(DoubleConsumer action) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.peek( action ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream limit(long maxSize) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.limit( maxSize ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream skip(long n) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.skip( n ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEach(DoubleConsumer action) {
|
||||
delegate.forEach( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachOrdered(DoubleConsumer action) {
|
||||
delegate.forEachOrdered( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public double[] toArray() {
|
||||
double[] result = delegate.toArray();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double reduce(double identity, DoubleBinaryOperator op) {
|
||||
double result = delegate.reduce( identity, op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble reduce(DoubleBinaryOperator op) {
|
||||
OptionalDouble result = delegate.reduce( op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R collect(
|
||||
Supplier<R> supplier, ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||
R result = delegate.collect( supplier, accumulator, combiner );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public double sum() {
|
||||
double result = delegate.sum();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble min() {
|
||||
OptionalDouble result = delegate.min();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble max() {
|
||||
OptionalDouble result = delegate.max();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count() {
|
||||
long result = delegate.count();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble average() {
|
||||
OptionalDouble result = delegate.average();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleSummaryStatistics summaryStatistics() {
|
||||
DoubleSummaryStatistics result = delegate.summaryStatistics();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean anyMatch(DoublePredicate predicate) {
|
||||
boolean result = delegate.anyMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allMatch(DoublePredicate predicate) {
|
||||
boolean result = delegate.allMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean noneMatch(DoublePredicate predicate) {
|
||||
boolean result = delegate.noneMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble findFirst() {
|
||||
OptionalDouble result = delegate.findFirst();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble findAny() {
|
||||
OptionalDouble result = delegate.findAny();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Double> boxed() {
|
||||
return new StreamDecorator<>(
|
||||
delegate.boxed(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream sequential() {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.sequential(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream parallel() {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.parallel(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream unordered() {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.unordered(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream onClose(Runnable closeHandler) {
|
||||
this.closeHandler = closeHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveIterator.OfDouble iterator() {
|
||||
return delegate.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spliterator.OfDouble spliterator() {
|
||||
return delegate.spliterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isParallel() {
|
||||
return delegate.isParallel();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,333 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
|
||||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
|
||||
*/
|
||||
package org.hibernate.query.spi;
|
||||
|
||||
import java.util.IntSummaryStatistics;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.IntBinaryOperator;
|
||||
import java.util.function.IntConsumer;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.IntPredicate;
|
||||
import java.util.function.IntToDoubleFunction;
|
||||
import java.util.function.IntToLongFunction;
|
||||
import java.util.function.IntUnaryOperator;
|
||||
import java.util.function.ObjIntConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.hibernate.Incubating;
|
||||
|
||||
/**
|
||||
* The {@link IntStreamDecorator} wraps a Java {@link IntStream} and registers a {@code closeHandler}
|
||||
* which is passed further to any resulting {@link Stream}.
|
||||
* <p>
|
||||
* The goal of the {@link IntStreamDecorator} is to close the underlying {@link IntStream} upon
|
||||
* calling a terminal operation.
|
||||
*
|
||||
* @author Vlad Mihalcea
|
||||
* @since 5.4
|
||||
*/
|
||||
@Incubating
|
||||
public class IntStreamDecorator implements IntStream {
|
||||
|
||||
private final IntStream delegate;
|
||||
|
||||
private Runnable closeHandler;
|
||||
|
||||
public IntStreamDecorator(
|
||||
IntStream delegate,
|
||||
Runnable closeHandler) {
|
||||
this.delegate = delegate;
|
||||
this.closeHandler = closeHandler;
|
||||
this.delegate.onClose( closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream filter(IntPredicate predicate) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.filter( predicate ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream map(IntUnaryOperator mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.map( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <U> Stream<U> mapToObj(IntFunction<? extends U> mapper) {
|
||||
return new StreamDecorator<>(
|
||||
delegate.mapToObj( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream mapToLong(IntToLongFunction mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.mapToLong( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream mapToDouble(IntToDoubleFunction mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.mapToDouble( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream flatMap(IntFunction<? extends IntStream> mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.flatMap( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream distinct() {
|
||||
return new IntStreamDecorator(
|
||||
delegate.distinct(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream sorted() {
|
||||
return new IntStreamDecorator(
|
||||
delegate.sorted(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream peek(IntConsumer action) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.peek( action ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream limit(long maxSize) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.limit( maxSize ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream skip(long n) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.skip( n ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEach(IntConsumer action) {
|
||||
delegate.forEach( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachOrdered(IntConsumer action) {
|
||||
delegate.forEachOrdered( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int[] toArray() {
|
||||
int[] result = delegate.toArray();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int reduce(int identity, IntBinaryOperator op) {
|
||||
int result = delegate.reduce( identity, op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt reduce(IntBinaryOperator op) {
|
||||
OptionalInt result = delegate.reduce( op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R collect(
|
||||
Supplier<R> supplier, ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||
R result = delegate.collect( supplier, accumulator, combiner );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sum() {
|
||||
int result = delegate.sum();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt min() {
|
||||
OptionalInt result = delegate.min();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt max() {
|
||||
OptionalInt result = delegate.max();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count() {
|
||||
long result = delegate.count();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble average() {
|
||||
OptionalDouble result = delegate.average();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntSummaryStatistics summaryStatistics() {
|
||||
IntSummaryStatistics result = delegate.summaryStatistics();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean anyMatch(IntPredicate predicate) {
|
||||
boolean result = delegate.anyMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allMatch(IntPredicate predicate) {
|
||||
boolean result = delegate.allMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean noneMatch(IntPredicate predicate) {
|
||||
boolean result = delegate.noneMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt findFirst() {
|
||||
OptionalInt result = delegate.findFirst();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt findAny() {
|
||||
OptionalInt result = delegate.findAny();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream asLongStream() {
|
||||
LongStream result = delegate.asLongStream();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream asDoubleStream() {
|
||||
DoubleStream result = delegate.asDoubleStream();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Integer> boxed() {
|
||||
return new StreamDecorator<>(
|
||||
delegate.boxed(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream sequential() {
|
||||
return new IntStreamDecorator(
|
||||
delegate.sequential(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream parallel() {
|
||||
return new IntStreamDecorator(
|
||||
delegate.parallel(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream unordered() {
|
||||
return new IntStreamDecorator(
|
||||
delegate.unordered(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream onClose(Runnable closeHandler) {
|
||||
this.closeHandler = closeHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveIterator.OfInt iterator() {
|
||||
return delegate.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spliterator.OfInt spliterator() {
|
||||
return delegate.spliterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isParallel() {
|
||||
return delegate.isParallel();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,326 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
|
||||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
|
||||
*/
|
||||
package org.hibernate.query.spi;
|
||||
|
||||
import java.util.LongSummaryStatistics;
|
||||
import java.util.OptionalDouble;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.PrimitiveIterator;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.LongBinaryOperator;
|
||||
import java.util.function.LongConsumer;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.LongPredicate;
|
||||
import java.util.function.LongToDoubleFunction;
|
||||
import java.util.function.LongToIntFunction;
|
||||
import java.util.function.LongUnaryOperator;
|
||||
import java.util.function.ObjLongConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.hibernate.Incubating;
|
||||
|
||||
/**
|
||||
* The {@link LongStreamDecorator} wraps a Java {@link LongStream} and registers a {@code closeHandler}
|
||||
* which is passed further to any resulting {@link Stream}.
|
||||
*
|
||||
* The goal of the {@link LongStreamDecorator} is to close the underlying {@link LongStream} upon
|
||||
* calling a terminal operation.
|
||||
*
|
||||
* @author Vlad Mihalcea
|
||||
* @since 5.4
|
||||
*/
|
||||
@Incubating
|
||||
public class LongStreamDecorator implements LongStream {
|
||||
|
||||
private final LongStream delegate;
|
||||
|
||||
private Runnable closeHandler;
|
||||
|
||||
public LongStreamDecorator(
|
||||
LongStream delegate,
|
||||
Runnable closeHandler) {
|
||||
this.delegate = delegate;
|
||||
this.closeHandler = closeHandler;
|
||||
this.delegate.onClose( closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream filter(LongPredicate predicate) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.filter( predicate ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream map(LongUnaryOperator mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.map( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
|
||||
return new StreamDecorator<>(
|
||||
delegate.mapToObj( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream mapToInt(LongToIntFunction mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.mapToInt( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream mapToDouble(LongToDoubleFunction mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.mapToDouble( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream flatMap(LongFunction<? extends LongStream> mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.flatMap( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream distinct() {
|
||||
return new LongStreamDecorator(
|
||||
delegate.distinct(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream sorted() {
|
||||
return new LongStreamDecorator(
|
||||
delegate.sorted(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream peek(LongConsumer action) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.peek( action ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream limit(long maxSize) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.limit( maxSize ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream skip(long n) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.skip( n ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEach(LongConsumer action) {
|
||||
delegate.forEach( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachOrdered(LongConsumer action) {
|
||||
delegate.forEachOrdered( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long[] toArray() {
|
||||
long[] result = delegate.toArray();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long reduce(long identity, LongBinaryOperator op) {
|
||||
long result = delegate.reduce( identity, op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong reduce(LongBinaryOperator op) {
|
||||
OptionalLong result = delegate.reduce( op );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> R collect(
|
||||
Supplier<R> supplier, ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner) {
|
||||
R result = delegate.collect( supplier, accumulator, combiner );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long sum() {
|
||||
long result = delegate.sum();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong min() {
|
||||
OptionalLong result = delegate.min();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong max() {
|
||||
OptionalLong result = delegate.max();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count() {
|
||||
long result = delegate.count();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalDouble average() {
|
||||
OptionalDouble result = delegate.average();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongSummaryStatistics summaryStatistics() {
|
||||
LongSummaryStatistics result = delegate.summaryStatistics();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean anyMatch(LongPredicate predicate) {
|
||||
boolean result = delegate.anyMatch(predicate);
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allMatch(LongPredicate predicate) {
|
||||
boolean result = delegate.allMatch(predicate);
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean noneMatch(LongPredicate predicate) {
|
||||
boolean result = delegate.noneMatch(predicate);
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong findFirst() {
|
||||
OptionalLong result = delegate.findFirst();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalLong findAny() {
|
||||
OptionalLong result = delegate.findAny();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream asDoubleStream() {
|
||||
DoubleStream result = delegate.asDoubleStream();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Long> boxed() {
|
||||
return new StreamDecorator<>(
|
||||
delegate.boxed(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream sequential() {
|
||||
return new LongStreamDecorator(
|
||||
delegate.sequential(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream parallel() {
|
||||
return new LongStreamDecorator(
|
||||
delegate.parallel(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream unordered() {
|
||||
return new LongStreamDecorator(
|
||||
delegate.unordered(),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream onClose(Runnable closeHandler) {
|
||||
this.closeHandler = closeHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveIterator.OfLong iterator() {
|
||||
return delegate.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spliterator.OfLong spliterator() {
|
||||
return delegate.spliterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isParallel() {
|
||||
return delegate.isParallel();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,340 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
|
||||
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
|
||||
*/
|
||||
package org.hibernate.query.spi;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.BinaryOperator;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.IntFunction;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToDoubleFunction;
|
||||
import java.util.function.ToIntFunction;
|
||||
import java.util.function.ToLongFunction;
|
||||
import java.util.stream.Collector;
|
||||
import java.util.stream.DoubleStream;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.hibernate.Incubating;
|
||||
import org.hibernate.internal.util.ReflectHelper;
|
||||
|
||||
/**
|
||||
* The {@link StreamDecorator} wraps a Java {@link Stream} and registers a {@code closeHandler}
|
||||
* which is passed further to any resulting {@link Stream}.
|
||||
*
|
||||
* The goal of the {@link StreamDecorator} is to close the underlying {@link Stream} upon
|
||||
* calling a terminal operation.
|
||||
*
|
||||
* @author Vlad Mihalcea
|
||||
* @since 5.4
|
||||
*/
|
||||
@Incubating
|
||||
public class StreamDecorator<R> implements Stream<R> {
|
||||
|
||||
private final Stream<R> delegate;
|
||||
|
||||
private Runnable closeHandler;
|
||||
|
||||
public StreamDecorator(
|
||||
Stream<R> delegate,
|
||||
Runnable closeHandler) {
|
||||
this.delegate = delegate;
|
||||
this.closeHandler = closeHandler;
|
||||
this.delegate.onClose( closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> filter(Predicate<? super R> predicate) {
|
||||
return new StreamDecorator<R>( delegate.filter( predicate ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R1> Stream<R1> map(Function<? super R, ? extends R1> mapper) {
|
||||
return new StreamDecorator<>( delegate.map( mapper ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream mapToInt(ToIntFunction<? super R> mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.mapToInt( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream mapToLong(ToLongFunction<? super R> mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.mapToLong( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream mapToDouble(ToDoubleFunction<? super R> mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.mapToDouble( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R1> Stream<R1> flatMap(Function<? super R, ? extends Stream<? extends R1>> mapper) {
|
||||
return new StreamDecorator<>( delegate.flatMap( mapper ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntStream flatMapToInt(Function<? super R, ? extends IntStream> mapper) {
|
||||
return new IntStreamDecorator(
|
||||
delegate.flatMapToInt( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStream flatMapToLong(Function<? super R, ? extends LongStream> mapper) {
|
||||
return new LongStreamDecorator(
|
||||
delegate.flatMapToLong( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DoubleStream flatMapToDouble(Function<? super R, ? extends DoubleStream> mapper) {
|
||||
return new DoubleStreamDecorator(
|
||||
delegate.flatMapToDouble( mapper ),
|
||||
closeHandler
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> distinct() {
|
||||
return new StreamDecorator<>( delegate.distinct(), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> sorted() {
|
||||
return new StreamDecorator<>( delegate.sorted(), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> sorted(Comparator<? super R> comparator) {
|
||||
return new StreamDecorator<>( delegate.sorted( comparator ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> peek(Consumer<? super R> action) {
|
||||
return new StreamDecorator<>( delegate.peek( action ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> limit(long maxSize) {
|
||||
return new StreamDecorator<>( delegate.limit( maxSize ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> skip(long n) {
|
||||
return new StreamDecorator<>( delegate.skip( n ), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEach(Consumer<? super R> action) {
|
||||
delegate.forEach( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void forEachOrdered(Consumer<? super R> action) {
|
||||
delegate.forEachOrdered( action );
|
||||
close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
Object[] result = delegate.toArray();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <A> A[] toArray(IntFunction<A[]> generator) {
|
||||
A[] result = delegate.toArray( generator );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public R reduce(R identity, BinaryOperator<R> accumulator) {
|
||||
R result = delegate.reduce( identity, accumulator );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<R> reduce(BinaryOperator<R> accumulator) {
|
||||
Optional<R> result = delegate.reduce( accumulator );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <U> U reduce(
|
||||
U identity, BiFunction<U, ? super R, U> accumulator, BinaryOperator<U> combiner) {
|
||||
U result = delegate.reduce( identity, accumulator, combiner );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R1> R1 collect(
|
||||
Supplier<R1> supplier, BiConsumer<R1, ? super R> accumulator, BiConsumer<R1, R1> combiner) {
|
||||
R1 result = delegate.collect( supplier, accumulator, combiner );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R1, A> R1 collect(Collector<? super R, A, R1> collector) {
|
||||
R1 result = delegate.collect( collector );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<R> min(Comparator<? super R> comparator) {
|
||||
Optional<R> result = delegate.min( comparator );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<R> max(Comparator<? super R> comparator) {
|
||||
Optional<R> result = delegate.max( comparator );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count() {
|
||||
long result = delegate.count();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean anyMatch(Predicate<? super R> predicate) {
|
||||
boolean result = delegate.anyMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allMatch(Predicate<? super R> predicate) {
|
||||
boolean result = delegate.allMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean noneMatch(Predicate<? super R> predicate) {
|
||||
boolean result = delegate.noneMatch( predicate );
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<R> findFirst() {
|
||||
Optional<R> result = delegate.findFirst();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<R> findAny() {
|
||||
Optional<R> result = delegate.findAny();
|
||||
close();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<R> iterator() {
|
||||
return delegate.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spliterator<R> spliterator() {
|
||||
return delegate.spliterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isParallel() {
|
||||
return delegate.isParallel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> sequential() {
|
||||
return new StreamDecorator<>( delegate.sequential(), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> parallel() {
|
||||
return new StreamDecorator<>( delegate.parallel(), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> unordered() {
|
||||
return new StreamDecorator<>( delegate.unordered(), closeHandler );
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<R> onClose(Runnable closeHandler) {
|
||||
this.closeHandler = closeHandler;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
//Methods added to JDK 9
|
||||
|
||||
public Stream<R> takeWhile(Predicate<? super R> predicate) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
Stream<R> result = (Stream<R>)
|
||||
ReflectHelper.getMethod( Stream.class, "takeWhile", Predicate.class )
|
||||
.invoke( delegate, predicate );
|
||||
return new StreamDecorator<>( result, closeHandler );
|
||||
}
|
||||
catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new HibernateException( e );
|
||||
}
|
||||
}
|
||||
|
||||
public Stream<R> dropWhile(Predicate<? super R> predicate) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
Stream<R> result = (Stream<R>)
|
||||
ReflectHelper.getMethod( Stream.class, "dropWhile", Predicate.class )
|
||||
.invoke( delegate, predicate );
|
||||
return new StreamDecorator<>( result, closeHandler );
|
||||
}
|
||||
catch (IllegalAccessException | InvocationTargetException e) {
|
||||
throw new HibernateException( e );
|
||||
}
|
||||
}
|
||||
}
|
|
@ -49,7 +49,7 @@ public class BasicStreamTest extends BaseNonConfigCoreFunctionalTestCase {
|
|||
final Stream<MyEntity> stream = session.createQuery( "from MyEntity", MyEntity.class ).stream();
|
||||
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( true ) );
|
||||
stream.forEach( System.out::println );
|
||||
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( true ) );
|
||||
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( false ) );
|
||||
stream.close();
|
||||
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( false ) );
|
||||
|
||||
|
|
|
@ -6,27 +6,34 @@
|
|||
*/
|
||||
package org.hibernate.test.stream.basic;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.Id;
|
||||
import javax.persistence.Table;
|
||||
import javax.persistence.Tuple;
|
||||
import javax.persistence.criteria.CriteriaBuilder;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
import javax.persistence.criteria.Root;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.boot.MetadataSources;
|
||||
import org.hibernate.engine.spi.SessionImplementor;
|
||||
import org.hibernate.dialect.H2Dialect;
|
||||
import org.hibernate.engine.jdbc.spi.JdbcCoordinator;
|
||||
import org.hibernate.engine.spi.SharedSessionContractImplementor;
|
||||
import org.hibernate.internal.util.ReflectHelper;
|
||||
import org.hibernate.resource.jdbc.ResourceRegistry;
|
||||
|
||||
import org.hibernate.testing.RequiresDialect;
|
||||
import org.hibernate.testing.TestForIssue;
|
||||
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hibernate.testing.junit4.ExtraAssertions.assertTyping;
|
||||
import static org.hibernate.testing.transaction.TransactionUtil.doInHibernate;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* @author Steve Ebersole
|
||||
|
@ -70,12 +77,180 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
|
|||
} );
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestForIssue( jiraKey = "HHH-13872")
|
||||
@RequiresDialect(H2Dialect.class)
|
||||
public void testStreamCloseOnTerminalOperation() {
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
session.createQuery( "delete from MyEntity" ).executeUpdate();
|
||||
|
||||
for ( int i = 1; i <= 10; i++ ) {
|
||||
MyEntity e = new MyEntity();
|
||||
e.id = i;
|
||||
e.name = "Test";
|
||||
session.persist( e );
|
||||
}
|
||||
} );
|
||||
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Stream<MyEntity> stream = session
|
||||
.createQuery( "SELECT me FROM MyEntity me" )
|
||||
.getResultStream();
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry(session);
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
List<MyEntity> entities = stream.collect( Collectors.toList() ) ;
|
||||
assertEquals(10, entities.size());
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
} );
|
||||
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Stream<MyEntity> stream = session
|
||||
.createQuery( "SELECT me FROM MyEntity me" )
|
||||
.getResultStream();
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry(session);
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
int sum = stream.mapToInt( MyEntity::getId ).sum();
|
||||
assertEquals(55, sum);
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
} );
|
||||
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Stream<MyEntity> stream = session
|
||||
.createQuery( "SELECT me FROM MyEntity me" )
|
||||
.getResultStream();
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry(session);
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
long result = stream.mapToLong( entity -> entity.id * 10 ).min().getAsLong();
|
||||
assertEquals(10, result);
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
} );
|
||||
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Stream<MyEntity> stream = session
|
||||
.createQuery( "SELECT me FROM MyEntity me" )
|
||||
.getResultStream();
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry(session);
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
double result = stream.mapToDouble( entity -> entity.id * 0.1D ).max().getAsDouble();
|
||||
assertEquals(1, result, 0.1);
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
} );
|
||||
|
||||
//Test call close explicitly
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
|
||||
try (Stream<Long> stream = session
|
||||
.createQuery( "SELECT me.id FROM MyEntity me" )
|
||||
.getResultStream()) {
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry( session );
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
Object[] result = stream.sorted().skip( 5 ).limit( 5 ).toArray();
|
||||
assertEquals( 5, result.length );
|
||||
assertEquals( 6, result[0] );
|
||||
assertEquals( 10, result[4] );
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
}
|
||||
} );
|
||||
|
||||
//Test Java 9 Stream methods
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Method takeWhileMethod = ReflectHelper.getMethod( Stream.class, "takeWhile", Predicate.class );
|
||||
|
||||
if ( takeWhileMethod != null ) {
|
||||
try (Stream<Long> stream = session
|
||||
.createQuery( "SELECT me.id FROM MyEntity me" )
|
||||
.getResultStream()) {
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry( session );
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
Predicate<Integer> predicate = id -> id <= 5;
|
||||
|
||||
Stream<Integer> takeWhileStream = (Stream<Integer>) takeWhileMethod.invoke( stream, predicate );
|
||||
|
||||
List<Integer> result = takeWhileStream.collect( Collectors.toList() );
|
||||
|
||||
assertEquals( 5, result.size() );
|
||||
assertTrue( result.contains( 1 ) );
|
||||
assertTrue( result.contains( 3 ) );
|
||||
assertTrue( result.contains( 5 ) );
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
}
|
||||
catch (IllegalAccessException | InvocationTargetException e) {
|
||||
fail( "Could not execute takeWhile because of " + e.getMessage() );
|
||||
}
|
||||
}
|
||||
} );
|
||||
|
||||
doInHibernate( this::sessionFactory, session -> {
|
||||
Method dropWhileMethod = ReflectHelper.getMethod( Stream.class, "dropWhile", Predicate.class );
|
||||
|
||||
if ( dropWhileMethod != null ) {
|
||||
try (Stream<Long> stream = session
|
||||
.createQuery( "SELECT me.id FROM MyEntity me" )
|
||||
.getResultStream()) {
|
||||
|
||||
ResourceRegistry resourceRegistry = resourceRegistry( session );
|
||||
assertTrue( resourceRegistry.hasRegisteredResources() );
|
||||
|
||||
Predicate<Integer> predicate = id -> id <= 5;
|
||||
|
||||
Stream<Integer> dropWhileStream = (Stream<Integer>) dropWhileMethod.invoke( stream, predicate );
|
||||
|
||||
List<Integer> result = dropWhileStream.collect( Collectors.toList() );
|
||||
|
||||
assertEquals( 5, result.size() );
|
||||
assertTrue( result.contains( 6 ) );
|
||||
assertTrue( result.contains( 8 ) );
|
||||
assertTrue( result.contains( 10 ) );
|
||||
|
||||
assertFalse( resourceRegistry.hasRegisteredResources() );
|
||||
}
|
||||
catch (IllegalAccessException | InvocationTargetException e) {
|
||||
fail( "Could not execute takeWhile because of " + e.getMessage() );
|
||||
}
|
||||
}
|
||||
} );
|
||||
}
|
||||
|
||||
private ResourceRegistry resourceRegistry(Session session) {
|
||||
SharedSessionContractImplementor sharedSessionContractImplementor = (SharedSessionContractImplementor) session;
|
||||
JdbcCoordinator jdbcCoordinator = sharedSessionContractImplementor.getJdbcCoordinator();
|
||||
return jdbcCoordinator.getLogicalConnection().getResourceRegistry();
|
||||
}
|
||||
|
||||
@Entity(name = "MyEntity")
|
||||
@Table(name="MyEntity")
|
||||
public static class MyEntity {
|
||||
|
||||
@Id
|
||||
public Integer id;
|
||||
|
||||
public String name;
|
||||
|
||||
public Integer getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue