HHH-14621 Introduce new methods on EventListenerGroup which allow Hibernate Reactive to fire events more efficiently

This commit is contained in:
Sanne Grinovero 2021-05-18 22:05:17 +01:00 committed by Sanne Grinovero
parent 8b9b5e7e68
commit eb6c68cdc6
2 changed files with 106 additions and 0 deletions

View File

@ -11,8 +11,12 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.hibernate.event.service.spi.DuplicationStrategy; import org.hibernate.event.service.spi.DuplicationStrategy;
@ -33,8 +37,10 @@ import org.jboss.logging.Logger;
* @author Sanne Grinovero * @author Sanne Grinovero
*/ */
class EventListenerGroupImpl<T> implements EventListenerGroup<T> { class EventListenerGroupImpl<T> implements EventListenerGroup<T> {
private static final Logger log = Logger.getLogger( EventListenerGroupImpl.class ); private static final Logger log = Logger.getLogger( EventListenerGroupImpl.class );
private static final Set<DuplicationStrategy> DEFAULT_DUPLICATION_STRATEGIES = Collections.unmodifiableSet( makeDefaultDuplicationStrategy() ); private static final Set<DuplicationStrategy> DEFAULT_DUPLICATION_STRATEGIES = Collections.unmodifiableSet( makeDefaultDuplicationStrategy() );
private static final CompletableFuture COMPLETED = CompletableFuture.completedFuture( null );
private final EventType<T> eventType; private final EventType<T> eventType;
private final CallbackRegistry callbackRegistry; private final CallbackRegistry callbackRegistry;
@ -114,6 +120,54 @@ class EventListenerGroupImpl<T> implements EventListenerGroup<T> {
} }
} }
@Override
public <R, U, RL> CompletionStage<R> fireEventOnEachListener(
final U event,
final Function<RL, Function<U, CompletionStage<R>>> fun) {
CompletionStage<R> ret = COMPLETED;
final T[] ls = listeners;
if ( ls != null && ls.length != 0 ) {
for ( T listener : ls ) {
//to preserve atomicity of the Session methods
//call apply() from within the arg of thenCompose()
ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event ) );
}
}
return ret;
}
@Override
public <R, U, RL, X> CompletionStage<R> fireEventOnEachListener(
U event, X param, Function<RL, BiFunction<U, X, CompletionStage<R>>> fun) {
CompletionStage<R> ret = COMPLETED;
final T[] ls = listeners;
if ( ls != null && ls.length != 0 ) {
for ( T listener : ls ) {
//to preserve atomicity of the Session methods
//call apply() from within the arg of thenCompose()
ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event, param ) );
}
}
return ret;
}
@Override
public <R, U, RL> CompletionStage<R> fireLazyEventOnEachListener(
final Supplier<U> eventSupplier,
final Function<RL, Function<U, CompletionStage<R>>> fun) {
CompletionStage<R> ret = COMPLETED;
final T[] ls = listeners;
if ( ls != null && ls.length != 0 ) {
final U event = eventSupplier.get();
for ( T listener : ls ) {
//to preserve atomicity of the Session methods
//call apply() from within the arg of thenCompose()
ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event ) );
}
}
return ret;
}
@Override @Override
public void addDuplicationStrategy(DuplicationStrategy strategy) { public void addDuplicationStrategy(DuplicationStrategy strategy) {
if ( duplicationStrategies == DEFAULT_DUPLICATION_STRATEGIES ) { if ( duplicationStrategies == DEFAULT_DUPLICATION_STRATEGIES ) {

View File

@ -8,7 +8,10 @@ package org.hibernate.event.service.spi;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.hibernate.Incubating; import org.hibernate.Incubating;
@ -113,4 +116,53 @@ public interface EventListenerGroup<T> extends Serializable {
@Incubating @Incubating
<U,X> void fireEventOnEachListener(final U event, X param, final EventActionWithParameter<T,U,X> actionOnEvent); <U,X> void fireEventOnEachListener(final U event, X param, final EventActionWithParameter<T,U,X> actionOnEvent);
/**
* Similar to {@link #fireEventOnEachListener(Object, Function)}, but Reactive friendly: it chains
* processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R.
* The various generic types allow using this for each concrete event type and flexible return types.
* <p>Used by Hibernate Reactive</p>
* @param event The event being fired
* @param fun The function combining each event listener with the event
* @param <R> the return type of the returned CompletionStage
* @param <U> the type of the event being fired on each listener
* @param <RL> the type of ReactiveListener: each listener of type T will be casted to it.
* @return the composite completion stage of invoking fun(event) on each listener.
*/
@Incubating
<R, U, RL> CompletionStage<R> fireEventOnEachListener(final U event, final Function<RL, Function<U, CompletionStage<R>>> fun);
/**
* Similar to {@link #fireEventOnEachListener(Object, Object, Function)}, but Reactive friendly: it chains
* processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R.
* The various generic types allow using this for each concrete event type and flexible return types.
* <p>Used by Hibernate Reactive</p>
* @param event The event being fired
* @param fun The function combining each event listener with the event
* @param <R> the return type of the returned CompletionStage
* @param <U> the type of the event being fired on each listener
* @param <RL> the type of ReactiveListener: each listener of type T will be casted to it.
* @param <X> an additional parameter to be passed to the function fun
* @return the composite completion stage of invoking fun(event) on each listener.
*/
@Incubating
public <R, U, RL, X> CompletionStage<R> fireEventOnEachListener(U event, X param, Function<RL, BiFunction<U, X, CompletionStage<R>>> fun);
/**
* Similar to {@link #fireLazyEventOnEachListener(Supplier, BiConsumer)}, but Reactive friendly: it chains
* processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R.
* The various generic types allow using this for each concrete event type and flexible return types.
* <p>This variant expects a Supplier of the event, rather than the event directly; this is useful for the
* event types which are commonly configured with no listeners at all, so to allow skipping creating the
* event; use only for event types which are known to be expensive while the listeners are commonly empty.</p>
* <p>Used by Hibernate Reactive</p>
* @param eventSupplier A supplier able to produce the actual event
* @param fun The function combining each event listener with the event
* @param <R> the return type of the returned CompletionStage
* @param <U> the type of the event being fired on each listener
* @param <RL> the type of ReactiveListener: each listener of type T will be casted to it.
* @return the composite completion stage of invoking fun(event) on each listener.
*/
@Incubating
<R, U, RL> CompletionStage<R> fireLazyEventOnEachListener(final Supplier<U> eventSupplier, final Function<RL, Function<U, CompletionStage<R>>> fun);
} }