From d9a335c429ef812bc9726e9d467597ebae2b81a5 Mon Sep 17 00:00:00 2001 From: Vlad Mihalcea Date: Thu, 20 Feb 2020 11:16:11 +0200 Subject: [PATCH] HHH-13872 - Make the Java Stream close the underlying ScrollableResultsIterator upon calling a terminal operation --- .../userguide/chapters/query/hql/HQL.adoc | 35 +- .../org/hibernate/userguide/hql/HQLTest.java | 34 +- .../internal/util/ReflectHelper.java | 9 + .../query/internal/AbstractProducedQuery.java | 9 +- .../query/spi/DoubleStreamDecorator.java | 318 ++++++++++++++++ .../query/spi/IntStreamDecorator.java | 333 +++++++++++++++++ .../query/spi/LongStreamDecorator.java | 326 +++++++++++++++++ .../hibernate/query/spi/StreamDecorator.java | 340 ++++++++++++++++++ .../test/stream/basic/BasicStreamTest.java | 2 +- .../test/stream/basic/JpaStreamTest.java | 191 +++++++++- 10 files changed, 1582 insertions(+), 15 deletions(-) create mode 100644 hibernate-core/src/main/java/org/hibernate/query/spi/DoubleStreamDecorator.java create mode 100644 hibernate-core/src/main/java/org/hibernate/query/spi/IntStreamDecorator.java create mode 100644 hibernate-core/src/main/java/org/hibernate/query/spi/LongStreamDecorator.java create mode 100644 hibernate-core/src/main/java/org/hibernate/query/spi/StreamDecorator.java diff --git a/documentation/src/main/asciidoc/userguide/chapters/query/hql/HQL.adoc b/documentation/src/main/asciidoc/userguide/chapters/query/hql/HQL.adoc index 2db548b9ee..17902a1f96 100644 --- a/documentation/src/main/asciidoc/userguide/chapters/query/hql/HQL.adoc +++ b/documentation/src/main/asciidoc/userguide/chapters/query/hql/HQL.adoc @@ -165,9 +165,10 @@ include::{sourcedir}/HQLTest.java[tags=jpql-api-positional-parameter-example] It's good practice not to mix parameter binding forms in a given query. ==== -In terms of execution, JPA `Query` offers 2 different methods for retrieving a result set. +In terms of execution, JPA `Query` offers 3 different methods for retrieving a result set. * `Query#getResultList()` - executes the select query and returns back the list of results. +* `Query#getResultStream()` - executes the select query and returns back a `Stream` over the results. * `Query#getSingleResult()` - executes the select query and returns a single result. If there were more than one result an exception is thrown. [[jpql-api-list-example]] @@ -179,6 +180,15 @@ include::{sourcedir}/HQLTest.java[tags=jpql-api-list-example] ---- ==== +[[jpql-api-stream-example]] +.JPA `getResultStream()` result +==== +[source, JAVA, indent=0] +---- +include::{sourcedir}/HQLTest.java[tags=jpql-api-stream-example] +---- +==== + [[jpql-api-unique-result-example]] .JPA `getSingleResult()` ==== @@ -414,6 +424,29 @@ include::{sourcedir}/HQLTest.java[tags=hql-api-stream-example] Just like with `ScrollableResults`, you should always close a Hibernate `Stream` either explicitly or using a https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html[try-with-resources] block. ==== +[[jpql-api-stream]] +==== Query streaming + +Since version 2.2, the JPA `Query` interface offers support for returning a `Stream` via the `getResultStream` method. + +Just like the `scroll` method, you can use a try-with-resources block to close the `Stream` +prior to closing the currently running Persistence Context. + +Since Hibernate 5.4, the `Stream` is also closed when calling a terminal operation, +as illustrated by the following example. + +[[jpql-api-stream-terminal-operation]] +==== +[source, JAVA, indent=0] +---- +include::{sourcedir}/HQLTest.java[tags=jpql-api-stream-terminal-operation] +---- +==== + +The `Stream` is closed automatically after calling the `collect` method, +since there is no reason to keep the underlying JDBC `ResultSet` open +if the `Stream` cannot be reused. + [[hql-case-sensitivity]] === Case Sensitivity diff --git a/documentation/src/test/java/org/hibernate/userguide/hql/HQLTest.java b/documentation/src/test/java/org/hibernate/userguide/hql/HQLTest.java index 702ed53632..9867a70d0a 100644 --- a/documentation/src/test/java/org/hibernate/userguide/hql/HQLTest.java +++ b/documentation/src/test/java/org/hibernate/userguide/hql/HQLTest.java @@ -10,7 +10,6 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.ZoneOffset; -import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; @@ -43,7 +42,6 @@ import org.hibernate.userguide.model.PhoneType; import org.hibernate.userguide.model.WireTransferPayment; import org.hibernate.testing.DialectChecks; -import org.hibernate.testing.FailureExpected; import org.hibernate.testing.RequiresDialect; import org.hibernate.testing.RequiresDialectFeature; import org.hibernate.testing.SkipForDialect; @@ -673,6 +671,38 @@ public class HQLTest extends BaseEntityManagerFunctionalTestCase { }); } + @Test + public void test_jpql_api_stream_example() { + doInJPA( this::entityManagerFactory, entityManager -> { + //tag::jpql-api-stream-example[] + try(Stream personStream = entityManager.createQuery( + "select p " + + "from Person p " + + "where p.name like :name", Person.class ) + .setParameter( "name", "J%" ) + .getResultStream()) { + List persons = personStream + .skip( 5 ) + .limit( 5 ) + .collect( Collectors.toList() ); + } + //end::jpql-api-stream-example[] + + // tag::jpql-api-stream-terminal-operation[] + List persons = entityManager.createQuery( + "select p " + + "from Person p " + + "where p.name like :name", Person.class ) + .setParameter( "name", "J%" ) + .getResultStream() + .skip( 5 ) + .limit( 5 ) + .collect( Collectors.toList() ); + + //end::jpql-api-stream-terminal-operation[] + }); + } + @Test public void test_jpql_api_single_result_example() { doInJPA( this::entityManagerFactory, entityManager -> { diff --git a/hibernate-core/src/main/java/org/hibernate/internal/util/ReflectHelper.java b/hibernate-core/src/main/java/org/hibernate/internal/util/ReflectHelper.java index bdade10f2d..be5a0caacc 100644 --- a/hibernate-core/src/main/java/org/hibernate/internal/util/ReflectHelper.java +++ b/hibernate-core/src/main/java/org/hibernate/internal/util/ReflectHelper.java @@ -375,6 +375,15 @@ public final class ReflectHelper { } } + public static Method getMethod(Class clazz, String methodName, Class... paramTypes) { + try { + return clazz.getMethod( methodName, paramTypes ); + } + catch (Exception e) { + return null; + } + } + public static Field findField(Class containerClass, String propertyName) { if ( containerClass == null ) { throw new IllegalArgumentException( "Class on which to find field [" + propertyName + "] cannot be null" ); diff --git a/hibernate-core/src/main/java/org/hibernate/query/internal/AbstractProducedQuery.java b/hibernate-core/src/main/java/org/hibernate/query/internal/AbstractProducedQuery.java index 374410912a..2c0eafd11a 100644 --- a/hibernate-core/src/main/java/org/hibernate/query/internal/AbstractProducedQuery.java +++ b/hibernate-core/src/main/java/org/hibernate/query/internal/AbstractProducedQuery.java @@ -56,6 +56,7 @@ import org.hibernate.engine.spi.SharedSessionContractImplementor; import org.hibernate.engine.spi.TypedValue; import org.hibernate.graph.GraphSemantic; import org.hibernate.graph.RootGraph; +import org.hibernate.graph.internal.RootGraphImpl; import org.hibernate.graph.spi.RootGraphImplementor; import org.hibernate.hql.internal.QueryExecutionRequestException; import org.hibernate.internal.EmptyScrollableResults; @@ -64,7 +65,6 @@ import org.hibernate.internal.HEMLogging; import org.hibernate.internal.util.collections.ArrayHelper; import org.hibernate.jpa.QueryHints; import org.hibernate.jpa.TypedParameterValue; -import org.hibernate.graph.internal.RootGraphImpl; import org.hibernate.jpa.internal.util.CacheModeHelper; import org.hibernate.jpa.internal.util.ConfigurationHelper; import org.hibernate.jpa.internal.util.FlushModeTypeHelper; @@ -80,6 +80,7 @@ import org.hibernate.query.spi.QueryParameterBinding; import org.hibernate.query.spi.QueryParameterBindings; import org.hibernate.query.spi.QueryParameterListBinding; import org.hibernate.query.spi.ScrollableResultsImplementor; +import org.hibernate.query.spi.StreamDecorator; import org.hibernate.transform.ResultTransformer; import org.hibernate.type.Type; @@ -1512,8 +1513,10 @@ public abstract class AbstractProducedQuery implements QueryImplementor { final ScrollableResultsIterator iterator = new ScrollableResultsIterator<>( scrollableResults ); final Spliterator spliterator = Spliterators.spliteratorUnknownSize( iterator, Spliterator.NONNULL ); - final Stream stream = StreamSupport.stream( spliterator, false ); - stream.onClose( scrollableResults::close ); + final Stream stream = new StreamDecorator( + StreamSupport.stream( spliterator, false ), + scrollableResults::close + ); return stream; } diff --git a/hibernate-core/src/main/java/org/hibernate/query/spi/DoubleStreamDecorator.java b/hibernate-core/src/main/java/org/hibernate/query/spi/DoubleStreamDecorator.java new file mode 100644 index 0000000000..9035a68289 --- /dev/null +++ b/hibernate-core/src/main/java/org/hibernate/query/spi/DoubleStreamDecorator.java @@ -0,0 +1,318 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.query.spi; + +import java.util.DoubleSummaryStatistics; +import java.util.OptionalDouble; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.DoubleBinaryOperator; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoublePredicate; +import java.util.function.DoubleToIntFunction; +import java.util.function.DoubleToLongFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import org.hibernate.Incubating; + +/** + * The {@link DoubleStreamDecorator} wraps a Java {@link DoubleStream} and registers a {@code closeHandler} + * which is passed further to any resulting {@link Stream}. + *

