diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java new file mode 100644 index 0000000000..c25c3092c1 --- /dev/null +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java @@ -0,0 +1,89 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.cache.infinispan.access; + +import org.infinispan.commands.CommandsFactory; +import org.infinispan.factories.annotations.Inject; +import org.infinispan.factories.annotations.Start; +import org.infinispan.interceptors.base.BaseRpcInterceptor; +import org.infinispan.jmx.JmxStatisticsExposer; +import org.infinispan.jmx.annotations.DataType; +import org.infinispan.jmx.annotations.ManagedAttribute; +import org.infinispan.jmx.annotations.ManagedOperation; +import org.infinispan.jmx.annotations.MeasurementType; +import org.infinispan.jmx.annotations.Parameter; +import org.infinispan.remoting.inboundhandler.DeliverOrder; +import org.infinispan.remoting.rpc.ResponseMode; +import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.remoting.transport.Address; +import org.infinispan.statetransfer.StateTransferManager; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer { + private final AtomicLong invalidations = new AtomicLong(0); + protected CommandsFactory commandsFactory; + protected StateTransferManager stateTransferManager; + protected boolean statisticsEnabled; + protected RpcOptions syncRpcOptions; + protected RpcOptions asyncRpcOptions; + + @Inject + public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager) { + this.commandsFactory = commandsFactory; + this.stateTransferManager = stateTransferManager; + } + + @Start + private void start() { + this.setStatisticsEnabled(cacheConfiguration.jmxStatistics().enabled()); + syncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build(); + asyncRpcOptions = rpcManager.getDefaultRpcOptions(false); + } + + @ManagedOperation( + description = "Resets statistics gathered by this component", + displayName = "Reset statistics" + ) + public void resetStatistics() { + invalidations.set(0); + } + + @ManagedAttribute( + displayName = "Statistics enabled", + description = "Enables or disables the gathering of statistics by this component", + dataType = DataType.TRAIT, + writable = true + ) + public boolean getStatisticsEnabled() { + return this.statisticsEnabled; + } + + public void setStatisticsEnabled(@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) { + this.statisticsEnabled = enabled; + } + + @ManagedAttribute( + description = "Number of invalidations", + displayName = "Number of invalidations", + measurementType = MeasurementType.TRENDSUP + ) + public long getInvalidations() { + return invalidations.get(); + } + + protected void incrementInvalidations() { + if (statisticsEnabled) { + invalidations.incrementAndGet(); + } + } + + protected List
getMembers() { + return stateTransferManager.getCacheTopology().getMembers(); + } +} diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java index 9126da0603..2ca51196f3 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java @@ -8,7 +8,6 @@ package org.hibernate.cache.infinispan.access; import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; -import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.InvalidateCommand; @@ -21,18 +20,8 @@ import org.infinispan.commons.util.InfinispanCollections; import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; -import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.InvalidationInterceptor; -import org.infinispan.interceptors.base.BaseRpcInterceptor; -import org.infinispan.jmx.JmxStatisticsExposer; -import org.infinispan.jmx.annotations.DataType; import org.infinispan.jmx.annotations.MBean; -import org.infinispan.jmx.annotations.ManagedAttribute; -import org.infinispan.jmx.annotations.ManagedOperation; -import org.infinispan.jmx.annotations.MeasurementType; -import org.infinispan.jmx.annotations.Parameter; - -import java.util.concurrent.atomic.AtomicLong; /** * This interceptor should completely replace default InvalidationInterceptor. @@ -45,12 +34,9 @@ import java.util.concurrent.atomic.AtomicLong; * @author Galder ZamarreƱo */ @MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.") -public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer { - private final AtomicLong invalidations = new AtomicLong(0); +public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor { private final PutFromLoadValidator putFromLoadValidator; - private CommandsFactory commandsFactory; private CacheCommandInitializer commandInitializer; - private boolean statisticsEnabled; private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(InvalidationInterceptor.class); @@ -59,16 +45,10 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements } @Inject - public void injectDependencies(CommandsFactory commandsFactory, CacheCommandInitializer commandInitializer) { - this.commandsFactory = commandsFactory; + public void injectDependencies(CacheCommandInitializer commandInitializer) { this.commandInitializer = commandInitializer; } - @Start - private void start() { - this.setStatisticsEnabled(cacheConfiguration.jmxStatistics().enabled()); - } - @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if (!isPutForExternalRead(command)) { @@ -93,7 +73,7 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements if (!isLocalModeForced(command)) { // just broadcast the clear command - this is simplest! if (ctx.isOriginLocal()) { - rpcManager.invokeRemotely(null, command, rpcManager.getDefaultRpcOptions(defaultSynchronous)); + rpcManager.invokeRemotely(getMembers(), command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions); } } return retval; @@ -132,13 +112,7 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); } - rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(isSynchronous(command))); - } - } - - private void incrementInvalidations() { - if (statisticsEnabled) { - invalidations.incrementAndGet(); + rpcManager.invokeRemotely(getMembers(), invalidateCommand, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions); } } @@ -150,35 +124,4 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements return false; } - @ManagedOperation( - description = "Resets statistics gathered by this component", - displayName = "Reset statistics" - ) - public void resetStatistics() { - invalidations.set(0); - } - - @ManagedAttribute( - displayName = "Statistics enabled", - description = "Enables or disables the gathering of statistics by this component", - dataType = DataType.TRAIT, - writable = true - ) - public boolean getStatisticsEnabled() { - return this.statisticsEnabled; - } - - public void setStatisticsEnabled(@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) { - this.statisticsEnabled = enabled; - } - - @ManagedAttribute( - description = "Number of invalidations", - displayName = "Number of invalidations", - measurementType = MeasurementType.TRENDSUP - ) - public long getInvalidations() { - return invalidations.get(); - } - } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java index e1855a250a..1340971a80 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java @@ -13,9 +13,16 @@ import org.hibernate.cache.infinispan.util.EndInvalidationCommand; import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; +import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.base.BaseCustomInterceptor; import org.infinispan.remoting.inboundhandler.DeliverOrder; +import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.remoting.transport.Address; +import org.infinispan.statetransfer.StateTransferManager; + +import java.util.List; /** * Non-transactional counterpart of {@link TxPutFromLoadInterceptor}. @@ -30,6 +37,8 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { private final PutFromLoadValidator putFromLoadValidator; private CacheCommandInitializer commandInitializer; private RpcManager rpcManager; + private StateTransferManager stateTransferManager; + private RpcOptions asyncUnordered; public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) { this.putFromLoadValidator = putFromLoadValidator; @@ -37,9 +46,15 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { } @Inject - public void injectDependencies(CacheCommandInitializer commandInitializer, RpcManager rpcManager) { + public void injectDependencies(CacheCommandInitializer commandInitializer, RpcManager rpcManager, StateTransferManager stateTransferManager) { this.commandInitializer = commandInitializer; this.rpcManager = rpcManager; + this.stateTransferManager = stateTransferManager; + } + + @Start + public void start() { + asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build(); } @Override @@ -56,6 +71,7 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { assert sessionTransactionId != null; EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand( cacheName, keys, sessionTransactionId); - rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE)); + List
members = stateTransferManager.getCacheTopology().getMembers(); + rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered); } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java index 9e241900c8..b0720ed3c7 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java @@ -11,12 +11,10 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; import org.infinispan.commands.AbstractVisitor; -import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.FlagAffectedCommand; import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.control.LockControlCommand; @@ -33,16 +31,8 @@ import org.infinispan.context.Flag; import org.infinispan.context.InvocationContext; import org.infinispan.context.impl.LocalTxInvocationContext; import org.infinispan.context.impl.TxInvocationContext; -import org.infinispan.factories.annotations.Inject; -import org.infinispan.factories.annotations.Start; -import org.infinispan.interceptors.base.BaseRpcInterceptor; -import org.infinispan.jmx.JmxStatisticsExposer; -import org.infinispan.jmx.annotations.DataType; import org.infinispan.jmx.annotations.MBean; -import org.infinispan.jmx.annotations.ManagedAttribute; -import org.infinispan.jmx.annotations.ManagedOperation; -import org.infinispan.jmx.annotations.MeasurementType; -import org.infinispan.jmx.annotations.Parameter; +import org.infinispan.remoting.transport.Address; /** * This interceptor acts as a replacement to the replication interceptor when the CacheImpl is configured with @@ -58,24 +48,9 @@ import org.infinispan.jmx.annotations.Parameter; * @since 4.0 */ @MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.") -public class TxInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer { - - private final AtomicLong invalidations = new AtomicLong( 0 ); - private CommandsFactory commandsFactory; - private boolean statisticsEnabled; - +public class TxInvalidationInterceptor extends BaseInvalidationInterceptor { private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog( TxInvalidationInterceptor.class ); - @Inject - public void injectDependencies(CommandsFactory commandsFactory) { - this.commandsFactory = commandsFactory; - } - - @Start - private void start() { - this.setStatisticsEnabled( cacheConfiguration.jmxStatistics().enabled() ); - } - @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if ( !isPutForExternalRead( command ) ) { @@ -100,7 +75,7 @@ public class TxInvalidationInterceptor extends BaseRpcInterceptor implements Jmx if ( !isLocalModeForced( command ) ) { // just broadcast the clear command - this is simplest! if ( ctx.isOriginLocal() ) { - rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( defaultSynchronous ) ); + rpcManager.invokeRemotely( getMembers(), command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions ); } } return retval; @@ -137,8 +112,9 @@ public class TxInvalidationInterceptor extends BaseRpcInterceptor implements Jmx if ( ctx.isOriginLocal() ) { //unlock will happen async as it is a best effort boolean sync = !command.isUnlock(); - ( (LocalTxInvocationContext) ctx ).remoteLocksAcquired( rpcManager.getTransport().getMembers() ); - rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( sync ) ); + List
members = getMembers(); + ( (LocalTxInvocationContext) ctx ).remoteLocksAcquired(members); + rpcManager.invokeRemotely(members, command, sync ? syncRpcOptions : asyncRpcOptions ); } return retVal; } @@ -244,13 +220,7 @@ public class TxInvalidationInterceptor extends BaseRpcInterceptor implements Jmx // but this does not impact consistency and the speed benefit is worth it. command = commandsFactory.buildPrepareCommand( txCtx.getGlobalTransaction(), Collections.singletonList( invalidateCommand ), true ); } - rpcManager.invokeRemotely( null, command, rpcManager.getDefaultRpcOptions( synchronous ) ); - } - - private void incrementInvalidations() { - if ( statisticsEnabled ) { - invalidations.incrementAndGet(); - } + rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions ); } private boolean isPutForExternalRead(FlagAffectedCommand command) { @@ -260,36 +230,4 @@ public class TxInvalidationInterceptor extends BaseRpcInterceptor implements Jmx } return false; } - - @ManagedOperation( - description = "Resets statistics gathered by this component", - displayName = "Reset statistics" - ) - public void resetStatistics() { - invalidations.set( 0 ); - } - - @ManagedAttribute( - displayName = "Statistics enabled", - description = "Enables or disables the gathering of statistics by this component", - dataType = DataType.TRAIT, - writable = true - ) - public boolean getStatisticsEnabled() { - return this.statisticsEnabled; - } - - public void setStatisticsEnabled( - @Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) { - this.statisticsEnabled = enabled; - } - - @ManagedAttribute( - description = "Number of invalidations", - displayName = "Number of invalidations", - measurementType = MeasurementType.TRENDSUP - ) - public long getInvalidations() { - return invalidations.get(); - } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java index b5c28cd7a6..6552b9230c 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java @@ -6,6 +6,7 @@ */ package org.hibernate.cache.infinispan.access; +import java.util.List; import java.util.Set; import org.hibernate.cache.infinispan.util.CacheCommandInitializer; @@ -21,9 +22,14 @@ import org.infinispan.commands.write.WriteCommand; import org.infinispan.container.DataContainer; import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.factories.annotations.Inject; +import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.base.BaseRpcInterceptor; import org.infinispan.remoting.inboundhandler.DeliverOrder; +import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.remoting.transport.Address; +import org.infinispan.statetransfer.StateTransferManager; /** * Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} @@ -40,6 +46,8 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor { private RpcManager rpcManager; private CacheCommandInitializer cacheCommandInitializer; private DataContainer dataContainer; + private StateTransferManager stateTransferManager; + private RpcOptions asyncUnordered; public TxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) { this.putFromLoadValidator = putFromLoadValidator; @@ -47,10 +55,16 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor { } @Inject - public void injectDependencies(RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer, DataContainer dataContainer) { + public void injectDependencies(RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer, DataContainer dataContainer, StateTransferManager stateTransferManager) { this.rpcManager = rpcManager; this.cacheCommandInitializer = cacheCommandInitializer; this.dataContainer = dataContainer; + this.stateTransferManager = stateTransferManager; + } + + @Start + public void start() { + asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build(); } // We need to intercept PrepareCommand, not InvalidateCommand since the interception takes @@ -135,7 +149,8 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor { if (!affectedKeys.isEmpty()) { EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand( cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction()); - rpcManager.invokeRemotely(null, commitCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE)); + List
members = stateTransferManager.getCacheTopology().getMembers(); + rpcManager.invokeRemotely(members, commitCommand, asyncUnordered); } } }