HHH-9868, HHH-9881 Replaced access to TransactionManager with Session

This commit is contained in:
Radim Vansa 2015-08-04 11:24:26 +02:00 committed by Galder Zamarreño
parent 1f24fa6354
commit 984125e87e
11 changed files with 199 additions and 202 deletions

View File

@ -24,6 +24,7 @@ dependencies {
testCompile( libraries.jnp_client )
testCompile( libraries.jnp_server )
testCompile( libraries.rhq )
testCompile( libraries.mockito )
testCompile ('mysql:mysql-connector-java:5.1.17')
}

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.cache.infinispan.access;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@ -24,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.spi.RegionFactory;
@ -53,9 +51,9 @@ import org.infinispan.util.logging.LogFactory;
* not find data is:
* <p/>
* <ol>
* <li> Call {@link #registerPendingPut(Object, long)}</li>
* <li> Call {@link #registerPendingPut(SessionImplementor, Object, long)}</li>
* <li> Read the database</li>
* <li> Call {@link #acquirePutFromLoadLock(Object, long)}
* <li> Call {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
* <li> if above returns <code>null</code>, the thread should not cache the data;
* only if above returns instance of <code>AcquiredLock</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object, Lock)}</li>
@ -68,18 +66,18 @@ import org.infinispan.util.logging.LogFactory;
* call
* <p/>
* <ul>
* <li> {@link #beginInvalidatingKey(Object)} (for a single key invalidation)</li>
* <li> {@link #beginInvalidatingKey(SessionImplementor, Object)} (for a single key invalidation)</li>
* <li>or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()}
* (for a general invalidation all pending puts)</li>
* </ul>
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(SessionImplementor, Object)} should
* be called in order to allow further attempts to cache entry.
* </p>
* <p/>
* <p>
* This class also supports the concept of "naked puts", which are calls to
* {@link #acquirePutFromLoadLock(Object, long)} without a preceding {@link #registerPendingPut(Object, long)}.
* Besides not acquiring lock in {@link #registerPendingPut(Object, long)} this can happen when collection
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)} without a preceding {@link #registerPendingPut(SessionImplementor, Object, long)}.
* Besides not acquiring lock in {@link #registerPendingPut(SessionImplementor, Object, long)} this can happen when collection
* elements are loaded after the collection has not been found in the cache, where the elements
* don't have their own table but can be listed as 'select ... from Element where collection_id = ...'.
* Naked puts are handled according to txTimestamp obtained by calling {@link RegionFactory#nextTimestamp()}
@ -141,8 +139,8 @@ public class PutFromLoadValidator {
/**
* Creates a new put from load validator instance.
*
* @param cache Cache instance on which to store pending put information.
*
* @param cache Cache instance on which to store pending put information.
* @param transactionManager Transaction manager
*/
public PutFromLoadValidator(AdvancedCache cache, TransactionManager transactionManager) {
@ -247,20 +245,15 @@ public class PutFromLoadValidator {
}
public void setCurrentSession(SessionImplementor session) {
// we register synchronizations directly on JTA transactions, let's make this noop with TM
if (transactionManager == null) {
currentSession.set(session);
}
currentSession.set(session);
}
public void resetCurrentSession() {
if (transactionManager == null) {
currentSession.remove();
}
currentSession.remove();
}
/**
* Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)}
* Marker for lock acquired in {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
*/
public static abstract class Lock {
private Lock() {}
@ -274,13 +267,14 @@ public class PutFromLoadValidator {
* should always be matched with a call to {@link #releasePutFromLoadLock(Object, Lock)}.
* </p>
*
* @param session
* @param key the key
*
* @param txTimestamp
* @return <code>AcquiredLock</code> if the lock is acquired and the cache put
* can proceed; <code>null</code> if the data should not be cached
*/
public Lock acquirePutFromLoadLock(Object key, long txTimestamp) {
public Lock acquirePutFromLoadLock(SessionImplementor session, Object key, long txTimestamp) {
if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d)", cache.getName(), key, txTimestamp);
}
@ -305,7 +299,7 @@ public class PutFromLoadValidator {
}
continue;
}
final PendingPut toCancel = pending.remove(getLocalLockOwner());
final PendingPut toCancel = pending.remove(session);
if (toCancel != null) {
valid = !toCancel.completed;
toCancel.completed = true;
@ -359,7 +353,7 @@ public class PutFromLoadValidator {
}
}
PendingPut pendingPut = new PendingPut(getLocalLockOwner());
PendingPut pendingPut = new PendingPut(session);
pending = new PendingPutMap(pendingPut);
PendingPutMap existing = pendingPuts.putIfAbsent(key, pending);
if (existing != null) {
@ -388,7 +382,7 @@ public class PutFromLoadValidator {
/**
* Releases the lock previously obtained by a call to
* {@link #acquirePutFromLoadLock(Object, long)}.
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}.
*
* @param key the key
*/
@ -407,9 +401,9 @@ public class PutFromLoadValidator {
}
/**
* Invalidates all {@link #registerPendingPut(Object, long) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object, long)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the any key has
* Invalidates all {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the any key has
* released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns,
* possibly caching stale data. </p>
*
@ -506,16 +500,17 @@ public class PutFromLoadValidator {
/**
* Notifies this validator that it is expected that a database read followed by a subsequent {@link
* #acquirePutFromLoadLock(Object, long)} call will occur. The intent is this method would be called following a cache miss
* #acquirePutFromLoadLock(SessionImplementor, Object, long)} call will occur. The intent is this method would be called following a cache miss
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
* treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
* notifications.
*
* @param session
* @param key key that will be used for subsequent cache put
* @param txTimestamp
*/
public void registerPendingPut(Object key, long txTimestamp) {
public void registerPendingPut(SessionImplementor session, Object key, long txTimestamp) {
long invalidationTimestamp = this.regionInvalidationTimestamp;
if (txTimestamp <= invalidationTimestamp) {
boolean skip;
@ -543,7 +538,7 @@ public class PutFromLoadValidator {
}
}
final PendingPut pendingPut = new PendingPut( getLocalLockOwner() );
final PendingPut pendingPut = new PendingPut( session );
final PendingPutMap pendingForKey = new PendingPutMap( pendingPut );
for (;;) {
@ -586,21 +581,23 @@ public class PutFromLoadValidator {
/**
* Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread.
*
* @param session
* @param key
* @return
*/
public boolean beginInvalidatingKey(Object key) {
return beginInvalidatingKey(key, getLocalLockOwner());
public boolean beginInvalidatingKey(SessionImplementor session, Object key) {
return beginInvalidatingKey(key, session);
}
/**
* Invalidates any {@link #registerPendingPut(Object, long) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object, long)}
* Invalidates any {@link #registerPendingPut(SessionImplementor, Object, long) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(SessionImplementor, Object, long)}
* will return <code>false</code>. <p> This method will block until any concurrent thread that has
* {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the given key
* {@link #acquirePutFromLoadLock(SessionImplementor, Object, long) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
* After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called }
* After this transaction completes, {@link #endInvalidatingKey(SessionImplementor, Object)} needs to be called }
*
* @param key key identifying data whose pending puts should be invalidated
*
@ -643,16 +640,18 @@ public class PutFromLoadValidator {
/**
* Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
*
* @param session
* @param key
* @return
*/
public boolean endInvalidatingKey(Object key) {
return endInvalidatingKey(key, getLocalLockOwner());
public boolean endInvalidatingKey(SessionImplementor session, Object key) {
return endInvalidatingKey(key, session);
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
* is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be a no-op.
* is called without previous invocation of {@link #beginInvalidatingKey(SessionImplementor, Object)}, then it should be a no-op.
*
* @param key
* @param lockOwner owner of the invalidation - transaction or thread
@ -690,31 +689,6 @@ public class PutFromLoadValidator {
}
public Object registerRemoteInvalidations(Object[] keys) {
Transaction tx = null;
try {
if ( transactionManager != null ) {
tx = transactionManager.getTransaction();
}
}
catch (SystemException se) {
throw new CacheException( "Could not obtain transaction", se );
}
if (tx != null) {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", tx, cache.getName(), Arrays.toString(keys));
}
try {
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
tx.registerSynchronization(sync);
return sync.uuid;
}
catch (SystemException se) {
throw new CacheException("Cannot register synchronization", se);
}
catch (RollbackException e) {
return null;
}
}
SessionImplementor session = currentSession.get();
TransactionCoordinator transactionCoordinator = session == null ? null : session.getTransactionCoordinator();
if (transactionCoordinator != null) {
@ -731,20 +705,6 @@ public class PutFromLoadValidator {
// ---------------------------------------------------------------- Private
private Object getLocalLockOwner() {
Transaction tx = null;
try {
if ( transactionManager != null ) {
tx = transactionManager.getTransaction();
}
}
catch (SystemException se) {
throw new CacheException( "Could not obtain transaction", se );
}
return tx == null ? Thread.currentThread() : tx;
}
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
* single put is pending for a given key.
@ -787,7 +747,7 @@ public class PutFromLoadValidator {
sb.append("[]");
}
else {
sb.append(invalidators);
sb.append(invalidators.values());
}
}
else {
@ -968,7 +928,8 @@ public class PutFromLoadValidator {
}
public String toString() {
return (completed ? "C@" : "R@") + owner;
// we can't use SessionImpl.toString() concurrently
return (completed ? "C@" : "R@") + (owner instanceof SessionImplementor ? "Session#" + owner.hashCode() : owner.toString());
}
public boolean invalidate(long now, long expirationPeriod) {
@ -995,7 +956,8 @@ public class PutFromLoadValidator {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("Owner=").append(owner);
// we can't use SessionImpl.toString() concurrently
sb.append("Owner=").append(owner instanceof SessionImplementor ? "Session#" + owner.hashCode() : owner.toString());
sb.append(", Timestamp=").append(registeredTimestamp);
sb.append('}');
return sb.toString();

View File

@ -51,19 +51,21 @@ public class TransactionalAccessDelegate {
/**
* Attempt to retrieve an object from the cache.
*
* @param key The key of the item to be retrieved
*
* @param session
* @param key The key of the item to be retrieved
* @param txTimestamp a timestamp prior to the transaction start time
* @return the cached object or <tt>null</tt>
* @throws CacheException if the cache retrieval failed
*/
@SuppressWarnings("UnusedParameters")
public Object get(Object key, long txTimestamp) throws CacheException {
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
if ( !region.checkValid() ) {
return null;
}
final Object val = cache.get( key );
if ( val == null ) {
putValidator.registerPendingPut( key, txTimestamp );
putValidator.registerPendingPut(session, key, txTimestamp );
}
return val;
}
@ -114,7 +116,7 @@ public class TransactionalAccessDelegate {
return false;
}
PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key, txTimestamp);
PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(session, key, txTimestamp);
if ( lock == null) {
if ( TRACE_ENABLED ) {
log.tracef( "Put from load lock not acquired for key %s", key );
@ -163,7 +165,7 @@ public class TransactionalAccessDelegate {
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this insert
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(key)) {
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@ -200,7 +202,7 @@ public class TransactionalAccessDelegate {
// We need to be invalidating even for regular writes; if we were not and the write was followed by eviction
// (or any other invalidation), naked put that was started after the eviction ended but before this update
// ended could insert the stale entry into the cache (since the entry was removed by eviction).
if ( !putValidator.beginInvalidatingKey(key)) {
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@ -223,7 +225,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if removing the cached item fails
*/
public void remove(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.beginInvalidatingKey(key)) {
if ( !putValidator.beginInvalidatingKey(session, key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@ -294,11 +296,13 @@ public class TransactionalAccessDelegate {
* may not have been successful), after transaction completion. This method
* is used by "asynchronous" concurrency strategies.
*
*
* @param session
* @param key The item key
* @throws org.hibernate.cache.CacheException Propogated from underlying {@link org.hibernate.cache.spi.Region}
*/
public void unlockItem(Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(key) ) {
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(session, key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
@ -310,14 +314,16 @@ public class TransactionalAccessDelegate {
* instead of calling release().
* This method is used by "asynchronous" concurrency strategies.
*
*
* @param session
* @param key The item key
* @param value The item
* @param version The item's version value
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException Propagated from underlying {@link org.hibernate.cache.spi.Region}
*/
public boolean afterInsert(Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(key) ) {
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
if ( !putValidator.endInvalidatingKey(session, key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");
@ -330,6 +336,8 @@ public class TransactionalAccessDelegate {
* instead of calling release(). This method is used by "asynchronous"
* concurrency strategies.
*
*
* @param session
* @param key The item key
* @param value The item
* @param currentVersion The item's current version value
@ -338,8 +346,8 @@ public class TransactionalAccessDelegate {
* @return Were the contents of the cache actual changed by this operation?
* @throws CacheException Propagated from underlying {@link org.hibernate.cache.spi.Region}
*/
public boolean afterUpdate(Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(key) ) {
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
if ( !putValidator.endInvalidatingKey(session, key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached until invalidation expires.");

View File

@ -42,7 +42,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
}
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
return delegate.get( key, txTimestamp );
return delegate.get( session, key, txTimestamp );
}
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) throws CacheException {
@ -75,7 +75,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
delegate.unlockItem( session, key);
}
public void unlockRegion(SoftLock lock) throws CacheException {

View File

@ -42,7 +42,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
}
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
return delegate.get( key, txTimestamp );
return delegate.get( session, key, txTimestamp );
}
public EntityRegion getRegion() {
@ -84,19 +84,19 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
delegate.unlockItem( session, key );
}
public void unlockRegion(SoftLock lock) throws CacheException {
}
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
return delegate.afterInsert(key, value, version);
return delegate.afterInsert( session, key, value, version );
}
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock)
throws CacheException {
return delegate.afterUpdate(key, value, currentVersion, previousVersion, lock);
return delegate.afterUpdate( session, key, value, currentVersion, previousVersion, lock );
}
@Override

View File

@ -53,7 +53,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
return delegate.get( key, txTimestamp );
return delegate.get( session, key, txTimestamp );
}
@Override
@ -89,7 +89,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
delegate.unlockItem( session, key );
}
@Override
@ -98,12 +98,12 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value) throws CacheException {
return delegate.afterInsert(key, value, null);
return delegate.afterInsert( session, key, value, null );
}
@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, SoftLock lock) throws CacheException {
return delegate.afterUpdate(key, value, null, null, lock);
return delegate.afterUpdate( session, key, value, null, null, lock );
}
@Override

View File

@ -7,6 +7,7 @@
package org.hibernate.test.cache.infinispan.access;
import javax.transaction.TransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -19,6 +20,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.infinispan.manager.EmbeddedCacheManager;
@ -38,6 +40,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
/**
* Tests of {@link PutFromLoadValidator}.
@ -215,14 +218,16 @@ public class PutFromLoadValidatorUnitTestCase {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1, txTimestamp);
SessionImplementor session1 = mock(SessionImplementor.class);
SessionImplementor session2 = mock(SessionImplementor.class);
testee.registerPendingPut(session1, KEY1, txTimestamp);
if (removeRegion) {
testee.beginInvalidatingRegion();
} else {
testee.beginInvalidatingKey(KEY1);
testee.beginInvalidatingKey(session2, KEY1);
}
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session1, KEY1, txTimestamp);
try {
assertNull(lock);
}
@ -233,7 +238,7 @@ public class PutFromLoadValidatorUnitTestCase {
if (removeRegion) {
testee.endInvalidatingRegion();
} else {
testee.endInvalidatingKey(KEY1);
testee.endInvalidatingKey(session2, KEY1);
}
}
} catch (Exception e) {
@ -271,10 +276,11 @@ public class PutFromLoadValidatorUnitTestCase {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1, txTimestamp);
SessionImplementor session = mock (SessionImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
@ -344,8 +350,9 @@ public class PutFromLoadValidatorUnitTestCase {
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
long txTimestamp = System.currentTimeMillis();
testee.registerPendingPut(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
SessionImplementor session = mock (SessionImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
removeLatch.countDown();
@ -365,7 +372,8 @@ public class PutFromLoadValidatorUnitTestCase {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.beginInvalidatingKey(KEY1);
SessionImplementor session = mock (SessionImplementor.class);
testee.beginInvalidatingKey(session, KEY1);
} else {
testee.beginInvalidatingRegion();
}
@ -432,9 +440,10 @@ public class PutFromLoadValidatorUnitTestCase {
assertTrue(success);
putFromLoadValidator.endInvalidatingRegion();;
} else {
boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1);
SessionImplementor session = mock (SessionImplementor.class);
boolean success = putFromLoadValidator.beginInvalidatingKey(session, KEY1);
assertTrue(success);
success = putFromLoadValidator.endInvalidatingKey(KEY1);
success = putFromLoadValidator.endInvalidatingKey(session, KEY1);
assertTrue(success);
}
// if we go for the timestamp-based approach, invalidation in the same millisecond
@ -455,9 +464,10 @@ public class PutFromLoadValidatorUnitTestCase {
public Void call() throws Exception {
try {
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
putFromLoadValidator.registerPendingPut(KEY1, txTimestamp);
SessionImplementor session = mock (SessionImplementor.class);
putFromLoadValidator.registerPendingPut(session, KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(session, KEY1, txTimestamp);
try {
assertNotNull(lock);
} finally {
@ -485,7 +495,8 @@ public class PutFromLoadValidatorUnitTestCase {
public Void call() throws Exception {
try {
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
SessionImplementor session = mock (SessionImplementor.class);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
try {
if (expectSuccess) {
assertNotNull(lock);

View File

@ -26,6 +26,7 @@ import org.hibernate.cache.internal.CacheDataDescriptionImpl;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.CollectionRegionAccessStrategy;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.internal.util.compare.ComparableComparator;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
@ -48,6 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Base class for tests of CollectionRegionAccessStrategy impls.
@ -71,10 +73,12 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
protected NodeEnvironment localEnvironment;
protected CollectionRegionImpl localCollectionRegion;
protected CollectionRegionAccessStrategy localAccessStrategy;
protected SessionImplementor localSession;
protected NodeEnvironment remoteEnvironment;
protected CollectionRegionImpl remoteCollectionRegion;
protected CollectionRegionAccessStrategy remoteAccessStrategy;
protected SessionImplementor remoteSession;
protected boolean invalidation;
protected boolean synchronous;
@ -96,6 +100,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
localCollectionRegion = localEnvironment.getCollectionRegion( REGION_NAME, getCacheDataDescription() );
localAccessStrategy = localCollectionRegion.buildAccessStrategy( getAccessType() );
localSession = mock(SessionImplementor.class);
invalidation = Caches.isInvalidationCache(localCollectionRegion.getCache());
synchronous = Caches.isSynchronousCache(localCollectionRegion.getCache());
@ -108,6 +113,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
remoteCollectionRegion = remoteEnvironment.getCollectionRegion( REGION_NAME, getCacheDataDescription() );
remoteAccessStrategy = remoteCollectionRegion.buildAccessStrategy( getAccessType() );
remoteSession = mock(SessionImplementor.class);
}
protected abstract String getConfigurationName();
@ -169,7 +175,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
delegate.putFromLoad(null, "k1", "v1", 0, null );
SessionImplementor session = mock(SessionImplementor.class);
delegate.putFromLoad(session, "k1", "v1", 0, null );
return null;
}
};
@ -180,7 +187,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
Caches.withinTx(localTm, new Callable<Void>() {
@Override
public Void call() throws Exception {
delegate.remove(null, "k1");
SessionImplementor session = mock(SessionImplementor.class);
delegate.remove(session, "k1");
return null;
}
});
@ -219,8 +227,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
PutFromLoadValidator.removeFromCache(cache);
return new PutFromLoadValidator(cache, cm, tm) {
@Override
public Lock acquirePutFromLoadLock(Object key, long txTimestamp) {
Lock lock = super.acquirePutFromLoadLock( key, txTimestamp);
public Lock acquirePutFromLoadLock(SessionImplementor session, Object key, long txTimestamp) {
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
@ -264,15 +272,15 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertEquals( "node1 starts clean", null, localAccessStrategy.get(null, KEY, txTimestamp ) );
assertEquals( "node1 starts clean", null, localAccessStrategy.get(localSession, KEY, txTimestamp ) );
writeLatch1.await();
if ( useMinimalAPI ) {
localAccessStrategy.putFromLoad(null, KEY, VALUE2, txTimestamp, new Integer( 2 ), true );
localAccessStrategy.putFromLoad(localSession, KEY, VALUE2, txTimestamp, new Integer( 2 ), true );
}
else {
localAccessStrategy.putFromLoad(null, KEY, VALUE2, txTimestamp, new Integer( 2 ) );
localAccessStrategy.putFromLoad(localSession, KEY, VALUE2, txTimestamp, new Integer( 2 ) );
}
BatchModeTransactionManager.getInstance().commit();
@ -302,7 +310,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertNull( "node2 starts clean", remoteAccessStrategy.get(null, KEY, txTimestamp ) );
assertNull( "node2 starts clean", remoteAccessStrategy.get(remoteSession, KEY, txTimestamp ) );
// Let node1 write
writeLatch1.countDown();
@ -313,10 +321,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
sleep( 200 );
if ( useMinimalAPI ) {
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer( 1 ), true );
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, txTimestamp, new Integer( 1 ), true );
}
else {
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer( 1 ) );
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, txTimestamp, new Integer( 1 ) );
}
BatchModeTransactionManager.getInstance().commit();
@ -376,8 +384,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
expected2 = VALUE2;
}
assertEquals( msg1, expected1, localAccessStrategy.get(null, KEY, txTimestamp ) );
assertEquals( msg2, expected2, remoteAccessStrategy.get(null, KEY, txTimestamp ) );
assertEquals( msg1, expected1, localAccessStrategy.get(localSession, KEY, txTimestamp ) );
assertEquals( msg2, expected2, remoteAccessStrategy.get(remoteSession, KEY, txTimestamp ) );
}
@Test
@ -404,13 +412,13 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
final Object KEY = TestingKeyFactory.generateCollectionCacheKey( KEY_BASE + testCount++ );
assertNull( "local is clean", localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertNull( "remote is clean", remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertNull( "local is clean", localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
assertNull( "remote is clean", remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
// Wait for async propagation
sleep( 250 );
@ -421,14 +429,14 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
if (evict)
localAccessStrategy.evict(KEY);
else
localAccessStrategy.remove(null, KEY);
localAccessStrategy.remove(localSession, KEY);
return null;
}
});
assertEquals( null, localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertEquals( null, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
assertEquals( null, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertEquals( null, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
}
private void evictOrRemoveAllTest(final boolean evict) throws Exception {
@ -439,13 +447,13 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
assertEquals( 0, remoteCollectionRegion.getCache().size() );
assertNull( "local is clean", localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertNull( "remote is clean", remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertNull( "local is clean", localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
assertNull( "remote is clean", remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
// Wait for async propagation
sleep( 250 );
@ -462,13 +470,13 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
});
// This should re-establish the region root node
assertNull( localAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertNull( localAccessStrategy.get(localSession, KEY, System.currentTimeMillis() ) );
assertEquals( 0, localCollectionRegion.getCache().size() );
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertEquals( null, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertEquals( null, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
assertEquals( 0, remoteCollectionRegion.getCache().size() );
@ -476,8 +484,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
sleep( 250 );
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
assertEquals( 1, remoteCollectionRegion.getCache().size() );
@ -486,11 +494,11 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
assertEquals(
"local is correct", (isUsingInvalidation() ? null : VALUE1), localAccessStrategy.get(
null, KEY, System
localSession, KEY, System
.currentTimeMillis()
)
);
assertEquals( "remote is correct", VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );
assertEquals( "remote is correct", VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis() ) );
}
private void rollback() {

View File

@ -20,6 +20,7 @@ import org.hibernate.cache.internal.CacheDataDescriptionImpl;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.internal.util.compare.ComparableComparator;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
@ -38,6 +39,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Base class for tests of EntityRegionAccessStrategy impls.
@ -62,10 +64,12 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
protected NodeEnvironment localEnvironment;
protected EntityRegionImpl localEntityRegion;
protected EntityRegionAccessStrategy localAccessStrategy;
protected SessionImplementor localSession;
protected NodeEnvironment remoteEnvironment;
protected EntityRegionImpl remoteEntityRegion;
protected EntityRegionAccessStrategy remoteAccessStrategy;
protected SessionImplementor remoteSession;
protected boolean invalidation;
protected boolean synchronous;
@ -85,6 +89,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
localEntityRegion = localEnvironment.getEntityRegion(REGION_NAME, getCacheDataDescription());
localAccessStrategy = localEntityRegion.buildAccessStrategy(getAccessType());
localSession = mock(SessionImplementor.class);
remoteSession = mock(SessionImplementor.class);
invalidation = Caches.isInvalidationCache(localEntityRegion.getCache());
synchronous = Caches.isSynchronousCache(localEntityRegion.getCache());
@ -211,17 +218,17 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertNull("node1 starts clean", localAccessStrategy.get(null, KEY, txTimestamp));
assertNull("node1 starts clean", localAccessStrategy.get(localSession, KEY, txTimestamp));
writeLatch1.await();
if (useMinimalAPI) {
localAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer(1), true);
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, txTimestamp, new Integer(1), true);
} else {
localAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer(1));
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, txTimestamp, new Integer(1));
}
localAccessStrategy.update(null, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
BatchModeTransactionManager.getInstance().commit();
} catch (Exception e) {
@ -248,7 +255,7 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertNull("node1 starts clean", remoteAccessStrategy.get(null, KEY, txTimestamp));
assertNull("node1 starts clean", remoteAccessStrategy.get(remoteSession, KEY, txTimestamp));
// Let node1 write
writeLatch1.countDown();
@ -256,9 +263,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
writeLatch2.await();
if (useMinimalAPI) {
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer(1), true);
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, txTimestamp, new Integer(1), true);
} else {
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, new Integer(1));
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, txTimestamp, new Integer(1));
}
BatchModeTransactionManager.getInstance().commit();
@ -286,14 +293,14 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(localSession, KEY, txTimestamp));
if (isUsingInvalidation()) {
// invalidation command invalidates pending put
assertEquals("Expected node2 value", null, remoteAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Expected node2 value", null, remoteAccessStrategy.get(remoteSession, KEY, txTimestamp));
} else {
// The node1 update is replicated, preventing the node2 PFER
assertEquals("Correct node2 value", VALUE2, remoteAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node2 value", VALUE2, remoteAccessStrategy.get(remoteSession, KEY, txTimestamp));
}
}
@ -315,9 +322,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertNull("Correct initial value", localAccessStrategy.get(null, KEY, txTimestamp));
assertNull("Correct initial value", localAccessStrategy.get(localSession, KEY, txTimestamp));
localAccessStrategy.insert(null, KEY, VALUE1, new Integer(1));
localAccessStrategy.insert(localSession, KEY, VALUE1, new Integer(1));
readLatch.countDown();
commitLatch.await();
@ -351,7 +358,7 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertEquals(
"Correct initial value", expected, localAccessStrategy.get(
null, KEY,
localSession, KEY,
txTimestamp
)
);
@ -381,9 +388,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE1, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node1 value", VALUE1, localAccessStrategy.get(localSession, KEY, txTimestamp));
Object expected = isUsingInvalidation() ? null : VALUE1;
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(remoteSession, KEY, txTimestamp));
}
@Test
@ -392,8 +399,8 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
// Set up initial state
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
// Let the async put propagate
sleep(250);
@ -411,9 +418,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
log.debug("Transaction began, get initial value");
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(localSession, KEY, txTimestamp));
log.debug("Now update value");
localAccessStrategy.update(null, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
log.debug("Notify the read latch");
readLatch.countDown();
readerUnlocked = true;
@ -450,7 +457,7 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
// This won't block w/ mvc and will read the old value
Object expected = VALUE1;
assertEquals("Correct value", expected, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct value", expected, localAccessStrategy.get(localSession, KEY, txTimestamp));
BatchModeTransactionManager.getInstance().commit();
} catch (Exception e) {
@ -479,9 +486,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(localSession, KEY, txTimestamp));
Object expected = isUsingInvalidation() ? null : VALUE2;
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(remoteSession, KEY, txTimestamp));
}
@Test
@ -509,13 +516,13 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertEquals(0, localEntityRegion.getCache().size());
assertEquals(0, remoteEntityRegion.getCache().size());
assertNull("local is clean", localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull("local is clean", localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() {
@Override
@ -523,13 +530,13 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
if (evict)
localAccessStrategy.evict(KEY);
else
localAccessStrategy.remove(null, KEY);
localAccessStrategy.remove(localSession, KEY);
return null;
}
});
assertEquals(null, localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(null, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
assertEquals(0, localEntityRegion.getCache().size());
assertEquals(null, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(null, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
assertEquals(0, remoteEntityRegion.getCache().size());
}
@ -537,17 +544,17 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
assertEquals(0, localEntityRegion.getCache().size());
assertEquals(0, remoteEntityRegion.getCache().size());
assertNull("local is clean", localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull("local is clean", localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
// Wait for async propagation
sleep(250);
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
// Wait for async propagation
sleep(250);
@ -566,20 +573,20 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
});
// This should re-establish the region root node in the optimistic case
assertNull(localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull(localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
assertEquals(0, localEntityRegion.getCache().size());
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertNull(remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull(remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
assertEquals(0, remoteEntityRegion.getCache().size());
// Wait for async propagation of EndInvalidationCommand before executing naked put
sleep(250);
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(remoteSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
assertEquals(1, remoteEntityRegion.getCache().size());
// Wait for async propagation
@ -587,11 +594,11 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertEquals(
"local is correct", (isUsingInvalidation() ? null : VALUE1), localAccessStrategy
.get(null, KEY, System.currentTimeMillis())
.get(localSession, KEY, System.currentTimeMillis())
);
assertEquals(
"remote is correct", VALUE1, remoteAccessStrategy.get(
null, KEY, System
remoteSession, KEY, System
.currentTimeMillis()
)
);

View File

@ -45,26 +45,26 @@ public abstract class AbstractReadOnlyAccessTestCase extends AbstractEntityRegio
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertNull(localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull(localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
if (minimal)
localAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, 1, true);
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, txTimestamp, 1, true);
else
localAccessStrategy.putFromLoad(null, KEY, VALUE1, txTimestamp, 1);
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, txTimestamp, 1);
sleep(250);
Object expected = isUsingInvalidation() ? null : VALUE1;
assertEquals(expected, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(expected, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
BatchModeTransactionManager.getInstance().commit();
assertEquals(VALUE1, localAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(expected, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(VALUE1, localAccessStrategy.get(localSession, KEY, System.currentTimeMillis()));
assertEquals(expected, remoteAccessStrategy.get(remoteSession, KEY, System.currentTimeMillis()));
}
@Test(expected = UnsupportedOperationException.class)
@Override
public void testUpdate() throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
localAccessStrategy.update(null, KEY, VALUE2, 2, 1);
localAccessStrategy.update(localSession, KEY, VALUE2, 2, 1);
}
}

View File

@ -36,7 +36,7 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
localAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
localAccessStrategy.putFromLoad(localSession, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
final CountDownLatch pferLatch = new CountDownLatch(1);
final CountDownLatch pferCompletionLatch = new CountDownLatch(1);
@ -52,9 +52,9 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin();
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(null, KEY, txTimestamp));
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(localSession, KEY, txTimestamp));
localAccessStrategy.update(null, KEY, VALUE2, new Integer(2), new Integer(1));
localAccessStrategy.update(localSession, KEY, VALUE2, new Integer(2), new Integer(1));
pferLatch.countDown();
commitLatch.await();