HHH-14449 : ResultStream closing is not properly handled

This commit is contained in:
Gautham Kumar 2021-03-17 18:25:56 +05:30 committed by Steve Ebersole
parent 5324db9f25
commit b4bf58188c
7 changed files with 338 additions and 49 deletions

View File

@ -1584,12 +1584,10 @@ public abstract class AbstractProducedQuery<R> implements QueryImplementor<R> {
final ScrollableResultsIterator<R> iterator = new ScrollableResultsIterator<>( scrollableResults );
final Spliterator<R> spliterator = Spliterators.spliteratorUnknownSize( iterator, Spliterator.NONNULL );
final Stream<R> stream = new StreamDecorator(
return new StreamDecorator<>(
StreamSupport.stream( spliterator, false ),
scrollableResults::close
iterator::close
);
return stream;
}
@Override

View File

@ -41,15 +41,13 @@ import org.hibernate.Incubating;
public class DoubleStreamDecorator implements DoubleStream {
private final DoubleStream delegate;
private Runnable closeHandler;
private final Runnable closeHandler;
public DoubleStreamDecorator(
DoubleStream delegate,
Runnable closeHandler) {
this.delegate = delegate;
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
this.delegate = delegate.onClose( closeHandler );
}
@Override
@ -292,7 +290,7 @@ public class DoubleStreamDecorator implements DoubleStream {
@Override
public DoubleStream onClose(Runnable closeHandler) {
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
return this;
}

View File

@ -42,15 +42,13 @@ import org.hibernate.Incubating;
public class IntStreamDecorator implements IntStream {
private final IntStream delegate;
private Runnable closeHandler;
private final Runnable closeHandler;
public IntStreamDecorator(
IntStream delegate,
Runnable closeHandler) {
this.delegate = delegate;
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
this.delegate = delegate.onClose( closeHandler );
}
@Override
@ -307,7 +305,7 @@ public class IntStreamDecorator implements IntStream {
@Override
public IntStream onClose(Runnable closeHandler) {
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
return this;
}

View File

@ -42,15 +42,13 @@ import org.hibernate.Incubating;
public class LongStreamDecorator implements LongStream {
private final LongStream delegate;
private Runnable closeHandler;
private final Runnable closeHandler;
public LongStreamDecorator(
LongStream delegate,
Runnable closeHandler) {
this.delegate = delegate;
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
this.delegate = delegate.onClose( closeHandler );
}
@Override
@ -226,21 +224,21 @@ public class LongStreamDecorator implements LongStream {
@Override
public boolean anyMatch(LongPredicate predicate) {
boolean result = delegate.anyMatch(predicate);
boolean result = delegate.anyMatch( predicate );
close();
return result;
}
@Override
public boolean allMatch(LongPredicate predicate) {
boolean result = delegate.allMatch(predicate);
boolean result = delegate.allMatch( predicate );
close();
return result;
}
@Override
public boolean noneMatch(LongPredicate predicate) {
boolean result = delegate.noneMatch(predicate);
boolean result = delegate.noneMatch( predicate );
close();
return result;
}
@ -300,7 +298,7 @@ public class LongStreamDecorator implements LongStream {
@Override
public LongStream onClose(Runnable closeHandler) {
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
return this;
}

View File

@ -46,15 +46,13 @@ import org.hibernate.internal.util.ReflectHelper;
public class StreamDecorator<R> implements Stream<R> {
private final Stream<R> delegate;
private Runnable closeHandler;
private final Runnable closeHandler;
public StreamDecorator(
Stream<R> delegate,
Runnable closeHandler) {
this.delegate = delegate;
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
this.delegate = delegate.onClose( closeHandler );
}
@Override
@ -301,7 +299,7 @@ public class StreamDecorator<R> implements Stream<R> {
@Override
public Stream<R> onClose(Runnable closeHandler) {
this.closeHandler = closeHandler;
this.delegate.onClose( closeHandler );
return this;
}

View File

@ -6,6 +6,7 @@
*/
package org.hibernate.test.stream.basic;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.persistence.Entity;
import javax.persistence.Id;
@ -25,6 +26,7 @@ import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hibernate.testing.junit4.ExtraAssertions.assertTyping;
import static org.hibernate.testing.transaction.TransactionUtil.doInHibernate;
@ -123,6 +125,53 @@ public class BasicStreamTest extends BaseNonConfigCoreFunctionalTestCase {
} );
}
@Test
public void basicStreamTestWithExplicitOnClose() {
Session session = openSession();
session.getTransaction().begin();
AtomicInteger onCloseCount = new AtomicInteger();
// mainly we want to make sure that closing the Stream releases the ScrollableResults too
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator()
.getLogicalConnection()
.getResourceRegistry()
.hasRegisteredResources(), is( false ) );
assertThat( onCloseCount.get(), equalTo( 0 ) );
final Stream<MyEntity> stream = session.createQuery( "from MyEntity", MyEntity.class ).stream().onClose(
onCloseCount::incrementAndGet );
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator()
.getLogicalConnection()
.getResourceRegistry()
.hasRegisteredResources(), is( true ) );
assertThat( onCloseCount.get(), equalTo( 0 ) );
stream.forEach( System.out::println );
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator()
.getLogicalConnection()
.getResourceRegistry()
.hasRegisteredResources(), is( false ) );
assertThat( onCloseCount.get(), equalTo( 1 ) );
stream.close();
assertThat( ( (SessionImplementor) session ).getJdbcCoordinator()
.getLogicalConnection()
.getResourceRegistry()
.hasRegisteredResources(), is( false ) );
assertThat( onCloseCount.get(), equalTo( 1 ) );
session.getTransaction().commit();
session.close();
}
@Entity(name = "MyEntity")
@Table(name="MyEntity")
public static class MyEntity {

View File

@ -8,7 +8,12 @@ package org.hibernate.test.stream.basic;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -28,6 +33,7 @@ import org.hibernate.testing.TestForIssue;
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hibernate.testing.junit4.ExtraAssertions.assertTyping;
import static org.hibernate.testing.transaction.TransactionUtil.doInHibernate;
import static org.junit.Assert.assertEquals;
@ -78,8 +84,8 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
}
@Test
@TestForIssue( jiraKey = "HHH-13872")
@RequiresDialect(H2Dialect.class)
@TestForIssue( jiraKey = {"HHH-13872", "HHH-14449"})
public void testStreamCloseOnTerminalOperation() {
doInHibernate( this::sessionFactory, session -> {
session.createQuery( "delete from MyEntity" ).executeUpdate();
@ -92,10 +98,116 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
}
} );
Runnable noOp = () -> {
// do nothing
};
// run without onClose callbacks
this.runTerminalOperationTests(noOp, Collections.emptyList(), noOp, false, false);
AtomicInteger onClose1Count = new AtomicInteger();
AtomicInteger onClose2Count = new AtomicInteger();
AtomicInteger onClose3Count = new AtomicInteger();
// run with chained onClose callbacks
this.runTerminalOperationTests(
() -> {
// prepare
onClose1Count.set( 0 );
onClose2Count.set( 0 );
onClose3Count.set( 0 );
},
Arrays.asList(
onClose1Count::incrementAndGet, // onClose1 logic
onClose2Count::incrementAndGet, // onClose2 logic
onClose3Count::incrementAndGet // onClose3 logic
),
() -> {
// assertion
assertThat( onClose1Count ).hasValue( 1 );
assertThat( onClose2Count ).hasValue( 1 );
assertThat( onClose3Count ).hasValue( 1 );
},
false, // no flatMap before onClose
false // no flatMap after onClose
);
this.runTerminalOperationTests(
() -> {
// prepare
onClose1Count.set( 0 );
onClose2Count.set( 0 );
onClose3Count.set( 0 );
},
Arrays.asList(
onClose1Count::incrementAndGet, // onClose1 logic
onClose2Count::incrementAndGet, // onClose2 logic
onClose3Count::incrementAndGet // onClose3 logic
),
() -> {
// assertion
assertThat( onClose1Count ).hasValue( 1 );
assertThat( onClose2Count ).hasValue( 1 );
assertThat( onClose3Count ).hasValue( 1 );
},
true, // run a flatMap operation before onClose
false // no flatMap after onClose
);
this.runTerminalOperationTests(
() -> {
// prepare
onClose1Count.set( 0 );
onClose2Count.set( 0 );
onClose3Count.set( 0 );
},
Arrays.asList(
onClose1Count::incrementAndGet, // onClose1 logic
onClose2Count::incrementAndGet, // onClose2 logic
onClose3Count::incrementAndGet // onClose3 logic
),
() -> {
// assertion
assertThat( onClose1Count ).hasValue( 1 );
assertThat( onClose2Count ).hasValue( 1 );
assertThat( onClose3Count ).hasValue( 1 );
},
false, // no flatMap before onClose
true // run a flatMap operation after onClose
);
this.runTerminalOperationTests(
() -> {
// prepare
onClose1Count.set( 0 );
onClose2Count.set( 0 );
onClose3Count.set( 0 );
},
Arrays.asList(
onClose1Count::incrementAndGet, // onClose1 logic
onClose2Count::incrementAndGet, // onClose2 logic
onClose3Count::incrementAndGet // onClose3 logic
),
() -> {
// assertion
assertThat( onClose1Count ).hasValue( 1 );
assertThat( onClose2Count ).hasValue( 1 );
assertThat( onClose3Count ).hasValue( 1 );
},
true, // run a flatMap operation before onClose
true // run a flatMap operation after onClose
);
}
private void runTerminalOperationTests(
Runnable prepare, List<Runnable> onCloseCallbacks, Runnable onCloseAssertion,
boolean flatMapBefore, boolean flatMapAfter) {
// collect as list
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = session
.createQuery( "SELECT me FROM MyEntity me" )
.getResultStream();
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -104,12 +216,90 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertEquals(10, entities.size());
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// forEach (TestCase based on attachment EntityManagerIllustrationTest.java in HHH-14449)
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = session
.createQuery( "SELECT me FROM MyEntity me" )
.getResultStream();
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
AtomicInteger count = new AtomicInteger();
stream.forEach(myEntity -> count.incrementAndGet());
assertEquals(10, count.get());
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// filter (always true) + forEach (TestCase based on attachment EntityManagerIllustrationTest.java in HHH-14449)
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
AtomicInteger count = new AtomicInteger();
stream.filter(Objects::nonNull).forEach(myEntity -> count.incrementAndGet());
assertEquals(10, count.get());
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// filter (partially true) + forEach (TestCase based on attachment EntityManagerIllustrationTest.java in HHH-14449)
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
AtomicInteger count = new AtomicInteger();
stream.filter(entity -> entity.getId() % 2 == 0).forEach(myEntity -> count.incrementAndGet());
assertEquals(5, count.get());
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// multiple chained operations (TestCase based on attachment EntityManagerIllustrationTest.java in HHH-14449)
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
AtomicInteger count = new AtomicInteger();
stream
.filter(Objects::nonNull)
.map(Optional::of)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(myEntity -> count.incrementAndGet());
assertEquals(10, count.get());
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// mapToInt
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -118,12 +308,13 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertEquals(55, sum);
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// mapToLong
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = session
.createQuery( "SELECT me FROM MyEntity me" )
.getResultStream();
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -132,12 +323,13 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertEquals(10, result);
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
// mapToDouble
doInHibernate( this::sessionFactory, session -> {
Stream<MyEntity> stream = session
.createQuery( "SELECT me FROM MyEntity me" )
.getResultStream();
Stream<MyEntity> stream = getMyEntityStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter);
ResourceRegistry resourceRegistry = resourceRegistry(session);
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -146,14 +338,14 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertEquals(1, result, 0.1);
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
} );
//Test call close explicitly
doInHibernate( this::sessionFactory, session -> {
try (Stream<Long> stream = session
.createQuery( "SELECT me.id FROM MyEntity me" )
.getResultStream()) {
try (Stream<Long> stream = getLongStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter)) {
ResourceRegistry resourceRegistry = resourceRegistry( session );
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -164,6 +356,8 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertEquals( 10, result[4] );
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
}
} );
@ -172,9 +366,7 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
Method takeWhileMethod = ReflectHelper.getMethod( Stream.class, "takeWhile", Predicate.class );
if ( takeWhileMethod != null ) {
try (Stream<Long> stream = session
.createQuery( "SELECT me.id FROM MyEntity me" )
.getResultStream()) {
try (Stream<Long> stream = getLongStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter)) {
ResourceRegistry resourceRegistry = resourceRegistry( session );
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -191,6 +383,8 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertTrue( result.contains( 5 ) );
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
}
catch (IllegalAccessException | InvocationTargetException e) {
fail( "Could not execute takeWhile because of " + e.getMessage() );
@ -202,9 +396,7 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
Method dropWhileMethod = ReflectHelper.getMethod( Stream.class, "dropWhile", Predicate.class );
if ( dropWhileMethod != null ) {
try (Stream<Long> stream = session
.createQuery( "SELECT me.id FROM MyEntity me" )
.getResultStream()) {
try (Stream<Long> stream = getLongStream(prepare, session, onCloseCallbacks, flatMapBefore, flatMapAfter)) {
ResourceRegistry resourceRegistry = resourceRegistry( session );
assertTrue( resourceRegistry.hasRegisteredResources() );
@ -221,6 +413,8 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
assertTrue( result.contains( 10 ) );
assertFalse( resourceRegistry.hasRegisteredResources() );
onCloseAssertion.run();
}
catch (IllegalAccessException | InvocationTargetException e) {
fail( "Could not execute takeWhile because of " + e.getMessage() );
@ -229,6 +423,62 @@ public class JpaStreamTest extends BaseNonConfigCoreFunctionalTestCase {
} );
}
private static Stream<MyEntity> getMyEntityStream(
Runnable prepare,
Session session,
List<Runnable> onCloseCallbacks,
boolean flatMapBefore,
boolean flatMapAfter) {
return getStream(
prepare,
session,
"SELECT me FROM MyEntity me",
onCloseCallbacks,
flatMapBefore,
flatMapAfter
);
}
private static Stream<Long> getLongStream(
Runnable prepare,
Session session,
List<Runnable> onCloseCallbacks,
boolean flatMapBefore,
boolean flatMapAfter) {
return getStream(
prepare,
session,
"SELECT me.id FROM MyEntity me",
onCloseCallbacks,
flatMapBefore,
flatMapAfter
);
}
@SuppressWarnings("unchecked")
private static <T> Stream<T> getStream(
Runnable prepare, Session session, String queryString,
List<Runnable> onCloseCallbacks, boolean flatMapBefore, boolean flatMapAfter) {
prepare.run();
Stream<T> stream = session.createQuery( queryString ).getResultStream();
if ( flatMapBefore ) {
stream = stream.flatMap( Stream::of );
}
for ( Runnable callback : onCloseCallbacks ) {
stream = stream.onClose( callback );
}
if ( flatMapAfter ) {
stream = stream.flatMap( Stream::of );
}
return stream;
}
private ResourceRegistry resourceRegistry(Session session) {
SharedSessionContractImplementor sharedSessionContractImplementor = (SharedSessionContractImplementor) session;
JdbcCoordinator jdbcCoordinator = sharedSessionContractImplementor.getJdbcCoordinator();