HHH-5793 - Query and timestamp caches to use cluster cache loader

* Query and timestamp caches now use cluster cache loader instead of
state transfer in order to remain asynchronous.
* To maintain previous semantics, after query or timestamp regions have
been completely evicted, a check has been added to avoid going remote if
the region was invalid.
This commit is contained in:
Galder Zamarreño 2010-12-15 17:37:34 +01:00
parent 4c7983151c
commit b3aa9e0c28
6 changed files with 65 additions and 15 deletions

View File

@ -1,5 +1,5 @@
dependencies { dependencies {
infinispanVersion = '4.2.0.ALPHA3' infinispanVersion = '4.2.0.CR4'
jnpVersion = '5.0.3.GA' jnpVersion = '5.0.3.GA'
compile(project(':hibernate-core')) compile(project(':hibernate-core'))

View File

@ -169,7 +169,7 @@ public abstract class BaseRegion implements Region {
} }
public boolean checkValid() { public boolean checkValid() {
boolean valid = invalidateState.get() == InvalidateState.VALID; boolean valid = isValid();
if (!valid) { if (!valid) {
synchronized (invalidationMutex) { synchronized (invalidationMutex) {
if (invalidateState.compareAndSet(InvalidateState.INVALID, InvalidateState.CLEARING)) { if (invalidateState.compareAndSet(InvalidateState.INVALID, InvalidateState.CLEARING)) {
@ -188,26 +188,34 @@ public abstract class BaseRegion implements Region {
} }
} }
} }
valid = invalidateState.get() == InvalidateState.VALID; valid = isValid();
} }
return valid; return valid;
} }
protected boolean isValid() {
return invalidateState.get() == InvalidateState.VALID;
}
/** /**
* Performs a Infinispan <code>get(Fqn, Object)</code> * Performs a Infinispan <code>get(Fqn, Object)</code>
* *
* @param key The key of the item to get * @param key The key of the item to get
* @param opt any option to add to the get invocation. May be <code>null</code>
* @param suppressTimeout should any TimeoutException be suppressed? * @param suppressTimeout should any TimeoutException be suppressed?
* @param flagAdapters flags to add to the get invocation
* @return The retrieved object * @return The retrieved object
* @throws CacheException issue managing transaction or talking to cache * @throws CacheException issue managing transaction or talking to cache
*/ */
protected Object get(Object key, FlagAdapter opt, boolean suppressTimeout) throws CacheException { protected Object get(Object key, boolean suppressTimeout, FlagAdapter... flagAdapters) throws CacheException {
CacheAdapter localCacheAdapter = cacheAdapter;
if (flagAdapters != null && flagAdapters.length > 0)
localCacheAdapter = cacheAdapter.withFlags(flagAdapters);
if (suppressTimeout) if (suppressTimeout)
return cacheAdapter.getAllowingTimeout(key); return localCacheAdapter.getAllowingTimeout(key);
else else
return cacheAdapter.get(key); return localCacheAdapter.get(key);
} }
public Object getOwnerForPut() { public Object getOwnerForPut() {
@ -295,4 +303,4 @@ public abstract class BaseRegion implements Region {
} }
} }
} }

View File

@ -47,6 +47,13 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
} }
public Object get(Object key) throws CacheException { public Object get(Object key) throws CacheException {
// If the region is not valid, skip cache store to avoid going remote to retrieve the query.
// The aim of this is to maintain same logic/semantics as when state transfer was configured.
// TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround
boolean skipCacheStore = false;
if (!isValid())
skipCacheStore = true;
if (!checkValid()) if (!checkValid())
return null; return null;
@ -55,7 +62,10 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
// to avoid holding locks that would prevent updates. // to avoid holding locks that would prevent updates.
// Add a zero (or low) timeout option so we don't block // Add a zero (or low) timeout option so we don't block
// waiting for tx's that did a put to commit // waiting for tx's that did a put to commit
return get(key, FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT, true); if (skipCacheStore)
return get(key, true, FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT, FlagAdapter.SKIP_CACHE_STORE);
else
return get(key, true, FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT);
} }
public void put(Object key, Object value) throws CacheException { public void put(Object key, Object value) throws CacheException {
@ -83,5 +93,4 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
.putAllowingTimeout(key, value); .putAllowingTimeout(key, value);
} }
} }
} }

View File

