HHH-9868, HHH-9881 Implementation for non-transactional caches and non-JTA transactions.

This commit is contained in:
Radim Vansa 2015-07-27 18:00:30 +02:00 committed by Galder Zamarreño
parent fa7265ff0e
commit 1f24fa6354
21 changed files with 1007 additions and 185 deletions

View File

@ -36,6 +36,18 @@ def osgiDescription() {
return mavenPom.description return mavenPom.description
} }
classes.doLast {
javaexec {
classpath = project.sourceSets.main.runtimeClasspath
main = "org.infinispan.factories.components.ComponentMetadataPersister"
args = [
project.sourceSets.main.output.classesDir,
project.sourceSets.main.output.resourcesDir.toPath().resolve("hibernate-infinispan-component-metadata.dat").toString()
].toList()
standardOutput = { def f = File.createTempFile('metadata-log', null ); f.deleteOnExit(); f.newOutputStream() }()
}
}
test { test {
systemProperties['java.net.preferIPv4Stack'] = true systemProperties['java.net.preferIPv4Stack'] = true
systemProperties['jgroups.ping.timeout'] = 500 systemProperties['jgroups.ping.timeout'] = 500

View File

@ -34,6 +34,7 @@ import org.hibernate.cache.infinispan.timestamp.TimestampsRegionImpl;
import org.hibernate.cache.infinispan.tm.HibernateTransactionManagerLookup; import org.hibernate.cache.infinispan.tm.HibernateTransactionManagerLookup;
import org.hibernate.cache.infinispan.util.CacheCommandFactory; import org.hibernate.cache.infinispan.util.CacheCommandFactory;
import org.hibernate.cache.infinispan.util.Caches; import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.Externalizers;
import org.hibernate.cache.internal.DefaultCacheKeysFactory; import org.hibernate.cache.internal.DefaultCacheKeysFactory;
import org.hibernate.cache.internal.SimpleCacheKeysFactory; import org.hibernate.cache.internal.SimpleCacheKeysFactory;
import org.hibernate.cache.spi.CacheDataDescription; import org.hibernate.cache.spi.CacheDataDescription;
@ -475,6 +476,7 @@ public class InfinispanRegionFactory implements RegionFactory {
.globalJmxStatistics() .globalJmxStatistics()
.enabled( Boolean.parseBoolean( globalStats ) ); .enabled( Boolean.parseBoolean( globalStats ) );
} }
holder.getGlobalConfigurationBuilder().serialization().addAdvancedExternalizer(Externalizers.ALL_EXTERNALIZERS);
return createCacheManager( holder ); return createCacheManager( holder );
} }

View File

@ -0,0 +1,190 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.InfinispanCollections;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
import org.infinispan.jmx.JmxStatisticsExposer;
import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.jmx.annotations.Parameter;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.concurrent.atomic.AtomicLong;
/**
* This interceptor should completely replace default InvalidationInterceptor.
* We need to send custom invalidation commands with transaction identifier (as the invalidation)
* since we have to do a two-phase invalidation (releasing the locks as JTA synchronization),
* although the cache itself is non-transactional.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
* @author Mircea.Markus@jboss.com
* @author Galder Zamarreño
*/
@MBean(objectName = "Invalidation", description = "Component responsible for invalidating entries on remote caches when entries are written to locally.")
public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements JmxStatisticsExposer {
private final AtomicLong invalidations = new AtomicLong(0);
private final PutFromLoadValidator putFromLoadValidator;
private CommandsFactory commandsFactory;
private CacheCommandInitializer commandInitializer;
private boolean statisticsEnabled;
private static final Log log = LogFactory.getLog(InvalidationInterceptor.class);
public NonTxInvalidationInterceptor(PutFromLoadValidator putFromLoadValidator) {
this.putFromLoadValidator = putFromLoadValidator;
}
@Override
protected Log getLog() {
return log;
}
@Inject
public void injectDependencies(CommandsFactory commandsFactory, CacheCommandInitializer commandInitializer) {
this.commandsFactory = commandsFactory;
this.commandInitializer = commandInitializer;
}
@Start
private void start() {
this.setStatisticsEnabled(cacheConfiguration.jmxStatistics().enabled());
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, command.getKey());
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey());
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey());
}
@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
if (!isLocalModeForced(command)) {
// just broadcast the clear command - this is simplest!
if (ctx.isOriginLocal()) {
rpcManager.invokeRemotely(null, command, rpcManager.getDefaultRpcOptions(defaultSynchronous));
}
}
return retval;
}
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
return handleInvalidate(ctx, command, keys);
}
private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && !ctx.isInTxScope()) {
if (keys != null && keys.length != 0) {
if (!isLocalModeForced(command)) {
invalidateAcrossCluster(isSynchronous(command), keys, ctx);
}
}
}
return retval;
}
private void invalidateAcrossCluster(boolean synchronous, Object[] keys, InvocationContext ctx) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand invalidateCommand;
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys);
if (lockOwner == null) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
}
else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
InfinispanCollections.<Flag>emptySet(), keys, lockOwner);
}
if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
}
rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(synchronous));
}
private void incrementInvalidations() {
if (statisticsEnabled) {
invalidations.incrementAndGet();
}
}
private boolean isPutForExternalRead(FlagAffectedCommand command) {
if (command.hasFlag(Flag.PUT_FOR_EXTERNAL_READ)) {
log.trace("Put for external read called. Suppressing clustered invalidation.");
return true;
}
return false;
}
@ManagedOperation(
description = "Resets statistics gathered by this component",
displayName = "Reset statistics"
)
public void resetStatistics() {
invalidations.set(0);
}
@ManagedAttribute(
displayName = "Statistics enabled",
description = "Enables or disables the gathering of statistics by this component",
dataType = DataType.TRAIT,
writable = true
)
public boolean getStatisticsEnabled() {
return this.statisticsEnabled;
}
public void setStatisticsEnabled(@Parameter(name = "enabled", description = "Whether statistics should be enabled or disabled (true/false)") boolean enabled) {
this.statisticsEnabled = enabled;
}
@ManagedAttribute(
description = "Number of invalidations",
displayName = "Number of invalidations",
measurementType = MeasurementType.TRENDSUP
)
public long getInvalidations() {
return invalidations.get();
}
}

