diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/BasicRegionAdapter.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/BasicRegionAdapter.java index 0657c604da..161a099880 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/BasicRegionAdapter.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/BasicRegionAdapter.java @@ -23,9 +23,13 @@ */ package org.hibernate.cache.jbc2; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import javax.transaction.SystemException; import javax.transaction.Transaction; @@ -42,7 +46,12 @@ import org.jboss.cache.NodeSPI; import org.jboss.cache.config.Configuration; import org.jboss.cache.config.Option; import org.jboss.cache.config.Configuration.NodeLockingScheme; -import org.jboss.cache.notifications.annotation.CacheListener; +import org.jboss.cache.notifications.annotation.NodeInvalidated; +import org.jboss.cache.notifications.annotation.NodeModified; +import org.jboss.cache.notifications.annotation.ViewChanged; +import org.jboss.cache.notifications.event.NodeInvalidatedEvent; +import org.jboss.cache.notifications.event.NodeModifiedEvent; +import org.jboss.cache.notifications.event.ViewChangedEvent; import org.jboss.cache.optimistic.DataVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,30 +62,52 @@ import org.slf4j.LoggerFactory; * * @author Steve Ebersole */ -@CacheListener public abstract class BasicRegionAdapter implements Region { + private enum InvalidateState { INVALID, CLEARING, VALID }; public static final String ITEM = CacheHelper.ITEM; protected final Cache jbcCache; protected final String regionName; protected final Fqn regionFqn; + protected final Fqn internalFqn; protected Node regionRoot; protected final boolean optimistic; protected final TransactionManager transactionManager; protected final Logger log; protected final Object regionRootMutex = new Object(); + protected final Object memberId; + protected final boolean replication; + protected final Object invalidationMutex = new Object(); + protected final AtomicReference invalidateState = + new AtomicReference(InvalidateState.VALID); + protected final Set currentView = new HashSet(); // protected RegionRootListener listener; public BasicRegionAdapter(Cache jbcCache, String regionName, String regionPrefix) { + + this.log = LoggerFactory.getLogger(getClass()); + this.jbcCache = jbcCache; this.transactionManager = jbcCache.getConfiguration().getRuntimeConfig().getTransactionManager(); this.regionName = regionName; this.regionFqn = createRegionFqn(regionName, regionPrefix); - optimistic = jbcCache.getConfiguration().getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC; - log = LoggerFactory.getLogger(getClass()); + this.internalFqn = CacheHelper.getInternalFqn(regionFqn); + this.optimistic = jbcCache.getConfiguration().getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC; + this.memberId = jbcCache.getLocalAddress(); + this.replication = CacheHelper.isClusteredReplication(jbcCache); + + this.jbcCache.addCacheListener(this); + + synchronized (currentView) { + List view = jbcCache.getMembers(); + if (view != null) { + currentView.addAll(view); + } + } + activateLocalClusterNode(); log.debug("Created Region for " + regionName + " -- regionPrefix is " + regionPrefix); @@ -129,13 +160,13 @@ public abstract class BasicRegionAdapter implements Region { if (!regionRoot.isResident()) { regionRoot.setResident(true); } + establishInternalNodes(); } catch (Exception e) { throw new CacheException(e.getMessage(), e); } finally { - if (tx != null) - resume(tx); + resume(tx); } } @@ -154,6 +185,7 @@ public abstract class BasicRegionAdapter implements Region { // For pessimistic locking, we just want to toss out our ref // to any old invalid root node and get the latest (may be null) if (!optimistic) { + establishInternalNodes(); regionRoot = jbcCache.getRoot().getChild( regionFqn ); return; } @@ -181,6 +213,7 @@ public abstract class BasicRegionAdapter implements Region { } // Never evict this node newRoot.setResident(true); + establishInternalNodes(); } finally { resume(tx); @@ -189,6 +222,24 @@ public abstract class BasicRegionAdapter implements Region { } } + private void establishInternalNodes() + { + synchronized (currentView) { + Transaction tx = suspend(); + try { + for (Object member : currentView) { + DataVersion version = optimistic ? NonLockingDataVersion.INSTANCE : null; + Fqn f = Fqn.fromRelativeElements(internalFqn, member); + CacheHelper.addNode(jbcCache, f, true, false, version); + } + } + finally { + resume(tx); + } + } + + } + public String getName() { return regionName; } @@ -201,6 +252,11 @@ public abstract class BasicRegionAdapter implements Region { return regionFqn; } + public Object getMemberId() + { + return this.memberId; + } + /** * Checks for the validity of the root cache node for this region, * creating a new one if it does not exist or is invalid, and also @@ -219,6 +275,37 @@ public abstract class BasicRegionAdapter implements Region { if (regionRoot != null && regionRoot.isValid() && !regionRoot.isResident()) regionRoot.setResident(true); } + + public boolean checkValid() + { + boolean valid = invalidateState.get() == InvalidateState.VALID; + + if (!valid) { + synchronized (invalidationMutex) { + if (invalidateState.compareAndSet(InvalidateState.INVALID, InvalidateState.CLEARING)) { + Transaction tx = suspend(); + try { + Option opt = new Option(); + opt.setLockAcquisitionTimeout(1); + opt.setCacheModeLocal(true); + CacheHelper.removeAll(jbcCache, regionFqn, opt); + invalidateState.compareAndSet(InvalidateState.CLEARING, InvalidateState.VALID); + } + catch (Exception e) { + if (log.isTraceEnabled()) { + log.trace("Could not invalidate region: " + e.getLocalizedMessage()); + } + } + finally { + resume(tx); + } + } + } + valid = invalidateState.get() == InvalidateState.VALID; + } + + return valid; + } public void destroy() throws CacheException { try { @@ -242,10 +329,9 @@ public abstract class BasicRegionAdapter implements Region { } catch (Exception e) { throw new CacheException(e); } -// finally { -// if (listener != null) -// jbcCache.removeCacheListener(listener); -// } + finally { + jbcCache.removeCacheListener(this); + } } protected void deactivateLocalNode() { @@ -262,12 +348,21 @@ public abstract class BasicRegionAdapter implements Region { } public long getElementCountInMemory() { - try { - Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); - return childrenNames.size(); - } catch (Exception e) { - throw new CacheException(e); + if (checkValid()) { + try { + Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); + int size = childrenNames.size(); + if (childrenNames.contains(CacheHelper.Internal.NODE)) { + size--; + } + return size; + } catch (Exception e) { + throw new CacheException(e); + } } + else { + return 0; + } } public long getElementCountOnDisk() { @@ -275,17 +370,24 @@ public abstract class BasicRegionAdapter implements Region { } public Map toMap() { - try { - Map result = new HashMap(); - Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); - for (Object childName : childrenNames) { - result.put(childName, CacheHelper.get(jbcCache,regionFqn, childName)); - } - return result; - } catch (CacheException e) { - throw e; - } catch (Exception e) { - throw new CacheException(e); + if (checkValid()) { + try { + Map result = new HashMap(); + Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); + for (Object childName : childrenNames) { + if (CacheHelper.Internal.NODE != childName) { + result.put(childName, CacheHelper.get(jbcCache,regionFqn, childName)); + } + } + return result; + } catch (CacheException e) { + throw e; + } catch (Exception e) { + throw new CacheException(e); + } + } + else { + return Collections.emptyMap(); } } @@ -320,6 +422,20 @@ public abstract class BasicRegionAdapter implements Region { resume(tx); } } + + public Object getOwnerForPut() + { + Transaction tx = null; + try { + if (transactionManager != null) { + tx = transactionManager.getTransaction(); + } + } catch (SystemException se) { + throw new CacheException("Could not obtain transaction", se); + } + return tx == null ? Thread.currentThread() : tx; + + } /** * Tell the TransactionManager to suspend any ongoing transaction. @@ -327,7 +443,7 @@ public abstract class BasicRegionAdapter implements Region { * @return the transaction that was suspended, or null if * there wasn't one */ - protected Transaction suspend() { + public Transaction suspend() { Transaction tx = null; try { if (transactionManager != null) { @@ -345,7 +461,7 @@ public abstract class BasicRegionAdapter implements Region { * @param tx * the transaction to suspend. May be null. */ - protected void resume(Transaction tx) { + public void resume(Transaction tx) { try { if (tx != null) transactionManager.resume(tx); @@ -404,17 +520,52 @@ public abstract class BasicRegionAdapter implements Region { return escaped; } -// @CacheListener -// public class RegionRootListener { -// -// @NodeCreated -// public void nodeCreated(NodeCreatedEvent event) { -// if (!event.isPre() && event.getFqn().equals(getRegionFqn())) { -// log.debug("Node created for " + getRegionFqn()); -// Node regionRoot = jbcCache.getRoot().getChild(getRegionFqn()); -// regionRoot.setResident(true); -// } -// } -// -// } + @NodeModified + public void nodeModified(NodeModifiedEvent event) + { + handleEvictAllModification(event); + } + + protected boolean handleEvictAllModification(NodeModifiedEvent event) { + + if (!event.isPre() && (replication || event.isOriginLocal()) && event.getData().containsKey(ITEM)) + { + if (event.getFqn().isChildOf(internalFqn)) + { + invalidateState.set(InvalidateState.INVALID); + return true; + } + } + return false; + } + + @NodeInvalidated + public void nodeInvalidated(NodeInvalidatedEvent event) + { + handleEvictAllInvalidation(event); + } + + protected boolean handleEvictAllInvalidation(NodeInvalidatedEvent event) + { + if (!event.isPre() && event.getFqn().isChildOf(internalFqn)) + { + invalidateState.set(InvalidateState.INVALID); + return true; + } + return false; + } + + @ViewChanged + public void viewChanged(ViewChangedEvent event) { + + synchronized (currentView) { + List view = event.getNewView().getMembers(); + if (view != null) { + currentView.addAll(view); + establishInternalNodes(); + } + } + + } + } diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/OptimisticTransactionalAccessDelegate.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/OptimisticTransactionalAccessDelegate.java index 3892850d72..2bb8b94fe7 100755 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/OptimisticTransactionalAccessDelegate.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/OptimisticTransactionalAccessDelegate.java @@ -23,11 +23,12 @@ */ package org.hibernate.cache.jbc2.access; +import javax.transaction.Transaction; + import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheException; import org.hibernate.cache.access.CollectionRegionAccessStrategy; import org.hibernate.cache.access.EntityRegionAccessStrategy; -import org.hibernate.cache.jbc2.BasicRegionAdapter; import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; import org.hibernate.cache.jbc2.util.CacheHelper; import org.hibernate.cache.jbc2.util.DataVersionAdapter; @@ -63,43 +64,42 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe */ @Override public void evict(Object key) throws CacheException { - + pendingPuts.remove(key); region.ensureRegionRootExists(); Option opt = NonLockingDataVersion.getInvocationOption(); CacheHelper.remove(cache, regionFqn, key, opt); - } - - /** - * Overrides the {@link TransactionalAccessDelegate#evictAll() superclass} - * by adding a {@link NonLockingDataVersion} to the invocation. - */ - @Override - public void evictAll() throws CacheException { - - evictOrRemoveAll(); - } + } - /** - * Overrides the {@link TransactionalAccessDelegate#get(Object, long) superclass} - * by {@link BasicRegionAdapter#ensureRegionRootExists() ensuring the root - * node for the region exists} before making the call. - */ + + @Override - public Object get(Object key, long txTimestamp) throws CacheException + public void evictAll() throws CacheException { - region.ensureRegionRootExists(); - - return CacheHelper.get(cache, regionFqn, key); + pendingPuts.clear(); + Transaction tx = region.suspend(); + try { + region.ensureRegionRootExists(); + Option opt = NonLockingDataVersion.getInvocationOption(); + CacheHelper.sendEvictAllNotification(cache, regionFqn, region.getMemberId(), opt); + } + finally { + region.resume(tx); + } } - /** + /** * Overrides the * {@link TransactionalAccessDelegate#insert(Object, Object, Object) superclass} * by adding a {@link DataVersion} to the invocation. */ @Override public boolean insert(Object key, Object value, Object version) throws CacheException { + + pendingPuts.remove(key); + + if (!region.checkValid()) + return false; region.ensureRegionRootExists(); @@ -111,6 +111,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe @Override public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException { + + if (!region.checkValid()) + return false; + + if (!isPutValid(key)) + return false; region.ensureRegionRootExists(); @@ -123,6 +129,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe @Override public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException { + + if (!region.checkValid()) + return false; + + if (!isPutValid(key)) + return false; region.ensureRegionRootExists(); @@ -132,6 +144,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe @Override public void remove(Object key) throws CacheException { + + pendingPuts.remove(key); + + // We remove whether or not the region is valid. Other nodes + // may have already restored the region so they need to + // be informed of the change. region.ensureRegionRootExists(); @@ -141,13 +159,20 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe @Override public void removeAll() throws CacheException { - - evictOrRemoveAll(); + pendingPuts.clear(); + Option opt = NonLockingDataVersion.getInvocationOption(); + CacheHelper.removeAll(cache, regionFqn, opt); } @Override public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException { + + pendingPuts.remove(key); + + // We update whether or not the region is valid. Other nodes + // may have already restored the region so they need to + // be informed of the change. region.ensureRegionRootExists(); @@ -166,10 +191,4 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe return opt; } - private void evictOrRemoveAll() { - - Option opt = NonLockingDataVersion.getInvocationOption(); - CacheHelper.removeAll(cache, regionFqn, opt); - } - } diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/TransactionalAccessDelegate.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/TransactionalAccessDelegate.java index 61e9e317ed..46e139000e 100755 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/TransactionalAccessDelegate.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/access/TransactionalAccessDelegate.java @@ -23,6 +23,13 @@ */ package org.hibernate.cache.jbc2.access; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.transaction.Transaction; + import org.hibernate.cache.CacheException; import org.hibernate.cache.access.CollectionRegionAccessStrategy; import org.hibernate.cache.access.EntityRegionAccessStrategy; @@ -44,10 +51,12 @@ import org.jboss.cache.Fqn; * @author Brian Stansberry */ public class TransactionalAccessDelegate { - + protected final Cache cache; protected final Fqn regionFqn; protected final BasicRegionAdapter region; + protected final ConcurrentMap> pendingPuts = + new ConcurrentHashMap>(); public TransactionalAccessDelegate(BasicRegionAdapter adapter) { this.region = adapter; @@ -56,22 +65,43 @@ public class TransactionalAccessDelegate { } public Object get(Object key, long txTimestamp) throws CacheException { - + + if (!region.checkValid()) + return null; + region.ensureRegionRootExists(); - return CacheHelper.get(cache, regionFqn, key); + Object val = CacheHelper.get(cache, regionFqn, key); + + if (val == null) { + registerPendingPut(key); + } + + return val; } - public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException { + public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException { + + if (!region.checkValid()) + return false; + + if (!isPutValid(key)) + return false; region.ensureRegionRootExists(); return CacheHelper.putForExternalRead(cache, regionFqn, key, value); } - public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) + public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException { + if (!region.checkValid()) + return false; + + if (!isPutValid(key)) + return false; + region.ensureRegionRootExists(); // We ignore minimalPutOverride. JBossCache putForExternalRead is @@ -96,6 +126,11 @@ public class TransactionalAccessDelegate { public boolean insert(Object key, Object value, Object version) throws CacheException { + pendingPuts.remove(key); + + if (!region.checkValid()) + return false; + region.ensureRegionRootExists(); CacheHelper.put(cache, regionFqn, key, value); @@ -109,6 +144,12 @@ public class TransactionalAccessDelegate { public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException { + pendingPuts.remove(key); + + // We update whether or not the region is valid. Other nodes + // may have already restored the region so they need to + // be informed of the change. + region.ensureRegionRootExists(); CacheHelper.put(cache, regionFqn, key, value); @@ -122,27 +163,74 @@ public class TransactionalAccessDelegate { public void remove(Object key) throws CacheException { + pendingPuts.remove(key); + + // We remove whether or not the region is valid. Other nodes + // may have already restored the region so they need to + // be informed of the change. + region.ensureRegionRootExists(); CacheHelper.remove(cache, regionFqn, key); } public void removeAll() throws CacheException { - evictOrRemoveAll(); + pendingPuts.clear(); + CacheHelper.removeAll(cache, regionFqn); } public void evict(Object key) throws CacheException { + pendingPuts.remove(key); + region.ensureRegionRootExists(); CacheHelper.remove(cache, regionFqn, key); } public void evictAll() throws CacheException { - evictOrRemoveAll(); + pendingPuts.clear(); + Transaction tx = region.suspend(); + try { + region.ensureRegionRootExists(); + + CacheHelper.sendEvictAllNotification(cache, regionFqn, region.getMemberId(), null); + } + finally { + region.resume(tx); + } } - - private void evictOrRemoveAll() throws CacheException { - CacheHelper.removeAll(cache, regionFqn); + + protected void registerPendingPut(Object key) + { + Set pending = pendingPuts.get(key); + if (pending == null) { + pending = new HashSet(); + } + + synchronized (pending) { + Object owner = region.getOwnerForPut(); + pending.add(owner); + Set existing = pendingPuts.putIfAbsent(key, pending); + if (existing != pending) { + // try again + registerPendingPut(key); + } + } + } + + protected boolean isPutValid(Object key) + { + boolean valid = false; + Set pending = pendingPuts.get(key); + if (pending != null) { + synchronized (pending) { + valid = pending.remove(region.getOwnerForPut()); + if (valid && pending.size() == 0) { + pendingPuts.remove(key); + } + } + } + return valid; } } diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/collection/CollectionRegionImpl.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/collection/CollectionRegionImpl.java index b5377ea12e..1278aa318b 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/collection/CollectionRegionImpl.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/collection/CollectionRegionImpl.java @@ -26,6 +26,7 @@ package org.hibernate.cache.jbc2.collection; import org.jboss.cache.Cache; import org.jboss.cache.Fqn; import org.jboss.cache.config.Configuration.NodeLockingScheme; +import org.jboss.cache.notifications.annotation.CacheListener; import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheException; @@ -39,6 +40,7 @@ import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; * * @author Steve Ebersole */ +@CacheListener public class CollectionRegionImpl extends TransactionalDataRegionAdapter implements CollectionRegion { public static final String TYPE = "COLL"; diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/entity/EntityRegionImpl.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/entity/EntityRegionImpl.java index f9a603e772..89ba67ee14 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/entity/EntityRegionImpl.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/entity/EntityRegionImpl.java @@ -26,6 +26,7 @@ package org.hibernate.cache.jbc2.entity; import org.jboss.cache.Cache; import org.jboss.cache.Fqn; import org.jboss.cache.config.Configuration.NodeLockingScheme; +import org.jboss.cache.notifications.annotation.CacheListener; import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheException; @@ -39,6 +40,7 @@ import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; * * @author Steve Ebersole */ +@CacheListener public class EntityRegionImpl extends TransactionalDataRegionAdapter implements EntityRegion { public static final String TYPE = "ENTITY"; diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/query/QueryResultsRegionImpl.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/query/QueryResultsRegionImpl.java index e69e89f0af..d57709137c 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/query/QueryResultsRegionImpl.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/query/QueryResultsRegionImpl.java @@ -25,6 +25,8 @@ package org.hibernate.cache.jbc2.query; import java.util.Properties; +import javax.transaction.Transaction; + import org.hibernate.cache.CacheException; import org.hibernate.cache.QueryResultsRegion; import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; @@ -33,6 +35,7 @@ import org.hibernate.util.PropertiesHelper; import org.jboss.cache.Cache; import org.jboss.cache.Fqn; import org.jboss.cache.config.Option; +import org.jboss.cache.notifications.annotation.CacheListener; /** * Defines the behavior of the query cache regions for JBossCache 2.x. @@ -40,6 +43,7 @@ import org.jboss.cache.config.Option; * @author Brian Stansberry * @version $Revision$ */ +@CacheListener public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter implements QueryResultsRegion { public static final String QUERY_CACHE_LOCAL_ONLY_PROP = "hibernate.cache.region.jbc2.query.localonly"; @@ -85,14 +89,22 @@ public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter imple } public void evictAll() throws CacheException { - Option opt = getNonLockingDataVersionOption(false); - if (localOnly) - opt.setCacheModeLocal(true); - CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt); + Transaction tx = suspend(); + try { + ensureRegionRootExists(); + Option opt = getNonLockingDataVersionOption(true); + CacheHelper.sendEvictAllNotification(jbcCache, regionFqn, getMemberId(), opt); + } + finally { + resume(tx); + } } public Object get(Object key) throws CacheException { + if (!checkValid()) + return null; + ensureRegionRootExists(); // Don't hold the JBC node lock throughout the tx, as that @@ -106,28 +118,30 @@ public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter imple public void put(Object key, Object value) throws CacheException { - ensureRegionRootExists(); - - // Here we don't want to suspend the tx. If we do: - // 1) We might be caching query results that reflect uncommitted - // changes. No tx == no WL on cache node, so other threads - // can prematurely see those query results - // 2) No tx == immediate replication. More overhead, plus we - // spread issue #1 above around the cluster - - // Add a zero (or quite low) timeout option so we don't block. - // Ignore any TimeoutException. Basically we forego caching the - // query result in order to avoid blocking. - // Reads are done with suspended tx, so they should not hold the - // lock for long. Not caching the query result is OK, since - // any subsequent read will just see the old result with its - // out-of-date timestamp; that result will be discarded and the - // db query performed again. - Option opt = getNonLockingDataVersionOption(false); - opt.setLockAcquisitionTimeout(2); - if (localOnly) - opt.setCacheModeLocal(true); - CacheHelper.putAllowingTimeout(getCacheInstance(), getRegionFqn(), key, value, opt); + if (checkValid()) { + ensureRegionRootExists(); + + // Here we don't want to suspend the tx. If we do: + // 1) We might be caching query results that reflect uncommitted + // changes. No tx == no WL on cache node, so other threads + // can prematurely see those query results + // 2) No tx == immediate replication. More overhead, plus we + // spread issue #1 above around the cluster + + // Add a zero (or quite low) timeout option so we don't block. + // Ignore any TimeoutException. Basically we forego caching the + // query result in order to avoid blocking. + // Reads are done with suspended tx, so they should not hold the + // lock for long. Not caching the query result is OK, since + // any subsequent read will just see the old result with its + // out-of-date timestamp; that result will be discarded and the + // db query performed again. + Option opt = getNonLockingDataVersionOption(false); + opt.setLockAcquisitionTimeout(2); + if (localOnly) + opt.setCacheModeLocal(true); + CacheHelper.putAllowingTimeout(getCacheInstance(), getRegionFqn(), key, value, opt); + } } @Override diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/TimestampsRegionImpl.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/TimestampsRegionImpl.java index 30aa10638c..de900aae72 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/TimestampsRegionImpl.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/TimestampsRegionImpl.java @@ -41,6 +41,7 @@ import org.jboss.cache.config.Option; import org.jboss.cache.notifications.annotation.CacheListener; import org.jboss.cache.notifications.annotation.NodeModified; import org.jboss.cache.notifications.annotation.NodeRemoved; +import org.jboss.cache.notifications.event.NodeInvalidatedEvent; import org.jboss.cache.notifications.event.NodeModifiedEvent; import org.jboss.cache.notifications.event.NodeRemovedEvent; @@ -95,14 +96,21 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme public void evictAll() throws CacheException { // TODO Is this a valid operation on a timestamps cache? - Option opt = getNonLockingDataVersionOption(true); - CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt); + Transaction tx = suspend(); + try { + ensureRegionRootExists(); + Option opt = getNonLockingDataVersionOption(true); + CacheHelper.sendEvictAllNotification(jbcCache, regionFqn, getMemberId(), opt); + } + finally { + resume(tx); + } } public Object get(Object key) throws CacheException { Object value = localCache.get(key); - if (value == null) { + if (value == null && checkValid()) { ensureRegionRootExists(); @@ -147,14 +155,15 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme */ @NodeModified public void nodeModified(NodeModifiedEvent event) { - if (event.isPre()) - return; - - Fqn fqn = event.getFqn(); - Fqn regFqn = getRegionFqn(); - if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) { - Object key = fqn.get(regFqn.size()); - localCache.put(key, event.getData().get(ITEM)); + + if (!handleEvictAllModification(event) && !event.isPre()) { + + Fqn fqn = event.getFqn(); + Fqn regFqn = getRegionFqn(); + if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) { + Object key = fqn.get(regFqn.size()); + localCache.put(key, event.getData().get(ITEM)); + } } } @@ -178,8 +187,30 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme localCache.clear(); } } + + - /** + @Override + protected boolean handleEvictAllInvalidation(NodeInvalidatedEvent event) + { + boolean result = super.handleEvictAllInvalidation(event); + if (result) { + localCache.clear(); + } + return result; + } + + @Override + protected boolean handleEvictAllModification(NodeModifiedEvent event) + { + boolean result = super.handleEvictAllModification(event); + if (result) { + localCache.clear(); + } + return result; + } + + /** * Brings all data from the distributed cache into our local cache. */ private void populateLocalCache() { diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/util/CacheHelper.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/util/CacheHelper.java index fa8d79df52..b091eb0626 100644 --- a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/util/CacheHelper.java +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/util/CacheHelper.java @@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory; */ public class CacheHelper { + public static enum Internal { NODE, LOCAL }; + /** Key under which items are cached */ public static final String ITEM = "item"; /** Key and value used in a hack to create region root nodes */ @@ -467,4 +469,23 @@ public class CacheHelper { option.setDataVersion(version); setInvocationOption(cache, option); } + + public static Fqn getInternalFqn(Fqn region) + { + return Fqn.fromRelativeElements(region, Internal.NODE); + } + + public static void sendEvictNotification(Cache cache, Fqn region, Object member, Object key, Option option) + { + setInvocationOption(cache, option); + Fqn f = Fqn.fromRelativeElements(region, Internal.NODE, member == null ? Internal.LOCAL : member, key); + cache.put(f, ITEM, DUMMY); + } + + public static void sendEvictAllNotification(Cache cache, Fqn region, Object member, Option option) + { + setInvocationOption(cache, option); + Fqn f = Fqn.fromRelativeElements(region, Internal.NODE, member == null ? Internal.LOCAL : member); + cache.put(f, ITEM, DUMMY); + } }