HHH-9868, HHH-9881 Must not write into non-transactional caches during transactional write

* The write can only invalidate (remove) the entry and block further PFERs of that entry
* After successful DB update, if there have not been any concurrent updates the value can be PFERed into the cache
This commit is contained in:
Radim Vansa 2015-08-06 12:05:25 +02:00 committed by Galder Zamarreño
parent 19c14cee9a
commit 93d39fa470
17 changed files with 423 additions and 173 deletions

View File

@ -78,19 +78,19 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, command.getKey());
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey());
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return handleInvalidate(ctx, command, command.getKey());
return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
@Override
@ -107,39 +107,39 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
return handleInvalidate(ctx, command, keys);
if (!isPutForExternalRead(command)) {
return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
}
return invokeNextInterceptor(ctx, command);
}
private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable {
private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
if (command.isSuccessful() && !ctx.isInTxScope()) {
if (keys != null && keys.length != 0) {
if (!isLocalModeForced(command)) {
invalidateAcrossCluster(isSynchronous(command), keys, ctx);
}
}
if (command.isSuccessful() && keys != null && keys.length != 0) {
invalidateAcrossCluster(command, keys);
}
return retval;
}
private void invalidateAcrossCluster(boolean synchronous, Object[] keys, InvocationContext ctx) throws Throwable {
private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand invalidateCommand;
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys);
if (lockOwner == null) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
}
else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
InfinispanCollections.<Flag>emptySet(), keys, lockOwner);
}
if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
}
if (!isLocalModeForced(command)) {
if (lockOwner == null) {
invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.<Flag>emptySet(), keys);
}
else {
invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
InfinispanCollections.<Flag>emptySet(), keys, lockOwner);
}
if (log.isDebugEnabled()) {
log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
}
rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(synchronous));
rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(isSynchronous(command)));
}
}
private void incrementInvalidations() {

View File

@ -45,7 +45,7 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ((BeginInvalidationCommand) command).getLockOwner());
putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
}
}
return invokeNextInterceptor(ctx, command);

View File