View File

@ -0,0 +1,60 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.BeginInvalidationCommand;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
/**
* Non-transactional counterpart of {@link TxPutFromLoadInterceptor}.
* Invokes {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} for each invalidation from
* remote node ({@link BeginInvalidationCommand} and sends {@link EndInvalidationCommand} after the transaction
* is complete, with help of {@link Synchronization};
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
private final String cacheName;
private final PutFromLoadValidator putFromLoadValidator;
private CacheCommandInitializer commandInitializer;
private RpcManager rpcManager;
public NonTxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) {
this.putFromLoadValidator = putFromLoadValidator;
this.cacheName = cacheName;
}
@Inject
public void injectDependencies(CacheCommandInitializer commandInitializer, RpcManager rpcManager) {
this.commandInitializer = commandInitializer;
this.rpcManager = rpcManager;
}
@Override
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ((BeginInvalidationCommand) command).getLockOwner());
}
}
return invokeNextInterceptor(ctx, command);
}
public void broadcastEndInvalidationCommand(Object[] keys, Object lockOwner) {
assert lockOwner != null;
EndInvalidationCommand endInvalidationCommand = commandInitializer.buildEndInvalidationCommand(
cacheName, keys, lockOwner);
rpcManager.invokeRemotely(null, endInvalidationCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
}
}

View File

@ -6,15 +6,20 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import javax.transaction.RollbackException;
import javax.transaction.Status; import javax.transaction.Status;
import javax.transaction.SystemException; import javax.transaction.SystemException;
import javax.transaction.Transaction; import javax.transaction.Transaction;
import javax.transaction.TransactionManager; import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -22,30 +27,23 @@ import java.util.concurrent.locks.ReentrantLock;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.hibernate.cache.spi.RegionFactory; import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.EntryWrappingInterceptor; import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.base.BaseRpcInterceptor; import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor; import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.concurrent.ConcurrentHashSet; import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
/** /**
* Encapsulates logic to allow a {@link TransactionalAccessDelegate} to determine * Encapsulates logic to allow a {@link TransactionalAccessDelegate} to determine
* whether a {@link TransactionalAccessDelegate#putFromLoad(Object, Object, long, Object, boolean)} * whether a {@link TransactionalAccessDelegate#putFromLoad(org.hibernate.engine.spi.SessionImplementor, Object, Object, long, Object, boolean)}
* call should be allowed to update the cache. A <code>putFromLoad</code> has * call should be allowed to update the cache. A <code>putFromLoad</code> has
* the potential to store stale data, since the data may have been removed from the * the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database * database and the cache between the time when the data was read from the database
@ -117,6 +115,11 @@ public class PutFromLoadValidator {
*/ */
private final AdvancedCache cache; private final AdvancedCache cache;
/**
* Injected interceptor
*/
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
/** /**
* The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after * The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after
* this timestamp are denied. * this timestamp are denied.
@ -133,6 +136,8 @@ public class PutFromLoadValidator {
*/ */
private final ConcurrentHashSet<Transaction> regionInvalidators = new ConcurrentHashSet<Transaction>(); private final ConcurrentHashSet<Transaction> regionInvalidators = new ConcurrentHashSet<Transaction>();
private final ThreadLocal<SessionImplementor> currentSession = new ThreadLocal<SessionImplementor>();
/** /**
* Creates a new put from load validator instance. * Creates a new put from load validator instance.
@ -171,19 +176,52 @@ public class PutFromLoadValidator {
// Since we need to intercept both invalidations of entries that are in the cache and those // Since we need to intercept both invalidations of entries that are in the cache and those
// that are not, we need to use custom interceptor, not listeners (which fire only for present entries). // that are not, we need to use custom interceptor, not listeners (which fire only for present entries).
NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor = null;
if (cacheConfiguration.clustering().cacheMode().isClustered()) { if (cacheConfiguration.clustering().cacheMode().isClustered()) {
RpcManager rpcManager = cache.getComponentRegistry().getComponent(RpcManager.class); List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class); log.debug("Interceptor chain was: " + interceptorChain);
int position = 0;
// add interceptor before uses exact match, not instanceof match
int invalidationPosition = 0;
int entryWrappingPosition = 0;
for (CommandInterceptor ci : interceptorChain) {
if (ci instanceof InvalidationInterceptor) {
invalidationPosition = position;
}
if (ci instanceof EntryWrappingInterceptor) {
entryWrappingPosition = position;
}
position++;
}
boolean transactional = cache.getCacheConfiguration().transaction().transactionMode().isTransactional();
if (transactional) {
// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before // Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation // wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
// would not commit the entry removal (as during wrap the entry was not in cache) // would not commit the entry removal (as during wrap the entry was not in cache)
cache.addInterceptorBefore(new PutFromLoadInterceptor(cache.getName(), rpcManager, cacheCommandInitializer), EntryWrappingInterceptor.class); TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(this, cache.getName());
cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class);
cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition);
}
else {
cache.removeInterceptor(invalidationPosition);
NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(this);
cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class);
cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition);
nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(this, cache.getName());
cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class);
cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition);
}
log.debug("New interceptor chain is: " + cache.getInterceptorChain());
CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this); cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this);
} }
this.cache = cache; this.cache = cache;
this.pendingPuts = cacheManager.getCache(pendingPutsName); this.pendingPuts = cacheManager.getCache(pendingPutsName);
this.transactionManager = tm; this.transactionManager = tm;
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
} }
/** /**
@ -193,11 +231,14 @@ public class PutFromLoadValidator {
* @param cache * @param cache
*/ */
public static void removeFromCache(AdvancedCache cache) { public static void removeFromCache(AdvancedCache cache) {
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain(); cache.removeInterceptor(TxPutFromLoadInterceptor.class);
int index = 0; cache.removeInterceptor(NonTxPutFromLoadInterceptor.class);
for (; index < interceptorChain.size(); ++index) { for (Object i : cache.getInterceptorChain()) {
if (interceptorChain.get(index).getClass().getName().startsWith(PutFromLoadValidator.class.getName())) { if (i instanceof NonTxInvalidationInterceptor) {
cache.removeInterceptor(index); InvalidationInterceptor invalidationInterceptor = new InvalidationInterceptor();
cache.getComponentRegistry().registerComponent(invalidationInterceptor, InvalidationInterceptor.class);
cache.addInterceptorBefore(invalidationInterceptor, NonTxInvalidationInterceptor.class);
cache.removeInterceptor(NonTxInvalidationInterceptor.class);
break; break;
} }
} }
@ -205,7 +246,18 @@ public class PutFromLoadValidator {
cci.removePutFromLoadValidator(cache.getName()); cci.removePutFromLoadValidator(cache.getName());
} }
// ----------------------------------------------------------------- Public public void setCurrentSession(SessionImplementor session) {
// we register synchronizations directly on JTA transactions, let's make this noop with TM
if (transactionManager == null) {
currentSession.set(session);
}
}
public void resetCurrentSession() {
if (transactionManager == null) {
currentSession.remove();
}
}
/** /**
* Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)} * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)}
@ -232,7 +284,6 @@ public class PutFromLoadValidator {
if (trace) { if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d)", cache.getName(), key, txTimestamp); log.tracef("acquirePutFromLoadLock(%s#%s, %d)", cache.getName(), key, txTimestamp);
} }
boolean valid = false;
boolean locked = false; boolean locked = false;
PendingPutMap pending = pendingPuts.get( key ); PendingPutMap pending = pendingPuts.get( key );
@ -241,8 +292,20 @@ public class PutFromLoadValidator {
if (pending != null) { if (pending != null) {
locked = pending.acquireLock(100, TimeUnit.MILLISECONDS); locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
if (locked) { if (locked) {
boolean valid = false;
try { try {
final PendingPut toCancel = pending.remove(getOwnerForPut()); if (pending.isRemoved()) {
// this deals with a race between retrieving the map from cache vs. removing that
// and locking the map
pending.releaseLock();
locked = false;
pending = null;
if (trace) {
log.tracef("Record removed when waiting for the lock.");
}
continue;
}
final PendingPut toCancel = pending.remove(getLocalLockOwner());
if (toCancel != null) { if (toCancel != null) {
valid = !toCancel.completed; valid = !toCancel.completed;
toCancel.completed = true; toCancel.completed = true;
@ -252,20 +315,25 @@ public class PutFromLoadValidator {
if (pending.hasInvalidator()) { if (pending.hasInvalidator()) {
valid = false; valid = false;
} }
else { // we need this check since registerPendingPut (creating new pp) can get between invalidation
// and naked put caused by the invalidation
else if (pending.lastInvalidationEnd != Long.MIN_VALUE) {
// if this transaction started after last invalidation we can continue // if this transaction started after last invalidation we can continue
valid = txTimestamp > pending.lastInvalidationEnd; valid = txTimestamp > pending.lastInvalidationEnd;
} }
else {
valid = txTimestamp > regionInvalidationTimestamp;
}
} }
return valid ? pending : null; return valid ? pending : null;
} }
finally { finally {
if (!valid) { if (!valid && pending != null) {
pending.releaseLock(); pending.releaseLock();
locked = false; locked = false;
} }
if (trace) { if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d) ended with %s", cache.getName(), key, txTimestamp, pending); log.tracef("acquirePutFromLoadLock(%s#%s, %d) ended with %s, valid: %s", cache.getName(), key, txTimestamp, pending, valid);
} }
} }
} }
@ -278,14 +346,20 @@ public class PutFromLoadValidator {
} }
} }
else { else {
long regionInvalidationTimestamp = this.regionInvalidationTimestamp;
if (txTimestamp <= regionInvalidationTimestamp) { if (txTimestamp <= regionInvalidationTimestamp) {
if (trace) { if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed due to invalidated region", cache.getName(), key, txTimestamp); log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed due to region invalidated at %d", cache.getName(), key, txTimestamp, regionInvalidationTimestamp);
} }
return null; return null;
} }
else {
if (trace) {
log.tracef("Region invalidated at %d, this transaction started at %d", regionInvalidationTimestamp, txTimestamp);
}
}
PendingPut pendingPut = new PendingPut(getOwnerForPut()); PendingPut pendingPut = new PendingPut(getLocalLockOwner());
pending = new PendingPutMap(pendingPut); pending = new PendingPutMap(pendingPut);
PendingPutMap existing = pendingPuts.putIfAbsent(key, pending); PendingPutMap existing = pendingPuts.putIfAbsent(key, pending);
if (existing != null) { if (existing != null) {
@ -325,6 +399,7 @@ public class PutFromLoadValidator {
final PendingPutMap pending = (PendingPutMap) lock; final PendingPutMap pending = (PendingPutMap) lock;
if ( pending != null ) { if ( pending != null ) {
if ( pending.canRemove() ) { if ( pending.canRemove() ) {
pending.setRemoved();
pendingPuts.remove( key, pending ); pendingPuts.remove( key, pending );
} }
pending.releaseLock(); pending.releaseLock();
@ -386,6 +461,9 @@ public class PutFromLoadValidator {
try { try {
// Acquire the lock for each entry to ensure any ongoing // Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return // work associated with it is completed before we return
// We cannot erase the map: if there was ongoing invalidation and we removed it, registerPendingPut
// started after that would have no way of finding out that the entity *is* invalidated (it was
// removed from the cache and now the DB is about to be updated).
for (Iterator<PendingPutMap> it = pendingPuts.values().iterator(); it.hasNext(); ) { for (Iterator<PendingPutMap> it = pendingPuts.values().iterator(); it.hasNext(); ) {
PendingPutMap entry = it.next(); PendingPutMap entry = it.next();
if (entry.acquireLock(60, TimeUnit.SECONDS)) { if (entry.acquireLock(60, TimeUnit.SECONDS)) {
@ -395,7 +473,6 @@ public class PutFromLoadValidator {
finally { finally {
entry.releaseLock(); entry.releaseLock();
} }
it.remove();
} }
else { else {
ok = false; ok = false;
@ -415,10 +492,15 @@ public class PutFromLoadValidator {
synchronized (this) { synchronized (this) {
if (--regionInvalidations == 0) { if (--regionInvalidations == 0) {
regionInvalidationTimestamp = System.currentTimeMillis(); regionInvalidationTimestamp = System.currentTimeMillis();
}
}
if (trace) { if (trace) {
log.trace("Finished invalidating region " + cache.getName()); log.tracef("Finished invalidating region %s at %d", cache.getName(), regionInvalidationTimestamp);
}
}
else {
if (trace) {
log.tracef("Finished invalidating region %s, but there are %d ongoing invalidations", cache.getName(), regionInvalidations);
}
}
} }
} }
@ -461,14 +543,21 @@ public class PutFromLoadValidator {
} }
} }
final PendingPut pendingPut = new PendingPut( getOwnerForPut() ); final PendingPut pendingPut = new PendingPut( getLocalLockOwner() );
final PendingPutMap pendingForKey = new PendingPutMap( pendingPut ); final PendingPutMap pendingForKey = new PendingPutMap( pendingPut );
final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey ); for (;;) {
if ( existing != null ) { final PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) { if (existing != null) {
if (existing.acquireLock(10, TimeUnit.SECONDS)) {
try { try {
if ( !existing.hasInvalidator() ) { if (existing.isRemoved()) {
if (trace) {
log.tracef("Record removed when waiting for the lock.");
}
continue;
}
if (!existing.hasInvalidator()) {
existing.put(pendingPut); existing.put(pendingPut);
} }
} }
@ -491,6 +580,8 @@ public class PutFromLoadValidator {
log.tracef("registerPendingPut(%s#%s, %d) registered using putIfAbsent: %s", cache.getName(), key, txTimestamp, pendingForKey); log.tracef("registerPendingPut(%s#%s, %d) registered using putIfAbsent: %s", cache.getName(), key, txTimestamp, pendingForKey);
} }
} }
return;
}
} }
/** /**
@ -499,7 +590,7 @@ public class PutFromLoadValidator {
* @return * @return
*/ */
public boolean beginInvalidatingKey(Object key) { public boolean beginInvalidatingKey(Object key) {
return beginInvalidatingKey(key, getOwnerForPut()); return beginInvalidatingKey(key, getLocalLockOwner());
} }
/** /**
@ -517,6 +608,7 @@ public class PutFromLoadValidator {
* caller should treat as an exception condition) * caller should treat as an exception condition)
*/ */
public boolean beginInvalidatingKey(Object key, Object lockOwner) { public boolean beginInvalidatingKey(Object key, Object lockOwner) {
for (;;) {
PendingPutMap pending = new PendingPutMap(null); PendingPutMap pending = new PendingPutMap(null);
PendingPutMap prev = pendingPuts.putIfAbsent(key, pending); PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
if (prev != null) { if (prev != null) {
@ -524,6 +616,12 @@ public class PutFromLoadValidator {
} }
if (pending.acquireLock(60, TimeUnit.SECONDS)) { if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try { try {
if (pending.isRemoved()) {
if (trace) {
log.tracef("Record removed when waiting for the lock.");
}
continue;
}
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
pending.invalidate(now, expirationPeriod); pending.invalidate(now, expirationPeriod);
pending.addInvalidator(lockOwner, now, expirationPeriod); pending.addInvalidator(lockOwner, now, expirationPeriod);
@ -541,6 +639,7 @@ public class PutFromLoadValidator {
return false; return false;
} }
} }
}
/** /**
* Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread. * Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
@ -548,7 +647,7 @@ public class PutFromLoadValidator {
* @return * @return
*/ */
public boolean endInvalidatingKey(Object key) { public boolean endInvalidatingKey(Object key) {
return endInvalidatingKey(key, getOwnerForPut()); return endInvalidatingKey(key, getLocalLockOwner());
} }
/** /**
@ -590,10 +689,49 @@ public class PutFromLoadValidator {
} }
} }
public Object registerRemoteInvalidations(Object[] keys) {
Transaction tx = null;
try {
if ( transactionManager != null ) {
tx = transactionManager.getTransaction();
}
}
catch (SystemException se) {
throw new CacheException( "Could not obtain transaction", se );
}
if (tx != null) {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", tx, cache.getName(), Arrays.toString(keys));
}
try {
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
tx.registerSynchronization(sync);
return sync.uuid;
}
catch (SystemException se) {
throw new CacheException("Cannot register synchronization", se);
}
catch (RollbackException e) {
return null;
}
}
SessionImplementor session = currentSession.get();
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", session, cache.getName(), Arrays.toString(keys));
}
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
return sync.uuid;
}
// evict() command is not executed in session context
return null;
}
// ---------------------------------------------------------------- Private // ---------------------------------------------------------------- Private
private Object getOwnerForPut() { private Object getLocalLockOwner() {
Transaction tx = null; Transaction tx = null;
try { try {
if ( transactionManager != null ) { if ( transactionManager != null ) {
@ -620,6 +758,7 @@ public class PutFromLoadValidator {
private Invalidator singleInvalidator; private Invalidator singleInvalidator;
private Map<Object, Invalidator> invalidators; private Map<Object, Invalidator> invalidators;
private long lastInvalidationEnd = Long.MIN_VALUE; private long lastInvalidationEnd = Long.MIN_VALUE;
private boolean removed = false;
PendingPutMap(PendingPut singleItem) { PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem; this.singlePendingPut = singleItem;
@ -661,7 +800,7 @@ public class PutFromLoadValidator {
else { else {
sb.append(lastInvalidationEnd); sb.append(lastInvalidationEnd);
} }
return sb.append("}").toString(); return sb.append(", Removed=").append(removed).append("}").toString();
} }
finally { finally {
lock.unlock(); lock.unlock();
@ -774,6 +913,25 @@ public class PutFromLoadValidator {
return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty()); return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty());
} }
// Debug introspection method, do not use in production code!
public Collection<Invalidator> getInvalidators() {
lock.lock();
try {
if (singleInvalidator != null) {
return Collections.singleton(singleInvalidator);
}
else if (invalidators != null) {
return new ArrayList<Invalidator>(invalidators.values());
}
else {
return Collections.EMPTY_LIST;
}
}
finally {
lock.unlock();
}
}
public void removeInvalidator(Object owner, long now) { public void removeInvalidator(Object owner, long now) {
if (invalidators == null) { if (invalidators == null) {
if (singleInvalidator != null && singleInvalidator.owner.equals(owner)) { if (singleInvalidator != null && singleInvalidator.owner.equals(owner)) {
@ -789,6 +947,14 @@ public class PutFromLoadValidator {
public boolean canRemove() { public boolean canRemove() {
return size() == 0 && !hasInvalidator() && lastInvalidationEnd == Long.MIN_VALUE; return size() == 0 && !hasInvalidator() && lastInvalidationEnd == Long.MIN_VALUE;
} }
public void setRemoved() {
removed = true;
}
public boolean isRemoved() {
return removed;
}
} }
private static class PendingPut { private static class PendingPut {
@ -835,57 +1001,4 @@ public class PutFromLoadValidator {
return sb.toString(); return sb.toString();
} }
} }
private class PutFromLoadInterceptor extends BaseRpcInterceptor {
private final String cacheName;
private final RpcManager rpcManager;
private final CacheCommandInitializer cacheCommandInitializer;
public PutFromLoadInterceptor(String cacheName, RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer) {
this.cacheName = cacheName;
this.rpcManager = rpcManager;
this.cacheCommandInitializer = cacheCommandInitializer;
}
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
if (!ctx.isOriginLocal()) {
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
beginInvalidatingKey(key, ctx.getLockOwner());
}
}
else {
for (Object key : wc.getAffectedKeys()) {
beginInvalidatingKey(key, ctx.getLockOwner());
}
}
}
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
try {
if (ctx.isOriginLocal()) {
// send async Commit
Set<Object> affectedKeys = ctx.getAffectedKeys();
if (!affectedKeys.isEmpty()) {
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction());
rpcManager.invokeRemotely(null, commitCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
}
}
}
finally {
return invokeNextInterceptor(ctx, command);
}
}
}
} }

View File

@ -0,0 +1,33 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import java.util.UUID;
/**
* Synchronization that should release the locks after invalidation is complete.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class Synchronization implements javax.transaction.Synchronization {
public final UUID uuid = UUID.randomUUID();
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private final Object[] keys;
public Synchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
this.keys = keys;
}
@Override
public void beforeCompletion() {}
@Override
public void afterCompletion(int status) {
nonTxPutFromLoadInterceptor.broadcastEndInvalidationCommand(keys, uuid);
}
}

View File

@ -9,6 +9,8 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion; import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.Caches; import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
@ -69,20 +71,22 @@ public class TransactionalAccessDelegate {
/** /**
* Attempt to cache an object, after loading from the database. * Attempt to cache an object, after loading from the database.
* *
* @param session Current session
* @param key The item key * @param key The item key
* @param value The item * @param value The item
* @param txTimestamp a timestamp prior to the transaction start time * @param txTimestamp a timestamp prior to the transaction start time
* @param version the item version number * @param version the item version number
* @return <tt>true</tt> if the object was successfully cached * @return <tt>true</tt> if the object was successfully cached
*/ */
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) { public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
return putFromLoad( key, value, txTimestamp, version, false ); return putFromLoad(session, key, value, txTimestamp, version, false );
} }
/** /**
* Attempt to cache an object, after loading from the database, explicitly * Attempt to cache an object, after loading from the database, explicitly
* specifying the minimalPut behavior. * specifying the minimalPut behavior.
* *
* @param session Current session
* @param key The item key * @param key The item key
* @param value The item * @param value The item
* @param txTimestamp a timestamp prior to the transaction start time * @param txTimestamp a timestamp prior to the transaction start time
@ -92,7 +96,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if storing the object failed * @throws CacheException if storing the object failed
*/ */
@SuppressWarnings("UnusedParameters") @SuppressWarnings("UnusedParameters")
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
if ( !region.checkValid() ) { if ( !region.checkValid() ) {
if ( TRACE_ENABLED ) { if ( TRACE_ENABLED ) {
@ -118,6 +122,7 @@ public class TransactionalAccessDelegate {
return false; return false;
} }
putValidator.setCurrentSession(session);
try { try {
// Conditional put/putForExternalRead. If the region has been // Conditional put/putForExternalRead. If the region has been
// evicted in the current transaction, do a put instead of a // evicted in the current transaction, do a put instead of a
@ -131,6 +136,7 @@ public class TransactionalAccessDelegate {
} }
} }
finally { finally {
putValidator.resetCurrentSession();
putValidator.releasePutFromLoadLock( key, lock); putValidator.releasePutFromLoadLock( key, lock);
} }
@ -141,6 +147,7 @@ public class TransactionalAccessDelegate {
* Called after an item has been inserted (before the transaction completes), * Called after an item has been inserted (before the transaction completes),
* instead of calling evict(). * instead of calling evict().
* *
* @param session Current session
* @param key The item key * @param key The item key
* @param value The item * @param value The item
* @param version The item's version value * @param version The item's version value
@ -148,12 +155,26 @@ public class TransactionalAccessDelegate {
* @throws CacheException if the insert fails * @throws CacheException if the insert fails
*/ */
@SuppressWarnings("UnusedParameters") @SuppressWarnings("UnusedParameters")
public boolean insert(Object key, Object value, Object version) throws CacheException { public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
if ( !region.checkValid() ) { if ( !region.checkValid() ) {
return false; return false;
} }
writeCache.put( key, value ); // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true; return true;
} }
@ -161,6 +182,7 @@ public class TransactionalAccessDelegate {
* Called after an item has been updated (before the transaction completes), * Called after an item has been updated (before the transaction completes),
* instead of calling evict(). * instead of calling evict().
* *
* @param session Current session
* @param key The item key * @param key The item key
* @param value The item * @param value The item
* @param currentVersion The item's current version value * @param currentVersion The item's current version value
@ -169,31 +191,53 @@ public class TransactionalAccessDelegate {
* @throws CacheException if the update fails * @throws CacheException if the update fails
*/ */
@SuppressWarnings("UnusedParameters") @SuppressWarnings("UnusedParameters")
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
// We update whether or not the region is valid. Other nodes // We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
// be informed of the change. // be informed of the change.
writeCache.put( key, value );
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true; return true;
} }
/** /**
* Called after an item has become stale (before the transaction completes). * Called after an item has become stale (before the transaction completes).
* *
* @param session Current session
* @param key The key of the item to remove * @param key The key of the item to remove
* @throws CacheException if removing the cached item fails * @throws CacheException if removing the cached item fails
*/ */
public void remove(Object key) throws CacheException { public void remove(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.beginInvalidatingKey(key)) { if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException( throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName() "Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
); );
} }
putValidator.setCurrentSession(session);
try {
// We update whether or not the region is valid. Other nodes // We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
// be informed of the change. // be informed of the change.
writeCache.remove( key ); writeCache.remove(key);
}
finally {
putValidator.resetCurrentSession();
}
} }
/** /**
@ -221,11 +265,6 @@ public class TransactionalAccessDelegate {
* @throws CacheException if evicting the item fails * @throws CacheException if evicting the item fails
*/ */
public void evict(Object key) throws CacheException { public void evict(Object key) throws CacheException {
if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
writeCache.remove( key ); writeCache.remove( key );
} }
@ -262,7 +301,49 @@ public class TransactionalAccessDelegate {
if ( !putValidator.endInvalidatingKey(key) ) { if ( !putValidator.endInvalidatingKey(key) ) {
// TODO: localization // TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region " log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached in the future."); + region.getName() + "; the key won't be cached until invalidation expires.");
} }
} }
/**
* Called after an item has been inserted (after the transaction completes),
* instead of calling release().
* This method is used by "asynchronous" concurrency strategies.
*
* @param key The item key
* @param value The item
* @param version The item's version value
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException Propagated from underlying {@link org.hibernate.cache.spi.Region}
*/
public boolean afterInsert(Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
}
return false;
}
/**
* Called after an item has been updated (after the transaction completes),
* instead of calling release(). This method is used by "asynchronous"
* concurrency strategies.
*
* @param key The item key
* @param value The item
* @param currentVersion The item's current version value
* @param previousVersion The item's previous version value
* @param lock The lock previously obtained from {@link #lockItem}
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException Propagated from underlying {@link org.hibernate.cache.spi.Region}
*/
public boolean afterUpdate(Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
}
return false;
}
} }

