diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java
index c25c3092c1..a805f0b30c 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/BaseInvalidationInterceptor.java
@@ -6,7 +6,10 @@
*/
package org.hibernate.cache.infinispan.access;
+import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.FlagAffectedCommand;
+import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
@@ -29,14 +32,16 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
private final AtomicLong invalidations = new AtomicLong(0);
protected CommandsFactory commandsFactory;
protected StateTransferManager stateTransferManager;
+ protected String cacheName;
protected boolean statisticsEnabled;
protected RpcOptions syncRpcOptions;
protected RpcOptions asyncRpcOptions;
@Inject
- public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager) {
+ public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) {
this.commandsFactory = commandsFactory;
this.stateTransferManager = stateTransferManager;
+ this.cacheName = cache.getName();
}
@Start
@@ -86,4 +91,11 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
protected List
getMembers() {
return stateTransferManager.getCacheTopology().getMembers();
}
+
+ protected boolean isPutForExternalRead(FlagAffectedCommand command) {
+ if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java
index ad85ba814e..6bf32a0105 100755
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java
@@ -122,9 +122,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
- if ( !putValidator.beginInvalidatingKey(session, key)) {
- throw log.failedInvalidatePendingPut(key, region.getName());
- }
putValidator.setCurrentSession(session);
try {
// We update whether or not the region is valid. Other nodes
@@ -173,8 +170,5 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
- if ( !putValidator.endInvalidatingKey(session, key) ) {
- log.failedEndInvalidating(key, region.getName());
- }
}
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationSynchronization.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationSynchronization.java
index 0ca6e93969..a7723472e6 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationSynchronization.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationSynchronization.java
@@ -6,7 +6,7 @@
*/
package org.hibernate.cache.infinispan.access;
-import java.util.UUID;
+import javax.transaction.Status;
/**
* Synchronization that should release the locks after invalidation is complete.
@@ -14,13 +14,14 @@ import java.util.UUID;
* @author Radim Vansa <rvansa@redhat.com>
*/
public class InvalidationSynchronization implements javax.transaction.Synchronization {
- public final UUID uuid = UUID.randomUUID();
+ public final Object lockOwner;
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
- private final Object[] keys;
+ private final Object key;
- public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
+ public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object key, Object lockOwner) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
- this.keys = keys;
+ this.key = key;
+ this.lockOwner = lockOwner;
}
@Override
@@ -28,6 +29,6 @@ public class InvalidationSynchronization implements javax.transaction.Synchroniz
@Override
public void afterCompletion(int status) {
- nonTxPutFromLoadInterceptor.broadcastEndInvalidationCommand(keys, uuid);
+ nonTxPutFromLoadInterceptor.endInvalidating(key, lockOwner, status == Status.STATUS_COMMITTED || status == Status.STATUS_COMMITTING);
}
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationCacheAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationCacheAccessDelegate.java
index c1a43a59e7..ac8a7bd4d7 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationCacheAccessDelegate.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationCacheAccessDelegate.java
@@ -37,12 +37,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
- if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
- throw log.failedInvalidatePendingPut(key, region.getName());
- }
putValidator.setCurrentSession(session);
try {
- writeCache.remove(key);
+ // NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
+ // we must publish the new value only after invalidation ends.
+ writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
@@ -61,12 +60,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
- if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
- throw log.failedInvalidatePendingPut(key, region.getName());
- }
putValidator.setCurrentSession(session);
try {
- writeCache.remove(key);
+ // NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
+ // we must publish the new value only after invalidation ends.
+ writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
@@ -74,53 +72,15 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
return true;
}
- protected boolean isCommitted(SessionImplementor session) {
- if (session.isClosed()) {
- // If the session has been closed before transaction ends, so we cannot find out
- // if the transaction was successful and if we can do the PFER.
- // As this can happen only in JTA environment, we can query the TransactionManager
- // directly here.
- TransactionManager tm = region.getTransactionManager();
- if (tm != null) {
- try {
- switch (tm.getStatus()) {
- case Status.STATUS_COMMITTED:
- case Status.STATUS_COMMITTING:
- return true;
- default:
- return false;
- }
- }
- catch (SystemException e) {
- log.debug("Failed to retrieve transaction status", e);
- return false;
- }
- }
- }
- TransactionCoordinator tc = session.getTransactionCoordinator();
- return tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
- }
-
- @Override
- public void unlockItem(SessionImplementor session, Object key) throws CacheException {
- if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
- log.failedEndInvalidating(key, region.getName());
- }
- }
-
@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
- if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
- log.failedEndInvalidating(key, region.getName());
- }
+ // endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
return false;
}
@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
- if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
- log.failedEndInvalidating(key, region.getName());
- }
+ // endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
return false;
}
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
index 2ca51196f3..1306c44020 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
@@ -8,7 +8,6 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
-import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
@@ -16,12 +15,14 @@ import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
-import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.jmx.annotations.MBean;
+import org.infinispan.util.concurrent.locks.RemoteLockCommand;
+
+import java.util.Collections;
/**
* This interceptor should completely replace default InvalidationInterceptor.
@@ -51,20 +52,47 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
- if (!isPutForExternalRead(command)) {
- return handleInvalidate(ctx, command, new Object[] { command.getKey() });
+ if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
+ return invokeNextInterceptor(ctx, command);
+ }
+ else {
+ boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
+ if (!isTransactional) {
+ throw new IllegalStateException("Put executed without transaction!");
+ }
+ if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) {
+ log.failedInvalidatePendingPut(command.getKey(), cacheName);
+ }
+ RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlags());
+ Object retval = invokeNextInterceptor(ctx, removeCommand);
+ if (command.isSuccessful()) {
+ invalidateAcrossCluster(command, isTransactional, command.getKey());
+ }
+ return retval;
}
- return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
- return handleInvalidate(ctx, command, new Object[] { command.getKey() });
+ throw new UnsupportedOperationException("Unexpected replace");
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
- return handleInvalidate(ctx, command, new Object[] { command.getKey() });
+ boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
+ if (isTransactional) {
+ if (!putFromLoadValidator.beginInvalidatingKey(command.getKeyLockOwner(), command.getKey())) {
+ log.failedInvalidatePendingPut(command.getKey(), cacheName);
+ }
+ }
+ else {
+ log.trace("This is an eviction, not invalidating anything");
+ }
+ Object retval = invokeNextInterceptor(ctx, command);
+ if (command.isSuccessful()) {
+ invalidateAcrossCluster(command, isTransactional, command.getKey());
+ }
+ return retval;
}
@Override
@@ -81,32 +109,20 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
- if (!isPutForExternalRead(command)) {
- return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
- }
- return invokeNextInterceptor(ctx, command);
+ throw new UnsupportedOperationException("Unexpected putAll");
}
- private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable {
- Object retval = invokeNextInterceptor(ctx, command);
- if (command.isSuccessful() && keys != null && keys.length != 0) {
- invalidateAcrossCluster(command, keys);
- }
- return retval;
- }
-
- private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
+ private void invalidateAcrossCluster(T command, boolean isTransactional, Object key) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand invalidateCommand;
- Object sessionTransactionId = putFromLoadValidator.registerRemoteInvalidations(keys);
if (!isLocalModeForced(command)) {
- if (sessionTransactionId == null) {
- invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.emptySet(), keys);
+ if (isTransactional) {
+ invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
+ Collections.emptySet(), new Object[] { key }, command.getKeyLockOwner());
}
else {
- invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
- InfinispanCollections.emptySet(), keys, sessionTransactionId);
+ invalidateCommand = commandsFactory.buildInvalidateCommand(Collections.emptySet(), new Object[] { key });
}
if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
@@ -116,12 +132,4 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
}
}
- private boolean isPutForExternalRead(FlagAffectedCommand command) {
- if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
- log.trace("Put for external read called. Suppressing clustered invalidation.");
- return true;
- }
- return false;
- }
-
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
index 09d28fe852..1d5384774c 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
@@ -9,6 +9,8 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
+
+import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
@@ -32,6 +34,7 @@ import java.util.List;
* @author Radim Vansa <rvansa@redhat.com>
*/
public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
+ private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class);
private final String cacheName;
private final PutFromLoadValidator putFromLoadValidator;
private CacheCommandInitializer commandInitializer;
@@ -60,16 +63,20 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) {
- putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getSessionTransactionId(), key);
+ putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
}
}
return invokeNextInterceptor(ctx, command);
}
- public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransactionId) {
- assert sessionTransactionId != null;
+ public void endInvalidating(Object key, Object lockOwner, boolean successful) {
+ assert lockOwner != null;
+ if (!putFromLoadValidator.endInvalidatingKey(lockOwner, key, successful)) {
+ log.failedEndInvalidating(key, cacheName);
+ }
+
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
- cacheName, keys, sessionTransactionId);
+ cacheName, new Object[] { key }, lockOwner);
List members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered);
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
index a6bf5d26bc..c54bbbe21a 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
@@ -7,7 +7,6 @@
package org.hibernate.cache.infinispan.access;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -622,19 +621,19 @@ public class PutFromLoadValidator {
}
}
- public Object registerRemoteInvalidations(Object[] keys) {
+ public boolean registerRemoteInvalidation(Object key, Object lockOwner) {
SessionImplementor session = currentSession.get();
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
if (trace) {
- log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
+ log.tracef("Registering synchronization on transaction in %s, cache %s: %s", lockOwnerToString(session), cache.getName(), key);
}
- InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys);
+ InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, key, lockOwner);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
- return sync.uuid;
+ return true;
}
// evict() command is not executed in session context
- return null;
+ return false;
}
// ---------------------------------------------------------------- Private
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationCacheAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationCacheAccessDelegate.java
index a095d59026..de79d71fad 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationCacheAccessDelegate.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationCacheAccessDelegate.java
@@ -31,16 +31,9 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
- if ( !putValidator.beginInvalidatingKey(session, key)) {
- throw log.failedInvalidatePendingPut(key, region.getName());
- }
- putValidator.setCurrentSession(session);
- try {
- writeCache.put(key, value);
- }
- finally {
- putValidator.resetCurrentSession();
- }
+
+ // The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
+ writeCache.put(key, value);
return true;
}
@@ -55,32 +48,21 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
- if ( !putValidator.beginInvalidatingKey(session, key)) {
- log.failedInvalidatePendingPut(key, region.getName());
- }
- putValidator.setCurrentSession(session);
- try {
- writeCache.put(key, value);
- }
- finally {
- putValidator.resetCurrentSession();
- }
+
+ // The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
+ writeCache.put(key, value);
return true;
}
@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
- if ( !putValidator.endInvalidatingKey(session, key) ) {
- log.failedEndInvalidating(key, region.getName());
- }
+ // The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
return false;
}
@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
- if ( !putValidator.endInvalidatingKey(session, key) ) {
- log.failedEndInvalidating(key, region.getName());
- }
+ // The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
return false;
}
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java
index eb9dbf2450..3e0d324a07 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationInterceptor.java
@@ -82,8 +82,7 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
- Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
- return handleInvalidate( ctx, command, keys );
+ return handleInvalidate( ctx, command, command.getMap().keySet().toArray() );
}
@Override
@@ -221,12 +220,4 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
}
rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions );
}
-
- private boolean isPutForExternalRead(FlagAffectedCommand command) {
- if ( command.hasFlag( Flag.PUT_FOR_EXTERNAL_READ ) ) {
- log.trace( "Put for external read called. Suppressing clustered invalidation." );
- return true;
- }
- return false;
- }
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java
index d7c12294bc..26109e48bd 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxPutFromLoadInterceptor.java
@@ -13,9 +13,12 @@ import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
-import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.DataContainer;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -26,6 +29,7 @@ import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager;
+import org.infinispan.transaction.xa.GlobalTransaction;
import java.util.Set;
import java.util.List;
@@ -66,6 +70,31 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
}
+ private void beginInvalidating(InvocationContext ctx, Object key) {
+ TxInvocationContext txCtx = (TxInvocationContext) ctx;
+ // make sure that the command is registered in the transaction
+ txCtx.addAffectedKey(key);
+
+ GlobalTransaction globalTransaction = txCtx.getGlobalTransaction();
+ if (!putFromLoadValidator.beginInvalidatingKey(globalTransaction, key)) {
+ log.failedInvalidatePendingPut(key, cacheName);
+ }
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+ if (!command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
+ beginInvalidating(ctx, command.getKey());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+ beginInvalidating(ctx, command.getKey());
+ return invokeNextInterceptor(ctx, command);
+ }
+
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor
@@ -79,38 +108,19 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
// us against concurrent modification of the collection. Therefore, we need to remove the entry
// here (even without lock!) and let possible update happen in commit phase.
for (WriteCommand wc : command.getModifications()) {
- if (wc instanceof InvalidateCommand) {
- // ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
- for (Object key : ((InvalidateCommand) wc).getKeys()) {
- dataContainer.remove(key);
- }
- }
- else {
- for (Object key : wc.getAffectedKeys()) {
- dataContainer.remove(key);
- }
+ for (Object key : wc.getAffectedKeys()) {
+ dataContainer.remove(key);
}
}
}
else {
for (WriteCommand wc : command.getModifications()) {
- if (wc instanceof InvalidateCommand) {
- // ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
- for (Object key : ((InvalidateCommand) wc).getKeys()) {
- if (log.isTraceEnabled()) {
- log.tracef("Invalidating key %s with lock owner %s", key, ctx.getLockOwner());
- }
- putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
- }
+ Set