@ -0,0 +1,86 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.hibernate.resource.transaction.spi.TransactionStatus;
/**
* Delegate for non-transactional caches
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class NonTxTransactionalAccessDelegate extends TransactionalAccessDelegate {
public NonTxTransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
super(region, validator);
}
@Override
@SuppressWarnings("UnusedParameters")
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
if ( !region.checkValid() ) {
return false;
}
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
@Override
@SuppressWarnings("UnusedParameters")
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException {
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.remove(key);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
TransactionCoordinator tc = session.getTransactionCoordinator();
boolean doPFER = tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
if ( !putValidator.endInvalidatingKey(session, key, doPFER) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
}
}
}

View File

@ -6,11 +6,6 @@
*/
package org.hibernate.cache.infinispan.access;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -35,7 +30,6 @@ import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@ -66,11 +60,11 @@ import org.infinispan.util.logging.LogFactory;
* call
* <p/>
* <ul>
* <li> {@link #beginInvalidatingKey(SessionImplementor, Object)} (for a single key invalidation)</li>
* <li> {@link #beginInvalidatingKey(Object, Object)} (for a single key invalidation)</li>
* <li>or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()}
* (for a general invalidation all pending puts)</li>
* </ul>
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(SessionImplementor, Object)} should
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object, Object)} should
* be called in order to allow further attempts to cache entry.
* </p>
* <p/>
@ -520,17 +514,6 @@ public class PutFromLoadValidator {
}
}
/**
* Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread.
*
* @param session
* @param key
* @return
*/
public boolean beginInvalidatingKey(SessionImplementor session, Object key) {
return beginInvalidatingKey(key, session);
}
/**
* Invalidates any {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
@ -538,14 +521,18 @@ public class PutFromLoadValidator {
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
* After this transaction completes, {@link #endInvalidatingKey(SessionImplementor, Object)} needs to be called }
* After this transaction completes, {@link #endInvalidatingKey(Object, Object)} needs to be called }
*
* @param key key identifying data whose pending puts should be invalidated
*
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean beginInvalidatingKey(Object key, Object lockOwner) {
public boolean beginInvalidatingKey(Object lockOwner, Object key) {
return beginInvalidatingWithPFER(lockOwner, key, null);
}
public boolean beginInvalidatingWithPFER(Object lockOwner, Object key, Object valueForPFER) {
for (;;) {
PendingPutMap pending = new PendingPutMap(null);
PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
@ -562,54 +549,47 @@ public class PutFromLoadValidator {
}
long now = System.currentTimeMillis();
pending.invalidate(now, expirationPeriod);
pending.addInvalidator(lockOwner, now, expirationPeriod);
pending.addInvalidator(lockOwner, valueForPFER, now);
}
finally {
pending.releaseLock();
}
if (trace) {
log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwnerToString(lockOwner), pending);
}
return true;
}
else {
log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key);
log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwnerToString(lockOwner));
return false;
}
}
}
/**
* Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
*
* @param session
* @param key
* @return
*/
public boolean endInvalidatingKey(SessionImplementor session, Object key) {
return endInvalidatingKey(key, session);
public boolean endInvalidatingKey(Object lockOwner, Object key) {
return endInvalidatingKey(lockOwner, key, false);
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
* is called without previous invocation of {@link #beginInvalidatingKey(SessionImplementor, Object)}, then it should be a no-op.
* is called without previous invocation of {@link #beginInvalidatingKey(Object, Object)}, then it should be a no-op.
*
* @param key
* @param lockOwner owner of the invalidation - transaction or thread
* @param key
* @return
*/
public boolean endInvalidatingKey(Object key, Object lockOwner) {
public boolean endInvalidatingKey(Object lockOwner, Object key, boolean doPFER) {
PendingPutMap pending = pendingPuts.get(key);
if (pending == null) {
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwner);
log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwnerToString(lockOwner));
}
return true;
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
long now = System.currentTimeMillis();
pending.removeInvalidator(lockOwner, now);
pending.removeInvalidator(lockOwner, key, now, doPFER);
// we can't remove the pending put yet because we wait for naked puts
// pendingPuts should be configured with maxIdle time so won't have memory leak
return true;
@ -617,13 +597,13 @@ public class PutFromLoadValidator {
finally {
pending.releaseLock();
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwnerToString(lockOwner), pending);
}
}
}
else {
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwner);
log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwnerToString(lockOwner));
}
return false;
}
@ -634,7 +614,7 @@ public class PutFromLoadValidator {
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", session, cache.getName(), Arrays.toString(keys));
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
}
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
@ -646,13 +626,18 @@ public class PutFromLoadValidator {
// ---------------------------------------------------------------- Private
// we can't use SessionImpl.toString() concurrently
private static String lockOwnerToString(Object lockOwner) {
return lockOwner instanceof SessionImplementor ? "Session#" + lockOwner.hashCode() : lockOwner.toString();
}
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
* single put is pending for a given key.
* <p/>
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/
private static class PendingPutMap extends Lock {
private class PendingPutMap extends Lock {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
@ -781,32 +766,44 @@ public class PutFromLoadValidator {
}
}
public void addInvalidator(Object owner, long now, long invalidatorTimeout) {
public void addInvalidator(Object owner, Object valueForPFER, long now) {
assert owner != null;
if (invalidators == null) {
if (singleInvalidator == null) {
singleInvalidator = new Invalidator(owner, now);
singleInvalidator = new Invalidator(owner, now, valueForPFER);
put(new PendingPut(owner));
}
else {
if (singleInvalidator.registeredTimestamp + invalidatorTimeout < now) {
// remove leaked invalidator
singleInvalidator = new Invalidator(owner, now);
if (singleInvalidator.registeredTimestamp + expirationPeriod < now) {
// override leaked invalidator
singleInvalidator = new Invalidator(owner, now, valueForPFER);
put(new PendingPut(owner));
}
invalidators = new HashMap<Object, Invalidator>();
invalidators.put(singleInvalidator.owner, singleInvalidator);
invalidators.put(owner, new Invalidator(owner, now));
// with multiple invalidations the PFER must not be executed
invalidators.put(owner, new Invalidator(owner, now, null));
singleInvalidator = null;
}
}
else {
long allowedRegistration = now - invalidatorTimeout;
long allowedRegistration = now - expirationPeriod;
// remove leaked invalidators
for (Iterator<Invalidator> it = invalidators.values().iterator(); it.hasNext(); ) {
if (it.next().registeredTimestamp < allowedRegistration) {
it.remove();
}
}
invalidators.put(owner, new Invalidator(owner, now));
// With multiple invalidations in parallel we don't know the order in which
// the writes were applied into DB and therefore we can't update the cache
// with the most recent value.
if (invalidators.isEmpty()) {
put(new PendingPut(owner));
}
else {
valueForPFER = null;
}
invalidators.put(owner, new Invalidator(owner, now, valueForPFER));
}
}
@ -833,18 +830,31 @@ public class PutFromLoadValidator {
}
}
public void removeInvalidator(Object owner, long now) {
public void removeInvalidator(Object owner, Object key, long now, boolean doPFER) {
if (invalidators == null) {
if (singleInvalidator != null && singleInvalidator.owner.equals(owner)) {
pferValueIfNeeded(owner, key, singleInvalidator.valueForPFER, doPFER);
singleInvalidator = null;
}
}
else {
invalidators.remove(owner);
Invalidator invalidator = invalidators.remove(owner);
if (invalidator != null) {
pferValueIfNeeded(owner, key, invalidator.valueForPFER, doPFER);
}
}
lastInvalidationEnd = Math.max(lastInvalidationEnd, now);
}
private void pferValueIfNeeded(Object owner, Object key, Object valueForPFER, boolean doPFER) {
if (valueForPFER != null) {
PendingPut pendingPut = remove(owner);
if (doPFER && pendingPut != null && !pendingPut.completed) {
cache.putForExternalRead(key, valueForPFER);
}
}
}
public boolean canRemove() {
return size() == 0 && !hasInvalidator() && lastInvalidationEnd == Long.MIN_VALUE;
}
@ -870,7 +880,7 @@ public class PutFromLoadValidator {
public String toString() {
// we can't use SessionImpl.toString() concurrently
return (completed ? "C@" : "R@") + (owner instanceof SessionImplementor ? "Session#" + owner.hashCode() : owner.toString());
return (completed ? "C@" : "R@") + lockOwnerToString(owner);
}
public boolean invalidate(long now, long expirationPeriod) {
@ -888,17 +898,18 @@ public class PutFromLoadValidator {
private static class Invalidator {
private final Object owner;
private final long registeredTimestamp;
private final Object valueForPFER;
private Invalidator(Object owner, long registeredTimestamp) {
private Invalidator(Object owner, long registeredTimestamp, Object valueForPFER) {
this.owner = owner;
this.registeredTimestamp = registeredTimestamp;
this.valueForPFER = valueForPFER;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
// we can't use SessionImpl.toString() concurrently
sb.append("Owner=").append(owner instanceof SessionImplementor ? "Session#" + owner.hashCode() : owner.toString());
sb.append("Owner=").append(lockOwnerToString(owner));
sb.append(", Timestamp=").append(registeredTimestamp);
sb.append('}');
return sb.toString();

View File

@ -26,13 +26,22 @@ import org.infinispan.util.logging.LogFactory;
* @author Galder Zamarreño
* @since 3.5
*/
public class TransactionalAccessDelegate {
private static final Log log = LogFactory.getLog( TransactionalAccessDelegate.class );
private static final boolean TRACE_ENABLED = log.isTraceEnabled();
private final AdvancedCache cache;
private final BaseRegion region;
private final PutFromLoadValidator putValidator;
private final AdvancedCache<Object, Object> writeCache;
public abstract class TransactionalAccessDelegate {
protected static final Log log = LogFactory.getLog( TransactionalAccessDelegate.class );
protected static final boolean TRACE_ENABLED = log.isTraceEnabled();
protected final AdvancedCache cache;
protected final BaseRegion region;
protected final PutFromLoadValidator putValidator;
protected final AdvancedCache<Object, Object> writeCache;
public static TransactionalAccessDelegate create(BaseRegion region, PutFromLoadValidator validator) {
if (region.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional()) {
return new TxTransactionalAccessDelegate(region, validator);
}
else {
return new NonTxTransactionalAccessDelegate(region, validator);
}
}
/**
* Create a new transactional access delegate instance.
@ -41,7 +50,7 @@ public class TransactionalAccessDelegate {
* @param validator put from load validator
*/
@SuppressWarnings("unchecked")
public TransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
protected TransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
this.region = region;
this.cache = region.getCache();
this.putValidator = validator;
@ -134,79 +143,35 @@ public class TransactionalAccessDelegate {
return true;
}
/**
* Called after an item has been inserted (before the transaction completes),
* instead of calling evict().
*
/**
* Called after an item has been inserted (before the transaction completes),
* instead of calling evict().
*
* @param session Current session
* @param key The item key
* @param value The item
* @param version The item's version value
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException if the insert fails
*/
@SuppressWarnings("UnusedParameters")
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
if ( !region.checkValid() ) {
return false;
}
* @param value The item
* @param version The item's version value
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException if the insert fails
*/
public abstract boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException;
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
/**
* Called after an item has been updated (before the transaction completes),
* instead of calling evict().
*
/**
* Called after an item has been updated (before the transaction completes),
* instead of calling evict().
*
* @param session Current session
* @param key The item key
* @param value The item
* @param currentVersion The item's current version value
* @param previousVersion The item's previous version value
* @return Whether the contents of the cache actual changed by this operation
* @throws CacheException if the update fails
*/
@SuppressWarnings("UnusedParameters")
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException {
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
* @param value The item
* @param currentVersion The item's current version value
* @param previousVersion The item's previous version value
* @return Whether the contents of the cache actual changed by this operation
* @throws CacheException if the update fails
*/
public abstract boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException;
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
/**
/**
* Called after an item has become stale (before the transaction completes).
*
* @param session Current session

View File

@ -67,7 +67,7 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
// here (even without lock!) and let possible update happen in commit phase.
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
dataContainer.remove(key);
}
@ -82,14 +82,14 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
else {
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
// ISPN-5605 InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ctx.getLockOwner());
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
}
else {
for (Object key : wc.getAffectedKeys()) {
putFromLoadValidator.beginInvalidatingKey(key, ctx.getLockOwner());
putFromLoadValidator.beginInvalidatingKey(ctx.getLockOwner(), key);
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.engine.spi.SessionImplementor;
/**
* Delegate for transactional caches
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TxTransactionalAccessDelegate extends TransactionalAccessDelegate {
public TxTransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
super(region, validator);
}
@Override
@SuppressWarnings("UnusedParameters")
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
if ( !region.checkValid() ) {
return false;
}
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
@Override
@SuppressWarnings("UnusedParameters")
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException {
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
}
putValidator.setCurrentSession(session);
try {
writeCache.put(key, value);
}
finally {
putValidator.resetCurrentSession();
}
return true;
}
}

View File

@ -30,7 +30,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
TransactionalAccess(CollectionRegionImpl region) {
this.region = region;
this.delegate = new TransactionalAccessDelegate( region, region.getPutFromLoadValidator() );
this.delegate = TransactionalAccessDelegate.create( region, region.getPutFromLoadValidator() );
}
public void evict(Object key) throws CacheException {

View File

@ -30,7 +30,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
TransactionalAccess(EntityRegionImpl region) {
this.region = region;
this.delegate = new TransactionalAccessDelegate( region, region.getPutFromLoadValidator() );
this.delegate = TransactionalAccessDelegate.create( region, region.getPutFromLoadValidator() );
}
public void evict(Object key) throws CacheException {

View File

@ -23,7 +23,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
TransactionalAccess(NaturalIdRegionImpl region) {
this.region = region;
this.delegate = new TransactionalAccessDelegate( region, region.getPutFromLoadValidator() );
this.delegate = TransactionalAccessDelegate.create( region, region.getPutFromLoadValidator() );
}
@Override

View File

@ -39,7 +39,7 @@ public class EndInvalidationCommand extends BaseRpcCommand {
@Override
public Object perform(InvocationContext ctx) throws Throwable {
for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(key, lockOwner);
putFromLoadValidator.endInvalidatingKey(lockOwner, key);
}
return null;
}

View File

@ -170,7 +170,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
PutFromLoadValidator validator = getPutFromLoadValidator(remoteCollectionRegion.getCache(), cm, removeLatch, pferLatch);
final TransactionalAccessDelegate delegate =
new TransactionalAccessDelegate(localCollectionRegion, validator);
TransactionalAccessDelegate.create(localCollectionRegion, validator);
final TransactionManager localTm = localCollectionRegion.getTransactionManager();
Callable<Void> pferCallable = new Callable<Void>() {

View File

@ -20,10 +20,12 @@ import org.hibernate.cache.internal.CacheDataDescriptionImpl;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.internal.util.compare.ComparableComparator;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
@ -40,6 +42,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Base class for tests of EntityRegionAccessStrategy impls.
@ -90,8 +93,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
localEntityRegion = localEnvironment.getEntityRegion(REGION_NAME, getCacheDataDescription());
localAccessStrategy = localEntityRegion.buildAccessStrategy(getAccessType());
localSession = mock(SessionImplementor.class);
when(localSession.getTransactionCoordinator()).thenReturn(new BatchModeTransactionCoordinator());
remoteSession = mock(SessionImplementor.class);
when(localSession.getTransactionCoordinator()).thenReturn(new BatchModeTransactionCoordinator());
invalidation = Caches.isInvalidationCache(localEntityRegion.getCache());
synchronous = Caches.isSynchronousCache(localEntityRegion.getCache());
@ -228,7 +232,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, txTimestamp, new Integer(1));
}
SoftLock softLock = localAccessStrategy.lockItem(localSession, KEY, null);
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.unlockItem(localSession, KEY, softLock);
BatchModeTransactionManager.getInstance().commit();
} catch (Exception e) {
@ -288,7 +294,7 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
node1.start();
node2.start();
assertTrue("Threads completed", completionLatch.await(2, TimeUnit.SECONDS));
assertTrue("Threads completed", completionLatch.await(2000, TimeUnit.SECONDS));
assertThreadsRanCleanly();
@ -325,6 +331,7 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertNull("Correct initial value", localAccessStrategy.get(localSession, KEY, txTimestamp));
localAccessStrategy.insert(localSession, KEY, VALUE1, new Integer(1));
localAccessStrategy.afterInsert(localSession, KEY, VALUE1, null);
readLatch.countDown();
commitLatch.await();
@ -420,7 +427,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
log.debug("Transaction began, get initial value");
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(localSession, KEY, txTimestamp));
log.debug("Now update value");
SoftLock softLock = localAccessStrategy.lockItem(localSession, KEY, null);
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.unlockItem(localSession, KEY, softLock);
log.debug("Notify the read latch");
readLatch.countDown();
readerUnlocked = true;
@ -527,10 +536,14 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() {
@Override
public Void call() throws Exception {
if (evict)
if (evict) {
localAccessStrategy.evict(KEY);
else
}
else {
SoftLock softLock = localAccessStrategy.lockItem(localSession, KEY, null);
localAccessStrategy.remove(localSession, KEY);
localAccessStrategy.unlockItem(localSession, KEY, softLock);
}
return null;
}
});
@ -566,7 +579,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
log.debug("Call evict all locally");
localAccessStrategy.evictAll();
} else {
SoftLock softLock = localAccessStrategy.lockRegion();
localAccessStrategy.removeAll();
localAccessStrategy.unlockRegion(softLock);
}
return null;
}

View File

@ -7,6 +7,7 @@
package org.hibernate.test.cache.infinispan.entity;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.junit.Test;
@ -64,7 +65,9 @@ public abstract class AbstractReadOnlyAccessTestCase extends AbstractEntityRegio
@Override
public void testUpdate() throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
SoftLock softLock = localAccessStrategy.lockItem(localSession, KEY, null);
localAccessStrategy.update(localSession, KEY, VALUE2, 2, 1);
localAccessStrategy.unlockItem(localSession, KEY, softLock);
}
}

View File

@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.jboss.logging.Logger;
@ -54,7 +55,9 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(localSession, KEY, txTimestamp));
SoftLock softLock = localAccessStrategy.lockItem(localSession, KEY, null);
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.unlockItem(localSession, KEY, softLock);
pferLatch.countDown();
commitLatch.await();

View File

@ -559,7 +559,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
item.setDescription( "steve's item" );
s.persist( item );
s.flush();
assertNotNull( slcs.getEntries().get( item.getId() ) );
// assertNotNull( slcs.getEntries().get( item.getId() ) );
setRollbackOnlyTx();
}
catch (Exception e) {
@ -617,13 +617,13 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
s.persist( item );
s.flush();
// item is cached on insert.
assertNotNull( slcs.getEntries().get( item.getId() ) );
// assertNotNull( slcs.getEntries().get( item.getId() ) );
s.evict( item );
assertEquals( slcs.getHitCount(), 0 );
item = (Item) s.get( Item.class, item.getId() );
assertNotNull( item );
assertEquals( slcs.getHitCount(), 1 );
assertNotNull( slcs.getEntries().get( item.getId() ) );
// assertEquals( slcs.getHitCount(), 1 );
// assertNotNull( slcs.getEntries().get( item.getId() ) );
setRollbackOnlyTx();
}
catch (Exception e) {

View File

@ -0,0 +1,94 @@
package org.hibernate.test.cache.infinispan.util;
import org.hibernate.engine.transaction.spi.IsolationDelegate;
import org.hibernate.engine.transaction.spi.TransactionObserver;
import org.hibernate.resource.transaction.SynchronizationRegistry;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.hibernate.resource.transaction.TransactionCoordinatorBuilder;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
/**
* Mocks transaction coordinator when {@link org.hibernate.engine.spi.SessionImplementor} is only mocked
* and {@link org.infinispan.transaction.tm.BatchModeTransactionManager} is used.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class BatchModeTransactionCoordinator implements TransactionCoordinator {
@Override
public void explicitJoin() {
}
@Override
public boolean isJoined() {
return true;
}
@Override
public void pulse() {
}
@Override
public TransactionDriver getTransactionDriverControl() {
throw new UnsupportedOperationException();
}
@Override
public SynchronizationRegistry getLocalSynchronizations() {
return new SynchronizationRegistry() {
@Override
public void registerSynchronization(Synchronization synchronization) {
try {
BatchModeTransactionManager.getInstance().getTransaction().registerSynchronization(synchronization);
} catch (RollbackException e) {
throw new RuntimeException(e);
} catch (SystemException e) {
throw new RuntimeException(e);
}
}
};
}
@Override
public boolean isActive() {
try {
return BatchModeTransactionManager.getInstance().getStatus() == Status.STATUS_ACTIVE;
} catch (SystemException e) {
return false;
}
}
@Override
public IsolationDelegate createIsolationDelegate() {
throw new UnsupportedOperationException();
}
@Override
public void addObserver(TransactionObserver observer) {
throw new UnsupportedOperationException();
}
@Override
public void removeObserver(TransactionObserver observer) {
throw new UnsupportedOperationException();
}
@Override
public TransactionCoordinatorBuilder getTransactionCoordinatorBuilder() {
throw new UnsupportedOperationException();
}
@Override
public void setTimeOut(int seconds) {
throw new UnsupportedOperationException();
}
@Override
public int getTimeOut() {
throw new UnsupportedOperationException();
}
}