HHH-11304 Invalidations are not cleared when transaction rolls back

* always use global transaction id (in transactional caches) or command invocation id (in non-transactional caches) to identifiy the invalidator
* don't use afterInvoke/afterUpdate/unlockItem to end invalidation as this is not called during rollback
** use Infinispan transaction handling or explicitly registered invalidation to hook into the process
** move invalidation calls to interceptor stack where we have the identifiers
* don't use deprecated methods for commands marshalling

(cherry picked from commit 294ba74c76)

Conflicts:
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/InvalidationCacheAccessDelegate.java
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationCacheAccessDelegate.java
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxInvalidationCacheAccessDelegate.java
	hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java
	hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/InvalidationTest.java
This commit is contained in:
Radim Vansa 2016-12-06 18:23:25 +01:00 committed by Gail Badner
parent 6d7a14e4a2
commit 560d6b59c6
15 changed files with 290 additions and 313 deletions

View File

@ -6,7 +6,10 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start; import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor; import org.infinispan.interceptors.base.BaseRpcInterceptor;
@ -29,14 +32,16 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
private final AtomicLong invalidations = new AtomicLong(0); private final AtomicLong invalidations = new AtomicLong(0);
protected CommandsFactory commandsFactory; protected CommandsFactory commandsFactory;
protected StateTransferManager stateTransferManager; protected StateTransferManager stateTransferManager;
protected String cacheName;
protected boolean statisticsEnabled; protected boolean statisticsEnabled;
protected RpcOptions syncRpcOptions; protected RpcOptions syncRpcOptions;
protected RpcOptions asyncRpcOptions; protected RpcOptions asyncRpcOptions;
@Inject @Inject
public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager) { public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) {
this.commandsFactory = commandsFactory; this.commandsFactory = commandsFactory;
this.stateTransferManager = stateTransferManager; this.stateTransferManager = stateTransferManager;
this.cacheName = cache.getName();
} }
@Start @Start
@ -86,4 +91,11 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
protected List<Address> getMembers() { protected List<Address> getMembers() {
return stateTransferManager.getCacheTopology().getMembers(); return stateTransferManager.getCacheTopology().getMembers();
} }
protected boolean isPutForExternalRead(FlagAffectedCommand command) {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
return true;
}
return false;
}
} }

View File

@ -122,9 +122,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override @Override
public void remove(SessionImplementor session, Object key) throws CacheException { public void remove(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session); putValidator.setCurrentSession(session);
try { try {
// We update whether or not the region is valid. Other nodes // We update whether or not the region is valid. Other nodes
@ -173,8 +170,5 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override @Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException { public void unlockItem(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(session, key) ) {
log.failedEndInvalidating(key, region.getName());
}
} }
} }

View File

