HHH-9868, HHH-9881 Removed references to TransactionManager from PutFromLoadValidator
* Also removed put() instead of PFER() after region invalidation * Relaxed test that required that session.load() after cache.evictAll() in the same transaction cached the loaded entity
This commit is contained in:
parent
c8ed5e1bef
commit
19c14cee9a
|
@ -92,11 +92,6 @@ public class PutFromLoadValidator {
|
||||||
private static final Log log = LogFactory.getLog(PutFromLoadValidator.class);
|
private static final Log log = LogFactory.getLog(PutFromLoadValidator.class);
|
||||||
private static final boolean trace = log.isTraceEnabled();
|
private static final boolean trace = log.isTraceEnabled();
|
||||||
|
|
||||||
/**
|
|
||||||
* Used to determine whether the owner of a pending put is a thread or a transaction
|
|
||||||
*/
|
|
||||||
private final TransactionManager transactionManager;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Period after which ongoing invalidation is removed. Value is retrieved from cache configuration.
|
* Period after which ongoing invalidation is removed. Value is retrieved from cache configuration.
|
||||||
*/
|
*/
|
||||||
|
@ -130,21 +125,17 @@ public class PutFromLoadValidator {
|
||||||
private int regionInvalidations = 0;
|
private int regionInvalidations = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transactions that invalidate the region. Entries are removed during next invalidation based on transaction status.
|
* Allows propagation of current Session to callbacks invoked from interceptors
|
||||||
*/
|
*/
|
||||||
private final ConcurrentHashSet<Transaction> regionInvalidators = new ConcurrentHashSet<Transaction>();
|
|
||||||
|
|
||||||
private final ThreadLocal<SessionImplementor> currentSession = new ThreadLocal<SessionImplementor>();
|
private final ThreadLocal<SessionImplementor> currentSession = new ThreadLocal<SessionImplementor>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new put from load validator instance.
|
* Creates a new put from load validator instance.
|
||||||
*
|
*
|
||||||
* @param cache Cache instance on which to store pending put information.
|
* @param cache Cache instance on which to store pending put information.
|
||||||
* @param transactionManager Transaction manager
|
|
||||||
*/
|
*/
|
||||||
public PutFromLoadValidator(AdvancedCache cache, TransactionManager transactionManager) {
|
public PutFromLoadValidator(AdvancedCache cache) {
|
||||||
this( cache, cache.getCacheManager(), transactionManager);
|
this( cache, cache.getCacheManager());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -152,10 +143,9 @@ public class PutFromLoadValidator {
|
||||||
*
|
*
|
||||||
* @param cache Cache instance on which to store pending put information.
|
* @param cache Cache instance on which to store pending put information.
|
||||||
* @param cacheManager where to find a cache to store pending put information
|
* @param cacheManager where to find a cache to store pending put information
|
||||||
* @param tm transaction manager
|
|
||||||
*/
|
*/
|
||||||
public PutFromLoadValidator(AdvancedCache cache,
|
public PutFromLoadValidator(AdvancedCache cache,
|
||||||
EmbeddedCacheManager cacheManager, TransactionManager tm) {
|
EmbeddedCacheManager cacheManager) {
|
||||||
|
|
||||||
Configuration cacheConfiguration = cache.getCacheConfiguration();
|
Configuration cacheConfiguration = cache.getCacheConfiguration();
|
||||||
Configuration pendingPutsConfiguration = cacheManager.getCacheConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME);
|
Configuration pendingPutsConfiguration = cacheManager.getCacheConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME);
|
||||||
|
@ -218,7 +208,6 @@ public class PutFromLoadValidator {
|
||||||
|
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
this.pendingPuts = cacheManager.getCache(pendingPutsName);
|
this.pendingPuts = cacheManager.getCache(pendingPutsName);
|
||||||
this.transactionManager = tm;
|
|
||||||
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
|
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,35 +411,6 @@ public class PutFromLoadValidator {
|
||||||
regionInvalidationTimestamp = Long.MAX_VALUE;
|
regionInvalidationTimestamp = Long.MAX_VALUE;
|
||||||
regionInvalidations++;
|
regionInvalidations++;
|
||||||
}
|
}
|
||||||
if (transactionManager != null) {
|
|
||||||
// cleanup old transactions
|
|
||||||
for (Iterator<Transaction> it = regionInvalidators.iterator(); it.hasNext(); ) {
|
|
||||||
Transaction tx = it.next();
|
|
||||||
try {
|
|
||||||
switch (tx.getStatus()) {
|
|
||||||
case Status.STATUS_COMMITTED:
|
|
||||||
case Status.STATUS_ROLLEDBACK:
|
|
||||||
case Status.STATUS_UNKNOWN:
|
|
||||||
case Status.STATUS_NO_TRANSACTION:
|
|
||||||
it.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (SystemException e) {
|
|
||||||
log.error("Cannot retrieve transaction status", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// add this transaction
|
|
||||||
try {
|
|
||||||
Transaction tx = transactionManager.getTransaction();
|
|
||||||
if (tx != null) {
|
|
||||||
regionInvalidators.add(tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (SystemException e) {
|
|
||||||
log.error("TransactionManager failed to provide transaction", e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Acquire the lock for each entry to ensure any ongoing
|
// Acquire the lock for each entry to ensure any ongoing
|
||||||
|
@ -513,29 +473,10 @@ public class PutFromLoadValidator {
|
||||||
public void registerPendingPut(SessionImplementor session, Object key, long txTimestamp) {
|
public void registerPendingPut(SessionImplementor session, Object key, long txTimestamp) {
|
||||||
long invalidationTimestamp = this.regionInvalidationTimestamp;
|
long invalidationTimestamp = this.regionInvalidationTimestamp;
|
||||||
if (txTimestamp <= invalidationTimestamp) {
|
if (txTimestamp <= invalidationTimestamp) {
|
||||||
boolean skip;
|
if (trace) {
|
||||||
if (invalidationTimestamp == Long.MAX_VALUE) {
|
log.tracef("registerPendingPut(%s#%s, %d) skipped due to region invalidation (%d)", cache.getName(), key, txTimestamp, invalidationTimestamp);
|
||||||
// there is ongoing invalidation of pending puts
|
|
||||||
skip = true;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
Transaction tx = null;
|
|
||||||
if (transactionManager != null) {
|
|
||||||
try {
|
|
||||||
tx = transactionManager.getTransaction();
|
|
||||||
}
|
|
||||||
catch (SystemException e) {
|
|
||||||
log.error("TransactionManager failed to provide transaction", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
skip = tx == null || !regionInvalidators.contains(tx);
|
|
||||||
}
|
|
||||||
if (skip) {
|
|
||||||
if (trace) {
|
|
||||||
log.tracef("registerPendingPut(%s#%s, %d) skipped due to region invalidation (%d)", cache.getName(), key, txTimestamp, invalidationTimestamp);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final PendingPut pendingPut = new PendingPut( session );
|
final PendingPut pendingPut = new PendingPut( session );
|
||||||
|
|
|
@ -124,21 +124,10 @@ public class TransactionalAccessDelegate {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
putValidator.setCurrentSession(session);
|
|
||||||
try {
|
try {
|
||||||
// Conditional put/putForExternalRead. If the region has been
|
writeCache.putForExternalRead( key, value );
|
||||||
// evicted in the current transaction, do a put instead of a
|
|
||||||
// putForExternalRead to make it stick, otherwise the current
|
|
||||||
// transaction won't see it.
|
|
||||||
if ( region.isRegionInvalidatedInCurrentTx() ) {
|
|
||||||
writeCache.put( key, value );
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
writeCache.putForExternalRead( key, value );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
putValidator.resetCurrentSession();
|
|
||||||
putValidator.releasePutFromLoadLock( key, lock);
|
putValidator.releasePutFromLoadLock( key, lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class CollectionRegionImpl extends BaseTransactionalDataRegion implements
|
||||||
}
|
}
|
||||||
|
|
||||||
public PutFromLoadValidator getPutFromLoadValidator() {
|
public PutFromLoadValidator getPutFromLoadValidator() {
|
||||||
return new PutFromLoadValidator( cache, getTransactionManager() );
|
return new PutFromLoadValidator( cache );
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,6 @@ public class EntityRegionImpl extends BaseTransactionalDataRegion implements Ent
|
||||||
}
|
}
|
||||||
|
|
||||||
public PutFromLoadValidator getPutFromLoadValidator() {
|
public PutFromLoadValidator getPutFromLoadValidator() {
|
||||||
return new PutFromLoadValidator( cache, getTransactionManager() );
|
return new PutFromLoadValidator( cache );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,17 +228,7 @@ public abstract class BaseRegion implements Region {
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
log.trace( "Invalidate region: " + name );
|
log.trace( "Invalidate region: " + name );
|
||||||
}
|
}
|
||||||
|
invalidateState.set( InvalidateState.INVALID );
|
||||||
Transaction tx = getCurrentTransaction();
|
|
||||||
if ( tx != null ) {
|
|
||||||
synchronized ( invalidationMutex ) {
|
|
||||||
invalidateTransaction = tx;
|
|
||||||
invalidateState.set( InvalidateState.INVALID );
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
invalidateState.set( InvalidateState.INVALID );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransactionManager getTransactionManager() {
|
public TransactionManager getTransactionManager() {
|
||||||
|
@ -255,11 +245,6 @@ public abstract class BaseRegion implements Region {
|
||||||
return cache;
|
return cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRegionInvalidatedInCurrentTx() {
|
|
||||||
Transaction tx = getCurrentTransaction();
|
|
||||||
return tx != null && tx.equals( invalidateTransaction );
|
|
||||||
}
|
|
||||||
|
|
||||||
private Transaction getCurrentTransaction() {
|
private Transaction getCurrentTransaction() {
|
||||||
try {
|
try {
|
||||||
// Transaction manager could be null
|
// Transaction manager could be null
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class NaturalIdRegionImpl extends BaseTransactionalDataRegion
|
||||||
}
|
}
|
||||||
|
|
||||||
public PutFromLoadValidator getPutFromLoadValidator() {
|
public PutFromLoadValidator getPutFromLoadValidator() {
|
||||||
return new PutFromLoadValidator( cache, getTransactionManager() );
|
return new PutFromLoadValidator( cache );
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,8 +97,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
transactional ? tm : null);
|
|
||||||
exec(transactional, new NakedPut(testee, true));
|
exec(transactional, new NakedPut(testee, true));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -117,8 +116,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
transactional ? tm : null);
|
|
||||||
exec(transactional, new RegularPut(testee));
|
exec(transactional, new RegularPut(testee));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -146,8 +144,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache());
|
||||||
transactional ? tm : null);
|
|
||||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||||
// the naked put can succeed because it has txTimestamp after invalidation
|
// the naked put can succeed because it has txTimestamp after invalidation
|
||||||
NakedPut nakedPut = new NakedPut(testee, true);
|
NakedPut nakedPut = new NakedPut(testee, true);
|
||||||
|
@ -179,8 +176,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
transactional ? tm : null);
|
|
||||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||||
RegularPut regularPut = new RegularPut(testee);
|
RegularPut regularPut = new RegularPut(testee);
|
||||||
exec(transactional, invalidation, regularPut);
|
exec(transactional, invalidation, regularPut);
|
||||||
|
@ -211,8 +207,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
transactional ? tm : null);
|
|
||||||
try {
|
try {
|
||||||
long txTimestamp = System.currentTimeMillis();
|
long txTimestamp = System.currentTimeMillis();
|
||||||
if (transactional) {
|
if (transactional) {
|
||||||
|
@ -262,8 +257,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
|
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
transactional ? tm : null);
|
|
||||||
|
|
||||||
final CountDownLatch registeredLatch = new CountDownLatch(3);
|
final CountDownLatch registeredLatch = new CountDownLatch(3);
|
||||||
final CountDownLatch finishedLatch = new CountDownLatch(3);
|
final CountDownLatch finishedLatch = new CountDownLatch(3);
|
||||||
|
@ -341,8 +335,7 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(),
|
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
cm, null);
|
|
||||||
final CountDownLatch removeLatch = new CountDownLatch(1);
|
final CountDownLatch removeLatch = new CountDownLatch(1);
|
||||||
final CountDownLatch pferLatch = new CountDownLatch(1);
|
final CountDownLatch pferLatch = new CountDownLatch(1);
|
||||||
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
|
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
|
||||||
|
|
|
@ -167,7 +167,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
|
||||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||||
@Override
|
@Override
|
||||||
public void call() {
|
public void call() {
|
||||||
PutFromLoadValidator validator = getPutFromLoadValidator(remoteCollectionRegion.getCache(), cm, remoteTm, removeLatch, pferLatch);
|
PutFromLoadValidator validator = getPutFromLoadValidator(remoteCollectionRegion.getCache(), cm, removeLatch, pferLatch);
|
||||||
|
|
||||||
final TransactionalAccessDelegate delegate =
|
final TransactionalAccessDelegate delegate =
|
||||||
new TransactionalAccessDelegate(localCollectionRegion, validator);
|
new TransactionalAccessDelegate(localCollectionRegion, validator);
|
||||||
|
@ -221,11 +221,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
|
||||||
}
|
}
|
||||||
|
|
||||||
protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm,
|
protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm,
|
||||||
TransactionManager tm,
|
|
||||||
CountDownLatch removeLatch, CountDownLatch pferLatch) {
|
CountDownLatch removeLatch, CountDownLatch pferLatch) {
|
||||||
// remove the interceptor inserted by default PutFromLoadValidator, we're using different one
|
// remove the interceptor inserted by default PutFromLoadValidator, we're using different one
|
||||||
PutFromLoadValidator.removeFromCache(cache);
|
PutFromLoadValidator.removeFromCache(cache);
|
||||||
return new PutFromLoadValidator(cache, cm, tm) {
|
return new PutFromLoadValidator(cache, cm) {
|
||||||
@Override
|
@Override
|
||||||
public Lock acquirePutFromLoadLock(SessionImplementor session, Object key, long txTimestamp) {
|
public Lock acquirePutFromLoadLock(SessionImplementor session, Object key, long txTimestamp) {
|
||||||
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
|
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
|
||||||
|
|
|
@ -1043,10 +1043,6 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
|
||||||
assertNotNull(citizen);
|
assertNotNull(citizen);
|
||||||
assertNotNull(citizen.getFirstname()); // proxy gets resolved
|
assertNotNull(citizen.getFirstname()); // proxy gets resolved
|
||||||
assertEquals(1, slcStats.getMissCount());
|
assertEquals(1, slcStats.getMissCount());
|
||||||
assertEquals(3, slcStats.getPutCount());
|
|
||||||
assertEquals(1, slcStats.getElementCountInMemory());
|
|
||||||
assertTrue("2lc entity cache is expected to contain Citizen id = " + citizens.get(0).getId(),
|
|
||||||
cache.containsEntity(Citizen.class, citizens.get(0).getId()));
|
|
||||||
|
|
||||||
// cleanup
|
// cleanup
|
||||||
tx.rollback();
|
tx.rollback();
|
||||||
|
|
Loading…
Reference in New Issue