@ -57,8 +57,20 @@ public class TimestampsRegionImpl extends BaseGeneralDataRegion implements Times
public Object get(Object key) throws CacheException { public Object get(Object key) throws CacheException {
Object value = localCache.get(key); Object value = localCache.get(key);
// If the region is not valid, skip cache store to avoid going remote to retrieve the query.
// The aim of this is to maintain same logic/semantics as when state transfer was configured.
// TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround
boolean skipCacheStore = false;
if (!isValid())
skipCacheStore = true;
if (value == null && checkValid()) { if (value == null && checkValid()) {
value = get(key, null, false); if (skipCacheStore)
value = get(key, false, FlagAdapter.SKIP_CACHE_STORE);
else
value = get(key, false);
if (value != null) if (value != null)
localCache.put(key, value); localCache.put(key, value);
} }

View File

@ -35,7 +35,8 @@ public enum FlagAdapter {
ZERO_LOCK_ACQUISITION_TIMEOUT, ZERO_LOCK_ACQUISITION_TIMEOUT,
CACHE_MODE_LOCAL, CACHE_MODE_LOCAL,
FORCE_ASYNCHRONOUS, FORCE_ASYNCHRONOUS,
FORCE_SYNCHRONOUS; FORCE_SYNCHRONOUS,
SKIP_CACHE_STORE;
Flag toFlag() { Flag toFlag() {
switch(this) { switch(this) {
@ -47,6 +48,8 @@ public enum FlagAdapter {
return Flag.FORCE_ASYNCHRONOUS; return Flag.FORCE_ASYNCHRONOUS;
case FORCE_SYNCHRONOUS: case FORCE_SYNCHRONOUS:
return Flag.FORCE_SYNCHRONOUS; return Flag.FORCE_SYNCHRONOUS;
case SKIP_CACHE_STORE:
return Flag.SKIP_CACHE_STORE;
default: default:
throw new CacheException("Unmatched Infinispan flag " + this); throw new CacheException("Unmatched Infinispan flag " + this);
} }

View File

@ -82,7 +82,6 @@
<!-- A query cache that replicates queries. Replication is asynchronous. --> <!-- A query cache that replicates queries. Replication is asynchronous. -->
<namedCache name="replicated-query"> <namedCache name="replicated-query">
<clustering mode="replication"> <clustering mode="replication">
<stateRetrieval fetchInMemoryState="false"/>
<async/> <async/>
</clustering> </clustering>
<locking isolationLevel="READ_COMMITTED" concurrencyLevel="1000" <locking isolationLevel="READ_COMMITTED" concurrencyLevel="1000"
@ -91,6 +90,16 @@
the eviction thread will never run. A separate executor is used for eviction in each cache. --> the eviction thread will never run. A separate executor is used for eviction in each cache. -->
<eviction wakeUpInterval="5000" maxEntries="10000" strategy="LRU"/> <eviction wakeUpInterval="5000" maxEntries="10000" strategy="LRU"/>
<expiration maxIdle="100000"/> <expiration maxIdle="100000"/>
<!-- State transfer forces all replication calls to be synchronous,
so for calls to remain async, use a cluster cache loader instead -->
<loaders passivation="false" shared="false" preload="false">
<loader class="org.infinispan.loaders.cluster.ClusterCacheLoader" fetchPersistentState="false"
ignoreModifications="false" purgeOnStartup="false">
<properties>
<property name="remoteCallTimeout" value="20000"/>
</properties>
</loader>
</loaders>
</namedCache> </namedCache>
<!-- Optimized for timestamp caching. A clustered timestamp cache <!-- Optimized for timestamp caching. A clustered timestamp cache
@ -98,7 +107,6 @@
itself is configured with CacheMode=LOCAL. --> itself is configured with CacheMode=LOCAL. -->
<namedCache name="timestamps"> <namedCache name="timestamps">
<clustering mode="replication"> <clustering mode="replication">
<stateRetrieval fetchInMemoryState="true" timeout="20000"/>
<async/> <async/>
</clustering> </clustering>
<locking isolationLevel="READ_COMMITTED" concurrencyLevel="1000" <locking isolationLevel="READ_COMMITTED" concurrencyLevel="1000"
@ -106,6 +114,16 @@
<lazyDeserialization enabled="true"/> <lazyDeserialization enabled="true"/>
<!-- Don't ever evict modification timestamps --> <!-- Don't ever evict modification timestamps -->
<eviction wakeUpInterval="0" strategy="NONE"/> <eviction wakeUpInterval="0" strategy="NONE"/>
<!-- State transfer forces all replication calls to be synchronous,
so for calls to remain async, use a cluster cache loader instead -->
<loaders passivation="false" shared="false" preload="false">
<loader class="org.infinispan.loaders.cluster.ClusterCacheLoader" fetchPersistentState="false"
ignoreModifications="false" purgeOnStartup="false">
<properties>
<property name="remoteCallTimeout" value="20000"/>
</properties>
</loader>
</loaders>
</namedCache> </namedCache>
</infinispan> </infinispan>