diff --git a/hibernate-core/src/main/java/org/hibernate/event/service/internal/EventListenerGroupImpl.java b/hibernate-core/src/main/java/org/hibernate/event/service/internal/EventListenerGroupImpl.java index 8bdcf3710f..fb6255eb84 100644 --- a/hibernate-core/src/main/java/org/hibernate/event/service/internal/EventListenerGroupImpl.java +++ b/hibernate-core/src/main/java/org/hibernate/event/service/internal/EventListenerGroupImpl.java @@ -11,8 +11,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import org.hibernate.event.service.spi.DuplicationStrategy; @@ -33,8 +37,10 @@ import org.jboss.logging.Logger; * @author Sanne Grinovero */ class EventListenerGroupImpl implements EventListenerGroup { + private static final Logger log = Logger.getLogger( EventListenerGroupImpl.class ); private static final Set DEFAULT_DUPLICATION_STRATEGIES = Collections.unmodifiableSet( makeDefaultDuplicationStrategy() ); + private static final CompletableFuture COMPLETED = CompletableFuture.completedFuture( null ); private final EventType eventType; private final CallbackRegistry callbackRegistry; @@ -114,6 +120,54 @@ class EventListenerGroupImpl implements EventListenerGroup { } } + @Override + public CompletionStage fireEventOnEachListener( + final U event, + final Function>> fun) { + CompletionStage ret = COMPLETED; + final T[] ls = listeners; + if ( ls != null && ls.length != 0 ) { + for ( T listener : ls ) { + //to preserve atomicity of the Session methods + //call apply() from within the arg of thenCompose() + ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event ) ); + } + } + return ret; + } + + @Override + public CompletionStage fireEventOnEachListener( + U event, X param, Function>> fun) { + CompletionStage ret = COMPLETED; + final T[] ls = listeners; + if ( ls != null && ls.length != 0 ) { + for ( T listener : ls ) { + //to preserve atomicity of the Session methods + //call apply() from within the arg of thenCompose() + ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event, param ) ); + } + } + return ret; + } + + @Override + public CompletionStage fireLazyEventOnEachListener( + final Supplier eventSupplier, + final Function>> fun) { + CompletionStage ret = COMPLETED; + final T[] ls = listeners; + if ( ls != null && ls.length != 0 ) { + final U event = eventSupplier.get(); + for ( T listener : ls ) { + //to preserve atomicity of the Session methods + //call apply() from within the arg of thenCompose() + ret = ret.thenCompose( v -> fun.apply( (RL) listener ).apply( event ) ); + } + } + return ret; + } + @Override public void addDuplicationStrategy(DuplicationStrategy strategy) { if ( duplicationStrategies == DEFAULT_DUPLICATION_STRATEGIES ) { diff --git a/hibernate-core/src/main/java/org/hibernate/event/service/spi/EventListenerGroup.java b/hibernate-core/src/main/java/org/hibernate/event/service/spi/EventListenerGroup.java index df62868c48..84872efe28 100644 --- a/hibernate-core/src/main/java/org/hibernate/event/service/spi/EventListenerGroup.java +++ b/hibernate-core/src/main/java/org/hibernate/event/service/spi/EventListenerGroup.java @@ -8,7 +8,10 @@ package org.hibernate.event.service.spi; import java.io.Serializable; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import org.hibernate.Incubating; @@ -113,4 +116,53 @@ public interface EventListenerGroup extends Serializable { @Incubating void fireEventOnEachListener(final U event, X param, final EventActionWithParameter actionOnEvent); + /** + * Similar to {@link #fireEventOnEachListener(Object, Function)}, but Reactive friendly: it chains + * processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R. + * The various generic types allow using this for each concrete event type and flexible return types. + *

Used by Hibernate Reactive

+ * @param event The event being fired + * @param fun The function combining each event listener with the event + * @param the return type of the returned CompletionStage + * @param the type of the event being fired on each listener + * @param the type of ReactiveListener: each listener of type T will be casted to it. + * @return the composite completion stage of invoking fun(event) on each listener. + */ + @Incubating + CompletionStage fireEventOnEachListener(final U event, final Function>> fun); + + /** + * Similar to {@link #fireEventOnEachListener(Object, Object, Function)}, but Reactive friendly: it chains + * processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R. + * The various generic types allow using this for each concrete event type and flexible return types. + *

Used by Hibernate Reactive

+ * @param event The event being fired + * @param fun The function combining each event listener with the event + * @param the return type of the returned CompletionStage + * @param the type of the event being fired on each listener + * @param the type of ReactiveListener: each listener of type T will be casted to it. + * @param an additional parameter to be passed to the function fun + * @return the composite completion stage of invoking fun(event) on each listener. + */ + @Incubating + public CompletionStage fireEventOnEachListener(U event, X param, Function>> fun); + + /** + * Similar to {@link #fireLazyEventOnEachListener(Supplier, BiConsumer)}, but Reactive friendly: it chains + * processing of the same event on each Reactive Listener, and returns a {@link CompletionStage} of type R. + * The various generic types allow using this for each concrete event type and flexible return types. + *

This variant expects a Supplier of the event, rather than the event directly; this is useful for the + * event types which are commonly configured with no listeners at all, so to allow skipping creating the + * event; use only for event types which are known to be expensive while the listeners are commonly empty.

+ *

Used by Hibernate Reactive

+ * @param eventSupplier A supplier able to produce the actual event + * @param fun The function combining each event listener with the event + * @param the return type of the returned CompletionStage + * @param the type of the event being fired on each listener + * @param the type of ReactiveListener: each listener of type T will be casted to it. + * @return the composite completion stage of invoking fun(event) on each listener. + */ + @Incubating + CompletionStage fireLazyEventOnEachListener(final Supplier eventSupplier, final Function>> fun); + }