From 93d39fa47067362c17b762d2b4d710c3207b4131 Mon Sep 17 00:00:00 2001
From: Radim Vansa
Date: Thu, 6 Aug 2015 12:05:25 +0200
Subject: [PATCH] HHH-9868, HHH-9881 Must not write into non-transactional
caches during transactional write
* The write can only invalidate (remove) the entry and block further PFERs of that entry
* After successful DB update, if there have not been any concurrent updates the value can be PFERed into the cache
---
.../access/NonTxInvalidationInterceptor.java | 48 +++----
.../access/NonTxPutFromLoadInterceptor.java | 2 +-
.../NonTxTransactionalAccessDelegate.java | 86 ++++++++++++
.../access/PutFromLoadValidator.java | 123 ++++++++++--------
.../access/TransactionalAccessDelegate.java | 115 ++++++----------
.../access/TxPutFromLoadInterceptor.java | 8 +-
.../access/TxTransactionalAccessDelegate.java | 73 +++++++++++
.../collection/TransactionalAccess.java | 2 +-
.../entity/TransactionalAccess.java | 2 +-
.../naturalid/TransactionalAccess.java | 2 +-
.../util/EndInvalidationCommand.java | 2 +-
...ollectionRegionAccessStrategyTestCase.java | 2 +-
...actEntityRegionAccessStrategyTestCase.java | 23 +++-
.../AbstractReadOnlyAccessTestCase.java | 3 +
.../AbstractTransactionalAccessTestCase.java | 3 +
.../BasicTransactionalTestCase.java | 8 +-
.../util/BatchModeTransactionCoordinator.java | 94 +++++++++++++
17 files changed, 423 insertions(+), 173 deletions(-)
create mode 100644 hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxTransactionalAccessDelegate.java
create mode 100644 hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TxTransactionalAccessDelegate.java
create mode 100644 hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/util/BatchModeTransactionCoordinator.java
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
index 29ab73052a..2e1a686a50 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxInvalidationInterceptor.java
@@ -78,19 +78,19 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (!isPutForExternalRead(command)) {
- return handleInvalidate(ctx, command, command.getKey());
+ return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
- return handleInvalidate(ctx, command, command.getKey());
+ return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
- return handleInvalidate(ctx, command, command.getKey());
+ return handleInvalidate(ctx, command, new Object[] { command.getKey() });
}
@Override
@@ -107,39 +107,39 @@ public class NonTxInvalidationInterceptor extends BaseRpcInterceptor implements
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
- Object[] keys = command.getMap() == null ? null : command.getMap().keySet().toArray();
- return handleInvalidate(ctx, command, keys);
+ if (!isPutForExternalRead(command)) {
+ return handleInvalidate(ctx, command, command.getMap().keySet().toArray());
+ }
+ return invokeNextInterceptor(ctx, command);
}
- private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object... keys) throws Throwable {
+ private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object[] keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
- if (command.isSuccessful() && !ctx.isInTxScope()) {
- if (keys != null && keys.length != 0) {
- if (!isLocalModeForced(command)) {
- invalidateAcrossCluster(isSynchronous(command), keys, ctx);
- }
- }
+ if (command.isSuccessful() && keys != null && keys.length != 0) {
+ invalidateAcrossCluster(command, keys);
}
return retval;
}
- private void invalidateAcrossCluster(boolean synchronous, Object[] keys, InvocationContext ctx) throws Throwable {
+ private void invalidateAcrossCluster(FlagAffectedCommand command, Object[] keys) throws Throwable {
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand invalidateCommand;
Object lockOwner = putFromLoadValidator.registerRemoteInvalidations(keys);
- if (lockOwner == null) {
- invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.emptySet(), keys);
- }
- else {
- invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
- InfinispanCollections.emptySet(), keys, lockOwner);
- }
- if (log.isDebugEnabled()) {
- log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
- }
+ if (!isLocalModeForced(command)) {
+ if (lockOwner == null) {
+ invalidateCommand = commandsFactory.buildInvalidateCommand(InfinispanCollections.emptySet(), keys);
+ }
+ else {
+ invalidateCommand = commandInitializer.buildBeginInvalidationCommand(
+ InfinispanCollections.emptySet(), keys, lockOwner);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + invalidateCommand);
+ }
- rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(synchronous));
+ rpcManager.invokeRemotely(null, invalidateCommand, rpcManager.getDefaultRpcOptions(isSynchronous(command)));
+ }
}
private void incrementInvalidations() {
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
index d067ec4d95..371fc1d14c 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxPutFromLoadInterceptor.java
@@ -45,7 +45,7 @@ public class NonTxPutFromLoadInterceptor extends BaseCustomInterceptor {
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
if (!ctx.isOriginLocal() && command instanceof BeginInvalidationCommand) {
for (Object key : command.getKeys()) {
- putFromLoadValidator.beginInvalidatingKey(key, ((BeginInvalidationCommand) command).getLockOwner());
+ putFromLoadValidator.beginInvalidatingKey(((BeginInvalidationCommand) command).getLockOwner(), key);
}
}
return invokeNextInterceptor(ctx, command);
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxTransactionalAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxTransactionalAccessDelegate.java
new file mode 100644
index 0000000000..8cd1cb4969
--- /dev/null
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonTxTransactionalAccessDelegate.java
@@ -0,0 +1,86 @@
+/*
+ * Hibernate, Relational Persistence for Idiomatic Java
+ *
+ * License: GNU Lesser General Public License (LGPL), version 2.1 or later.
+ * See the lgpl.txt file in the root directory or .
+ */
+package org.hibernate.cache.infinispan.access;
+
+import org.hibernate.cache.CacheException;
+import org.hibernate.cache.infinispan.impl.BaseRegion;
+import org.hibernate.engine.spi.SessionImplementor;
+import org.hibernate.resource.transaction.TransactionCoordinator;
+import org.hibernate.resource.transaction.spi.TransactionStatus;
+
+/**
+ * Delegate for non-transactional caches
+ *
+ * @author Radim Vansa <rvansa@redhat.com>
+ */
+public class NonTxTransactionalAccessDelegate extends TransactionalAccessDelegate {
+ public NonTxTransactionalAccessDelegate(BaseRegion region, PutFromLoadValidator validator) {
+ super(region, validator);
+ }
+
+ @Override
+ @SuppressWarnings("UnusedParameters")
+ public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
+ if ( !region.checkValid() ) {
+ return false;
+ }
+
+ // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
+ // (or any other invalidation), naked put that was started after the eviction ended but before this insert
+ // ended could insert the stale entry into the cache (since the entry was removed by eviction).
+ if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
+ throw new CacheException(
+ "Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
+ );
+ }
+ putValidator.setCurrentSession(session);
+ try {
+ writeCache.remove(key);
+ }
+ finally {
+ putValidator.resetCurrentSession();
+ }
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("UnusedParameters")
+ public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion)
+ throws CacheException {
+ // We update whether or not the region is valid. Other nodes
+ // may have already restored the region so they need to
+ // be informed of the change.
+
+ // We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
+ // (or any other invalidation), naked put that was started after the eviction ended but before this update
+ // ended could insert the stale entry into the cache (since the entry was removed by eviction).
+ if ( !putValidator.beginInvalidatingWithPFER(session, key, value)) {
+ throw new CacheException(
+ "Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
+ );
+ }
+ putValidator.setCurrentSession(session);
+ try {
+ writeCache.remove(key);
+ }
+ finally {
+ putValidator.resetCurrentSession();
+ }
+ return true;
+ }
+
+ @Override
+ public void unlockItem(SessionImplementor session, Object key) throws CacheException {
+ TransactionCoordinator tc = session.getTransactionCoordinator();
+ boolean doPFER = tc != null && tc.getTransactionDriverControl().getStatus() == TransactionStatus.COMMITTED;
+ if ( !putValidator.endInvalidatingKey(session, key, doPFER) ) {
+ // TODO: localization
+ log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ + region.getName() + "; the key won't be cached until invalidation expires.");
+ }
+ }
+}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
index a8bd0e3cfb..4f43650b03 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
@@ -6,11 +6,6 @@
*/
package org.hibernate.cache.infinispan.access;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -35,7 +30,6 @@ import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.InvalidationInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
-import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -66,11 +60,11 @@ import org.infinispan.util.logging.LogFactory;
* call
*
*
- *
{@link #beginInvalidatingKey(SessionImplementor, Object)} (for a single key invalidation)
+ *
{@link #beginInvalidatingKey(Object, Object)} (for a single key invalidation)
*
or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()}
* (for a general invalidation all pending puts)
*
- * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(SessionImplementor, Object)} should
+ * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object, Object)} should
* be called in order to allow further attempts to cache entry.
*
*
@@ -520,17 +514,6 @@ public class PutFromLoadValidator {
}
}
- /**
- * Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread.
- *
- * @param session
- * @param key
- * @return
- */
- public boolean beginInvalidatingKey(SessionImplementor session, Object key) {
- return beginInvalidatingKey(key, session);
- }
-
/**
* Invalidates any {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
@@ -538,14 +521,18 @@ public class PutFromLoadValidator {
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data.
- * After this transaction completes, {@link #endInvalidatingKey(SessionImplementor, Object)} needs to be called }
+ * After this transaction completes, {@link #endInvalidatingKey(Object, Object)} needs to be called }
*
* @param key key identifying data whose pending puts should be invalidated
*
* @return true if the invalidation was successful; false if a problem occured (which the
* caller should treat as an exception condition)
*/
- public boolean beginInvalidatingKey(Object key, Object lockOwner) {
+ public boolean beginInvalidatingKey(Object lockOwner, Object key) {
+ return beginInvalidatingWithPFER(lockOwner, key, null);
+ }
+
+ public boolean beginInvalidatingWithPFER(Object lockOwner, Object key, Object valueForPFER) {
for (;;) {
PendingPutMap pending = new PendingPutMap(null);
PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
@@ -562,54 +549,47 @@ public class PutFromLoadValidator {
}
long now = System.currentTimeMillis();
pending.invalidate(now, expirationPeriod);
- pending.addInvalidator(lockOwner, now, expirationPeriod);
+ pending.addInvalidator(lockOwner, valueForPFER, now);
}
finally {
pending.releaseLock();
}
if (trace) {
- log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
+ log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwnerToString(lockOwner), pending);
}
return true;
}
else {
- log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key);
+ log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwnerToString(lockOwner));
return false;
}
}
}
- /**
- * Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
- *
- * @param session
- * @param key
- * @return
- */
- public boolean endInvalidatingKey(SessionImplementor session, Object key) {
- return endInvalidatingKey(key, session);
+ public boolean endInvalidatingKey(Object lockOwner, Object key) {
+ return endInvalidatingKey(lockOwner, key, false);
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
- * is called without previous invocation of {@link #beginInvalidatingKey(SessionImplementor, Object)}, then it should be a no-op.
+ * is called without previous invocation of {@link #beginInvalidatingKey(Object, Object)}, then it should be a no-op.
*
- * @param key
* @param lockOwner owner of the invalidation - transaction or thread
+ * @param key
* @return
*/
- public boolean endInvalidatingKey(Object key, Object lockOwner) {
+ public boolean endInvalidatingKey(Object lockOwner, Object key, boolean doPFER) {
PendingPutMap pending = pendingPuts.get(key);
if (pending == null) {
if (trace) {
- log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwner);
+ log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwnerToString(lockOwner));
}
return true;
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
long now = System.currentTimeMillis();
- pending.removeInvalidator(lockOwner, now);
+ pending.removeInvalidator(lockOwner, key, now, doPFER);
// we can't remove the pending put yet because we wait for naked puts
// pendingPuts should be configured with maxIdle time so won't have memory leak
return true;
@@ -617,13 +597,13 @@ public class PutFromLoadValidator {
finally {
pending.releaseLock();
if (trace) {
- log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
+ log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwnerToString(lockOwner), pending);
}
}
}
else {
if (trace) {
- log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwner);
+ log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwnerToString(lockOwner));
}
return false;
}
@@ -634,7 +614,7 @@ public class PutFromLoadValidator {
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
if (trace) {
- log.tracef("Registering lock owner %s for %s: %s", session, cache.getName(), Arrays.toString(keys));
+ log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
}
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
@@ -646,13 +626,18 @@ public class PutFromLoadValidator {
// ---------------------------------------------------------------- Private
+ // we can't use SessionImpl.toString() concurrently
+ private static String lockOwnerToString(Object lockOwner) {
+ return lockOwner instanceof SessionImplementor ? "Session#" + lockOwner.hashCode() : lockOwner.toString();
+ }
+
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
* single put is pending for a given key.
*
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/
- private static class PendingPutMap extends Lock {
+ private class PendingPutMap extends Lock {
private PendingPut singlePendingPut;
private Map