[HHH-3817] Don't cache stale data via putFromLoad

[HHH-3818] Handle evictAll "without regard for transactions"

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@16189 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Brian Stansberry 2009-03-19 19:51:15 +00:00
parent 4910a0c3dd
commit eb60160109
8 changed files with 448 additions and 120 deletions

View File

@ -23,9 +23,13 @@
*/ */
package org.hibernate.cache.jbc2; package org.hibernate.cache.jbc2;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.SystemException; import javax.transaction.SystemException;
import javax.transaction.Transaction; import javax.transaction.Transaction;
@ -42,7 +46,12 @@ import org.jboss.cache.NodeSPI;
import org.jboss.cache.config.Configuration; import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option; import org.jboss.cache.config.Option;
import org.jboss.cache.config.Configuration.NodeLockingScheme; import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.notifications.annotation.CacheListener; import org.jboss.cache.notifications.annotation.NodeInvalidated;
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.annotation.ViewChanged;
import org.jboss.cache.notifications.event.NodeInvalidatedEvent;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.notifications.event.ViewChangedEvent;
import org.jboss.cache.optimistic.DataVersion; import org.jboss.cache.optimistic.DataVersion;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -53,30 +62,52 @@ import org.slf4j.LoggerFactory;
* *
* @author Steve Ebersole * @author Steve Ebersole
*/ */
@CacheListener
public abstract class BasicRegionAdapter implements Region { public abstract class BasicRegionAdapter implements Region {
private enum InvalidateState { INVALID, CLEARING, VALID };
public static final String ITEM = CacheHelper.ITEM; public static final String ITEM = CacheHelper.ITEM;
protected final Cache jbcCache; protected final Cache jbcCache;
protected final String regionName; protected final String regionName;
protected final Fqn regionFqn; protected final Fqn regionFqn;
protected final Fqn internalFqn;
protected Node regionRoot; protected Node regionRoot;
protected final boolean optimistic; protected final boolean optimistic;
protected final TransactionManager transactionManager; protected final TransactionManager transactionManager;
protected final Logger log; protected final Logger log;
protected final Object regionRootMutex = new Object(); protected final Object regionRootMutex = new Object();
protected final Object memberId;
protected final boolean replication;
protected final Object invalidationMutex = new Object();
protected final AtomicReference<InvalidateState> invalidateState =
new AtomicReference<InvalidateState>(InvalidateState.VALID);
protected final Set<Object> currentView = new HashSet<Object>();
// protected RegionRootListener listener; // protected RegionRootListener listener;
public BasicRegionAdapter(Cache jbcCache, String regionName, String regionPrefix) { public BasicRegionAdapter(Cache jbcCache, String regionName, String regionPrefix) {
this.log = LoggerFactory.getLogger(getClass());
this.jbcCache = jbcCache; this.jbcCache = jbcCache;
this.transactionManager = jbcCache.getConfiguration().getRuntimeConfig().getTransactionManager(); this.transactionManager = jbcCache.getConfiguration().getRuntimeConfig().getTransactionManager();
this.regionName = regionName; this.regionName = regionName;
this.regionFqn = createRegionFqn(regionName, regionPrefix); this.regionFqn = createRegionFqn(regionName, regionPrefix);
optimistic = jbcCache.getConfiguration().getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC; this.internalFqn = CacheHelper.getInternalFqn(regionFqn);
log = LoggerFactory.getLogger(getClass()); this.optimistic = jbcCache.getConfiguration().getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC;
this.memberId = jbcCache.getLocalAddress();
this.replication = CacheHelper.isClusteredReplication(jbcCache);
this.jbcCache.addCacheListener(this);
synchronized (currentView) {
List view = jbcCache.getMembers();
if (view != null) {
currentView.addAll(view);
}
}
activateLocalClusterNode(); activateLocalClusterNode();
log.debug("Created Region for " + regionName + " -- regionPrefix is " + regionPrefix); log.debug("Created Region for " + regionName + " -- regionPrefix is " + regionPrefix);
@ -129,13 +160,13 @@ public abstract class BasicRegionAdapter implements Region {
if (!regionRoot.isResident()) { if (!regionRoot.isResident()) {
regionRoot.setResident(true); regionRoot.setResident(true);
} }
establishInternalNodes();
} }
catch (Exception e) { catch (Exception e) {
throw new CacheException(e.getMessage(), e); throw new CacheException(e.getMessage(), e);
} }
finally { finally {
if (tx != null) resume(tx);
resume(tx);
} }
} }
@ -154,6 +185,7 @@ public abstract class BasicRegionAdapter implements Region {
// For pessimistic locking, we just want to toss out our ref // For pessimistic locking, we just want to toss out our ref
// to any old invalid root node and get the latest (may be null) // to any old invalid root node and get the latest (may be null)
if (!optimistic) { if (!optimistic) {
establishInternalNodes();
regionRoot = jbcCache.getRoot().getChild( regionFqn ); regionRoot = jbcCache.getRoot().getChild( regionFqn );
return; return;
} }
@ -181,6 +213,7 @@ public abstract class BasicRegionAdapter implements Region {
} }
// Never evict this node // Never evict this node
newRoot.setResident(true); newRoot.setResident(true);
establishInternalNodes();
} }
finally { finally {
resume(tx); resume(tx);
@ -189,6 +222,24 @@ public abstract class BasicRegionAdapter implements Region {
} }
} }
private void establishInternalNodes()
{
synchronized (currentView) {
Transaction tx = suspend();
try {
for (Object member : currentView) {
DataVersion version = optimistic ? NonLockingDataVersion.INSTANCE : null;
Fqn f = Fqn.fromRelativeElements(internalFqn, member);
CacheHelper.addNode(jbcCache, f, true, false, version);
}
}
finally {
resume(tx);
}
}
}
public String getName() { public String getName() {
return regionName; return regionName;
} }
@ -201,6 +252,11 @@ public abstract class BasicRegionAdapter implements Region {
return regionFqn; return regionFqn;
} }
public Object getMemberId()
{
return this.memberId;
}
/** /**
* Checks for the validity of the root cache node for this region, * Checks for the validity of the root cache node for this region,
* creating a new one if it does not exist or is invalid, and also * creating a new one if it does not exist or is invalid, and also
@ -220,6 +276,37 @@ public abstract class BasicRegionAdapter implements Region {
regionRoot.setResident(true); regionRoot.setResident(true);
} }
public boolean checkValid()
{
boolean valid = invalidateState.get() == InvalidateState.VALID;
if (!valid) {
synchronized (invalidationMutex) {
if (invalidateState.compareAndSet(InvalidateState.INVALID, InvalidateState.CLEARING)) {
Transaction tx = suspend();
try {
Option opt = new Option();
opt.setLockAcquisitionTimeout(1);
opt.setCacheModeLocal(true);
CacheHelper.removeAll(jbcCache, regionFqn, opt);
invalidateState.compareAndSet(InvalidateState.CLEARING, InvalidateState.VALID);
}
catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("Could not invalidate region: " + e.getLocalizedMessage());
}
}
finally {
resume(tx);
}
}
}
valid = invalidateState.get() == InvalidateState.VALID;
}
return valid;
}
public void destroy() throws CacheException { public void destroy() throws CacheException {
try { try {
// NOTE : this is being used from the process of shutting down a // NOTE : this is being used from the process of shutting down a
@ -242,10 +329,9 @@ public abstract class BasicRegionAdapter implements Region {
} catch (Exception e) { } catch (Exception e) {
throw new CacheException(e); throw new CacheException(e);
} }
// finally { finally {
// if (listener != null) jbcCache.removeCacheListener(this);
// jbcCache.removeCacheListener(listener); }
// }
} }
protected void deactivateLocalNode() { protected void deactivateLocalNode() {
@ -262,11 +348,20 @@ public abstract class BasicRegionAdapter implements Region {
} }
public long getElementCountInMemory() { public long getElementCountInMemory() {
try { if (checkValid()) {
Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); try {
return childrenNames.size(); Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn);
} catch (Exception e) { int size = childrenNames.size();
throw new CacheException(e); if (childrenNames.contains(CacheHelper.Internal.NODE)) {
size--;
}
return size;
} catch (Exception e) {
throw new CacheException(e);
}
}
else {
return 0;
} }
} }
@ -275,17 +370,24 @@ public abstract class BasicRegionAdapter implements Region {
} }
public Map toMap() { public Map toMap() {
try { if (checkValid()) {
Map result = new HashMap(); try {
Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn); Map result = new HashMap();
for (Object childName : childrenNames) { Set childrenNames = CacheHelper.getChildrenNames(jbcCache, regionFqn);
result.put(childName, CacheHelper.get(jbcCache,regionFqn, childName)); for (Object childName : childrenNames) {
} if (CacheHelper.Internal.NODE != childName) {
return result; result.put(childName, CacheHelper.get(jbcCache,regionFqn, childName));
} catch (CacheException e) { }
throw e; }
} catch (Exception e) { return result;
throw new CacheException(e); } catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
}
else {
return Collections.emptyMap();
} }
} }
@ -321,13 +423,27 @@ public abstract class BasicRegionAdapter implements Region {
} }
} }
public Object getOwnerForPut()
{
Transaction tx = null;
try {
if (transactionManager != null) {
tx = transactionManager.getTransaction();
}
} catch (SystemException se) {
throw new CacheException("Could not obtain transaction", se);
}
return tx == null ? Thread.currentThread() : tx;
}
/** /**
* Tell the TransactionManager to suspend any ongoing transaction. * Tell the TransactionManager to suspend any ongoing transaction.
* *
* @return the transaction that was suspended, or <code>null</code> if * @return the transaction that was suspended, or <code>null</code> if
* there wasn't one * there wasn't one
*/ */
protected Transaction suspend() { public Transaction suspend() {
Transaction tx = null; Transaction tx = null;
try { try {
if (transactionManager != null) { if (transactionManager != null) {
@ -345,7 +461,7 @@ public abstract class BasicRegionAdapter implements Region {
* @param tx * @param tx
* the transaction to suspend. May be <code>null</code>. * the transaction to suspend. May be <code>null</code>.
*/ */
protected void resume(Transaction tx) { public void resume(Transaction tx) {
try { try {
if (tx != null) if (tx != null)
transactionManager.resume(tx); transactionManager.resume(tx);
@ -404,17 +520,52 @@ public abstract class BasicRegionAdapter implements Region {
return escaped; return escaped;
} }
// @CacheListener @NodeModified
// public class RegionRootListener { public void nodeModified(NodeModifiedEvent event)
// {
// @NodeCreated handleEvictAllModification(event);
// public void nodeCreated(NodeCreatedEvent event) { }
// if (!event.isPre() && event.getFqn().equals(getRegionFqn())) {
// log.debug("Node created for " + getRegionFqn()); protected boolean handleEvictAllModification(NodeModifiedEvent event) {
// Node regionRoot = jbcCache.getRoot().getChild(getRegionFqn());
// regionRoot.setResident(true); if (!event.isPre() && (replication || event.isOriginLocal()) && event.getData().containsKey(ITEM))
// } {
// } if (event.getFqn().isChildOf(internalFqn))
// {
// } invalidateState.set(InvalidateState.INVALID);
return true;
}
}
return false;
}
@NodeInvalidated
public void nodeInvalidated(NodeInvalidatedEvent event)
{
handleEvictAllInvalidation(event);
}
protected boolean handleEvictAllInvalidation(NodeInvalidatedEvent event)
{
if (!event.isPre() && event.getFqn().isChildOf(internalFqn))
{
invalidateState.set(InvalidateState.INVALID);
return true;
}
return false;
}
@ViewChanged
public void viewChanged(ViewChangedEvent event) {
synchronized (currentView) {
List view = event.getNewView().getMembers();
if (view != null) {
currentView.addAll(view);
establishInternalNodes();
}
}
}
} }

View File

@ -23,11 +23,12 @@
*/ */
package org.hibernate.cache.jbc2.access; package org.hibernate.cache.jbc2.access;
import javax.transaction.Transaction;
import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheDataDescription;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
import org.hibernate.cache.access.CollectionRegionAccessStrategy; import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.access.EntityRegionAccessStrategy; import org.hibernate.cache.access.EntityRegionAccessStrategy;
import org.hibernate.cache.jbc2.BasicRegionAdapter;
import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
import org.hibernate.cache.jbc2.util.CacheHelper; import org.hibernate.cache.jbc2.util.CacheHelper;
import org.hibernate.cache.jbc2.util.DataVersionAdapter; import org.hibernate.cache.jbc2.util.DataVersionAdapter;
@ -63,37 +64,31 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
*/ */
@Override @Override
public void evict(Object key) throws CacheException { public void evict(Object key) throws CacheException {
pendingPuts.remove(key);
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption(); Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.remove(cache, regionFqn, key, opt); CacheHelper.remove(cache, regionFqn, key, opt);
} }
/**
* Overrides the {@link TransactionalAccessDelegate#evictAll() superclass}
* by adding a {@link NonLockingDataVersion} to the invocation.
*/
@Override
public void evictAll() throws CacheException {
evictOrRemoveAll();
}
/**
* Overrides the {@link TransactionalAccessDelegate#get(Object, long) superclass}
* by {@link BasicRegionAdapter#ensureRegionRootExists() ensuring the root
* node for the region exists} before making the call.
*/
@Override @Override
public Object get(Object key, long txTimestamp) throws CacheException public void evictAll() throws CacheException
{ {
region.ensureRegionRootExists(); pendingPuts.clear();
Transaction tx = region.suspend();
return CacheHelper.get(cache, regionFqn, key); try {
region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.sendEvictAllNotification(cache, regionFqn, region.getMemberId(), opt);
}
finally {
region.resume(tx);
}
} }
/** /**
* Overrides the * Overrides the
* {@link TransactionalAccessDelegate#insert(Object, Object, Object) superclass} * {@link TransactionalAccessDelegate#insert(Object, Object, Object) superclass}
* by adding a {@link DataVersion} to the invocation. * by adding a {@link DataVersion} to the invocation.
@ -101,6 +96,11 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public boolean insert(Object key, Object value, Object version) throws CacheException { public boolean insert(Object key, Object value, Object version) throws CacheException {
pendingPuts.remove(key);
if (!region.checkValid())
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = getDataVersionOption(version, null); Option opt = getDataVersionOption(version, null);
@ -112,6 +112,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
if (!region.checkValid())
return false;
if (!isPutValid(key))
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is // We ignore minimalPutOverride. JBossCache putForExternalRead is
@ -124,6 +130,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException { public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {
if (!region.checkValid())
return false;
if (!isPutValid(key))
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = getDataVersionOption(version, version); Option opt = getDataVersionOption(version, version);
@ -133,6 +145,12 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public void remove(Object key) throws CacheException { public void remove(Object key) throws CacheException {
pendingPuts.remove(key);
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption(); Option opt = NonLockingDataVersion.getInvocationOption();
@ -141,14 +159,21 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public void removeAll() throws CacheException { public void removeAll() throws CacheException {
pendingPuts.clear();
evictOrRemoveAll(); Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.removeAll(cache, regionFqn, opt);
} }
@Override @Override
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
pendingPuts.remove(key);
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = getDataVersionOption(currentVersion, previousVersion); Option opt = getDataVersionOption(currentVersion, previousVersion);
@ -166,10 +191,4 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
return opt; return opt;
} }
private void evictOrRemoveAll() {
Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.removeAll(cache, regionFqn, opt);
}
} }

View File

@ -23,6 +23,13 @@
*/ */
package org.hibernate.cache.jbc2.access; package org.hibernate.cache.jbc2.access;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.transaction.Transaction;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
import org.hibernate.cache.access.CollectionRegionAccessStrategy; import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.access.EntityRegionAccessStrategy; import org.hibernate.cache.access.EntityRegionAccessStrategy;
@ -48,6 +55,8 @@ public class TransactionalAccessDelegate {
protected final Cache cache; protected final Cache cache;
protected final Fqn regionFqn; protected final Fqn regionFqn;
protected final BasicRegionAdapter region; protected final BasicRegionAdapter region;
protected final ConcurrentMap<Object, Set<Object>> pendingPuts =
new ConcurrentHashMap<Object, Set<Object>>();
public TransactionalAccessDelegate(BasicRegionAdapter adapter) { public TransactionalAccessDelegate(BasicRegionAdapter adapter) {
this.region = adapter; this.region = adapter;
@ -57,21 +66,42 @@ public class TransactionalAccessDelegate {
public Object get(Object key, long txTimestamp) throws CacheException { public Object get(Object key, long txTimestamp) throws CacheException {
if (!region.checkValid())
return null;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
return CacheHelper.get(cache, regionFqn, key); Object val = CacheHelper.get(cache, regionFqn, key);
if (val == null) {
registerPendingPut(key);
}
return val;
} }
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException { public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {
if (!region.checkValid())
return false;
if (!isPutValid(key))
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
return CacheHelper.putForExternalRead(cache, regionFqn, key, value); return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
} }
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException { throws CacheException {
if (!region.checkValid())
return false;
if (!isPutValid(key))
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is // We ignore minimalPutOverride. JBossCache putForExternalRead is
@ -96,6 +126,11 @@ public class TransactionalAccessDelegate {
public boolean insert(Object key, Object value, Object version) throws CacheException { public boolean insert(Object key, Object value, Object version) throws CacheException {
pendingPuts.remove(key);
if (!region.checkValid())
return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
CacheHelper.put(cache, regionFqn, key, value); CacheHelper.put(cache, regionFqn, key, value);
@ -109,6 +144,12 @@ public class TransactionalAccessDelegate {
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
pendingPuts.remove(key);
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
region.ensureRegionRootExists(); region.ensureRegionRootExists();
CacheHelper.put(cache, regionFqn, key, value); CacheHelper.put(cache, regionFqn, key, value);
@ -122,27 +163,74 @@ public class TransactionalAccessDelegate {
public void remove(Object key) throws CacheException { public void remove(Object key) throws CacheException {
pendingPuts.remove(key);
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
region.ensureRegionRootExists(); region.ensureRegionRootExists();
CacheHelper.remove(cache, regionFqn, key); CacheHelper.remove(cache, regionFqn, key);
} }
public void removeAll() throws CacheException { public void removeAll() throws CacheException {
evictOrRemoveAll(); pendingPuts.clear();
CacheHelper.removeAll(cache, regionFqn);
} }
public void evict(Object key) throws CacheException { public void evict(Object key) throws CacheException {
pendingPuts.remove(key);
region.ensureRegionRootExists(); region.ensureRegionRootExists();
CacheHelper.remove(cache, regionFqn, key); CacheHelper.remove(cache, regionFqn, key);
} }
public void evictAll() throws CacheException { public void evictAll() throws CacheException {
evictOrRemoveAll(); pendingPuts.clear();
Transaction tx = region.suspend();
try {
region.ensureRegionRootExists();
CacheHelper.sendEvictAllNotification(cache, regionFqn, region.getMemberId(), null);
}
finally {
region.resume(tx);
}
} }
private void evictOrRemoveAll() throws CacheException { protected void registerPendingPut(Object key)
CacheHelper.removeAll(cache, regionFqn); {
Set<Object> pending = pendingPuts.get(key);
if (pending == null) {
pending = new HashSet<Object>();
}
synchronized (pending) {
Object owner = region.getOwnerForPut();
pending.add(owner);
Set<Object> existing = pendingPuts.putIfAbsent(key, pending);
if (existing != pending) {
// try again
registerPendingPut(key);
}
}
}
protected boolean isPutValid(Object key)
{
boolean valid = false;
Set<Object> pending = pendingPuts.get(key);
if (pending != null) {
synchronized (pending) {
valid = pending.remove(region.getOwnerForPut());
if (valid && pending.size() == 0) {
pendingPuts.remove(key);
}
}
}
return valid;
} }
} }

View File

@ -26,6 +26,7 @@ package org.hibernate.cache.jbc2.collection;
import org.jboss.cache.Cache; import org.jboss.cache.Cache;
import org.jboss.cache.Fqn; import org.jboss.cache.Fqn;
import org.jboss.cache.config.Configuration.NodeLockingScheme; import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheDataDescription;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
@ -39,6 +40,7 @@ import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
* *
* @author Steve Ebersole * @author Steve Ebersole
*/ */
@CacheListener
public class CollectionRegionImpl extends TransactionalDataRegionAdapter implements CollectionRegion { public class CollectionRegionImpl extends TransactionalDataRegionAdapter implements CollectionRegion {
public static final String TYPE = "COLL"; public static final String TYPE = "COLL";

View File

@ -26,6 +26,7 @@ package org.hibernate.cache.jbc2.entity;
import org.jboss.cache.Cache; import org.jboss.cache.Cache;
import org.jboss.cache.Fqn; import org.jboss.cache.Fqn;
import org.jboss.cache.config.Configuration.NodeLockingScheme; import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.hibernate.cache.CacheDataDescription; import org.hibernate.cache.CacheDataDescription;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
@ -39,6 +40,7 @@ import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
* *
* @author Steve Ebersole * @author Steve Ebersole
*/ */
@CacheListener
public class EntityRegionImpl extends TransactionalDataRegionAdapter implements EntityRegion { public class EntityRegionImpl extends TransactionalDataRegionAdapter implements EntityRegion {
public static final String TYPE = "ENTITY"; public static final String TYPE = "ENTITY";

View File

@ -25,6 +25,8 @@ package org.hibernate.cache.jbc2.query;
import java.util.Properties; import java.util.Properties;
import javax.transaction.Transaction;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
import org.hibernate.cache.QueryResultsRegion; import org.hibernate.cache.QueryResultsRegion;
import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
@ -33,6 +35,7 @@ import org.hibernate.util.PropertiesHelper;
import org.jboss.cache.Cache; import org.jboss.cache.Cache;
import org.jboss.cache.Fqn; import org.jboss.cache.Fqn;
import org.jboss.cache.config.Option; import org.jboss.cache.config.Option;
import org.jboss.cache.notifications.annotation.CacheListener;
/** /**
* Defines the behavior of the query cache regions for JBossCache 2.x. * Defines the behavior of the query cache regions for JBossCache 2.x.
@ -40,6 +43,7 @@ import org.jboss.cache.config.Option;
* @author Brian Stansberry * @author Brian Stansberry
* @version $Revision$ * @version $Revision$
*/ */
@CacheListener
public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter implements QueryResultsRegion { public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter implements QueryResultsRegion {
public static final String QUERY_CACHE_LOCAL_ONLY_PROP = "hibernate.cache.region.jbc2.query.localonly"; public static final String QUERY_CACHE_LOCAL_ONLY_PROP = "hibernate.cache.region.jbc2.query.localonly";
@ -85,14 +89,22 @@ public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter imple
} }
public void evictAll() throws CacheException { public void evictAll() throws CacheException {
Option opt = getNonLockingDataVersionOption(false); Transaction tx = suspend();
if (localOnly) try {
opt.setCacheModeLocal(true); ensureRegionRootExists();
CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt); Option opt = getNonLockingDataVersionOption(true);
CacheHelper.sendEvictAllNotification(jbcCache, regionFqn, getMemberId(), opt);
}
finally {
resume(tx);
}
} }
public Object get(Object key) throws CacheException { public Object get(Object key) throws CacheException {
if (!checkValid())
return null;
ensureRegionRootExists(); ensureRegionRootExists();
// Don't hold the JBC node lock throughout the tx, as that // Don't hold the JBC node lock throughout the tx, as that
@ -106,28 +118,30 @@ public class QueryResultsRegionImpl extends TransactionalDataRegionAdapter imple
public void put(Object key, Object value) throws CacheException { public void put(Object key, Object value) throws CacheException {
ensureRegionRootExists(); if (checkValid()) {
ensureRegionRootExists();
// Here we don't want to suspend the tx. If we do: // Here we don't want to suspend the tx. If we do:
// 1) We might be caching query results that reflect uncommitted // 1) We might be caching query results that reflect uncommitted
// changes. No tx == no WL on cache node, so other threads // changes. No tx == no WL on cache node, so other threads
// can prematurely see those query results // can prematurely see those query results
// 2) No tx == immediate replication. More overhead, plus we // 2) No tx == immediate replication. More overhead, plus we
// spread issue #1 above around the cluster // spread issue #1 above around the cluster
// Add a zero (or quite low) timeout option so we don't block. // Add a zero (or quite low) timeout option so we don't block.
// Ignore any TimeoutException. Basically we forego caching the // Ignore any TimeoutException. Basically we forego caching the
// query result in order to avoid blocking. // query result in order to avoid blocking.
// Reads are done with suspended tx, so they should not hold the // Reads are done with suspended tx, so they should not hold the
// lock for long. Not caching the query result is OK, since // lock for long. Not caching the query result is OK, since
// any subsequent read will just see the old result with its // any subsequent read will just see the old result with its
// out-of-date timestamp; that result will be discarded and the // out-of-date timestamp; that result will be discarded and the
// db query performed again. // db query performed again.
Option opt = getNonLockingDataVersionOption(false); Option opt = getNonLockingDataVersionOption(false);
opt.setLockAcquisitionTimeout(2); opt.setLockAcquisitionTimeout(2);
if (localOnly) if (localOnly)
opt.setCacheModeLocal(true); opt.setCacheModeLocal(true);
CacheHelper.putAllowingTimeout(getCacheInstance(), getRegionFqn(), key, value, opt); CacheHelper.putAllowingTimeout(getCacheInstance(), getRegionFqn(), key, value, opt);
}
} }
@Override @Override

View File

@ -41,6 +41,7 @@ import org.jboss.cache.config.Option;
import org.jboss.cache.notifications.annotation.CacheListener; import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeModified; import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.annotation.NodeRemoved; import org.jboss.cache.notifications.annotation.NodeRemoved;
import org.jboss.cache.notifications.event.NodeInvalidatedEvent;
import org.jboss.cache.notifications.event.NodeModifiedEvent; import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.notifications.event.NodeRemovedEvent; import org.jboss.cache.notifications.event.NodeRemovedEvent;
@ -95,14 +96,21 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme
public void evictAll() throws CacheException { public void evictAll() throws CacheException {
// TODO Is this a valid operation on a timestamps cache? // TODO Is this a valid operation on a timestamps cache?
Option opt = getNonLockingDataVersionOption(true); Transaction tx = suspend();
CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt); try {
ensureRegionRootExists();
Option opt = getNonLockingDataVersionOption(true);
CacheHelper.sendEvictAllNotification(jbcCache, regionFqn, getMemberId(), opt);
}
finally {
resume(tx);
}
} }
public Object get(Object key) throws CacheException { public Object get(Object key) throws CacheException {
Object value = localCache.get(key); Object value = localCache.get(key);
if (value == null) { if (value == null && checkValid()) {
ensureRegionRootExists(); ensureRegionRootExists();
@ -147,14 +155,15 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme
*/ */
@NodeModified @NodeModified
public void nodeModified(NodeModifiedEvent event) { public void nodeModified(NodeModifiedEvent event) {
if (event.isPre())
return;
Fqn fqn = event.getFqn(); if (!handleEvictAllModification(event) && !event.isPre()) {
Fqn regFqn = getRegionFqn();
if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) { Fqn fqn = event.getFqn();
Object key = fqn.get(regFqn.size()); Fqn regFqn = getRegionFqn();
localCache.put(key, event.getData().get(ITEM)); if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) {
Object key = fqn.get(regFqn.size());
localCache.put(key, event.getData().get(ITEM));
}
} }
} }
@ -179,7 +188,29 @@ public class TimestampsRegionImpl extends TransactionalDataRegionAdapter impleme
} }
} }
/**
@Override
protected boolean handleEvictAllInvalidation(NodeInvalidatedEvent event)
{
boolean result = super.handleEvictAllInvalidation(event);
if (result) {
localCache.clear();
}
return result;
}
@Override
protected boolean handleEvictAllModification(NodeModifiedEvent event)
{
boolean result = super.handleEvictAllModification(event);
if (result) {
localCache.clear();
}
return result;
}
/**
* Brings all data from the distributed cache into our local cache. * Brings all data from the distributed cache into our local cache.
*/ */
private void populateLocalCache() { private void populateLocalCache() {

View File

@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
*/ */
public class CacheHelper { public class CacheHelper {
public static enum Internal { NODE, LOCAL };
/** Key under which items are cached */ /** Key under which items are cached */
public static final String ITEM = "item"; public static final String ITEM = "item";
/** Key and value used in a hack to create region root nodes */ /** Key and value used in a hack to create region root nodes */
@ -467,4 +469,23 @@ public class CacheHelper {
option.setDataVersion(version); option.setDataVersion(version);
setInvocationOption(cache, option); setInvocationOption(cache, option);
} }
public static Fqn getInternalFqn(Fqn region)
{
return Fqn.fromRelativeElements(region, Internal.NODE);
}
public static void sendEvictNotification(Cache cache, Fqn region, Object member, Object key, Option option)
{
setInvocationOption(cache, option);
Fqn f = Fqn.fromRelativeElements(region, Internal.NODE, member == null ? Internal.LOCAL : member, key);
cache.put(f, ITEM, DUMMY);
}
public static void sendEvictAllNotification(Cache cache, Fqn region, Object member, Option option)
{
setInvocationOption(cache, option);
Fqn f = Fqn.fromRelativeElements(region, Internal.NODE, member == null ? Internal.LOCAL : member);
cache.put(f, ITEM, DUMMY);
}
} }