View File

@ -0,0 +1,126 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.DataContainer;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Set;
/**
* Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)}
* before locks are acquired (and the entry is invalidated) and sends {@link EndInvalidationCommand} to release
* invalidation throught {@link PutFromLoadValidator#endInvalidatingKey(Object, Object)} after the transaction
* is committed.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
private final static Log log = LogFactory.getLog(TxPutFromLoadInterceptor.class);
private PutFromLoadValidator putFromLoadValidator;
private final String cacheName;
private RpcManager rpcManager;
private CacheCommandInitializer cacheCommandInitializer;
private DataContainer dataContainer;
public TxPutFromLoadInterceptor(PutFromLoadValidator putFromLoadValidator, String cacheName) {
this.putFromLoadValidator = putFromLoadValidator;
this.cacheName = cacheName;
}
@Inject
public void injectDependencies(RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer, DataContainer dataContainer) {
this.rpcManager = rpcManager;
this.cacheCommandInitializer = cacheCommandInitializer;
this.dataContainer = dataContainer;
}
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
if (ctx.isOriginLocal()) {
// We can't wait to commit phase to remove the entry locally (invalidations are processed in 1pc
// on remote nodes, so only local case matters here). The problem is that while the entry is locked
// reads still can take place and we can read outdated collection after reading updated entity
// owning this collection from DB; when this happens, the version lock on entity cannot protect
// us against concurrent modification of the collection. Therefore, we need to remove the entry
// here (even without lock!) and let possible update happen in commit phase.
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
dataContainer.remove(key);
}
}
else {
for (Object key : wc.getAffectedKeys()) {
dataContainer.remove(key);
}
}
}
}
else {
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ctx.getLockOwner());
}
}
else {
for (Object key : wc.getAffectedKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ctx.getLockOwner());
}
}
}
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
return endInvalidationAndInvokeNextInterceptor(ctx, command);
}
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
return endInvalidationAndInvokeNextInterceptor(ctx, command);
}
protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext ctx, VisitableCommand command) throws Throwable {
try {
if (ctx.isOriginLocal()) {
// send async Commit
Set<Object> affectedKeys = ctx.getAffectedKeys();
if (!affectedKeys.isEmpty()) {
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction());
rpcManager.invokeRemotely(null, commitCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
}
}
}
finally {
return invokeNextInterceptor(ctx, command);
}
}
}

View File

@ -46,16 +46,16 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
} }
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException { public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version ); return delegate.putFromLoad( session, key, value, txTimestamp, version );
} }
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version, minimalPutOverride ); return delegate.putFromLoad( session, key, value, txTimestamp, version, minimalPutOverride );
} }
public void remove(SessionImplementor session, Object key) throws CacheException { public void remove(SessionImplementor session, Object key) throws CacheException {
delegate.remove( key ); delegate.remove( session, key );
} }
public void removeAll() throws CacheException { public void removeAll() throws CacheException {

View File

@ -50,20 +50,20 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
} }
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException { public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
return delegate.insert( key, value, version ); return delegate.insert( session, key, value, version );
} }
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException { public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version ); return delegate.putFromLoad( session, key, value, txTimestamp, version );
} }
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version, minimalPutOverride ); return delegate.putFromLoad( session, key, value, txTimestamp, version, minimalPutOverride );
} }
public void remove(SessionImplementor session, Object key) throws CacheException { public void remove(SessionImplementor session, Object key) throws CacheException {
delegate.remove( key ); delegate.remove ( session, key );
} }
public void removeAll() throws CacheException { public void removeAll() throws CacheException {
@ -72,7 +72,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
return delegate.update( key, value, currentVersion, previousVersion ); return delegate.update( session, key, value, currentVersion, previousVersion );
} }
public SoftLock lockItem(SessionImplementor session, Object key, Object version) throws CacheException { public SoftLock lockItem(SessionImplementor session, Object key, Object version) throws CacheException {
@ -91,12 +91,12 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
} }
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) throws CacheException { public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
return false; return delegate.afterInsert(key, value, version);
} }
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock)
throws CacheException { throws CacheException {
return false; return delegate.afterUpdate(key, value, currentVersion, previousVersion, lock);
} }
@Override @Override

View File

@ -149,9 +149,11 @@ public abstract class BaseRegion implements Region {
synchronized (invalidationMutex) { synchronized (invalidationMutex) {
if ( invalidateState.compareAndSet( InvalidateState.INVALID, InvalidateState.CLEARING ) ) { if ( invalidateState.compareAndSet( InvalidateState.INVALID, InvalidateState.CLEARING ) ) {
try { try {
// Even if no transactions are running, a new transaction // If we're running inside a transaction, we need to remove elements one-by-one
// needs to be started to do clear the region // to clean the context as well (cache.clear() does not do that).
// (without forcing autoCommit cache configuration). // When we don't have transaction, we can do a clear operation (since we don't
// case about context) and can't do the one-by-one remove: remove() on tx cache
// requires transactional context.
Transaction tx = getCurrentTransaction(); Transaction tx = getCurrentTransaction();
if ( tx != null ) { if ( tx != null ) {
log.tracef( "Transaction, clearing one element at the time" ); log.tracef( "Transaction, clearing one element at the time" );

View File

@ -28,12 +28,12 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override @Override
public boolean insert(SessionImplementor session, Object key, Object value) throws CacheException { public boolean insert(SessionImplementor session, Object key, Object value) throws CacheException {
return delegate.insert( key, value, null ); return delegate.insert( session, key, value, null );
} }
@Override @Override
public boolean update(SessionImplementor session, Object key, Object value) throws CacheException { public boolean update(SessionImplementor session, Object key, Object value) throws CacheException {
return delegate.update( key, value, null, null ); return delegate.update( session, key, value, null, null );
} }
@Override @Override
@ -58,18 +58,18 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override @Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException { public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version ); return delegate.putFromLoad( session, key, value, txTimestamp, version );
} }
@Override @Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
return delegate.putFromLoad( key, value, txTimestamp, version, minimalPutOverride ); return delegate.putFromLoad( session, key, value, txTimestamp, version, minimalPutOverride );
} }
@Override @Override
public void remove(SessionImplementor session, Object key) throws CacheException { public void remove(SessionImplementor session, Object key) throws CacheException {
delegate.remove( key ); delegate.remove( session, key );
} }
@Override @Override
@ -98,12 +98,12 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override @Override
public boolean afterInsert(SessionImplementor session, Object key, Object value) throws CacheException { public boolean afterInsert(SessionImplementor session, Object key, Object value) throws CacheException {
return false; return delegate.afterInsert(key, value, null);
} }
@Override @Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, SoftLock lock) throws CacheException { public boolean afterUpdate(SessionImplementor session, Object key, Object value, SoftLock lock) throws CacheException {
return false; return delegate.afterUpdate(key, value, null, null, lock);
} }
@Override @Override

View File

@ -0,0 +1,95 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.internal.util.compare.EqualsHelper;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.context.Flag;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import java.util.Arrays;
import java.util.Set;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class BeginInvalidationCommand extends InvalidateCommand {
private Object lockOwner;
public BeginInvalidationCommand() {
}
public BeginInvalidationCommand(CacheNotifier notifier, Set<Flag> flags, Object[] keys, Object lockOwner) {
super(notifier, flags, keys);
this.lockOwner = lockOwner;
}
public Object getLockOwner() {
return lockOwner;
}
@Override
public Object[] getParameters() {
if (keys == null || keys.length == 0) {
return new Object[]{0, lockOwner};
}
if (keys.length == 1) {
return new Object[]{1, keys[0], lockOwner};
}
Object[] retval = new Object[keys.length + 2];
retval[0] = keys.length;
System.arraycopy(keys, 0, retval, 1, keys.length);
return retval;
}
@Override
public void setParameters(int commandId, Object[] args) {
if (commandId != CacheCommandIds.BEGIN_INVALIDATION) {
throw new IllegalStateException("Invalid method id");
}
int size = (Integer) args[0];
keys = new Object[size];
if (size == 1) {
keys[0] = args[1];
}
else if (size > 0) {
System.arraycopy(args, 1, keys, 0, size);
}
lockOwner = args[args.length - 1];
}
@Override
public byte getCommandId() {
return CacheCommandIds.BEGIN_INVALIDATION;
}
@Override
public boolean equals(Object o) {
if (!super.equals(o)) {
return false;
}
if (o instanceof BeginInvalidationCommand) {
BeginInvalidationCommand bic = (BeginInvalidationCommand) o;
return EqualsHelper.equals(lockOwner, bic.lockOwner);
}
else {
return false;
}
}
@Override
public int hashCode() {
return super.hashCode() + (lockOwner == null ? 0 : lockOwner.hashCode());
}
@Override
public String toString() {
return "BeginInvalidateCommand{keys=" + Arrays.toString(keys) +
", lockOwner=" + lockOwner + '}';
}
}

View File

@ -59,6 +59,7 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory {
final Map<Byte, Class<? extends ReplicableCommand>> map = new HashMap<Byte, Class<? extends ReplicableCommand>>( 3 ); final Map<Byte, Class<? extends ReplicableCommand>> map = new HashMap<Byte, Class<? extends ReplicableCommand>>( 3 );
map.put( CacheCommandIds.EVICT_ALL, EvictAllCommand.class ); map.put( CacheCommandIds.EVICT_ALL, EvictAllCommand.class );
map.put( CacheCommandIds.END_INVALIDATION, EndInvalidationCommand.class ); map.put( CacheCommandIds.END_INVALIDATION, EndInvalidationCommand.class );
map.put( CacheCommandIds.BEGIN_INVALIDATION, BeginInvalidationCommand.class );
return map; return map;
} }
@ -81,9 +82,16 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory {
@Override @Override
public ReplicableCommand fromStream(byte commandId, Object[] args) { public ReplicableCommand fromStream(byte commandId, Object[] args) {
// Should not be called while this factory only ReplicableCommand c;
// provides cache specific replicable commands. switch ( commandId ) {
return null; case CacheCommandIds.BEGIN_INVALIDATION:
c = new BeginInvalidationCommand();
break;
default:
throw new IllegalArgumentException( "Not registered to handle command id " + commandId );
}
c.setParameters( commandId, args );
return c;
} }
} }

View File

@ -16,10 +16,15 @@ public interface CacheCommandIds {
/** /**
* {@link EvictAllCommand} id * {@link EvictAllCommand} id
*/ */
public static final byte EVICT_ALL = 120; byte EVICT_ALL = 120;
/** /**
* {@link EndInvalidationCommand} id * {@link EndInvalidationCommand} id
*/ */
public static final byte END_INVALIDATION = 121; byte END_INVALIDATION = 121;
/**
* {@link BeginInvalidationCommand} id
*/
byte BEGIN_INVALIDATION = 122;
} }

