HHH-11304 Invalidations are not cleared when transaction rolls back

* always use global transaction id (in transactional caches) or command invocation id (in non-transactional caches) to identifiy the invalidator
* don't use afterInvoke/afterUpdate/unlockItem to end invalidation as this is not called during rollback
** use Infinispan transaction handling or explicitly registered invalidation to hook into the process
** move invalidation calls to interceptor stack where we have the identifiers
* don't use deprecated methods for commands marshalling
This commit is contained in:
Radim Vansa 2016-12-06 18:23:25 +01:00 committed by Gail Badner
parent 318473e59c
commit 294ba74c76
15 changed files with 278 additions and 304 deletions

View File

@ -6,7 +6,10 @@
*/
package org.hibernate.cache.infinispan.access;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
@ -29,14 +32,16 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
private final AtomicLong invalidations = new AtomicLong(0);
protected CommandsFactory commandsFactory;
protected StateTransferManager stateTransferManager;
protected String cacheName;
protected boolean statisticsEnabled;
protected RpcOptions syncRpcOptions;
protected RpcOptions asyncRpcOptions;
@Inject
public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager) {
public void injectDependencies(CommandsFactory commandsFactory, StateTransferManager stateTransferManager, Cache cache) {
this.commandsFactory = commandsFactory;
this.stateTransferManager = stateTransferManager;
this.cacheName = cache.getName();
}
@Start
@ -86,4 +91,11 @@ public abstract class BaseInvalidationInterceptor extends BaseRpcInterceptor imp
protected List<Address> getMembers() {
return stateTransferManager.getCacheTopology().getMembers();
}
protected boolean isPutForExternalRead(FlagAffectedCommand command) {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
return true;
}
return false;
}
}

View File

@ -123,9 +123,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override
public void remove(SharedSessionContractImplementor session, Object key) throws CacheException {
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session);
try {
// We update whether or not the region is valid. Other nodes
@ -174,8 +171,5 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
@Override
public void unlockItem(SharedSessionContractImplementor session, Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(session, key) ) {
log.failedEndInvalidating(key, region.getName());
}
}
}

View File