+ * The goal of the {@link DoubleStreamDecorator} is to close the underlying {@link DoubleStream} upon + * calling a terminal operation. + * + * @author Vlad Mihalcea + * @since 5.4 + */ +@Incubating +public class DoubleStreamDecorator implements DoubleStream { + + private final DoubleStream delegate; + + private Runnable closeHandler; + + public DoubleStreamDecorator( + DoubleStream delegate, + Runnable closeHandler) { + this.delegate = delegate; + this.closeHandler = closeHandler; + this.delegate.onClose( closeHandler ); + } + + @Override + public DoubleStream filter(DoublePredicate predicate) { + return new DoubleStreamDecorator( + delegate.filter( predicate ), + closeHandler + ); + } + + @Override + public DoubleStream map(DoubleUnaryOperator mapper) { + return new DoubleStreamDecorator( + delegate.map( mapper ), + closeHandler + ); + } + + @Override + public Stream mapToObj(DoubleFunction mapper) { + return new StreamDecorator<>( + delegate.mapToObj( mapper ), + closeHandler + ); + } + + @Override + public IntStream mapToInt(DoubleToIntFunction mapper) { + return new IntStreamDecorator( + delegate.mapToInt( mapper ), + closeHandler + ); + } + + @Override + public LongStream mapToLong(DoubleToLongFunction mapper) { + return new LongStreamDecorator( + delegate.mapToLong( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream flatMap(DoubleFunction mapper) { + return new DoubleStreamDecorator( + delegate.flatMap( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream distinct() { + return new DoubleStreamDecorator( + delegate.distinct(), + closeHandler + ); + } + + @Override + public DoubleStream sorted() { + return new DoubleStreamDecorator( + delegate.sorted(), + closeHandler + ); + } + + @Override + public DoubleStream peek(DoubleConsumer action) { + return new DoubleStreamDecorator( + delegate.peek( action ), + closeHandler + ); + } + + @Override + public DoubleStream limit(long maxSize) { + return new DoubleStreamDecorator( + delegate.limit( maxSize ), + closeHandler + ); + } + + @Override + public DoubleStream skip(long n) { + return new DoubleStreamDecorator( + delegate.skip( n ), + closeHandler + ); + } + + @Override + public void forEach(DoubleConsumer action) { + delegate.forEach( action ); + close(); + } + + @Override + public void forEachOrdered(DoubleConsumer action) { + delegate.forEachOrdered( action ); + close(); + } + + @Override + public double[] toArray() { + double[] result = delegate.toArray(); + close(); + return result; + } + + @Override + public double reduce(double identity, DoubleBinaryOperator op) { + double result = delegate.reduce( identity, op ); + close(); + return result; + } + + @Override + public OptionalDouble reduce(DoubleBinaryOperator op) { + OptionalDouble result = delegate.reduce( op ); + close(); + return result; + } + + @Override + public R collect( + Supplier supplier, ObjDoubleConsumer accumulator, BiConsumer combiner) { + R result = delegate.collect( supplier, accumulator, combiner ); + close(); + return result; + } + + @Override + public double sum() { + double result = delegate.sum(); + close(); + return result; + } + + @Override + public OptionalDouble min() { + OptionalDouble result = delegate.min(); + close(); + return result; + } + + @Override + public OptionalDouble max() { + OptionalDouble result = delegate.max(); + close(); + return result; + } + + @Override + public long count() { + long result = delegate.count(); + close(); + return result; + } + + @Override + public OptionalDouble average() { + OptionalDouble result = delegate.average(); + close(); + return result; + } + + @Override + public DoubleSummaryStatistics summaryStatistics() { + DoubleSummaryStatistics result = delegate.summaryStatistics(); + close(); + return result; + } + + @Override + public boolean anyMatch(DoublePredicate predicate) { + boolean result = delegate.anyMatch( predicate ); + close(); + return result; + } + + @Override + public boolean allMatch(DoublePredicate predicate) { + boolean result = delegate.allMatch( predicate ); + close(); + return result; + } + + @Override + public boolean noneMatch(DoublePredicate predicate) { + boolean result = delegate.noneMatch( predicate ); + close(); + return result; + } + + @Override + public OptionalDouble findFirst() { + OptionalDouble result = delegate.findFirst(); + close(); + return result; + } + + @Override + public OptionalDouble findAny() { + OptionalDouble result = delegate.findAny(); + close(); + return result; + } + + @Override + public Stream boxed() { + return new StreamDecorator<>( + delegate.boxed(), + closeHandler + ); + } + + @Override + public DoubleStream sequential() { + return new DoubleStreamDecorator( + delegate.sequential(), + closeHandler + ); + } + + @Override + public DoubleStream parallel() { + return new DoubleStreamDecorator( + delegate.parallel(), + closeHandler + ); + } + + @Override + public DoubleStream unordered() { + return new DoubleStreamDecorator( + delegate.unordered(), + closeHandler + ); + } + + @Override + public DoubleStream onClose(Runnable closeHandler) { + this.closeHandler = closeHandler; + return this; + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public PrimitiveIterator.OfDouble iterator() { + return delegate.iterator(); + } + + @Override + public Spliterator.OfDouble spliterator() { + return delegate.spliterator(); + } + + @Override + public boolean isParallel() { + return delegate.isParallel(); + } +} diff --git a/hibernate-core/src/main/java/org/hibernate/query/spi/IntStreamDecorator.java b/hibernate-core/src/main/java/org/hibernate/query/spi/IntStreamDecorator.java new file mode 100644 index 0000000000..d8050af658 --- /dev/null +++ b/hibernate-core/src/main/java/org/hibernate/query/spi/IntStreamDecorator.java @@ -0,0 +1,333 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.query.spi; + +import java.util.IntSummaryStatistics; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.IntBinaryOperator; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; +import java.util.function.IntToDoubleFunction; +import java.util.function.IntToLongFunction; +import java.util.function.IntUnaryOperator; +import java.util.function.ObjIntConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import org.hibernate.Incubating; + +/** + * The {@link IntStreamDecorator} wraps a Java {@link IntStream} and registers a {@code closeHandler} + * which is passed further to any resulting {@link Stream}. + *

+ * The goal of the {@link IntStreamDecorator} is to close the underlying {@link IntStream} upon + * calling a terminal operation. + * + * @author Vlad Mihalcea + * @since 5.4 + */ +@Incubating +public class IntStreamDecorator implements IntStream { + + private final IntStream delegate; + + private Runnable closeHandler; + + public IntStreamDecorator( + IntStream delegate, + Runnable closeHandler) { + this.delegate = delegate; + this.closeHandler = closeHandler; + this.delegate.onClose( closeHandler ); + } + + @Override + public IntStream filter(IntPredicate predicate) { + return new IntStreamDecorator( + delegate.filter( predicate ), + closeHandler + ); + } + + @Override + public IntStream map(IntUnaryOperator mapper) { + return new IntStreamDecorator( + delegate.map( mapper ), + closeHandler + ); + } + + @Override + public Stream mapToObj(IntFunction mapper) { + return new StreamDecorator<>( + delegate.mapToObj( mapper ), + closeHandler + ); + } + + @Override + public LongStream mapToLong(IntToLongFunction mapper) { + return new LongStreamDecorator( + delegate.mapToLong( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream mapToDouble(IntToDoubleFunction mapper) { + return new DoubleStreamDecorator( + delegate.mapToDouble( mapper ), + closeHandler + ); + } + + @Override + public IntStream flatMap(IntFunction mapper) { + return new IntStreamDecorator( + delegate.flatMap( mapper ), + closeHandler + ); + } + + @Override + public IntStream distinct() { + return new IntStreamDecorator( + delegate.distinct(), + closeHandler + ); + } + + @Override + public IntStream sorted() { + return new IntStreamDecorator( + delegate.sorted(), + closeHandler + ); + } + + @Override + public IntStream peek(IntConsumer action) { + return new IntStreamDecorator( + delegate.peek( action ), + closeHandler + ); + } + + @Override + public IntStream limit(long maxSize) { + return new IntStreamDecorator( + delegate.limit( maxSize ), + closeHandler + ); + } + + @Override + public IntStream skip(long n) { + return new IntStreamDecorator( + delegate.skip( n ), + closeHandler + ); + } + + @Override + public void forEach(IntConsumer action) { + delegate.forEach( action ); + close(); + } + + @Override + public void forEachOrdered(IntConsumer action) { + delegate.forEachOrdered( action ); + close(); + } + + @Override + public int[] toArray() { + int[] result = delegate.toArray(); + close(); + return result; + } + + @Override + public int reduce(int identity, IntBinaryOperator op) { + int result = delegate.reduce( identity, op ); + close(); + return result; + } + + @Override + public OptionalInt reduce(IntBinaryOperator op) { + OptionalInt result = delegate.reduce( op ); + close(); + return result; + } + + @Override + public R collect( + Supplier supplier, ObjIntConsumer accumulator, BiConsumer combiner) { + R result = delegate.collect( supplier, accumulator, combiner ); + close(); + return result; + } + + @Override + public int sum() { + int result = delegate.sum(); + close(); + return result; + } + + @Override + public OptionalInt min() { + OptionalInt result = delegate.min(); + close(); + return result; + } + + @Override + public OptionalInt max() { + OptionalInt result = delegate.max(); + close(); + return result; + } + + @Override + public long count() { + long result = delegate.count(); + close(); + return result; + } + + @Override + public OptionalDouble average() { + OptionalDouble result = delegate.average(); + close(); + return result; + } + + @Override + public IntSummaryStatistics summaryStatistics() { + IntSummaryStatistics result = delegate.summaryStatistics(); + close(); + return result; + } + + @Override + public boolean anyMatch(IntPredicate predicate) { + boolean result = delegate.anyMatch( predicate ); + close(); + return result; + } + + @Override + public boolean allMatch(IntPredicate predicate) { + boolean result = delegate.allMatch( predicate ); + close(); + return result; + } + + @Override + public boolean noneMatch(IntPredicate predicate) { + boolean result = delegate.noneMatch( predicate ); + close(); + return result; + } + + @Override + public OptionalInt findFirst() { + OptionalInt result = delegate.findFirst(); + close(); + return result; + } + + @Override + public OptionalInt findAny() { + OptionalInt result = delegate.findAny(); + close(); + return result; + } + + @Override + public LongStream asLongStream() { + LongStream result = delegate.asLongStream(); + close(); + return result; + } + + @Override + public DoubleStream asDoubleStream() { + DoubleStream result = delegate.asDoubleStream(); + close(); + return result; + } + + @Override + public Stream boxed() { + return new StreamDecorator<>( + delegate.boxed(), + closeHandler + ); + } + + @Override + public IntStream sequential() { + return new IntStreamDecorator( + delegate.sequential(), + closeHandler + ); + } + + @Override + public IntStream parallel() { + return new IntStreamDecorator( + delegate.parallel(), + closeHandler + ); + } + + @Override + public IntStream unordered() { + return new IntStreamDecorator( + delegate.unordered(), + closeHandler + ); + } + + @Override + public IntStream onClose(Runnable closeHandler) { + this.closeHandler = closeHandler; + return this; + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public PrimitiveIterator.OfInt iterator() { + return delegate.iterator(); + } + + @Override + public Spliterator.OfInt spliterator() { + return delegate.spliterator(); + } + + @Override + public boolean isParallel() { + return delegate.isParallel(); + } +} diff --git a/hibernate-core/src/main/java/org/hibernate/query/spi/LongStreamDecorator.java b/hibernate-core/src/main/java/org/hibernate/query/spi/LongStreamDecorator.java new file mode 100644 index 0000000000..b34c020316 --- /dev/null +++ b/hibernate-core/src/main/java/org/hibernate/query/spi/LongStreamDecorator.java @@ -0,0 +1,326 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.query.spi; + +import java.util.LongSummaryStatistics; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.LongBinaryOperator; +import java.util.function.LongConsumer; +import java.util.function.LongFunction; +import java.util.function.LongPredicate; +import java.util.function.LongToDoubleFunction; +import java.util.function.LongToIntFunction; +import java.util.function.LongUnaryOperator; +import java.util.function.ObjLongConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import org.hibernate.Incubating; + +/** + * The {@link LongStreamDecorator} wraps a Java {@link LongStream} and registers a {@code closeHandler} + * which is passed further to any resulting {@link Stream}. + * + * The goal of the {@link LongStreamDecorator} is to close the underlying {@link LongStream} upon + * calling a terminal operation. + * + * @author Vlad Mihalcea + * @since 5.4 + */ +@Incubating +public class LongStreamDecorator implements LongStream { + + private final LongStream delegate; + + private Runnable closeHandler; + + public LongStreamDecorator( + LongStream delegate, + Runnable closeHandler) { + this.delegate = delegate; + this.closeHandler = closeHandler; + this.delegate.onClose( closeHandler ); + } + + @Override + public LongStream filter(LongPredicate predicate) { + return new LongStreamDecorator( + delegate.filter( predicate ), + closeHandler + ); + } + + @Override + public LongStream map(LongUnaryOperator mapper) { + return new LongStreamDecorator( + delegate.map( mapper ), + closeHandler + ); + } + + @Override + public Stream mapToObj(LongFunction mapper) { + return new StreamDecorator<>( + delegate.mapToObj( mapper ), + closeHandler + ); + } + + @Override + public IntStream mapToInt(LongToIntFunction mapper) { + return new IntStreamDecorator( + delegate.mapToInt( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream mapToDouble(LongToDoubleFunction mapper) { + return new DoubleStreamDecorator( + delegate.mapToDouble( mapper ), + closeHandler + ); + } + + @Override + public LongStream flatMap(LongFunction mapper) { + return new LongStreamDecorator( + delegate.flatMap( mapper ), + closeHandler + ); + } + + @Override + public LongStream distinct() { + return new LongStreamDecorator( + delegate.distinct(), + closeHandler + ); + } + + @Override + public LongStream sorted() { + return new LongStreamDecorator( + delegate.sorted(), + closeHandler + ); + } + + @Override + public LongStream peek(LongConsumer action) { + return new LongStreamDecorator( + delegate.peek( action ), + closeHandler + ); + } + + @Override + public LongStream limit(long maxSize) { + return new LongStreamDecorator( + delegate.limit( maxSize ), + closeHandler + ); + } + + @Override + public LongStream skip(long n) { + return new LongStreamDecorator( + delegate.skip( n ), + closeHandler + ); + } + + @Override + public void forEach(LongConsumer action) { + delegate.forEach( action ); + close(); + } + + @Override + public void forEachOrdered(LongConsumer action) { + delegate.forEachOrdered( action ); + close(); + } + + @Override + public long[] toArray() { + long[] result = delegate.toArray(); + close(); + return result; + } + + @Override + public long reduce(long identity, LongBinaryOperator op) { + long result = delegate.reduce( identity, op ); + close(); + return result; + } + + @Override + public OptionalLong reduce(LongBinaryOperator op) { + OptionalLong result = delegate.reduce( op ); + close(); + return result; + } + + @Override + public R collect( + Supplier supplier, ObjLongConsumer accumulator, BiConsumer combiner) { + R result = delegate.collect( supplier, accumulator, combiner ); + close(); + return result; + } + + @Override + public long sum() { + long result = delegate.sum(); + close(); + return result; + } + + @Override + public OptionalLong min() { + OptionalLong result = delegate.min(); + close(); + return result; + } + + @Override + public OptionalLong max() { + OptionalLong result = delegate.max(); + close(); + return result; + } + + @Override + public long count() { + long result = delegate.count(); + close(); + return result; + } + + @Override + public OptionalDouble average() { + OptionalDouble result = delegate.average(); + close(); + return result; + } + + @Override + public LongSummaryStatistics summaryStatistics() { + LongSummaryStatistics result = delegate.summaryStatistics(); + close(); + return result; + } + + @Override + public boolean anyMatch(LongPredicate predicate) { + boolean result = delegate.anyMatch(predicate); + close(); + return result; + } + + @Override + public boolean allMatch(LongPredicate predicate) { + boolean result = delegate.allMatch(predicate); + close(); + return result; + } + + @Override + public boolean noneMatch(LongPredicate predicate) { + boolean result = delegate.noneMatch(predicate); + close(); + return result; + } + + @Override + public OptionalLong findFirst() { + OptionalLong result = delegate.findFirst(); + close(); + return result; + } + + @Override + public OptionalLong findAny() { + OptionalLong result = delegate.findAny(); + close(); + return result; + } + + @Override + public DoubleStream asDoubleStream() { + DoubleStream result = delegate.asDoubleStream(); + close(); + return result; + } + + @Override + public Stream boxed() { + return new StreamDecorator<>( + delegate.boxed(), + closeHandler + ); + } + + @Override + public LongStream sequential() { + return new LongStreamDecorator( + delegate.sequential(), + closeHandler + ); + } + + @Override + public LongStream parallel() { + return new LongStreamDecorator( + delegate.parallel(), + closeHandler + ); + } + + @Override + public LongStream unordered() { + return new LongStreamDecorator( + delegate.unordered(), + closeHandler + ); + } + + @Override + public LongStream onClose(Runnable closeHandler) { + this.closeHandler = closeHandler; + return this; + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public PrimitiveIterator.OfLong iterator() { + return delegate.iterator(); + } + + @Override + public Spliterator.OfLong spliterator() { + return delegate.spliterator(); + } + + @Override + public boolean isParallel() { + return delegate.isParallel(); + } +} diff --git a/hibernate-core/src/main/java/org/hibernate/query/spi/StreamDecorator.java b/hibernate-core/src/main/java/org/hibernate/query/spi/StreamDecorator.java new file mode 100644 index 0000000000..526b4ee9b9 --- /dev/null +++ b/hibernate-core/src/main/java/org/hibernate/query/spi/StreamDecorator.java @@ -0,0 +1,340 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.query.spi; + +import java.lang.reflect.InvocationTargetException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +import org.hibernate.HibernateException; +import org.hibernate.Incubating; +import org.hibernate.internal.util.ReflectHelper; + +/** + * The {@link StreamDecorator} wraps a Java {@link Stream} and registers a {@code closeHandler} + * which is passed further to any resulting {@link Stream}. + * + * The goal of the {@link StreamDecorator} is to close the underlying {@link Stream} upon + * calling a terminal operation. + * + * @author Vlad Mihalcea + * @since 5.4 + */ +@Incubating +public class StreamDecorator implements Stream { + + private final Stream delegate; + + private Runnable closeHandler; + + public StreamDecorator( + Stream delegate, + Runnable closeHandler) { + this.delegate = delegate; + this.closeHandler = closeHandler; + this.delegate.onClose( closeHandler ); + } + + @Override + public Stream filter(Predicate predicate) { + return new StreamDecorator( delegate.filter( predicate ), closeHandler ); + } + + @Override + public Stream map(Function mapper) { + return new StreamDecorator<>( delegate.map( mapper ), closeHandler ); + } + + @Override + public IntStream mapToInt(ToIntFunction mapper) { + return new IntStreamDecorator( + delegate.mapToInt( mapper ), + closeHandler + ); + } + + @Override + public LongStream mapToLong(ToLongFunction mapper) { + return new LongStreamDecorator( + delegate.mapToLong( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream mapToDouble(ToDoubleFunction mapper) { + return new DoubleStreamDecorator( + delegate.mapToDouble( mapper ), + closeHandler + ); + } + + @Override + public Stream flatMap(Function> mapper) { + return new StreamDecorator<>( delegate.flatMap( mapper ), closeHandler ); + } + + @Override + public IntStream flatMapToInt(Function mapper) { + return new IntStreamDecorator( + delegate.flatMapToInt( mapper ), + closeHandler + ); + } + + @Override + public LongStream flatMapToLong(Function mapper) { + return new LongStreamDecorator( + delegate.flatMapToLong( mapper ), + closeHandler + ); + } + + @Override + public DoubleStream flatMapToDouble(Function mapper) { + return new DoubleStreamDecorator( + delegate.flatMapToDouble( mapper ), + closeHandler + ); + } + + @Override + public Stream distinct() { + return new StreamDecorator<>( delegate.distinct(), closeHandler ); + } + + @Override + public Stream sorted() { + return new StreamDecorator<>( delegate.sorted(), closeHandler ); + } + + @Override + public Stream sorted(Comparator comparator) { + return new StreamDecorator<>( delegate.sorted( comparator ), closeHandler ); + } + + @Override + public Stream peek(Consumer action) { + return new StreamDecorator<>( delegate.peek( action ), closeHandler ); + } + + @Override + public Stream limit(long maxSize) { + return new StreamDecorator<>( delegate.limit( maxSize ), closeHandler ); + } + + @Override + public Stream skip(long n) { + return new StreamDecorator<>( delegate.skip( n ), closeHandler ); + } + + @Override + public void forEach(Consumer action) { + delegate.forEach( action ); + close(); + } + + @Override + public void forEachOrdered(Consumer action) { + delegate.forEachOrdered( action ); + close(); + } + + @Override + public Object[] toArray() { + Object[] result = delegate.toArray(); + close(); + return result; + } + + @Override + public A[] toArray(IntFunction generator) { + A[] result = delegate.toArray( generator ); + close(); + return result; + } + + @Override + public R reduce(R identity, BinaryOperator accumulator) { + R result = delegate.reduce( identity, accumulator ); + close(); + return result; + } + + @Override + public Optional reduce(BinaryOperator accumulator) { + Optional result = delegate.reduce( accumulator ); + close(); + return result; + } + + @Override + public U reduce( + U identity, BiFunction accumulator, BinaryOperator combiner) { + U result = delegate.reduce( identity, accumulator, combiner ); + close(); + return result; + } + + @Override + public R1 collect( + Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + R1 result = delegate.collect( supplier, accumulator, combiner ); + close(); + return result; + } + + @Override + public R1 collect(Collector collector) { + R1 result = delegate.collect( collector ); + close(); + return result; + } + + @Override + public Optional min(Comparator comparator) { + Optional result = delegate.min( comparator ); + close(); + return result; + } + + @Override + public Optional max(Comparator comparator) { + Optional result = delegate.max( comparator ); + close(); + return result; + } + + @Override + public long count() { + long result = delegate.count(); + close(); + return result; + } + + @Override + public boolean anyMatch(Predicate predicate) { + boolean result = delegate.anyMatch( predicate ); + close(); + return result; + } + + @Override + public boolean allMatch(Predicate predicate) { + boolean result = delegate.allMatch( predicate ); + close(); + return result; + } + + @Override + public boolean noneMatch(Predicate predicate) { + boolean result = delegate.noneMatch( predicate ); + close(); + return result; + } + + @Override + public Optional findFirst() { + Optional result = delegate.findFirst(); + close(); + return result; + } + + @Override + public Optional findAny() { + Optional result = delegate.findAny(); + close(); + return result; + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public Spliterator spliterator() { + return delegate.spliterator(); + } + + @Override + public boolean isParallel() { + return delegate.isParallel(); + } + + @Override + public Stream sequential() { + return new StreamDecorator<>( delegate.sequential(), closeHandler ); + } + + @Override + public Stream parallel() { + return new StreamDecorator<>( delegate.parallel(), closeHandler ); + } + + @Override + public Stream unordered() { + return new StreamDecorator<>( delegate.unordered(), closeHandler ); + } + + @Override + public Stream onClose(Runnable closeHandler) { + this.closeHandler = closeHandler; + return this; + } + + @Override + public void close() { + delegate.close(); + } + + //Methods added to JDK 9 + + public Stream takeWhile(Predicate predicate) { + try { + @SuppressWarnings("unchecked") + Stream result = (Stream) + ReflectHelper.getMethod( Stream.class, "takeWhile", Predicate.class ) + .invoke( delegate, predicate ); + return new StreamDecorator<>( result, closeHandler ); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new HibernateException( e ); + } + } + + public Stream dropWhile(Predicate predicate) { + try { + @SuppressWarnings("unchecked") + Stream result = (Stream) + ReflectHelper.getMethod( Stream.class, "dropWhile", Predicate.class ) + .invoke( delegate, predicate ); + return new StreamDecorator<>( result, closeHandler ); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new HibernateException( e ); + } + } +} diff --git a/hibernate-core/src/test/java/org/hibernate/test/stream/basic/BasicStreamTest.java b/hibernate-core/src/test/java/org/hibernate/test/stream/basic/BasicStreamTest.java index 94df2adaf0..471a193899 100644 --- a/hibernate-core/src/test/java/org/hibernate/test/stream/basic/BasicStreamTest.java +++ b/hibernate-core/src/test/java/org/hibernate/test/stream/basic/BasicStreamTest.java @@ -49,7 +49,7 @@ public class BasicStreamTest extends BaseNonConfigCoreFunctionalTestCase { final Stream stream = session.createQuery( "from MyEntity", MyEntity.class ).stream(); assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( true ) ); stream.forEach( System.out::println ); - assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( true ) ); + assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( false ) ); stream.close(); assertThat( ( (SessionImplementor) session ).getJdbcCoordinator().getLogicalConnection().getResourceRegistry().hasRegisteredResources(), is( false ) ); diff --git a/hibernate-core/src/test/java/org/hibernate/test/stream/basic/JpaStreamTest.java b/hibernate-core/src/test/java/org/hibernate/test/stream/basic/JpaStreamTest.java index 0c23546988..14fefac446 100644 --- a/hibernate-core/src/test/java/org/hibernate/test/stream/basic/JpaStreamTest.java +++ b/hibernate-core/src/test/java/org/hibernate/test/stream/basic/JpaStreamTest.java @@ -6,27 +6,34 @@ */ package org.hibernate.test.stream.basic; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; -import javax.persistence.Tuple; -import javax.persistence.criteria.CriteriaBuilder; -import javax.persistence.criteria.CriteriaQuery; -import javax.persistence.criteria.Root; import org.hibernate.Session; -import org.hibernate.boot.MetadataSources; -import org.hibernate.engine.spi.SessionImplementor; +import org.hibernate.dialect.H2Dialect; +import org.hibernate.engine.jdbc.spi.JdbcCoordinator; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.internal.util.ReflectHelper; +import org.hibernate.resource.jdbc.ResourceRegistry; +import org.hibernate.testing.RequiresDialect; import org.hibernate.testing.TestForIssue; import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase; import org.junit.Test; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; import static org.hibernate.testing.junit4.ExtraAssertions.assertTyping; import static org.hibernate.testing.transaction.TransactionUtil.doInHibernate; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author Steve Ebersole @@ -70,12 +77,180 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase { } ); } + @Test + @TestForIssue( jiraKey = "HHH-13872") + @RequiresDialect(H2Dialect.class) + public void testStreamCloseOnTerminalOperation() { + doInHibernate( this::sessionFactory, session -> { + session.createQuery( "delete from MyEntity" ).executeUpdate(); + + for ( int i = 1; i <= 10; i++ ) { + MyEntity e = new MyEntity(); + e.id = i; + e.name = "Test"; + session.persist( e ); + } + } ); + + doInHibernate( this::sessionFactory, session -> { + Stream stream = session + .createQuery( "SELECT me FROM MyEntity me" ) + .getResultStream(); + + ResourceRegistry resourceRegistry = resourceRegistry(session); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + List entities = stream.collect( Collectors.toList() ) ; + assertEquals(10, entities.size()); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } ); + + doInHibernate( this::sessionFactory, session -> { + Stream stream = session + .createQuery( "SELECT me FROM MyEntity me" ) + .getResultStream(); + + ResourceRegistry resourceRegistry = resourceRegistry(session); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + int sum = stream.mapToInt( MyEntity::getId ).sum(); + assertEquals(55, sum); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } ); + + doInHibernate( this::sessionFactory, session -> { + Stream stream = session + .createQuery( "SELECT me FROM MyEntity me" ) + .getResultStream(); + + ResourceRegistry resourceRegistry = resourceRegistry(session); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + long result = stream.mapToLong( entity -> entity.id * 10 ).min().getAsLong(); + assertEquals(10, result); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } ); + + doInHibernate( this::sessionFactory, session -> { + Stream stream = session + .createQuery( "SELECT me FROM MyEntity me" ) + .getResultStream(); + + ResourceRegistry resourceRegistry = resourceRegistry(session); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + double result = stream.mapToDouble( entity -> entity.id * 0.1D ).max().getAsDouble(); + assertEquals(1, result, 0.1); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } ); + + //Test call close explicitly + doInHibernate( this::sessionFactory, session -> { + + try (Stream stream = session + .createQuery( "SELECT me.id FROM MyEntity me" ) + .getResultStream()) { + + ResourceRegistry resourceRegistry = resourceRegistry( session ); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + Object[] result = stream.sorted().skip( 5 ).limit( 5 ).toArray(); + assertEquals( 5, result.length ); + assertEquals( 6, result[0] ); + assertEquals( 10, result[4] ); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } + } ); + + //Test Java 9 Stream methods + doInHibernate( this::sessionFactory, session -> { + Method takeWhileMethod = ReflectHelper.getMethod( Stream.class, "takeWhile", Predicate.class ); + + if ( takeWhileMethod != null ) { + try (Stream stream = session + .createQuery( "SELECT me.id FROM MyEntity me" ) + .getResultStream()) { + + ResourceRegistry resourceRegistry = resourceRegistry( session ); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + Predicate predicate = id -> id <= 5; + + Stream takeWhileStream = (Stream) takeWhileMethod.invoke( stream, predicate ); + + List result = takeWhileStream.collect( Collectors.toList() ); + + assertEquals( 5, result.size() ); + assertTrue( result.contains( 1 ) ); + assertTrue( result.contains( 3 ) ); + assertTrue( result.contains( 5 ) ); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } + catch (IllegalAccessException | InvocationTargetException e) { + fail( "Could not execute takeWhile because of " + e.getMessage() ); + } + } + } ); + + doInHibernate( this::sessionFactory, session -> { + Method dropWhileMethod = ReflectHelper.getMethod( Stream.class, "dropWhile", Predicate.class ); + + if ( dropWhileMethod != null ) { + try (Stream stream = session + .createQuery( "SELECT me.id FROM MyEntity me" ) + .getResultStream()) { + + ResourceRegistry resourceRegistry = resourceRegistry( session ); + assertTrue( resourceRegistry.hasRegisteredResources() ); + + Predicate predicate = id -> id <= 5; + + Stream dropWhileStream = (Stream) dropWhileMethod.invoke( stream, predicate ); + + List result = dropWhileStream.collect( Collectors.toList() ); + + assertEquals( 5, result.size() ); + assertTrue( result.contains( 6 ) ); + assertTrue( result.contains( 8 ) ); + assertTrue( result.contains( 10 ) ); + + assertFalse( resourceRegistry.hasRegisteredResources() ); + } + catch (IllegalAccessException | InvocationTargetException e) { + fail( "Could not execute takeWhile because of " + e.getMessage() ); + } + } + } ); + } + + private ResourceRegistry resourceRegistry(Session session) { + SharedSessionContractImplementor sharedSessionContractImplementor = (SharedSessionContractImplementor) session; + JdbcCoordinator jdbcCoordinator = sharedSessionContractImplementor.getJdbcCoordinator(); + return jdbcCoordinator.getLogicalConnection().getResourceRegistry(); + } + @Entity(name = "MyEntity") @Table(name="MyEntity") public static class MyEntity { + @Id public Integer id; + public String name; + + public Integer getId() { + return id; + } + + public String getName() { + return name; + } } }