View File

@ -9,7 +9,12 @@ package org.hibernate.cache.infinispan.util;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.module.ModuleCommandInitializer; import org.infinispan.commands.module.ModuleCommandInitializer;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -22,6 +27,14 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
private final ConcurrentHashMap<String, PutFromLoadValidator> putFromLoadValidators private final ConcurrentHashMap<String, PutFromLoadValidator> putFromLoadValidators
= new ConcurrentHashMap<String, PutFromLoadValidator>(); = new ConcurrentHashMap<String, PutFromLoadValidator>();
private CacheNotifier notifier;
private Configuration configuration;
@Inject
public void injectDependencies(CacheNotifier notifier, Configuration configuration) {
this.notifier = notifier;
this.configuration = configuration;
}
public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) { public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) {
// there could be two instances of PutFromLoadValidator bound to the same cache when // there could be two instances of PutFromLoadValidator bound to the same cache when
@ -49,6 +62,10 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
return new EvictAllCommand( regionName ); return new EvictAllCommand( regionName );
} }
public BeginInvalidationCommand buildBeginInvalidationCommand(Set<Flag> flags, Object[] keys, Object lockOwner) {
return new BeginInvalidationCommand(notifier, flags, keys, lockOwner);
}
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
return new EndInvalidationCommand( cacheName, keys, lockOwner ); return new EndInvalidationCommand( cacheName, keys, lockOwner );
} }
@ -60,6 +77,10 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
EndInvalidationCommand endInvalidationCommand = (EndInvalidationCommand) c; EndInvalidationCommand endInvalidationCommand = (EndInvalidationCommand) c;
endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName())); endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName()));
break; break;
case CacheCommandIds.BEGIN_INVALIDATION:
BeginInvalidationCommand beginInvalidationCommand = (BeginInvalidationCommand) c;
beginInvalidationCommand.init(notifier, configuration);
break;
} }
} }
} }