@ -6,7 +6,7 @@
*/
package org.hibernate.cache.infinispan.access;
import java.util.UUID;
import javax.transaction.Status;
/**
* Synchronization that should release the locks after invalidation is complete.
@ -14,13 +14,14 @@ import java.util.UUID;
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class InvalidationSynchronization implements javax.transaction.Synchronization {
public final UUID uuid = UUID.randomUUID();
public final Object lockOwner;
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private final Object[] keys;
private final Object key;
public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object key, Object lockOwner) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
this.keys = keys;
this.key = key;
this.lockOwner = lockOwner;
}
@Override
@ -28,6 +29,6 @@ public class InvalidationSynchronization implements javax.transaction.Synchroniz
@Override
public void afterCompletion(int status) {
nonTxPutFromLoadInterceptor.broadcastEndInvalidationCommand(keys, uuid);
nonTxPutFromLoadInterceptor.endInvalidating(key, lockOwner, status == Status.STATUS_COMMITTED || status == Status.STATUS_COMMITTING);
}
}

View File

@ -6,16 +6,10 @@
*/
package org.hibernate.cache.infinispan.access;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.resource.transaction.spi.TransactionCoordinator;
import org.hibernate.resource.transaction.spi.TransactionStatus;
/**
* Delegate for non-transactional caches
@ -37,12 +31,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
// NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
// we must publish the new value only after invalidation ends.
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
@ -61,12 +54,11 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
// NonTxInvalidationInterceptor will call beginInvalidatingWithPFER and change this to a removal because
// we must publish the new value only after invalidation ends.
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
@ -74,53 +66,15 @@ public class NonTxInvalidationCacheAccessDelegate extends InvalidationCacheAcces
return true;
}
protected boolean isCommitted(SharedSessionContractImplementor session) {
if (session.isClosed()) {
// If the session has been closed before transaction ends, so we cannot find out
// if the transaction was successful and if we can do the PFER.
// As this can happen only in JTA environment, we can query the TransactionManager
// directly here.
TransactionManager tm = region.getTransactionManager();
if (tm != null) {
try {
switch (tm.getStatus()) {
case Status.STATUS_COMMITTED:
case Status.STATUS_COMMITTING:
return true;
default:
return false;
}
}
catch (SystemException e) {
log.debug("Failed to retrieve transaction status", e);
return false;
}
}
}
TransactionCoordinator tc = session.getTransactionCoordinator();
return tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
}
@Override
public void unlockItem(SharedSessionContractImplementor session, Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
log.failedEndInvalidating(key, region.getName());
}
}
@Override
public boolean afterInsert(SharedSessionContractImplementor session, Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
log.failedEndInvalidating(key, region.getName());
}
// endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
return false;
}
@Override
public boolean afterUpdate(SharedSessionContractImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(session, key, isCommitted(session)) ) {
log.failedEndInvalidating(key, region.getName());
}
// endInvalidatingKeys is called from NonTxInvalidationInterceptor, from the synchronization callback
return false;
}
}

View File

@ -8,7 +8,6 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
@ -16,12 +15,14 @@ import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.util.concurrent.locks.RemoteLockCommand;
import java.util.Collections;
/**
* This interceptor should completely replace default InvalidationInterceptor.
@ -51,20 +52,47 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
return invokeNextInterceptor(ctx, command);
}
else {
boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
if (!isTransactional) {
throw new IllegalStateException("Put executed without transaction!");
}
if (!putFromLoadValidator.beginInvalidatingWithPFER(command.getKeyLockOwner(), command.getKey(), command.getValue())) {
log.failedInvalidatePendingPut(command.getKey(), cacheName);
}
RemoveCommand removeCommand = commandsFactory.buildRemoveCommand(command.getKey(), null, command.getFlags());
Object retval = invokeNextInterceptor(ctx, removeCommand);
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
throw new UnsupportedOperationException("Unexpected replace");
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
boolean isTransactional = putFromLoadValidator.registerRemoteInvalidation(command.getKey(), command.getKeyLockOwner());
if (isTransactional) {
if (!putFromLoadValidator.beginInvalidatingKey(command.getKeyLockOwner(), command.getKey())) {
log.failedInvalidatePendingPut(command.getKey(), cacheName);
}
}
else {
log.trace("This is an eviction, not invalidating anything");
}
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful()) {
invalidateAcrossCluster(command, isTransactional, command.getKey());
}
return retval;
}
@Override
@ -81,32 +109,20 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
}
return invokeNextInterceptor(ctx, command);
throw new UnsupportedOperationException("Unexpected putAll");
}
private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && keys != null && keys.length != 0) {
invalidateAcrossCluster(command, keys);
}
return retval;
}
private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
private <T extends WriteCommand & RemoteLockCommand> void invalidateAcrossCluster(T command, boolean isTransactional, Object key) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand invalidateCommand;
Object sessionTransactionId = putFromLoadValidator.registerRemoteInvalidations(keys);
if (!isLocalModeForced(command)) {
if (sessionTransactionId == null) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
if (isTransactional) {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
Collections.emptySet(), new Object[] { key }, command.getKeyLockOwner());
}
else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
InfinispanCollections.<Flag>emptySet(), keys, sessionTransactionId);
invalidateCommand = commandsFactory.buildInvalidateCommand(Collections.emptySet(), new Object[] { key });
}
if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
@ -116,12 +132,4 @@ public class NonTxInvalidationInterceptor extends BaseInvalidationInterceptor {
}
}
private boolean isPutForExternalRead(FlagAffectedCommand command) {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
log.trace("Put for external read called. Suppressing clustered invalidation.");
return true;
}
return false;
}
}

View File

@ -10,7 +10,12 @@ import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@ -33,6 +38,7 @@ import java.util.List;
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
private final static InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NonTxPutFromLoadInterceptor.class);
private final String cacheName;
private final PutFromLoadValidator putFromLoadValidator;
private CacheCommandInitializer commandInitializer;
@ -61,16 +67,20 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getSessionTransactionId(), key);
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
}
}
return invokeNextInterceptor(ctx, command);
}
public void broadcastEndInvalidationCommand(Object[] keys, Object sessionTransactionId) {
assert sessionTransactionId != null;
public void endInvalidating(Object key, Object lockOwner, boolean successful) {
assert lockOwner != null;
if (!putFromLoadValidator.endInvalidatingKey(lockOwner, key, successful)) {
log.failedEndInvalidating(key, cacheName);
}
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
cacheName, keys, sessionTransactionId);
cacheName, new Object[] { key }, lockOwner);
List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, endInvalidationCommand, asyncUnordered);
}

View File

@ -7,7 +7,6 @@
package org.hibernate.cache.infinispan.access;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -623,19 +622,19 @@ public class PutFromLoadValidator {
}
}
public Object registerRemoteInvalidations(Object[] keys) {
public boolean registerRemoteInvalidation(Object key, Object lockOwner) {
SharedSessionContractImplementor session = currentSession.get();
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
log.tracef("Registering synchronization on transaction in %s, cache %s: %s", lockOwnerToString(session), cache.getName(), key);
}
InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys);
InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, key, lockOwner);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
return sync.uuid;
return true;
}
// evict() command is not executed in session context
return null;
return false;
}
// ---------------------------------------------------------------- Private

View File

@ -31,16 +31,9 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
// The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
writeCache.put(key, value);
return true;
}
@ -55,32 +48,21 @@ public class TxInvalidationCacheAccessDelegate extends InvalidationCacheAccessDe
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
log.failedInvalidatePendingPut(key, region.getName());
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
// The beginInvalidateKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
writeCache.put(key, value);
return true;
}
@Override
public boolean afterInsert(SharedSessionContractImplementor session, Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(session, key) ) {
log.failedEndInvalidating(key, region.getName());
}
// The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
return false;
}
@Override
public boolean afterUpdate(SharedSessionContractImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(session, key) ) {
log.failedEndInvalidating(key, region.getName());
}
// The endInvalidatingKey(...) is called from TxPutFromLoadInterceptor because we need the global transaction id.
return false;
}
}

View File

@ -83,8 +83,7 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
return handleInvalidate( ctx, command, keys );
return handleInvalidate( ctx, command, command.getMap().keySet().toArray() );
}
@Override
@ -222,12 +221,4 @@ public class TxInvalidationInterceptor extends BaseInvalidationInterceptor {
}
rpcManager.invokeRemotely( getMembers(), command, synchronous ? syncRpcOptions : asyncRpcOptions );
}
private boolean isPutForExternalRead(FlagAffectedCommand command) {
if ( command.hasFlag( Flag.PUT_FOR_EXTERNAL_READ ) ) {
log.trace( "Put for external read called. Suppressing clustered invalidation." );
return true;
}
return false;
}
}

View File

@ -17,9 +17,12 @@ import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.DataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@ -30,6 +33,7 @@ import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.xa.GlobalTransaction;
/**
* Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)}
@ -67,6 +71,31 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
asyncUnordered = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
}
private void beginInvalidating(InvocationContext ctx, Object key) {
TxInvocationContext txCtx = (TxInvocationContext) ctx;
// make sure that the command is registered in the transaction
txCtx.addAffectedKey(key);
GlobalTransaction globalTransaction = txCtx.getGlobalTransaction();
if (!putFromLoadValidator.beginInvalidatingKey(globalTransaction, key)) {
log.failedInvalidatePendingPut(key, cacheName);
}
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
beginInvalidating(ctx, command.getKey());
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
beginInvalidating(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor
@ -80,38 +109,19 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
// us against concurrent modification of the collection. Therefore, we need to remove the entry
// here (even without lock!) and let possible update happen in commit phase.
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
dataContainer.remove(key);
}
}
else {
for (Object key : wc.getAffectedKeys()) {
dataContainer.remove(key);
}
for (Object key : wc.getAffectedKeys()) {
dataContainer.remove(key);
}
}
}
else {
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
if (log.isTraceEnabled()) {
log.tracef("Invalidating key %s with lock owner %s", key, ctx.getLockOwner());
}
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
Set<Object> keys = wc.getAffectedKeys();
if (log.isTraceEnabled()) {
log.tracef("Invalidating keys %s with lock owner %s", keys, ctx.getLockOwner());
}
else {
Set<Object> keys = wc.getAffectedKeys();
if (log.isTraceEnabled()) {
log.tracef("Invalidating keys %s with lock owner %s", keys, ctx.getLockOwner());
}
for (Object key : keys ) {
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
for (Object key : keys ) {
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
}
}
@ -143,14 +153,21 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
Set<Object> affectedKeys = ctx.getAffectedKeys();
if (log.isTraceEnabled()) {
log.tracef( "Sending end invalidation for keys %s asynchronously", affectedKeys );
log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s", affectedKeys, ctx.getCacheTransaction().getModifications());
}
if (!affectedKeys.isEmpty()) {
GlobalTransaction globalTransaction = ctx.getGlobalTransaction();
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction());
cacheName, affectedKeys.toArray(), globalTransaction);
List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, commitCommand, asyncUnordered);
// If the transaction is not successful, *RegionAccessStrategy would not be called, therefore
// we have to end invalidation from here manually (in successful case as well)
for (Object key : affectedKeys) {
putFromLoadValidator.endInvalidatingKey(globalTransaction, key);
}
}
}
}

View File

@ -6,73 +6,48 @@
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.internal.util.compare.EqualsHelper;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.Flag;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.remoting.transport.Address;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class BeginInvalidationCommand extends InvalidateCommand {
private Object sessionTransactionId;
private Object lockOwner;
public BeginInvalidationCommand() {
}
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, CommandInvocationId commandInvocationId, Object[] keys, Object sessionTransactionId) {
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, CommandInvocationId commandInvocationId, Object[] keys, Object lockOwner) {
super(notifier, flags, commandInvocationId, keys);
this.sessionTransactionId = sessionTransactionId;
this.lockOwner = lockOwner;
}
public Object getSessionTransactionId() {
return sessionTransactionId;
public Object getLockOwner() {
return lockOwner;
}
@Override
public Object[] getParameters() {
if (keys == null || keys.length == 0) {
return new Object[]{flags, sessionTransactionId, commandInvocationId, 0};
}
if (keys.length == 1) {
return new Object[]{flags, sessionTransactionId, commandInvocationId, 1, keys[0]};
}
Object[] retval = new Object[keys.length + 4];
retval[0] = flags;
retval[1] = sessionTransactionId;
retval[2] = commandInvocationId;
retval[3] = keys.length;
System.arraycopy(keys, 0, retval, 4, keys.length);
return retval;
public void writeTo(ObjectOutput output) throws IOException {
super.writeTo(output);
output.writeObject(lockOwner);
}
@Override
public void setParameters(int commandId, Object[] args) {
if (commandId != CacheCommandIds.BEGIN_INVALIDATION) {
throw new IllegalStateException("Invalid method id");
}
flags = (Set<Flag>) args[0];
sessionTransactionId = args[1];
commandInvocationId = (CommandInvocationId) args[2];
int size = (Integer) args[3];
keys = new Object[size];
if (size == 1) {
keys[0] = args[4];
}
else if (size > 0) {
System.arraycopy(args, 4, keys, 0, size);
}
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
super.readFrom(input);
lockOwner = input.readObject();
}
@Override
public byte getCommandId() {
return CacheCommandIds.BEGIN_INVALIDATION;
@ -85,7 +60,7 @@ public class BeginInvalidationCommand extends InvalidateCommand {
}
if (o instanceof BeginInvalidationCommand) {
BeginInvalidationCommand bic = (BeginInvalidationCommand) o;
return EqualsHelper.equals(sessionTransactionId, bic.sessionTransactionId);
return Objects.equals(lockOwner, bic.lockOwner);
}
else {
return false;
@ -94,12 +69,12 @@ public class BeginInvalidationCommand extends InvalidateCommand {
@Override
public int hashCode() {
return super.hashCode() + (sessionTransactionId == null ? 0 : sessionTransactionId.hashCode());
return super.hashCode() + (lockOwner == null ? 0 : lockOwner.hashCode());
}
@Override
public String toString() {
return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) +
", sessionTransactionId=" + sessionTransactionId + '}';
", sessionTransactionId=" + lockOwner + '}';
}
}

View File

@ -66,12 +66,12 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
return new EvictAllCommand( regionName );
}
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object sessionTransactionId) {
return new BeginInvalidationCommand(notifier, flags, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, sessionTransactionId);
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object lockOwner) {
return new BeginInvalidationCommand(notifier, flags, CommandInvocationId.generateId(clusteringDependentLogic.getAddress()), keys, lockOwner);
}
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
return new EndInvalidationCommand( cacheName, keys, sessionTransactionId );
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
return new EndInvalidationCommand( cacheName, keys, lockOwner );
}
@Override

View File

@ -6,11 +6,15 @@
*/
package org.hibernate.cache.infinispan.util;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.infinispan.commands.remote.BaseRpcCommand;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.context.InvocationContext;
/**
@ -21,7 +25,7 @@ import org.infinispan.context.InvocationContext;
*/
public class EndInvalidationCommand extends BaseRpcCommand {
private Object[] keys;
private Object sessionTransactionId;
private Object lockOwner;
private PutFromLoadValidator putFromLoadValidator;
public EndInvalidationCommand(String cacheName) {
@ -31,16 +35,16 @@ public class EndInvalidationCommand extends BaseRpcCommand {
/**
* @param cacheName name of the cache to evict
*/
public EndInvalidationCommand(String cacheName, Object[] keys, Object sessionTransactionId) {
public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
super(cacheName);
this.keys = keys;
this.sessionTransactionId = sessionTransactionId;
this.lockOwner = lockOwner;
}
@Override
public Object perform(InvocationContext ctx) throws Throwable {
for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(sessionTransactionId, key);
putFromLoadValidator.endInvalidatingKey(lockOwner, key);
}
return null;
}
@ -51,14 +55,15 @@ public class EndInvalidationCommand extends BaseRpcCommand {
}
@Override
public Object[] getParameters() {
return new Object[] { keys, sessionTransactionId};
public void writeTo(ObjectOutput output) throws IOException {
MarshallUtil.marshallArray(keys, output);
output.writeObject(lockOwner);
}
@Override
public void setParameters(int commandId, Object[] parameters) {
keys = (Object[]) parameters[0];
sessionTransactionId = parameters[1];
public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException {
keys = MarshallUtil.unmarshallArray(input, Object[]::new);
lockOwner = input.readObject();
}
@Override
@ -92,7 +97,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
if (!Arrays.equals(keys, that.keys)) {
return false;
}
return !(sessionTransactionId != null ? !sessionTransactionId.equals(that.sessionTransactionId) : that.sessionTransactionId != null);
return !(lockOwner != null ? !lockOwner.equals(that.lockOwner) : that.lockOwner != null);
}
@ -100,7 +105,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
public int hashCode() {
int result = cacheName != null ? cacheName.hashCode() : 0;
result = 31 * result + (keys != null ? Arrays.hashCode(keys) : 0);
result = 31 * result + (sessionTransactionId != null ? sessionTransactionId.hashCode() : 0);
result = 31 * result + (lockOwner != null ? lockOwner.hashCode() : 0);
return result;
}
@ -109,7 +114,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
final StringBuilder sb = new StringBuilder("EndInvalidationCommand{");
sb.append("cacheName=").append(cacheName);
sb.append(", keys=").append(Arrays.toString(keys));
sb.append(", sessionTransactionId=").append(sessionTransactionId);
sb.append(", sessionTransactionId=").append(lockOwner);
sb.append('}');
return sb.toString();
}

View File

@ -1,6 +1,8 @@
package org.hibernate.test.cache.infinispan.functional;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Phaser;
@ -12,11 +14,11 @@ import org.hibernate.PessimisticLockException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.hibernate.cache.spi.Region;
import org.hibernate.testing.TestForIssue;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.junit.Ignore;
import org.junit.Test;
import org.infinispan.AdvancedCache;
@ -24,9 +26,11 @@ import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests specific to invalidation mode caches
@ -50,8 +54,6 @@ public class InvalidationTest extends SingleNodeTest {
@Test
@TestForIssue(jiraKey = "HHH-9868")
public void testConcurrentRemoveAndPutFromLoad() throws Exception {
Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
final Item item = new Item( "chris", "Chris's Item" );
withTxSession(s -> {
@ -62,8 +64,7 @@ public class InvalidationTest extends SingleNodeTest {
Phaser getPhaser = new Phaser(2);
HookInterceptor hook = new HookInterceptor();
AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).getAdvancedCache();
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
pendingPutsCache.addInterceptor(hook, 0);
AtomicBoolean getThreadBlockedInDB = new AtomicBoolean(false);
@ -142,16 +143,106 @@ public class InvalidationTest extends SingleNodeTest {
// get thread puts the entry into cache
getThread.join();
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId());
assertNull(loadedItem);
});
}
protected AdvancedCache getPendingPutsCache(Class<Item> entityClazz) {
EntityRegionImpl region = (EntityRegionImpl) sessionFactory().getCache()
.getEntityRegionAccess(entityClazz.getName()).getRegion();
AdvancedCache entityCache = region.getCache();
return (AdvancedCache) entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).getAdvancedCache();
}
protected static void arriveAndAwait(Phaser phaser, int timeout) throws TimeoutException, InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), timeout, TimeUnit.MILLISECONDS);
}
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedInsert() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item i = new Item("inserted", "bar");
s.persist(i);
s.flush();
s.getTransaction().setRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
}
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedUpdate() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
final Item item = new Item("before-update", "bar");
withTxSession(s -> s.persist(item));
withTxSession(s -> {
Item item2 = s.load(Item.class, item.getId());
assertEquals("before-update", item2.getName());
item2.setName("after-update");
s.persist(item2);
s.flush();
s.flush(); // workaround for HHH-11312
s.getTransaction().setRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item item3 = s.load(Item.class, item.getId());
assertEquals("before-update", item3.getName());
s.remove(item3);
});
assertNoInvalidators(pendingPutsCache);
}
@TestForIssue(jiraKey = "HHH-11304")
@Test
public void testFailedRemove() throws Exception {
AdvancedCache pendingPutsCache = getPendingPutsCache(Item.class);
assertNoInvalidators(pendingPutsCache);
final Item item = new Item("before-remove", "bar");
withTxSession(s -> s.persist(item));
withTxSession(s -> {
Item item2 = s.load(Item.class, item.getId());
assertEquals("before-remove", item2.getName());
s.remove(item2);
s.flush();
s.getTransaction().setRollbackOnly();
});
assertNoInvalidators(pendingPutsCache);
withTxSession(s -> {
Item item3 = s.load(Item.class, item.getId());
assertEquals("before-remove", item3.getName());
s.remove(item3);
});
assertNoInvalidators(pendingPutsCache);
}
protected void assertNoInvalidators(AdvancedCache<Object, Object> pendingPutsCache) throws Exception {
Method getInvalidators = null;
for (Map.Entry<Object, Object> entry : pendingPutsCache.entrySet()) {
if (getInvalidators == null) {
getInvalidators = entry.getValue().getClass().getMethod("getInvalidators");
getInvalidators.setAccessible(true);
}
Collection invalidators = (Collection) getInvalidators.invoke(entry.getValue());
if (invalidators != null) {
assertTrue("Invalidators on key " + entry.getKey() + ": " + invalidators, invalidators.isEmpty());
}
}
}
private static class HookInterceptor extends BaseCustomInterceptor {
Phaser phaser;
Thread thread;

View File

@ -1,65 +0,0 @@
package org.hibernate.test.cache.infinispan.util;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.TestAddress;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Supplier;
import static org.jgroups.util.Util.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class CacheCommandsInitializerTest {
private static CacheCommandInitializer initializer = new CacheCommandInitializer();
@BeforeClass
public static void setUp() {
ClusteringDependentLogic cdl = mock(ClusteringDependentLogic.class);
when(cdl.getAddress()).thenReturn(new TestAddress(0));
initializer.injectDependencies(null, null, cdl);
}
@Test
public void testBeginInvalidationCommand1() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{}, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand2() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 1 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testBeginInvalidationCommand3() {
BeginInvalidationCommand command = initializer.buildBeginInvalidationCommand(Collections.EMPTY_SET, new Object[]{ 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new BeginInvalidationCommand());
}
@Test
public void testEndInvalidationCommmand() {
EndInvalidationCommand command = initializer.buildEndInvalidationCommand("foo", new Object[] { 2, 3 }, UUID.randomUUID());
checkParameters(command, () -> new EndInvalidationCommand("foo"));
}
protected <T extends ReplicableCommand> void checkParameters(T command, Supplier<T> commandSupplier) {
Object[] parameters = command.getParameters();
ReplicableCommand newCommand = commandSupplier.get();
newCommand.setParameters(command.getCommandId(), parameters);
assertEquals(command, newCommand);
assertArrayEquals(parameters, newCommand.getParameters());
}
}