@ -6,7 +6,7 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import java.util.UUID; import javax.transaction.Status;
/** /**
* Synchronization that should release the locks after invalidation is complete. * Synchronization that should release the locks after invalidation is complete.
@ -14,13 +14,14 @@ import java.util.UUID;
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class InvalidationSynchronization implements javax.transaction.Synchronization { public class InvalidationSynchronization implements javax.transaction.Synchronization {
public final UUID uuid = UUID.randomUUID(); public final Object lockOwner;
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor; private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private final Object[] keys; private final Object key;
public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) { public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object key, Object lockOwner) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor; this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
this.keys = keys; this.key = key;
this.lockOwner = lockOwner;
} }
@Override @Override
@ -28,6 +29,6 @@ public class InvalidationSynchronization implements javax.transaction.Synchroniz
@Override @Override
public void afterCompletion(int status) { public void afterCompletion(int status) {
nonTxPutFromLoadInterceptor.broadcastEndInvalidationCommand(keys, uuid); nonTxPutFromLoadInterceptor.endInvalidating(key, lockOwner, status == Status.STATUS_COMMITTED || status == Status.STATUS_COMMITTING);
} }
} }

View File

@ -37,12 +37,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert // (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction). // ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session); putValidator.setCurrentSession(session);
try { try {
writeCache.remove(key); // NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
// we must publish the new value only after invalidation ends.
writeCache.put(key, value);
} }
finally { finally {
putValidator.resetCurrentSession(); putValidator.resetCurrentSession();
@ -61,12 +60,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update // (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction). // ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session); putValidator.setCurrentSession(session);
try { try {
writeCache.remove(key); // NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
// we must publish the new value only after invalidation ends.
writeCache.put(key, value);
} }
finally { finally {
putValidator.resetCurrentSession(); putValidator.resetCurrentSession();
@ -74,53 +72,15 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
return true; return true;
} }
protected boolean isCommitted(SessionImplementor session) {
if (session.isClosed()) {
// If the session has been closed before transaction ends, so we cannot find out
// if the transaction was successful and if we can do the PFER.
// As this can happen only in JTA environment, we can query the TransactionManager
// directly here.
TransactionManager tm = region.getTransactionManager();
if (tm != null) {
try {
switch (tm.getStatus()) {
case Status.STATUS_COMMITTED:
case Status.STATUS_COMMITTING:
return true;
default:
return false;
}
}
catch (SystemException e) {
log.debug("Failed to retrieve transaction status", e);
return false;
}
}
}
TransactionCoordinator tc = session.getTransactionCoordinator();
return tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
}
@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
log.failedEndInvalidating(key, region.getName());
}
}
@Override @Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) { public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) { // endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
log.failedEndInvalidating(key, region.getName());
}
return false; return false;
} }
@Override @Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) { public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) { // endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
log.failedEndInvalidating(key, region.getName());
}
return false; return false;
} }
} }

View File

@ -8,7 +8,6 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutKeyValueCommand;
@ -16,12 +15,14 @@ import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand; import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand; import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvalidationInterceptor; import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.jmx.annotations.MBean; import org.infinispan.jmx.annotations.MBean;
import org.infinispan.util.concurrent.locks.RemoteLockCommand;
import java.util.Collections;
/** /**
* This interceptor should completely replace default InvalidationInterceptor. * This interceptor should completely replace default InvalidationInterceptor.
@ -51,20 +52,47 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override @Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) { if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
return handleInvalidate(ctx, command, new Object[] { command.getKey() }); return invokeNextInterceptor(ctx, command);
}
else {
boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
if (!isTransactional) {
throw new IllegalStateException("Put executed without transaction!");
}
if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) {
log.failedInvalidatePendingPut(command.getKey(), cacheName);
}
RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlags());
Object retval = invokeNextInterceptor(ctx, removeCommand);
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
} }
return invokeNextInterceptor(ctx, command);
} }
@Override @Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleInvalidate(ctx, command, new Object[] { command.getKey() }); throw new UnsupportedOperationException("Unexpected replace");
} }
@Override @Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleInvalidate(ctx, command, new Object[] { command.getKey() }); boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
if (isTransactional) {
if (!putFromLoadValidator.beginInvalidatingKey(command.getKeyLockOwner(), command.getKey())) {
log.failedInvalidatePendingPut(command.getKey(), cacheName);
}
}
else {
log.trace("This is an eviction, not invalidating anything");
}
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
} }
@Override @Override
@ -81,32 +109,20 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override @Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
if (!isPutForExternalRead(command)) { throw new UnsupportedOperationException("Unexpected putAll");
return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
}
return invokeNextInterceptor(ctx, command);
} }
private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable { private <T extends WriteCommand & RemoteLockCommand> void invalidateAcrossCluster(T command, boolean isTransactional, Object key) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && keys != null && keys.length != 0) {
invalidateAcrossCluster(command, keys);
}
return retval;
}
private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
// increment invalidations counter if statistics maintained // increment invalidations counter if statistics maintained
incrementInvalidations(); incrementInvalidations();
InvalidateCommand invalidateCommand; InvalidateCommand invalidateCommand;
Object sessionTransactionId = putFromLoadValidator.registerRemoteInvalidations(keys);
if (!isLocalModeForced(command)) { if (!isLocalModeForced(command)) {
if (sessionTransactionId == null) { if (isTransactional) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys); invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
Collections.emptySet(), new Object[] { key }, command.getKeyLockOwner());
} }
else { else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand( invalidateCommand = commandsFactory.buildInvalidateCommand(Collections.emptySet(), new Object[] { key });
InfinispanCollections.<Flag>emptySet(), keys, sessionTransactionId);
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand); log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
@ -116,12 +132,4 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
} }
} }
private boolean isPutForExternalRead(FlagAffectedCommand command) {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
log.trace("Put for external read called. Suppressing clustered invalidation.");
return true;
}
return false;
}
} }

View File

@ -9,6 +9,8 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand; import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand; import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
@ -32,6 +34,7 @@ import java.util.List;
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor { public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class);
private final String cacheName; private final String cacheName;
private final PutFromLoadValidator putFromLoadValidator; private final PutFromLoadValidator putFromLoadValidator;
private CacheCommandInitializer commandInitializer; private CacheCommandInitializer commandInitializer;
@ -60,16 +63,20 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable { public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) { if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) { for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getSessionTransactionId(), key); putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
} }
} }
return invokeNextInterceptor(ctx, command); return invokeNextInterceptor(ctx, command);
} }
public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransactionId) { public void endInvalidating(Object key, Object lockOwner, boolean successful) {
assert sessionTransactionId != null; assert lockOwner != null;
if (!putFromLoadValidator.endInvalidatingKey(lockOwner, key, successful)) {
log.failedEndInvalidating(key, cacheName);
}
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand( EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
cacheName, keys, sessionTransactionId); cacheName, new Object[] { key }, lockOwner);
List<Address> members = stateTransferManager.getCacheTopology().getMembers(); List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered); rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered);
} }

View File

@ -7,7 +7,6 @@
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -622,19 +621,19 @@ public class PutFromLoadValidator {
} }
} }
public Object registerRemoteInvalidations(Object[] keys) { public boolean registerRemoteInvalidation(Object key, Object lockOwner) {
SessionImplementor session = currentSession.get(); SessionImplementor session = currentSession.get();
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator(); TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) { if (transactionCoordinator != null) {
if (trace) { if (trace) {
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys)); log.tracef("Registering synchronization on transaction in %s, cache %s: %s", lockOwnerToString(session), cache.getName(), key);
} }
InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys); InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, key, lockOwner);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync); transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
return sync.uuid; return true;
} }
// evict() command is not executed in session context // evict() command is not executed in session context
return null; return false;
} }
// ---------------------------------------------------------------- Private // ---------------------------------------------------------------- Private

View File

@ -31,16 +31,9 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert // (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction). // ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw log.failedInvalidatePendingPut(key, region.getName()); // The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
} writeCache.put(key, value);
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true; return true;
} }
@ -55,32 +48,21 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update // (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction). // ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
log.failedInvalidatePendingPut(key, region.getName()); // The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
} writeCache.put(key, value);
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true; return true;
} }
@Override @Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) { public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(session, key) ) { // The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
log.failedEndInvalidating(key, region.getName());
}
return false; return false;
} }
@Override @Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) { public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(session, key) ) { // The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
log.failedEndInvalidating(key, region.getName());
}
return false; return false;
} }
} }

View File

@ -82,8 +82,7 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override @Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray(); return handleInvalidate( ctx, command, command.getMap().keySet().toArray() );
return handleInvalidate( ctx, command, keys );
} }
@Override @Override
@ -221,12 +220,4 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
} }
rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions ); rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions );
} }
private boolean isPutForExternalRead(FlagAffectedCommand command) {
if ( command.hasFlag( Flag.PUT_FOR_EXTERNAL_READ ) ) {
log.trace( "Put for external read called. Suppressing clustered invalidation." );
return true;
}
return false;
}
} }

View File

@ -13,9 +13,12 @@ import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand; import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand; import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.DataContainer; import org.infinispan.container.DataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start; import org.infinispan.factories.annotations.Start;
@ -26,6 +29,7 @@ import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions; import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager; import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.xa.GlobalTransaction;
import java.util.Set; import java.util.Set;
import java.util.List; import java.util.List;
@ -66,6 +70,31 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build(); asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
} }
private void beginInvalidating(InvocationContext ctx, Object key) {
TxInvocationContext txCtx = (TxInvocationContext) ctx;
// make sure that the command is registered in the transaction
txCtx.addAffectedKey(key);
GlobalTransaction globalTransaction = txCtx.getGlobalTransaction();
if (!putFromLoadValidator.beginInvalidatingKey(globalTransaction, key)) {
log.failedInvalidatePendingPut(key, cacheName);
}
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
beginInvalidating(ctx, command.getKey());
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
beginInvalidating(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes // We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands // place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor // as part of EntryWrappingInterceptor
@ -79,38 +108,19 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
// us against concurrent modification of the collection. Therefore, we need to remove the entry // us against concurrent modification of the collection. Therefore, we need to remove the entry
// here (even without lock!) and let possible update happen in commit phase. // here (even without lock!) and let possible update happen in commit phase.
for (WriteCommand wc : command.getModifications()) { for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) { for (Object key : wc.getAffectedKeys()) {
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys() dataContainer.remove(key);
for (Object key : ((InvalidateCommand) wc).getKeys()) {
dataContainer.remove(key);
}
}
else {
for (Object key : wc.getAffectedKeys()) {
dataContainer.remove(key);
}
} }
} }
} }
else { else {
for (WriteCommand wc : command.getModifications()) { for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) { Set<Object> keys = wc.getAffectedKeys();
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys() if (log.isTraceEnabled()) {
for (Object key : ((InvalidateCommand) wc).getKeys()) { log.tracef("Invalidating keys %s with lock owner %s", keys, ctx.getLockOwner());
if (log.isTraceEnabled()) {
log.tracef("Invalidating key %s with lock owner %s", key, ctx.getLockOwner());
}
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
} }
else { for (Object key : keys ) {
Set<Object> keys = wc.getAffectedKeys(); putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
if (log.isTraceEnabled()) {
log.tracef("Invalidating keys %s with lock owner %s", keys, ctx.getLockOwner());
}
for (Object key : keys ) {
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
} }
} }
} }
@ -142,14 +152,21 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
Set<Object> affectedKeys = ctx.getAffectedKeys(); Set<Object> affectedKeys = ctx.getAffectedKeys();
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.tracef( "Sending end invalidation for keys %s asynchronously", affectedKeys ); log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s", affectedKeys, ctx.getCacheTransaction().getModifications());
} }
if (!affectedKeys.isEmpty()) { if (!affectedKeys.isEmpty()) {
GlobalTransaction globalTransaction = ctx.getGlobalTransaction();
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand( EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction()); cacheName, affectedKeys.toArray(), globalTransaction);
List<Address> members = stateTransferManager.getCacheTopology().getMembers(); List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, commitCommand, asyncUnordered); rpcManager.invokeRemotely(members, commitCommand, asyncUnordered);
// If the transaction is not successful, *RegionAccessStrategy would not be called, therefore
// we have to end invalidation from here manually (in successful case as well)
for (Object key : affectedKeys) {
putFromLoadValidator.endInvalidatingKey(globalTransaction, key);
}
} }
} }
} }

View File

@ -6,73 +6,48 @@
*/ */
package org.hibernate.cache.infinispan.util; package org.hibernate.cache.infinispan.util;
import org.hibernate.internal.util.compare.EqualsHelper;
import org.infinispan.commands.CommandInvocationId; import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.Flag; import org.infinispan.context.Flag;
import org.infinispan.notifications.cachelistener.CacheNotifier; import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.remoting.transport.Address;
import java.lang.reflect.Field; import java.io.IOException;
import java.lang.reflect.Method; import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects;
import java.util.Set; import java.util.Set;
/** /**
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
public class BeginInvalidationCommand extends InvalidateCommand { public class BeginInvalidationCommand extends InvalidateCommand {
private Object sessionTransactionId; private Object lockOwner;
public BeginInvalidationCommand() { public BeginInvalidationCommand() {
} }
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, CommandInvocationId commandInvocationId, Object[] keys, Object sessionTransactionId) { public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, CommandInvocationId commandInvocationId, Object[] keys, Object lockOwner) {
super(notifier, flags, commandInvocationId, keys); super(notifier, flags, commandInvocationId, keys);
this.sessionTransactionId = sessionTransactionId; this.lockOwner = lockOwner;
} }
public Object getSessionTransactionId() { public Object getLockOwner() {
return sessionTransactionId; return lockOwner;
} }
@Override @Override
public Object[] getParameters() { public void writeTo(ObjectOutput output) throws IOException {
if (keys == null || keys.length == 0) { super.writeTo(output);
return new Object[]{flags, sessionTransactionId, commandInvocationId, 0}; output.writeObject(lockOwner);
}
if (keys.length == 1) {
return new Object[]{flags, sessionTransactionId, commandInvocationId, 1, keys[0]};
}
Object[] retval = new Object[keys.length + 4];
retval[0] = flags;
retval[1] = sessionTransactionId;
retval[2] = commandInvocationId;
retval[3] = keys.length;
System.arraycopy(keys, 0, retval, 4, keys.length);
return retval;
} }
@Override @Override
public void setParameters(int commandId, Object[] args) { public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
if (commandId != CacheCommandIds.BEGIN_INVALIDATION) { super.readFrom(input);
throw new IllegalStateException("Invalid method id"); lockOwner = input.readObject();
}
flags = (Set<Flag>) args[0];
sessionTransactionId = args[1];
commandInvocationId = (CommandInvocationId) args[2];
int size = (Integer) args[3];
keys = new Object[size];
if (size == 1) {
keys[0] = args[4];
}
else if (size > 0) {
System.arraycopy(args, 4, keys, 0, size);
}
} }
@Override @Override
public byte getCommandId() { public byte getCommandId() {
return CacheCommandIds.BEGIN_INVALIDATION; return CacheCommandIds.BEGIN_INVALIDATION;
@ -85,7 +60,7 @@ public class BeginInvalidationCommand extends InvalidateCommand {
} }
if (o instanceof BeginInvalidationCommand) { if (o instanceof BeginInvalidationCommand) {
BeginInvalidationCommand bic = (BeginInvalidationCommand) o; BeginInvalidationCommand bic = (BeginInvalidationCommand) o;
return EqualsHelper.equals(sessionTransactionId, bic.sessionTransactionId); return Objects.equals(lockOwner, bic.lockOwner);
} }
else { else {
return false; return false;
@ -94,12 +69,12 @@ public class BeginInvalidationCommand extends InvalidateCommand {
@Override @Override
public int hashCode() { public int hashCode() {
return super.hashCode() + (sessionTransactionId == null ? 0 : sessionTransactionId.hashCode()); return super.hashCode() + (lockOwner == null ? 0 : lockOwner.hashCode());
} }
@Override @Override
public String toString() { public String toString() {
return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) + return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) +
", sessionTransactionId=" + sessionTransactionId + '}'; ", sessionTransactionId=" + lockOwner + '}';
} }
} }

View File

@ -66,12 +66,12 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
return new EvictAllCommand( regionName ); return new EvictAllCommand( regionName );
} }
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object sessionTransactionId) { public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object lockOwner) {
return new BeginInvalidationCommand(notifier, flags, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, sessionTransactionId); return new BeginInvalidationCommand(notifier, flags, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, lockOwner);
} }
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) { public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
return new EndInvalidationCommand( cacheName, keys, sessionTransactionId ); return new EndInvalidationCommand( cacheName, keys, lockOwner );
} }
@Override @Override

View File

@ -6,12 +6,16 @@
*/ */
package org.hibernate.cache.infinispan.util; package org.hibernate.cache.infinispan.util;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.infinispan.commands.remote.BaseRpcCommand; import org.infinispan.commands.remote.BaseRpcCommand;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import java.util.Arrays;
/** /**
* Sent in commit phase (after DB commit) to remote nodes in order to stop invalidating * Sent in commit phase (after DB commit) to remote nodes in order to stop invalidating
* putFromLoads. * putFromLoads.
@ -20,7 +24,7 @@ import java.util.Arrays;
*/ */
public class EndInvalidationCommand extends BaseRpcCommand { public class EndInvalidationCommand extends BaseRpcCommand {
private Object[] keys; private Object[] keys;
private Object sessionTransactionId; private Object lockOwner;
private PutFromLoadValidator putFromLoadValidator; private PutFromLoadValidator putFromLoadValidator;
public EndInvalidationCommand(String cacheName) { public EndInvalidationCommand(String cacheName) {
@ -30,16 +34,16 @@ public class EndInvalidationCommand extends BaseRpcCommand {
/** /**
* @param cacheName name of the cache to evict * @param cacheName name of the cache to evict
*/ */
public EndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) { public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
super(cacheName); super(cacheName);
this.keys = keys; this.keys = keys;
this.sessionTransactionId = sessionTransactionId; this.lockOwner = lockOwner;
} }
@Override @Override
public Object perform(InvocationContext ctx) throws Throwable { public Object perform(InvocationContext ctx) throws Throwable {
for (Object key : keys) { for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(sessionTransactionId, key); putFromLoadValidator.endInvalidatingKey(lockOwner, key);
} }
return null; return null;
} }
@ -50,14 +54,15 @@ public class EndInvalidationCommand extends BaseRpcCommand {
} }
@Override @Override
public Object[] getParameters() { public void writeTo(ObjectOutput output) throws IOException {
return new Object[] { keys, sessionTransactionId}; MarshallUtil.marshallArray(keys, output);
output.writeObject(lockOwner);
} }
@Override @Override
public void setParameters(int commandId, Object[] parameters) { public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
keys = (Object[]) parameters[0]; keys = MarshallUtil.unmarshallArray(input, Object[]::new);
sessionTransactionId = parameters[1]; lockOwner = input.readObject();
} }
@Override @Override
@ -91,7 +96,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
if (!Arrays.equals(keys, that.keys)) { if (!Arrays.equals(keys, that.keys)) {
return false; return false;
} }
return !(sessionTransactionId != null ? !sessionTransactionId.equals(that.sessionTransactionId) : that.sessionTransactionId != null); return !(lockOwner != null ? !lockOwner.equals(that.lockOwner) : that.lockOwner != null);
} }
@ -99,7 +104,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
public int hashCode() { public int hashCode() {
int result = cacheName != null ? cacheName.hashCode() : 0; int result = cacheName != null ? cacheName.hashCode() : 0;
result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0); result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0);
result = 31 * result + (sessionTransactionId != null ? sessionTransactionId.hashCode() : 0); result = 31 * result + (lockOwner != null ? lockOwner.hashCode() : 0);
return result; return result;
} }
@ -108,7 +113,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
final StringBuilder sb = new StringBuilder("EndInvalidationCommand{"); final StringBuilder sb = new StringBuilder("EndInvalidationCommand{");
sb.append("cacheName=").append(cacheName); sb.append("cacheName=").append(cacheName);
sb.append(", keys=").append(Arrays.toString(keys)); sb.append(", keys=").append(Arrays.toString(keys));
sb.append(", sessionTransactionId=").append(sessionTransactionId); sb.append(", sessionTransactionId=").append(lockOwner);
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }

View File

@ -1,20 +1,8 @@
package org.hibernate.test.cache.infinispan.functional; package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException; import java.lang.reflect.Method;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.hibernate.cache.spi.Region;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Phaser; import java.util.concurrent.Phaser;
@ -22,9 +10,26 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.hibernate.PessimisticLockException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.hibernate.testing.TestForIssue;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.junit.Test;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/** /**
* Tests specific to invalidation mode caches * Tests specific to invalidation mode caches
@ -48,8 +53,6 @@ public class InvalidationTest extends SingleNodeTest {
@Test @Test
@TestForIssue(jiraKey = "HHH-9868") @TestForIssue(jiraKey = "HHH-9868")
public void testConcurrentRemoveAndPutFromLoad() throws Exception { public void testConcurrentRemoveAndPutFromLoad() throws Exception {
Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
final Item item = new Item( "chris", "Chris's Item" ); final Item item = new Item( "chris", "Chris's Item" );
withTxSession(s -> { withTxSession(s -> {
@ -60,8 +63,7 @@ public class InvalidationTest extends SingleNodeTest {
Phaser getPhaser = new Phaser(2); Phaser getPhaser = new Phaser(2);
HookInterceptor hook = new HookInterceptor(); HookInterceptor hook = new HookInterceptor();
AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache( AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
entityCache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).getAdvancedCache();
pendingPutsCache.addInterceptor(hook, 0); pendingPutsCache.addInterceptor(hook, 0);
AtomicBoolean getThreadBlockedInDB = new AtomicBoolean(false); AtomicBoolean getThreadBlockedInDB = new AtomicBoolean(false);
@ -140,16 +142,105 @@ public class InvalidationTest extends SingleNodeTest {
// get thread puts the entry into cache // get thread puts the entry into cache
getThread.join(); getThread.join();
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> { withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId()); Item loadedItem = s.get(Item.class, item.getId());
assertNull(loadedItem); assertNull(loadedItem);
}); });
} }
protected AdvancedCache getPendingPutsCache(Class<Item> entityClazz) {
EntityRegionImpl region = (EntityRegionImpl) sessionFactory().getSecondLevelCacheRegion( entityClazz.getName() );
AdvancedCache entityCache = region.getCache();
return (AdvancedCache) entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).getAdvancedCache();
}
protected static void arriveAndAwait(Phaser phaser, int timeout) throws TimeoutException, InterruptedException { protected static void arriveAndAwait(Phaser phaser, int timeout) throws TimeoutException, InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), timeout, TimeUnit.MILLISECONDS); phaser.awaitAdvanceInterruptibly(phaser.arrive(), timeout, TimeUnit.MILLISECONDS);
} }
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedInsert() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item i = new Item("inserted", "bar");
s.persist(i);
s.flush();
s.getTransaction().markRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
}
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedUpdate() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
final Item item = new Item("before-update", "bar");
withTxSession(s -> s.persist(item));
withTxSession(s -> {
Item item2 = s.load(Item.class, item.getId());
assertEquals("before-update", item2.getName());
item2.setName("after-update");
s.persist(item2);
s.flush();
s.flush(); // workaround for HHH-11312
s.getTransaction().markRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item item3 = s.load(Item.class, item.getId());
assertEquals("before-update", item3.getName());
s.delete(item3);
});
assertNoInvalidators(pendingPutsCache);
}
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedRemove() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
final Item item = new Item("before-remove", "bar");
withTxSession(s -> s.persist(item));
withTxSession(s -> {
Item item2 = s.load(Item.class, item.getId());
assertEquals("before-remove", item2.getName());
s.delete(item2);
s.flush();
s.getTransaction().markRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item item3 = s.load(Item.class, item.getId());
assertEquals("before-remove", item3.getName());
s.delete(item3);
});
assertNoInvalidators(pendingPutsCache);
}
protected void assertNoInvalidators(AdvancedCache<Object, Object> pendingPutsCache) throws Exception {
Method getInvalidators = null;
for (Map.Entry<Object, Object> entry : pendingPutsCache.entrySet()) {
if (getInvalidators == null) {
getInvalidators = entry.getValue().getClass().getMethod("getInvalidators");
getInvalidators.setAccessible(true);
}
Collection invalidators = (Collection) getInvalidators.invoke(entry.getValue());
if (invalidators != null) {
assertTrue("Invalidators on key " + entry.getKey() + ": " + invalidators, invalidators.isEmpty());
}
}
}
private static class HookInterceptor extends BaseCustomInterceptor { private static class HookInterceptor extends BaseCustomInterceptor {
Phaser phaser; Phaser phaser;
Thread thread; Thread thread;

View File

@ -1,65 +0,0 @@
package org.hibernate.test.cache.infinispan.util;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.TestAddress;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Supplier;
import static org.jgroups.util.Util.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class CacheCommandsInitializerTest {
private static CacheCommandInitializer initializer = new CacheCommandInitializer();
@BeforeClass
public static void setUp() {
ClusteringDependentLogic cdl = mock(ClusteringDependentLogic.class);
when(cdl.getAddress()).thenReturn(new TestAddress(0));
initializer.injectDependencies(null, null, cdl);
}
@Test
public void testBeginInvalidationCommand1() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{}, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand2() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 1 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand3() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testEndInvalidationCommmand() {
EndInvalidationCommand command = initializer.buildEndInvalidationCommand("foo", new Object[] { 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new EndInvalidationCommand("foo"));
}
protected <T extends ReplicableCommand> void checkParameters(T command, Supplier<T> commandSupplier) {
Object[] parameters = command.getParameters();
ReplicableCommand newCommand = commandSupplier.get();
newCommand.setParameters(command.getCommandId(), parameters);
assertEquals(command, newCommand);
assertArrayEquals(parameters, newCommand.getParameters());
}
}