View File

@ -0,0 +1,53 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.util.Util;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class Externalizers {
public final static int UUID = 1200;
public final static AdvancedExternalizer[] ALL_EXTERNALIZERS = new AdvancedExternalizer[] {
new UUIDExternalizer()
};
public static class UUIDExternalizer implements AdvancedExternalizer<UUID> {
@Override
public Set<Class<? extends UUID>> getTypeClasses() {
return Collections.<Class<? extends UUID>>singleton(UUID.class);
}
@Override
public Integer getId() {
return UUID;
}
@Override
public void writeObject(ObjectOutput output, UUID uuid) throws IOException {
output.writeLong(uuid.getMostSignificantBits());
output.writeLong(uuid.getLeastSignificantBits());
}
@Override
public UUID readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return new UUID(input.readLong(), input.readLong());
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.factories.components.ModuleMetadataFileFinder;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class SecondLevelCacheMetadataModuleFinder implements ModuleMetadataFileFinder {
@Override
public String getMetadataFilename() {
return "hibernate-infinispan-component-metadata.dat";
}
}

View File

@ -0,0 +1 @@
org.hibernate.cache.infinispan.util.SecondLevelCacheMetadataModuleFinder

View File

@ -169,7 +169,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
Callable<Void> pferCallable = new Callable<Void>() { Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception { public Void call() throws Exception {
delegate.putFromLoad( "k1", "v1", 0, null ); delegate.putFromLoad(null, "k1", "v1", 0, null );
return null; return null;
} }
}; };
@ -180,7 +180,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
Caches.withinTx(localTm, new Callable<Void>() { Caches.withinTx(localTm, new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
delegate.remove("k1"); delegate.remove(null, "k1");
return null; return null;
} }
}); });

View File

@ -24,6 +24,7 @@ import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics; import org.hibernate.stat.Statistics;
import org.hibernate.testing.TestForIssue; import org.hibernate.testing.TestForIssue;
import org.junit.After; import org.junit.After;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertEquals;