[HHH-4519] (Hibernate/Infinispan integration doesn't property handle Entity/CollectionRegionAccessStrategy evictAll) Fixed and got provider to work with forthcoming Infinispan 4.0.0.CR2 which has been just tagged but the maven repo has not been updated yet.

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@17976 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Galder Zamarreno 2009-11-13 18:12:53 +00:00
parent 8da29593d2
commit ec31e277ec
35 changed files with 1151 additions and 526 deletions

View File

@ -17,7 +17,7 @@
<description>Integration of Hibernate with Infinispan</description>
<properties>
<version.infinispan>4.0.0.BETA2</version.infinispan>
<version.infinispan>4.0.0-SNAPSHOT</version.infinispan>
<version.hsqldb>1.8.0.2</version.hsqldb>
<version.cglib>2.2</version.cglib>
<version.javassist>3.4.GA</version.javassist>

View File

@ -24,6 +24,8 @@
import org.hibernate.cache.infinispan.timestamp.TimestampsRegionImpl;
import org.hibernate.cache.infinispan.timestamp.TimestampTypeOverrides;
import org.hibernate.cache.infinispan.tm.HibernateTransactionManagerLookup;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.cfg.Settings;
import org.hibernate.util.PropertiesHelper;
import org.infinispan.Cache;
@ -154,14 +156,20 @@ public InfinispanRegionFactory(Properties props) {
public CollectionRegion buildCollectionRegion(String regionName, Properties properties, CacheDataDescription metadata) throws CacheException {
log.debug("Building collection cache region [" + regionName + "]");
Cache cache = getCache(regionName, COLLECTION_KEY, properties);
return new CollectionRegionImpl(cache, regionName, metadata, transactionManager);
CacheAdapter cacheAdapter = CacheAdapterImpl.newInstance(cache);
CollectionRegionImpl region = new CollectionRegionImpl(cacheAdapter, regionName, metadata, transactionManager);
region.start();
return region;
}
/** {@inheritDoc} */
public EntityRegion buildEntityRegion(String regionName, Properties properties, CacheDataDescription metadata) throws CacheException {
if (log.isDebugEnabled()) log.debug("Building entity cache region [" + regionName + "]");
Cache cache = getCache(regionName, ENTITY_KEY, properties);
return new EntityRegionImpl(cache, regionName, metadata, transactionManager);
CacheAdapter cacheAdapter = CacheAdapterImpl.newInstance(cache);
EntityRegionImpl region = new EntityRegionImpl(cacheAdapter, regionName, metadata, transactionManager);
region.start();
return region;
}
/**
@ -171,7 +179,10 @@ public QueryResultsRegion buildQueryResultsRegion(String regionName, Properties
throws CacheException {
log.debug("Building query results cache region [" + regionName + "]");
String cacheName = typeOverrides.get(QUERY_KEY).getCacheName();
return new QueryResultsRegionImpl(manager.getCache(cacheName), regionName, properties, transactionManager);
CacheAdapter cacheAdapter = CacheAdapterImpl.newInstance(manager.getCache(cacheName));
QueryResultsRegionImpl region = new QueryResultsRegionImpl(cacheAdapter, regionName, properties, transactionManager);
region.start();
return region;
}
/**
@ -181,7 +192,10 @@ public TimestampsRegion buildTimestampsRegion(String regionName, Properties prop
throws CacheException {
log.debug("Building timestamps cache region [" + regionName + "]");
String cacheName = typeOverrides.get(TIMESTAMPS_KEY).getCacheName();
return new TimestampsRegionImpl(manager.getCache(cacheName), regionName, transactionManager);
CacheAdapter cacheAdapter = CacheAdapterImpl.newInstance(manager.getCache(cacheName));
TimestampsRegionImpl region = new TimestampsRegionImpl(cacheAdapter, regionName, transactionManager);
region.start();
return region;
}
/**

View File

@ -23,11 +23,15 @@
*/
package org.hibernate.cache.infinispan.access;
import javax.transaction.Transaction;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.access.EntityRegionAccessStrategy;
import org.hibernate.cache.access.SoftLock;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheHelper;
/**
* Defines the strategy for transactional access to entity or collection data in a Infinispan instance.
@ -41,18 +45,24 @@
*/
public class TransactionalAccessDelegate {
protected final Cache cache;
protected final CacheAdapter cacheAdapter;
protected final BaseRegion region;
public TransactionalAccessDelegate(Cache cache) {
this.cache = cache;
public TransactionalAccessDelegate(BaseRegion region) {
this.region = region;
this.cacheAdapter = region.getCacheAdapter();
}
public Object get(Object key, long txTimestamp) throws CacheException {
return cache.get(key);
if (!region.checkValid())
return null;
return cacheAdapter.get(key);
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {
cache.putForExternalRead(key, value);
if (!region.checkValid())
return false;
cacheAdapter.putForExternalRead(key, value);
return true;
}
@ -76,7 +86,9 @@ public void unlockRegion(SoftLock lock) throws CacheException {
}
public boolean insert(Object key, Object value, Object version) throws CacheException {
cache.put(key, value);
if (!region.checkValid())
return false;
cacheAdapter.put(key, value);
return true;
}
@ -85,7 +97,10 @@ public boolean afterInsert(Object key, Object value, Object version) throws Cach
}
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
cache.put(key, value);
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
cacheAdapter.put(key, value);
return true;
}
@ -95,18 +110,26 @@ public boolean afterUpdate(Object key, Object value, Object currentVersion, Obje
}
public void remove(Object key) throws CacheException {
cache.remove(key);
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
cacheAdapter.remove(key);
}
public void removeAll() throws CacheException {
cache.clear();
cacheAdapter.clear();
}
public void evict(Object key) throws CacheException {
cacheAdapter.remove(key);
}
public void evictAll() throws CacheException {
evictOrRemoveAll();
}
private void evictOrRemoveAll() throws CacheException {
cache.clear();
Transaction tx = region.suspend();
try {
CacheHelper.sendEvictAllNotification(cacheAdapter, region.getAddress());
} finally {
region.resume(tx);
}
}
}

View File

@ -8,17 +8,19 @@
import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.infinispan.notifications.Listener;
/**
* @author Chris Bredesen
* @author Galder Zamarreño
* @since 3.5
*/
@Listener
public class CollectionRegionImpl extends BaseTransactionalDataRegion implements CollectionRegion {
public CollectionRegionImpl(Cache cache, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cache, name, metadata, transactionManager);
public CollectionRegionImpl(CacheAdapter cacheAdapter, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cacheAdapter, name, metadata, transactionManager);
}
public CollectionRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {

View File

@ -21,11 +21,11 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
TransactionalAccess(CollectionRegionImpl region) {
this.region = region;
this.delegate = new TransactionalAccessDelegate(region.getCache());
this.delegate = new TransactionalAccessDelegate(region);
}
public void evict(Object key) throws CacheException {
delegate.remove(key);
delegate.evict(key);
}
public void evictAll() throws CacheException {

View File

@ -8,17 +8,19 @@
import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.EntityRegionAccessStrategy;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.infinispan.notifications.Listener;
/**
* @author Chris Bredesen
* @author Galder Zamarreño
* @since 3.5
*/
@Listener
public class EntityRegionImpl extends BaseTransactionalDataRegion implements EntityRegion {
public EntityRegionImpl(Cache cache, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cache, name, metadata, transactionManager);
public EntityRegionImpl(CacheAdapter cacheAdapter, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cacheAdapter, name, metadata, transactionManager);
}
public EntityRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {

View File

@ -21,11 +21,11 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
TransactionalAccess(EntityRegionImpl region) {
this.region = region;
this.delegate = new TransactionalAccessDelegate(region.getCache());
this.delegate = new TransactionalAccessDelegate(region);
}
public void evict(Object key) throws CacheException {
delegate.remove(key);
delegate.evict(key);
}
public void evictAll() throws CacheException {
@ -41,8 +41,7 @@ public EntityRegion getRegion() {
}
public boolean insert(Object key, Object value, Object version) throws CacheException {
region.getCache().put(key, value);
return true; // TODO this is suspect
return delegate.insert(key, value, version);
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {

View File

@ -4,7 +4,7 @@
import org.hibernate.cache.CacheException;
import org.hibernate.cache.GeneralDataRegion;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.util.CacheAdapter;
/**
* Support for Infinispan {@link GeneralDataRegion} implementors.
@ -15,24 +15,24 @@
*/
public abstract class BaseGeneralDataRegion extends BaseRegion implements GeneralDataRegion {
public BaseGeneralDataRegion(Cache cache, String name, TransactionManager transactionManager) {
super(cache, name, transactionManager);
public BaseGeneralDataRegion(CacheAdapter cacheAdapter, String name, TransactionManager transactionManager) {
super(cacheAdapter, name, transactionManager);
}
public void evict(Object key) throws CacheException {
getCache().evict(key);
cacheAdapter.evict(key);
}
public void evictAll() throws CacheException {
getCache().clear();
cacheAdapter.clear();
}
public Object get(Object key) throws CacheException {
return getCache().get(key);
return cacheAdapter.get(key);
}
public void put(Object key, Object value) throws CacheException {
getCache().put(key, value);
cacheAdapter.put(key, value);
}
}

View File

@ -1,6 +1,11 @@
package org.hibernate.cache.infinispan.impl;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@ -8,9 +13,19 @@
import org.hibernate.cache.CacheException;
import org.hibernate.cache.Region;
import org.hibernate.cache.infinispan.util.AddressAdapter;
import org.hibernate.cache.infinispan.util.AddressAdapterImpl;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryInvalidated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
/**
* Support for Infinispan {@link Region}s. Handles common "utility" methods for an underlying named
@ -22,26 +37,69 @@
* @since 3.5
*/
public abstract class BaseRegion implements Region {
private final Cache cache;
private enum InvalidateState { INVALID, CLEARING, VALID };
private static final Log log = LogFactory.getLog(BaseRegion.class);
private final String name;
protected final CacheAdapter cacheAdapter;
protected final AddressAdapter address;
protected final Set<AddressAdapter> currentView = new HashSet<AddressAdapter>();
protected final TransactionManager transactionManager;
protected final boolean replication;
protected final Object invalidationMutex = new Object();
protected final AtomicReference<InvalidateState> invalidateState = new AtomicReference<InvalidateState>(InvalidateState.VALID);
public BaseRegion(Cache cache, String name, TransactionManager transactionManager) {
this.cache = cache;
public BaseRegion(CacheAdapter cacheAdapter, String name, TransactionManager transactionManager) {
this.cacheAdapter = cacheAdapter;
this.name = name;
this.transactionManager = transactionManager;
this.replication = cacheAdapter.isClusteredReplication();
this.address = this.cacheAdapter.getAddress();
this.cacheAdapter.addListener(this);
}
public Cache getCache() {
return cache;
public void start() {
if (address != null) {
synchronized (currentView) {
List<AddressAdapter> view = cacheAdapter.getMembers();
if (view != null) {
currentView.addAll(view);
establishInternalNodes();
}
}
}
}
/**
* Calls to this method must be done from synchronized (currentView) blocks only!!
*/
private void establishInternalNodes() {
Transaction tx = suspend();
try {
for (AddressAdapter member : currentView) {
CacheHelper.initInternalEvict(cacheAdapter, member);
}
} finally {
resume(tx);
}
}
public String getName() {
return name;
}
public CacheAdapter getCacheAdapter() {
return cacheAdapter;
}
public long getElementCountInMemory() {
return cache.size();
if (checkValid()) {
Set keySet = cacheAdapter.keySet();
int size = cacheAdapter.size();
if (CacheHelper.containsEvictAllNotification(keySet, address))
size--;
return size;
}
return 0;
}
/**
@ -71,17 +129,64 @@ public long nextTimestamp() {
}
public Map toMap() {
return cache;
if (checkValid()) {
Map map = cacheAdapter.toMap();
Set keys = map.keySet();
for (Object key : keys) {
if (CacheHelper.isEvictAllNotification(key)) {
map.remove(key);
}
}
return map;
}
return Collections.EMPTY_MAP;
}
public void destroy() throws CacheException {
cache.clear();
try {
cacheAdapter.clear();
} finally {
cacheAdapter.removeListener(this);
}
}
public boolean contains(Object key) {
return CacheHelper.containsKey(cache, key, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT);
if (!checkValid())
return false;
// Reads are non-blocking in Infinispan, so not sure of the necessity of passing ZERO_LOCK_ACQUISITION_TIMEOUT
return cacheAdapter.withFlags(FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT).containsKey(key);
}
public AddressAdapter getAddress() {
return address;
}
public boolean checkValid() {
boolean valid = invalidateState.get() == InvalidateState.VALID;
if (!valid) {
synchronized (invalidationMutex) {
if (invalidateState.compareAndSet(InvalidateState.INVALID, InvalidateState.CLEARING)) {
Transaction tx = suspend();
try {
cacheAdapter.withFlags(FlagAdapter.CACHE_MODE_LOCAL, FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT).clear();
invalidateState.compareAndSet(InvalidateState.CLEARING, InvalidateState.VALID);
}
catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("Could not invalidate region: " + e.getLocalizedMessage());
}
}
finally {
resume(tx);
}
}
}
valid = invalidateState.get() == InvalidateState.VALID;
}
return valid;
}
/**
* Performs a JBoss Cache <code>get(Fqn, Object)</code> after first
* {@link #suspend suspending any ongoing transaction}. Wraps any exception
@ -93,13 +198,13 @@ public boolean contains(Object key) {
* @return The retrieved object
* @throws CacheException issue managing transaction or talking to cache
*/
protected Object suspendAndGet(Object key, Flag opt, boolean suppressTimeout) throws CacheException {
protected Object suspendAndGet(Object key, FlagAdapter opt, boolean suppressTimeout) throws CacheException {
Transaction tx = suspend();
try {
if (suppressTimeout)
return CacheHelper.getAllowingTimeout(cache, key);
return cacheAdapter.getAllowingTimeout(key);
else
return CacheHelper.get(cache, key);
return cacheAdapter.get(key);
} finally {
resume(tx);
}
@ -111,7 +216,7 @@ protected Object suspendAndGet(Object key, Flag opt, boolean suppressTimeout) th
* @return the transaction that was suspended, or <code>null</code> if
* there wasn't one
*/
protected Transaction suspend() {
public Transaction suspend() {
Transaction tx = null;
try {
if (transactionManager != null) {
@ -122,14 +227,14 @@ protected Transaction suspend() {
}
return tx;
}
/**
* Tell the TransactionManager to resume the given transaction
*
* @param tx
* the transaction to suspend. May be <code>null</code>.
*/
protected void resume(Transaction tx) {
public void resume(Transaction tx) {
try {
if (tx != null)
transactionManager.resume(tx);
@ -138,4 +243,44 @@ protected void resume(Transaction tx) {
}
}
@CacheEntryModified
public void entryModified(CacheEntryModifiedEvent event) {
handleEvictAllModification(event);
}
protected boolean handleEvictAllModification(CacheEntryModifiedEvent event) {
if (!event.isPre() && (replication || event.isOriginLocal()) && CacheHelper.isEvictAllNotification(event.getKey(), event.getValue())) {
if (log.isTraceEnabled()) log.trace("Set invalid state because marker cache entry was put: {0}", event);
invalidateState.set(InvalidateState.INVALID);
return true;
}
return false;
}
@CacheEntryInvalidated
public void entryInvalidated(CacheEntryInvalidatedEvent event) {
if (log.isTraceEnabled()) log.trace("Cache entry invalidated: {0}", event);
handleEvictAllInvalidation(event);
}
protected boolean handleEvictAllInvalidation(CacheEntryInvalidatedEvent event) {
if (!event.isPre() && CacheHelper.isEvictAllNotification(event.getKey())) {
if (log.isTraceEnabled()) log.trace("Set invalid state because marker cache entry was invalidated: {0}", event);
invalidateState.set(InvalidateState.INVALID);
return true;
}
return false;
}
@ViewChanged
public void viewChanged(ViewChangedEvent event) {
synchronized (currentView) {
List<AddressAdapter> view = AddressAdapterImpl.toAddressAdapter(event.getNewMembers());
if (view != null) {
currentView.addAll(view);
establishInternalNodes();
}
}
}
}

View File

@ -4,7 +4,7 @@
import org.hibernate.cache.CacheDataDescription;
import org.hibernate.cache.TransactionalDataRegion;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.util.CacheAdapter;
/**
* Support for Inifinispan {@link TransactionalDataRegion} implementors.
@ -17,8 +17,8 @@ public abstract class BaseTransactionalDataRegion extends BaseRegion implements
private final CacheDataDescription metadata;
public BaseTransactionalDataRegion(Cache cache, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cache, name, transactionManager);
public BaseTransactionalDataRegion(CacheAdapter cacheAdapter, String name, CacheDataDescription metadata, TransactionManager transactionManager) {
super(cacheAdapter, name, transactionManager);
this.metadata = metadata;
}

View File

@ -2,74 +2,85 @@
import java.util.Properties;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.QueryResultsRegion;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.infinispan.notifications.Listener;
/**
* @author Chris Bredesen
* @author Galder Zamarreño
* @since 3.5
*/
@Listener
public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implements QueryResultsRegion {
private boolean localOnly;
public QueryResultsRegionImpl(Cache cache, String name, Properties properties, TransactionManager transactionManager) {
super(cache, name, null, transactionManager);
public QueryResultsRegionImpl(CacheAdapter cacheAdapter, String name, Properties properties, TransactionManager transactionManager) {
super(cacheAdapter, name, null, transactionManager);
// If Infinispan is using INVALIDATION for query cache, we don't want to propagate changes.
// We use the Timestamps cache to manage invalidation
localOnly = CacheHelper.isClusteredInvalidation(cache);
localOnly = cacheAdapter.isClusteredInvalidation();
}
public void evict(Object key) throws CacheException {
if (localOnly)
CacheHelper.removeKey(getCache(), key, Flag.CACHE_MODE_LOCAL);
cacheAdapter.withFlags(FlagAdapter.CACHE_MODE_LOCAL).remove(key);
else
CacheHelper.removeKey(getCache(), key);
cacheAdapter.remove(key);
}
public void evictAll() throws CacheException {
if (localOnly)
CacheHelper.removeAll(getCache(), Flag.CACHE_MODE_LOCAL);
else
CacheHelper.removeAll(getCache());
Transaction tx = suspend();
try {
CacheHelper.sendEvictAllNotification(cacheAdapter, getAddress());
} finally {
resume(tx);
}
}
public Object get(Object key) throws CacheException {
if (!checkValid())
return null;
// Don't hold the JBC node lock throughout the tx, as that
// prevents updates
// Add a zero (or low) timeout option so we don't block
// waiting for tx's that did a put to commit
return suspendAndGet(key, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, true);
return suspendAndGet(key, FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT, true);
}
public void put(Object key, Object value) throws CacheException {
// Here we don't want to suspend the tx. If we do:
// 1) We might be caching query results that reflect uncommitted
// changes. No tx == no WL on cache node, so other threads
// can prematurely see those query results
// 2) No tx == immediate replication. More overhead, plus we
// spread issue #1 above around the cluster
if (checkValid()) {
// Here we don't want to suspend the tx. If we do:
// 1) We might be caching query results that reflect uncommitted
// changes. No tx == no WL on cache node, so other threads
// can prematurely see those query results
// 2) No tx == immediate replication. More overhead, plus we
// spread issue #1 above around the cluster
// Add a zero (or quite low) timeout option so we don't block.
// Ignore any TimeoutException. Basically we forego caching the
// query result in order to avoid blocking.
// Reads are done with suspended tx, so they should not hold the
// lock for long. Not caching the query result is OK, since
// any subsequent read will just see the old result with its
// out-of-date timestamp; that result will be discarded and the
// db query performed again.
if (localOnly)
CacheHelper.putAllowingTimeout(getCache(), key, value, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.CACHE_MODE_LOCAL);
else
CacheHelper.putAllowingTimeout(getCache(), key, value, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT);
// Add a zero (or quite low) timeout option so we don't block.
// Ignore any TimeoutException. Basically we forego caching the
// query result in order to avoid blocking.
// Reads are done with suspended tx, so they should not hold the
// lock for long. Not caching the query result is OK, since
// any subsequent read will just see the old result with its
// out-of-date timestamp; that result will be discarded and the
// db query performed again.
if (localOnly)
cacheAdapter.withFlags(FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT, FlagAdapter.CACHE_MODE_LOCAL)
.putAllowingTimeout(key, value);
else
cacheAdapter.withFlags(FlagAdapter.ZERO_LOCK_ACQUISITION_TIMEOUT)
.putAllowingTimeout(key, value);
}
}
}

View File

@ -10,12 +10,13 @@
import org.hibernate.cache.CacheException;
import org.hibernate.cache.TimestampsRegion;
import org.hibernate.cache.infinispan.impl.BaseGeneralDataRegion;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryInvalidatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
@ -31,26 +32,31 @@ public class TimestampsRegionImpl extends BaseGeneralDataRegion implements Times
private Map localCache = new ConcurrentHashMap();
public TimestampsRegionImpl(Cache cache, String name, TransactionManager transactionManager) {
super(cache, name, transactionManager);
cache.addListener(this);
public TimestampsRegionImpl(CacheAdapter cacheAdapter, String name, TransactionManager transactionManager) {
super(cacheAdapter, name, transactionManager);
cacheAdapter.addListener(this);
populateLocalCache();
}
@Override
public void evict(Object key) throws CacheException {
// TODO Is this a valid operation on a timestamps cache?
CacheHelper.removeKey(getCache(), key);
cacheAdapter.remove(key);
}
public void evictAll() throws CacheException {
// TODO Is this a valid operation on a timestamps cache?
CacheHelper.removeAll(getCache());
Transaction tx = suspend();
try {
CacheHelper.sendEvictAllNotification(cacheAdapter, getAddress());
} finally {
resume(tx);
}
}
public Object get(Object key) throws CacheException {
Object value = localCache.get(key);
if (value == null) {
if (value == null && checkValid()) {
value = suspendAndGet(key, null, false);
if (value != null)
localCache.put(key, value);
@ -64,7 +70,7 @@ public void put(Object key, Object value) throws CacheException {
Transaction tx = suspend();
try {
// We ensure ASYNC semantics (JBCACHE-1175)
CacheHelper.put(getCache(), key, value, Flag.FORCE_ASYNCHRONOUS);
cacheAdapter.withFlags(FlagAdapter.FORCE_ASYNCHRONOUS).put(key, value);
} catch (Exception e) {
throw new CacheException(e);
} finally {
@ -75,7 +81,7 @@ public void put(Object key, Object value) throws CacheException {
@Override
public void destroy() throws CacheException {
localCache.clear();
getCache().removeListener(this);
cacheAdapter.removeListener(this);
super.destroy();
}
@ -86,8 +92,9 @@ public void destroy() throws CacheException {
*/
@CacheEntryModified
public void nodeModified(CacheEntryModifiedEvent event) {
if (event.isPre()) return;
localCache.put(event.getKey(), event.getValue());
if (!handleEvictAllModification(event) && !event.isPre()) {
localCache.put(event.getKey(), event.getValue());
}
}
/**
@ -101,11 +108,29 @@ public void nodeRemoved(CacheEntryRemovedEvent event) {
localCache.remove(event.getKey());
}
@Override
protected boolean handleEvictAllModification(CacheEntryModifiedEvent event) {
boolean result = super.handleEvictAllModification(event);
if (result) {
localCache.clear();
}
return result;
}
@Override
protected boolean handleEvictAllInvalidation(CacheEntryInvalidatedEvent event) {
boolean result = super.handleEvictAllInvalidation(event);
if (result) {
localCache.clear();
}
return result;
}
/**
* Brings all data from the distributed cache into our local cache.
*/
private void populateLocalCache() {
Set children = CacheHelper.getKeySet(getCache());
Set children = cacheAdapter.keySet();
for (Object key : children)
get(key);
}

View File

@ -0,0 +1,32 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2009, Red Hat, Inc. and/or its affiliates, and
* individual contributors as indicated by the @author tags. See the
* copyright.txt file in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hibernate.cache.infinispan.util;
/**
* AddressAdapter.
*
* @author Galder Zamarreño
* @since 3.5
*/
public interface AddressAdapter {
}

View File

@ -0,0 +1,84 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2009, Red Hat, Inc. and/or its affiliates, and
* individual contributors as indicated by the @author tags. See the
* copyright.txt file in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hibernate.cache.infinispan.util;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
import org.infinispan.remoting.transport.Address;
/**
* AddressAdapterImpl.
*
* @author Galder Zamarreño
* @since 3.5
*/
public class AddressAdapterImpl implements AddressAdapter, Externalizable {
private Address address;
private AddressAdapterImpl(Address address) {
this.address = address;
}
static AddressAdapter newInstance(Address address) {
return new AddressAdapterImpl(address);
}
public static List<AddressAdapter> toAddressAdapter(List<Address> ispnAddresses) {
List<AddressAdapter> addresses = new ArrayList<AddressAdapter>(ispnAddresses.size());
for (Address address : ispnAddresses) {
addresses.add(AddressAdapterImpl.newInstance(address));
}
return addresses;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
address = (Address) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(address);
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (!(obj instanceof AddressAdapterImpl))
return false;
AddressAdapterImpl other = (AddressAdapterImpl) obj;
return other.address.equals(address);
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + address.hashCode();
return result;
}
}

View File

@ -0,0 +1,207 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2009, Red Hat, Inc. and/or its affiliates, and
* individual contributors as indicated by the @author tags. See the
* copyright.txt file in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hibernate.cache.infinispan.util;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.hibernate.cache.CacheException;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.util.concurrent.TimeoutException;
/**
* Infinispan cache abstraction.
*
* @author Galder Zamarreño
* @since 3.5
*/
public interface CacheAdapter {
/**
* Is this cache participating in a cluster with invalidation?
*
* @return true if the cache is configured for synchronous/asynchronous invalidation; false otherwise.
*/
boolean isClusteredInvalidation();
/**
* Is this cache participating in a cluster with replication?
*
* @return true if the cache is configured for synchronous/asynchronous invalidation; false otherwise.
*/
boolean isClusteredReplication();
/**
* Is this cache configured for synchronous communication?
*
* @return true if the cache is configured for synchronous communication; false otherwise.
*/
boolean isSynchronous();
/**
* Set of keys of this cache.
*
* @return Set containing keys stored in this cache.
*/
Set keySet();
/**
* A builder-style method that adds flags to any cache API call.
*
* @param flagAdapters a set of flags to apply. See the {@link FlagAdapter} documentation.
* @return a cache on which a real operation is to be invoked.
*/
CacheAdapter withFlags(FlagAdapter... flagAdapters);
/**
* Method to check whether a certain key exists in this cache.
*
* @param key key to look up.
* @return true if key is present, false otherwise.
*/
boolean containsKey(Object key);
/**
* Performs an <code>get(Object)</code> on the cache, wrapping any exception in a {@link CacheException}.
*
* @param key key to retrieve
* @throws CacheException
*/
Object get(Object key) throws CacheException;
/**
* Performs an <code>get(Object)</code> on the cache ignoring any {@link TimeoutException}
* and wrapping any other exception in a {@link CacheException}.
*
* @param key key to retrieve
* @throws CacheException
*/
Object getAllowingTimeout(Object key) throws CacheException;
/**
* Performs a <code>put(Object, Object)</code> on the cache, wrapping any exception in a {@link CacheException}.
*
* @param key key whose value will be modified
* @param value data to store in the cache entry
* @return the previous value associated with <tt>key</tt>, or <tt>null</tt>
* if there was no mapping for <tt>key</tt>.
* @throws CacheException
*/
Object put(Object key, Object value) throws CacheException;
/**
* Performs a <code>put(Object, Object)</code> on the cache ignoring any {@link TimeoutException}
* and wrapping any exception in a {@link CacheException}.
*
* @param key key whose value will be modified
* @param value data to store in the cache entry
* @return the previous value associated with <tt>key</tt>, or <tt>null</tt>
* if there was no mapping for <tt>key</tt>.
* @throws CacheException
*/
Object putAllowingTimeout(Object key, Object value) throws CacheException;
/**
* See {@link Cache#putForExternalRead(Object, Object)} for detailed documentation.
*
* @param key key with which the specified value is to be associated.
* @param value value to be associated with the specified key.
* @throws CacheException
*/
void putForExternalRead(Object key, Object value) throws CacheException;
/**
* Performs a <code>remove(Object)</code>, wrapping any exception in a {@link CacheException}.
*
* @param key key to be removed
* @return the previous value associated with <tt>key</tt>, or
* <tt>null</tt> if there was no mapping for <tt>key</tt>.
* @throws CacheException
*/
Object remove(Object key) throws CacheException;
/**
* Evict the given key from memory.
*
* @param key to evict.
*/
void evict(Object key) throws CacheException;
/**
* Clear the cache.
*
* @throws CacheException
*/
void clear() throws CacheException;
/**
* Add listener to this cache.
*
* @param listener to be added to cache.
*/
void addListener(Object listener);
/**
* Get local cluster address.
*
* @return Address representing local address.
*/
AddressAdapter getAddress();
/**
* Get cluster members.
*
* @return List of cluster member Address instances
*/
List<AddressAdapter> getMembers();
/**
* Size of cache.
*
* @return number of cache entries.
*/
int size();
/**
* This method returns a Map view of the cache.
*
* @return Map view of cache.
*/
Map toMap();
/**
* Remove listener from cache instance.
*
* @param listener to be removed.
*/
void removeListener(Object listener);
/**
* Get cache configuration.
*
* @return Configuration instance associated with this cache.
*/
Configuration getConfiguration();
}

View File

@ -0,0 +1,205 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2009, Red Hat, Inc. and/or its affiliates, and
* individual contributors as indicated by the @author tags. See the
* copyright.txt file in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hibernate.cache.infinispan.util;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.hibernate.cache.CacheException;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.concurrent.TimeoutException;
/**
* CacheAdapterImpl.
*
* @author Galder Zamarreño
* @since 3.5
*/
public class CacheAdapterImpl implements CacheAdapter {
private final Cache cache;
private CacheAdapterImpl(Cache cache) {
this.cache = cache;
}
public static CacheAdapter newInstance(Cache cache) {
return new CacheAdapterImpl(cache);
}
public boolean isClusteredInvalidation() {
return isClusteredInvalidation(cache.getConfiguration().getCacheMode());
}
public boolean isClusteredReplication() {
return isClusteredReplication(cache.getConfiguration().getCacheMode());
}
public boolean isSynchronous() {
return isSynchronous(cache.getConfiguration().getCacheMode());
}
public Set keySet() {
return cache.keySet();
}
public CacheAdapter withFlags(FlagAdapter... flagAdapters) {
Flag[] flags = FlagAdapter.toFlags(flagAdapters);
return newInstance(cache.getAdvancedCache().withFlags(flags));
}
public Object get(Object key) throws CacheException {
try {
return cache.get(key);
} catch (Exception e) {
throw new CacheException(e);
}
}
public Object getAllowingTimeout(Object key) throws CacheException {
try {
return cache.get(key);
} catch (TimeoutException ignored) {
// ignore it
return null;
} catch (Exception e) {
throw new CacheException(e);
}
}
public Object put(Object key, Object value) throws CacheException {
try {
return cache.put(key, value);
} catch (Exception e) {
throw new CacheException(e);
}
}
public Object putAllowingTimeout(Object key, Object value) throws CacheException {
try {
return cache.put(key, value);
} catch (TimeoutException allowed) {
// ignore it
return null;
} catch (Exception e) {
throw new CacheException(e);
}
}
public void putForExternalRead(Object key, Object value) throws CacheException {
try {
cache.putForExternalRead(key, value);
} catch (Exception e) {
throw new CacheException(e);
}
}
public Object remove(Object key) throws CacheException {
try {
return cache.remove(key);
} catch (Exception e) {
throw new CacheException(e);
}
}
public void evict(Object key) throws CacheException {
try {
cache.evict(key);
} catch (Exception e) {
throw new CacheException(e);
}
}
public void clear() throws CacheException {
try {
cache.clear();
} catch (Exception e) {
throw new CacheException(e);
}
}
private static boolean isClusteredInvalidation(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.INVALIDATION_ASYNC
|| cacheMode == Configuration.CacheMode.INVALIDATION_SYNC;
}
private static boolean isClusteredReplication(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.REPL_ASYNC
|| cacheMode == Configuration.CacheMode.REPL_SYNC;
}
private static boolean isSynchronous(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.REPL_SYNC
|| cacheMode == Configuration.CacheMode.INVALIDATION_SYNC
|| cacheMode == Configuration.CacheMode.DIST_SYNC;
}
public void addListener(Object listener) {
cache.addListener(listener);
}
public AddressAdapter getAddress() {
RpcManager rpc = cache.getAdvancedCache().getRpcManager();
if (rpc != null) {
return AddressAdapterImpl.newInstance(rpc.getTransport().getAddress());
}
return null;
}
public List<AddressAdapter> getMembers() {
RpcManager rpc = cache.getAdvancedCache().getRpcManager();
if (rpc != null) {
return AddressAdapterImpl.toAddressAdapter(rpc.getTransport().getMembers());
}
return null;
}
public RpcManager getRpcManager() {
return cache.getAdvancedCache().getRpcManager();
}
public int size() {
return cache.size();
}
public Map toMap() {
return cache;
}
public void removeListener(Object listener) {
cache.removeListener(listener);
}
public boolean containsKey(Object key) {
return cache.containsKey(key);
}
public Configuration getConfiguration() {
return cache.getConfiguration();
}
}

View File

@ -23,13 +23,12 @@
*/
package org.hibernate.cache.infinispan.util;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Set;
import org.hibernate.cache.CacheException;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,324 +48,68 @@ public class CacheHelper {
private CacheHelper() {
}
/**
* Is this cache participating in a cluster with invalidation?
*
* @param cache
* The cache to check.
* @return True if the cache is configured for synchronous/asynchronous invalidation; false
* otherwise.
*/
public static boolean isClusteredInvalidation(Cache cache) {
return isClusteredInvalidation(cache.getConfiguration().getCacheMode());
public static void initInternalEvict(CacheAdapter cacheAdapter, AddressAdapter member) {
EvictAll eKey = new EvictAll(member == null ? NoAddress.INSTANCE : member);
cacheAdapter.withFlags(FlagAdapter.CACHE_MODE_LOCAL).put(eKey, Internal.INIT);
}
/**
* Does this cache mode indicate clustered invalidation?
*
* @param cacheMode
* The cache to check
* @return True if the cache mode is confiogured for synchronous/asynchronous invalidation; false
* otherwise.
*/
public static boolean isClusteredInvalidation(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.INVALIDATION_ASYNC
|| cacheMode == Configuration.CacheMode.INVALIDATION_SYNC;
public static void sendEvictAllNotification(CacheAdapter cacheAdapter, AddressAdapter member) {
EvictAll eKey = new EvictAll(member == null ? NoAddress.INSTANCE : member);
cacheAdapter.put(eKey, Internal.EVICT);
}
/**
* Is this cache participating in a cluster with replication?
*
* @param cache
* The cache to check.
* @return True if the cache is configured for synchronous/asynchronous invalidation; false
* otherwise.
*/
public static boolean isClusteredReplication(Cache cache) {
return isClusteredReplication(cache.getConfiguration().getCacheMode());
public static boolean isEvictAllNotification(Object key) {
return key instanceof EvictAll;
}
/**
* Does this cache mode indicate clustered replication?
*
* @param cacheMode
* The cache to check
* @return True if the cache mode is confiogured for synchronous/asynchronous invalidation; false
* otherwise.
*/
public static boolean isClusteredReplication(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.REPL_ASYNC || cacheMode == Configuration.CacheMode.REPL_SYNC;
public static boolean containsEvictAllNotification(Set keySet, AddressAdapter member) {
EvictAll eKey = new EvictAll(member == null ? NoAddress.INSTANCE : member);
return keySet.contains(eKey);
}
public static boolean isSynchronous(Cache cache) {
return isSynchronous(cache.getConfiguration().getCacheMode());
public static boolean isEvictAllNotification(Object key, Object value) {
return key instanceof EvictAll && value == Internal.EVICT;
}
public static boolean isSynchronous(Configuration.CacheMode cacheMode) {
return cacheMode == Configuration.CacheMode.REPL_SYNC || cacheMode == Configuration.CacheMode.INVALIDATION_SYNC;
}
private static class EvictAll implements Externalizable {
AddressAdapter member;
public static Set getKeySet(Cache cache) {
return cache.keySet();
}
EvictAll(AddressAdapter member) {
this.member = member;
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>get(Fqn, Object)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
*/
public static Object get(Cache cache, Object key) throws CacheException {
try {
return cache.get(key);
} catch (Exception e) {
throw new CacheException(e);
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (!(obj instanceof EvictAll))
return false;
EvictAll ek = (EvictAll) obj;
return ek.member.equals(member);
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + member.hashCode();
return result;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
member = (AddressAdapter) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(member);
}
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>get(Fqn, Object)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
*/
public static Object getAllowingTimeout(Cache cache, Object key) throws CacheException {
try {
return cache.get(key);
} catch (TimeoutException ignored) {
// ignore it
return null;
} catch (Exception e) {
throw new CacheException(e);
}
private enum NoAddress implements AddressAdapter {
INSTANCE;
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>put(Object, Object)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param value
* data to store in the cache node
*/
public static void put(Cache cache, Object key, Object value) throws CacheException {
put(cache, key, value, null);
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>put(Object, Object)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param value
* data to store in the cache node
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static void put(Cache cache, Object key, Object value, Flag option) throws CacheException {
try {
cache.getAdvancedCache().put(key, value, option);
} catch (Exception e) {
throw new CacheException(e);
}
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>put(Object, Object)</code>, ignoring any {@link TimeoutException} and wrapping any other
* exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param value
* data to store in the cache node
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static void putAllowingTimeout(Cache cache, Object key, Object value, Flag... option) throws CacheException {
try {
cache.getAdvancedCache().put(key, value, option);
} catch (TimeoutException allowed) {
// ignore it
} catch (Exception e) {
throw new CacheException(e);
}
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>putForExternalRead(Object, Object)</code>, wrapping any exception in a
* {@link CacheException}. Ignores any JBoss Cache {@link TimeoutException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param value
* data to store in the cache node
*/
public static boolean putForExternalRead(Cache cache, Object key, Object value) throws CacheException {
return putForExternalRead(cache, key, value, (Flag[])null);
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>putForExternalRead(Object, Object)</code>, wrapping any exception in a
* {@link CacheException}. Ignores any JBoss Cache {@link TimeoutException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param value
* data to store in the cache node
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static boolean putForExternalRead(Cache cache, Object key, Object value, Flag... option) throws CacheException {
try {
cache.getAdvancedCache().putForExternalRead(key, value, option);
return true;
} catch (TimeoutException te) {
// ignore!
log.debug("ignoring write lock acquisition failure");
return false;
} catch (Throwable t) {
throw new CacheException(t);
}
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>removeNode(Fqn)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
*/
public static void remove(Cache cache, Object key) throws CacheException {
remove(cache, key, null);
}
/**
* Builds an {@link Fqn} from <code>region</code> and <code>key</code> and performs a JBoss Cache
* <code>removeNode(Fqn)</code>, wrapping any exception in a {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param key
* specific key to append to the <code>region</code> to form the full Fqn
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static void remove(Cache cache, Object key, Flag option) throws CacheException {
try {
cache.getAdvancedCache().remove(key, option);
} catch (Exception e) {
throw new CacheException(e);
}
}
/**
* Performs a JBoss Cache <code>removeNode(Fqn)</code>, wrapping any exception in a
* {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
*/
public static void removeAll(Cache cache) throws CacheException {
try {
cache.clear();
} catch (Exception e) {
throw new CacheException(e);
}
}
/**
* Performs a JBoss Cache <code>removeNode(Fqn)</code>, wrapping any exception in a
* {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static void removeAll(Cache cache, Flag option) throws CacheException {
try {
cache.getAdvancedCache().clear(option);
} catch (Exception e) {
throw new CacheException(e);
}
}
/**
* Performs a JBoss Cache <code>removeNode(Fqn)</code>, wrapping any exception in a
* {@link CacheException}.
*
* @param cache
* the cache to invoke on
* @param region
* base Fqn for the cache region
* @param option
* invocation Option to set for this invocation. May be <code>null</code>.
*/
public static void removeKey(Cache cache, Object key, Flag option) throws CacheException {
try {
cache.getAdvancedCache().remove(key, option);
} catch (Exception e) {
throw new CacheException(e);
}
}
public static void removeKey(Cache cache, Object key) throws CacheException {
try {
cache.remove(key);
} catch (Exception e) {
throw new CacheException(e);
}
}
public static boolean containsKey(Cache cache, Object key, Flag... flags) {
try {
return cache.getAdvancedCache().containsKey(key, flags);
} catch (Exception e) {
throw new CacheException(e);
}
private enum Internal {
INIT, EVICT;
}
}

View File

@ -0,0 +1,59 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2009, Red Hat, Inc. and/or its affiliates, and
* individual contributors as indicated by the @author tags. See the
* copyright.txt file in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.cache.CacheException;
import org.infinispan.context.Flag;
/**
* FlagAdapter.
*
* @author Galder Zamarreño
* @since 3.5
*/
public enum FlagAdapter {
ZERO_LOCK_ACQUISITION_TIMEOUT,
CACHE_MODE_LOCAL,
FORCE_ASYNCHRONOUS;
Flag toFlag() {
switch(this) {
case ZERO_LOCK_ACQUISITION_TIMEOUT:
return Flag.ZERO_LOCK_ACQUISITION_TIMEOUT;
case CACHE_MODE_LOCAL:
return Flag.CACHE_MODE_LOCAL;
case FORCE_ASYNCHRONOUS:
return Flag.FORCE_ASYNCHRONOUS;
default:
throw new CacheException("Unmatched Infinispan flag " + this);
}
}
static Flag[] toFlags(FlagAdapter[] adapters) {
Flag[] flags = new Flag[adapters.length];
for (int i = 0; i < adapters.length; i++) {
flags[i] = adapters[i].toFlag();
}
return flags;
}
}

View File

@ -29,10 +29,9 @@
import org.hibernate.cache.QueryResultsRegion;
import org.hibernate.cache.Region;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.infinispan.Cache;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
/**
@ -74,8 +73,8 @@ public void testEvict() throws Exception {
private void evictOrRemoveTest() throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg, getCacheTestSupport());
Cache localCache = getInfinispanCache(regionFactory);
boolean invalidation = CacheHelper.isClusteredInvalidation(localCache);
CacheAdapter localCache = getInfinispanCache(regionFactory);
boolean invalidation = localCache.isClusteredInvalidation();
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
@ -123,7 +122,7 @@ public void testEvictAll() throws Exception {
private void evictOrRemoveAllTest(String configName) throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg, getCacheTestSupport());
Cache localCache = getInfinispanCache(regionFactory);
CacheAdapter localCache = getInfinispanCache(regionFactory);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
@ -133,7 +132,7 @@ private void evictOrRemoveAllTest(String configName) throws Exception {
cfg = createConfiguration();
regionFactory = CacheTestUtil.startRegionFactory(cfg, getCacheTestSupport());
Cache remoteCache = getInfinispanCache(regionFactory);
CacheAdapter remoteCache = getInfinispanCache(regionFactory);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
@ -141,11 +140,11 @@ private void evictOrRemoveAllTest(String configName) throws Exception {
GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion(regionFactory,
getStandardRegionName(REGION_PREFIX), cfg.getProperties(), null);
Set children = CacheHelper.getKeySet(localCache);
assertEquals("No children in " + children, 0, children.size());
Set keys = localCache.keySet();
assertEquals("No valid children in " + keys, 0, getValidKeyCount(keys));
children = CacheHelper.getKeySet(remoteCache);
assertEquals("No children in " + children, 0, children.size());
keys = remoteCache.keySet();
assertEquals("No valid children in " + keys, 0, getValidKeyCount(keys));
assertNull("local is clean", localRegion.get(KEY));
assertNull("remote is clean", remoteRegion.get(KEY));
@ -168,11 +167,13 @@ private void evictOrRemoveAllTest(String configName) throws Exception {
sleep(250);
// This should re-establish the region root node in the optimistic case
assertNull(localRegion.get(KEY));
assertEquals("No valid children in " + keys, 0, getValidKeyCount(localCache.keySet()));
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
// This only adds a node in the case of optimistic locking
assertEquals(null, remoteRegion.get(KEY));
assertEquals("No valid children in " + keys, 0, getValidKeyCount(remoteCache.keySet()));
assertEquals("local is clean", null, localRegion.get(KEY));
assertEquals("remote is clean", null, remoteRegion.get(KEY));

View File

@ -23,7 +23,10 @@
*/
package org.hibernate.test.cache.infinispan;
import java.util.Set;
import org.hibernate.cache.RegionFactory;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.junit.UnitTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestSupport;
import org.infinispan.Cache;
@ -81,7 +84,7 @@ protected void unregisterFactory(RegionFactory factory) {
protected CacheTestSupport getCacheTestSupport() {
return testSupport;
}
protected void sleep(long ms) {
try {
Thread.sleep(ms);
@ -94,4 +97,15 @@ protected void sleep(long ms) {
protected void avoidConcurrentFlush() {
testSupport.avoidConcurrentFlush();
}
protected int getValidKeyCount(Set keys) {
int result = 0;
for (Object key : keys) {
if (!(CacheHelper.isEvictAllNotification(key))) {
result++;
}
}
return result;
}
}

View File

@ -29,8 +29,8 @@
import org.hibernate.cache.Region;
import org.hibernate.cache.impl.CacheDataDescriptionImpl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.util.ComparableComparator;
import org.infinispan.Cache;
/**
* Base class for tests of Region implementations.
@ -44,7 +44,7 @@ public AbstractRegionImplTestCase(String name) {
super(name);
}
protected abstract Cache getInfinispanCache(InfinispanRegionFactory regionFactory);
protected abstract CacheAdapter getInfinispanCache(InfinispanRegionFactory regionFactory);
protected abstract Region createRegion(InfinispanRegionFactory regionFactory, String regionName, Properties properties, CacheDataDescription cdd);

View File

@ -29,7 +29,7 @@
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.query.QueryResultsRegionImpl;
import org.hibernate.cache.infinispan.timestamp.TimestampsRegionImpl;
import org.infinispan.Cache;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.infinispan.config.Configuration;
import org.infinispan.config.Configuration.CacheMode;
import org.infinispan.eviction.EvictionStrategy;
@ -125,13 +125,13 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertFalse(factory.getDefinedConfigurations().contains(person));
assertNotNull(factory.getTypeOverrides().get(addresses));
assertFalse(factory.getDefinedConfigurations().contains(addresses));
Cache cache = null;
CacheAdapter cache = null;
EntityRegionImpl region = (EntityRegionImpl) factory.buildEntityRegion(person, p, null);
assertNotNull(factory.getTypeOverrides().get(person));
assertTrue(factory.getDefinedConfigurations().contains(person));
assertNull(factory.getTypeOverrides().get(address));
cache = region.getCache();
cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.LRU, cacheCfg.getEvictionStrategy());
assertEquals(2000, cacheCfg.getEvictionWakeUpInterval());
@ -143,7 +143,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertNotNull(factory.getTypeOverrides().get(person));
assertTrue(factory.getDefinedConfigurations().contains(person));
assertNull(factory.getTypeOverrides().get(address));
cache = region.getCache();
cache = region.getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.FIFO, cacheCfg.getEvictionStrategy());
assertEquals(3000, cacheCfg.getEvictionWakeUpInterval());
@ -153,7 +153,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertNotNull(factory.getTypeOverrides().get(person));
assertTrue(factory.getDefinedConfigurations().contains(person));
assertNull(factory.getTypeOverrides().get(address));
cache = region.getCache();
cache = region.getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.FIFO, cacheCfg.getEvictionStrategy());
assertEquals(3000, cacheCfg.getEvictionWakeUpInterval());
@ -163,7 +163,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertNotNull(factory.getTypeOverrides().get(addresses));
assertTrue(factory.getDefinedConfigurations().contains(person));
assertNull(factory.getTypeOverrides().get(parts));
cache = collectionRegion .getCache();
cache = collectionRegion .getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.FIFO, cacheCfg.getEvictionStrategy());
assertEquals(2500, cacheCfg.getEvictionWakeUpInterval());
@ -175,7 +175,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertNotNull(factory.getTypeOverrides().get(addresses));
assertTrue(factory.getDefinedConfigurations().contains(addresses));
assertNull(factory.getTypeOverrides().get(parts));
cache = collectionRegion.getCache();
cache = collectionRegion.getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.LRU, cacheCfg.getEvictionStrategy());
assertEquals(3500, cacheCfg.getEvictionWakeUpInterval());
@ -185,7 +185,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
assertNotNull(factory.getTypeOverrides().get(addresses));
assertTrue(factory.getDefinedConfigurations().contains(addresses));
assertNull(factory.getTypeOverrides().get(parts));
cache = collectionRegion.getCache();
cache = collectionRegion.getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.LRU, cacheCfg.getEvictionStrategy());
assertEquals(3500, cacheCfg.getEvictionWakeUpInterval());
@ -196,7 +196,7 @@ public void testBuildEntityCollectionRegionsPersonPlusEntityCollectionOverrides(
}
public void testBuildEntityCollectionRegionOverridesOnly() {
Cache cache = null;
CacheAdapter cache = null;
Properties p = new Properties();
p.setProperty("hibernate.cache.infinispan.entity.eviction.strategy", "FIFO");
p.setProperty("hibernate.cache.infinispan.entity.eviction.wake_up_interval", "3000");
@ -211,7 +211,7 @@ public void testBuildEntityCollectionRegionOverridesOnly() {
try {
EntityRegionImpl region = (EntityRegionImpl) factory.buildEntityRegion("com.acme.Address", p, null);
assertNull(factory.getTypeOverrides().get("com.acme.Address"));
cache = region.getCache();
cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.FIFO, cacheCfg.getEvictionStrategy());
assertEquals(3000, cacheCfg.getEvictionWakeUpInterval());
@ -220,7 +220,7 @@ public void testBuildEntityCollectionRegionOverridesOnly() {
CollectionRegionImpl collectionRegion = (CollectionRegionImpl) factory.buildCollectionRegion("com.acme.Person.addresses", p, null);
assertNull(factory.getTypeOverrides().get("com.acme.Person.addresses"));
cache = collectionRegion.getCache();
cache = collectionRegion.getCacheAdapter();
cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.LRU, cacheCfg.getEvictionStrategy());
assertEquals(3500, cacheCfg.getEvictionWakeUpInterval());
@ -252,7 +252,7 @@ public void testBuildEntityRegionPersonPlusEntityOverridesWithoutCfg() {
EntityRegionImpl region = (EntityRegionImpl) factory.buildEntityRegion(person, p, null);
assertNotNull(factory.getTypeOverrides().get(person));
assertTrue(factory.getDefinedConfigurations().contains(person));
Cache cache = region.getCache();
CacheAdapter cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.LRU, cacheCfg.getEvictionStrategy());
assertEquals(3000, cacheCfg.getEvictionWakeUpInterval());
@ -297,7 +297,7 @@ public void testBuildDefaultTimestampsRegion() {
config.setFetchInMemoryState(false);
manager.defineConfiguration("timestamps", config);
TimestampsRegionImpl region = (TimestampsRegionImpl) factory.buildTimestampsRegion(timestamps, p);
Cache cache = region.getCache();
CacheAdapter cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.NONE, cacheCfg.getEvictionStrategy());
assertEquals(CacheMode.REPL_ASYNC, cacheCfg.getCacheMode());
@ -324,7 +324,7 @@ public void testBuildDiffCacheNameTimestampsRegion() {
config.setCacheMode(CacheMode.REPL_SYNC);
manager.defineConfiguration("unrecommended-timestamps", config);
TimestampsRegionImpl region = (TimestampsRegionImpl) factory.buildTimestampsRegion(timestamps, p);
Cache cache = region.getCache();
CacheAdapter cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(EvictionStrategy.NONE, cacheCfg.getEvictionStrategy());
assertEquals(CacheMode.REPL_SYNC, cacheCfg.getCacheMode());
@ -400,7 +400,7 @@ public void testBuildQueryRegion() {
try {
assertTrue(factory.getDefinedConfigurations().contains("local-query"));
QueryResultsRegionImpl region = (QueryResultsRegionImpl) factory.buildQueryResultsRegion(query, p);
Cache cache = region.getCache();
CacheAdapter cache = region.getCacheAdapter();
Configuration cacheCfg = cache.getConfiguration();
assertEquals(CacheMode.LOCAL, cacheCfg.getCacheMode());
} finally {

View File

@ -38,13 +38,12 @@
import org.hibernate.cache.impl.CacheDataDescriptionImpl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.util.ComparableComparator;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
/**
@ -64,10 +63,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
protected static Configuration localCfg;
protected static InfinispanRegionFactory localRegionFactory;
protected Cache localCache;
protected CacheAdapter localCache;
protected static Configuration remoteCfg;
protected static InfinispanRegionFactory remoteRegionFactory;
protected Cache remoteCache;
protected CacheAdapter remoteCache;
protected CollectionRegion localCollectionRegion;
protected CollectionRegionAccessStrategy localAccessStrategy;
@ -112,17 +111,17 @@ protected void setUp() throws Exception {
localCollectionRegion = localRegionFactory.buildCollectionRegion(REGION_NAME, localCfg.getProperties(),
getCacheDataDescription());
localCache = ((BaseRegion) localCollectionRegion).getCache();
localCache = ((BaseRegion) localCollectionRegion).getCacheAdapter();
localAccessStrategy = localCollectionRegion.buildAccessStrategy(getAccessType());
invalidation = CacheHelper.isClusteredInvalidation(localCache);
synchronous = CacheHelper.isSynchronous(localCache);
invalidation = localCache.isClusteredInvalidation();
synchronous = localCache.isSynchronous();
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
remoteCollectionRegion = remoteRegionFactory.buildCollectionRegion(REGION_NAME, remoteCfg.getProperties(),
getCacheDataDescription());
remoteCache = ((BaseRegion) remoteCollectionRegion).getCache();
remoteCache = ((BaseRegion) remoteCollectionRegion).getCacheAdapter();
remoteAccessStrategy = remoteCollectionRegion.buildAccessStrategy(getAccessType());
node1Exception = null;
@ -142,13 +141,13 @@ protected void tearDown() throws Exception {
remoteCollectionRegion.destroy();
try {
localCache.getAdvancedCache().clear(Flag.CACHE_MODE_LOCAL);
localCache.withFlags(FlagAdapter.CACHE_MODE_LOCAL).clear();
} catch (Exception e) {
log.error("Problem purging local cache", e);
}
try {
remoteCache.getAdvancedCache().clear(Flag.CACHE_MODE_LOCAL);
remoteCache.withFlags(FlagAdapter.CACHE_MODE_LOCAL).clear();
} catch (Exception e) {
log.error("Problem purging remote cache", e);
}
@ -402,9 +401,9 @@ private void evictOrRemoveAllTest(boolean evict) {
final String KEY = KEY_BASE + testCount++;
assertEquals(0, localCache.keySet().size());
assertEquals(0, getValidKeyCount(localCache.keySet()));
assertEquals(0, remoteCache.keySet().size());
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
assertNull("local is clean", localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
@ -425,19 +424,19 @@ private void evictOrRemoveAllTest(boolean evict) {
// This should re-establish the region root node
assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, localCache.keySet().size());
assertEquals(0, getValidKeyCount(localCache.keySet()));
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, remoteCache.keySet().size());
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(1, remoteCache.keySet().size());
assertEquals(1, getValidKeyCount(remoteCache.keySet()));
// Wait for async propagation of the putFromLoad
sleep(250);

View File

@ -38,13 +38,12 @@
import org.hibernate.cache.impl.CacheDataDescriptionImpl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.util.ComparableComparator;
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
/**
@ -64,10 +63,10 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
protected static Configuration localCfg;
protected static InfinispanRegionFactory localRegionFactory;
protected Cache localCache;
protected CacheAdapter localCache;
protected static Configuration remoteCfg;
protected static InfinispanRegionFactory remoteRegionFactory;
protected Cache remoteCache;
protected CacheAdapter remoteCache;
protected boolean invalidation;
protected boolean synchronous;
@ -114,10 +113,10 @@ protected void setUp() throws Exception {
.getProperties(), getCacheDataDescription());
localAccessStrategy = localEntityRegion.buildAccessStrategy(getAccessType());
localCache = ((BaseRegion) localEntityRegion).getCache();
localCache = ((BaseRegion) localEntityRegion).getCacheAdapter();
invalidation = CacheHelper.isClusteredInvalidation(localCache);
synchronous = CacheHelper.isSynchronous(localCache);
invalidation = localCache.isClusteredInvalidation();
synchronous = localCache.isSynchronous();
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
@ -126,7 +125,7 @@ protected void setUp() throws Exception {
.getProperties(), getCacheDataDescription());
remoteAccessStrategy = remoteEntityRegion.buildAccessStrategy(getAccessType());
remoteCache = ((BaseRegion) remoteEntityRegion).getCache();
remoteCache = ((BaseRegion) remoteEntityRegion).getCacheAdapter();
node1Exception = null;
node2Exception = null;
@ -145,13 +144,13 @@ protected void tearDown() throws Exception {
remoteEntityRegion.destroy();
try {
localCache.getAdvancedCache().clear(Flag.CACHE_MODE_LOCAL);
localCache.withFlags(FlagAdapter.CACHE_MODE_LOCAL).clear();
} catch (Exception e) {
log.error("Problem purging local cache", e);
}
try {
remoteCache.getAdvancedCache().clear(Flag.CACHE_MODE_LOCAL);
remoteCache.withFlags(FlagAdapter.CACHE_MODE_LOCAL).clear();
} catch (Exception e) {
log.error("Problem purging remote cache", e);
}
@ -560,8 +559,9 @@ public void testEvictAll() {
}
private void evictOrRemoveTest(boolean evict) {
final String KEY = KEY_BASE + testCount++;
assertEquals(0, getValidKeyCount(localCache.keySet()));
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
assertNull("local is clean", localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
@ -571,26 +571,21 @@ private void evictOrRemoveTest(boolean evict) {
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
// Wait for async propagation
sleep(250);
if (evict)
localAccessStrategy.evict(KEY);
else
localAccessStrategy.remove(KEY);
assertEquals(null, localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, getValidKeyCount(localCache.keySet()));
assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
}
private void evictOrRemoveAllTest(boolean evict) {
final String KEY = KEY_BASE + testCount++;
assertEquals(0, localCache.keySet().size());
assertEquals(0, remoteCache.keySet().size());
assertEquals(0, getValidKeyCount(localCache.keySet()));
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
assertNull("local is clean", localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
@ -606,27 +601,26 @@ private void evictOrRemoveAllTest(boolean evict) {
// Wait for async propagation
sleep(250);
if (evict)
if (evict) {
log.debug("Call evict all locally");
localAccessStrategy.evictAll();
else
} else {
localAccessStrategy.removeAll();
}
// This should re-establish the region root node in the optimistic case
assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, localCache.keySet().size());
assertEquals(0, getValidKeyCount(localCache.keySet()));
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(0, remoteCache.keySet().size());
assertEquals(0, getValidKeyCount(remoteCache.keySet()));
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals(1, remoteCache.keySet().size());
assertEquals(1, getValidKeyCount(remoteCache.keySet()));
// Wait for async propagation
sleep(250);

View File

@ -32,8 +32,9 @@
import org.hibernate.cache.RegionFactory;
import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.test.cache.infinispan.AbstractEntityCollectionRegionTestCase;
import org.infinispan.Cache;
/**
* Tests of EntityRegionImpl.
@ -94,8 +95,8 @@ protected Region createRegion(InfinispanRegionFactory regionFactory, String regi
}
@Override
protected Cache getInfinispanCache(InfinispanRegionFactory regionFactory) {
return regionFactory.getCacheManager().getCache("entity");
protected CacheAdapter getInfinispanCache(InfinispanRegionFactory regionFactory) {
return CacheAdapterImpl.newInstance(regionFactory.getCacheManager().getCache("entity"));
}
}

View File

@ -26,7 +26,7 @@ public String[] getMappings() {
public String getCacheConcurrencyStrategy() {
return cacheConcurrencyStrategy;
}
public void testEmptySecondLevelCacheEntry() throws Exception {
getSessions().getCache().evictEntityRegion(Item.class.getName());
Statistics stats = getSessions().getStatistics();

View File

@ -1,6 +1,7 @@
package org.hibernate.test.cache.infinispan.functional;
import java.io.Serializable;
import java.util.Map;
import org.hibernate.Session;
import org.hibernate.Transaction;
@ -63,6 +64,8 @@ public void testCollectionCache() {
assertEquals(item.getName(), loadedWithCachedCollection.getName());
assertEquals(item.getItems().size(), loadedWithCachedCollection.getItems().size());
assertEquals(1, cStats.getHitCount());
Map cacheEntries = cStats.getEntries();
assertEquals(1, cacheEntries.size());
s.close();
}

View File

@ -32,6 +32,7 @@
import org.hibernate.cfg.Environment;
import org.hibernate.classic.Session;
import org.hibernate.junit.functional.FunctionalTestCase;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.test.cache.infinispan.functional.Contact;
import org.hibernate.test.cache.infinispan.functional.Customer;
import org.hibernate.transaction.CMTTransactionFactory;
@ -48,30 +49,31 @@
public class BulkOperationsTestCase extends FunctionalTestCase {
private static final Logger log = LoggerFactory.getLogger(BulkOperationsTestCase.class);
private TransactionManager tm;
public BulkOperationsTestCase(String string) {
super(string);
}
public String[] getMappings() {
return new String[] { "cache/infinispan/functional/Contact.hbm.xml", "cache/infinispan/functional/Customer.hbm.xml" };
return new String[] { "cache/infinispan/functional/Contact.hbm.xml",
"cache/infinispan/functional/Customer.hbm.xml" };
}
@Override
public String getCacheConcurrencyStrategy() {
return "transactional";
}
protected Class getTransactionFactoryClass() {
return CMTTransactionFactory.class;
return CMTTransactionFactory.class;
}
protected Class getConnectionProviderClass() {
return org.hibernate.test.cache.infinispan.tm.XaConnectionProvider.class;
}
protected Class<? extends TransactionManagerLookup> getTransactionManagerLookupClass() {
return org.hibernate.test.cache.infinispan.tm.XaTransactionManagerLookup.class;
}
@ -81,11 +83,13 @@ public void configure(Configuration cfg) {
cfg.setProperty(Environment.USE_SECOND_LEVEL_CACHE, "true");
cfg.setProperty(Environment.GENERATE_STATISTICS, "true");
cfg.setProperty(Environment.USE_QUERY_CACHE, "false");
cfg.setProperty(Environment.CONNECTION_PROVIDER, getConnectionProviderClass().getName());
cfg.setProperty(Environment.TRANSACTION_MANAGER_STRATEGY, getTransactionManagerLookupClass().getName());
cfg.setProperty(Environment.TRANSACTION_MANAGER_STRATEGY, getTransactionManagerLookupClass()
.getName());
Class transactionFactory = getTransactionFactoryClass();
cfg.setProperty( Environment.TRANSACTION_STRATEGY, transactionFactory.getName());
cfg.setProperty(Environment.TRANSACTION_STRATEGY, transactionFactory.getName());
}
public void testBulkOperations() throws Throwable {
@ -93,14 +97,19 @@ public void testBulkOperations() throws Throwable {
boolean cleanedUp = false;
try {
tm = getTransactionManagerLookupClass().newInstance().getTransactionManager(null);
createContacts();
List<Integer> rhContacts = getContactsByCustomer("Red Hat");
assertNotNull("Red Hat contacts exist", rhContacts);
assertEquals("Created expected number of Red Hat contacts", 10, rhContacts.size());
SecondLevelCacheStatistics contactSlcs = getEnvironment().getSessionFactory()
.getStatistics().getSecondLevelCacheStatistics(Contact.class.getName());
assertEquals(20, contactSlcs.getElementCountInMemory());
assertEquals("Deleted all Red Hat contacts", 10, deleteContacts());
assertEquals(0, contactSlcs.getElementCountInMemory());
List<Integer> jbContacts = getContactsByCustomer("JBoss");
assertNotNull("JBoss contacts exist", jbContacts);
@ -115,6 +124,7 @@ public void testBulkOperations() throws Throwable {
}
updateContacts("Kabir", "Updated");
assertEquals(0, contactSlcs.getElementCountInMemory());
for (Integer id : jbContacts) {
Contact contact = getContact(id);
assertNotNull("JBoss contact " + id + " exists", contact);
@ -125,7 +135,20 @@ public void testBulkOperations() throws Throwable {
List<Integer> updated = getContactsByTLF("Updated");
assertNotNull("Got updated contacts", updated);
assertEquals("Updated contacts", 5, updated.size());
} catch(Throwable t) {
updateContactsWithOneManual("Kabir", "UpdatedAgain");
assertEquals(contactSlcs.getElementCountInMemory(), 0);
for (Integer id : jbContacts) {
Contact contact = getContact(id);
assertNotNull("JBoss contact " + id + " exists", contact);
String expected = ("Kabir".equals(contact.getName())) ? "UpdatedAgain" : "2222";
assertEquals("JBoss contact " + id + " has correct TLF", expected, contact.getTlf());
}
updated = getContactsByTLF("UpdatedAgain");
assertNotNull("Got updated contacts", updated);
assertEquals("Updated contacts", 5, updated.size());
} catch (Throwable t) {
cleanedUp = true;
log.debug("Exceptional cleanup");
cleanup(true);
@ -185,8 +208,8 @@ public List<Integer> getContactsByCustomer(String customerName) throws Exception
try {
Session session = getSessions().getCurrentSession();
List results = session.createQuery(selectHQL).setFlushMode(FlushMode.AUTO).setParameter("cName", customerName)
.list();
List results = session.createQuery(selectHQL).setFlushMode(FlushMode.AUTO).setParameter(
"cName", customerName).list();
tm.commit();
return results;
} catch (Exception e) {
@ -203,7 +226,8 @@ public List<Integer> getContactsByTLF(String tlf) throws Exception {
try {
Session session = getSessions().getCurrentSession();
List results = session.createQuery(selectHQL).setFlushMode(FlushMode.AUTO).setParameter("cTLF", tlf).list();
List results = session.createQuery(selectHQL).setFlushMode(FlushMode.AUTO).setParameter(
"cTLF", tlf).list();
tm.commit();
return results;
} catch (Exception e) {
@ -214,13 +238,30 @@ public List<Integer> getContactsByTLF(String tlf) throws Exception {
public int updateContacts(String name, String newTLF) throws Exception {
String updateHQL = "update Contact set tlf = :cNewTLF where name = :cName";
tm.begin();
try {
Session session = getSessions().getCurrentSession();
int rowsAffected = session.createQuery(updateHQL).setFlushMode(FlushMode.AUTO).setParameter("cNewTLF", newTLF)
.setParameter("cName", name).executeUpdate();
int rowsAffected = session.createQuery(updateHQL).setFlushMode(FlushMode.AUTO)
.setParameter("cNewTLF", newTLF).setParameter("cName", name).executeUpdate();
tm.commit();
return rowsAffected;
} catch (Exception e) {
tm.rollback();
throw e;
}
}
public int updateContactsWithOneManual(String name, String newTLF) throws Exception {
String queryHQL = "from Contact c where c.name = :cName";
String updateHQL = "update Contact set tlf = :cNewTLF where name = :cName";
tm.begin();
try {
Session session = getSessions().getCurrentSession();
@SuppressWarnings("unchecked")
List<Contact> list = session.createQuery(queryHQL).setParameter("cName", name).list();
list.get(0).setTlf(newTLF);
int rowsAffected = session.createQuery(updateHQL).setFlushMode(FlushMode.AUTO)
.setParameter("cNewTLF", newTLF).setParameter("cName", name).executeUpdate();
tm.commit();
return rowsAffected;
} catch (Exception e) {
@ -290,7 +331,7 @@ private Customer createCustomer(int id) throws Exception {
s.persist(customer);
s.getTransaction().commit();
s.close();
return customer;
} finally {
System.out.println("CREATE CUSTOMER " + id + " - END");

View File

@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Set;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
@ -50,7 +51,7 @@ public void clear() {
@CacheEntryModified
public void nodeModified(CacheEntryModifiedEvent event) {
if (!event.isPre()) {
if (!event.isPre() && !CacheHelper.isEvictAllNotification(event.getKey())) {
Object key = event.getKey();
log.info("Modified node " + key);
modified.add(key.toString());
@ -59,7 +60,7 @@ public void nodeModified(CacheEntryModifiedEvent event) {
@CacheEntryCreated
public void nodeCreated(CacheEntryCreatedEvent event) {
if (!event.isPre()) {
if (!event.isPre() && !CacheHelper.isEvictAllNotification(event.getKey())) {
Object key = event.getKey();
log.info("Created node " + key);
modified.add(key.toString());
@ -68,7 +69,7 @@ public void nodeCreated(CacheEntryCreatedEvent event) {
@CacheEntryVisited
public void nodeVisited(CacheEntryVisitedEvent event) {
if (!event.isPre()) {
if (!event.isPre() && !CacheHelper.isEvictAllNotification(event.getKey())) {
Object key = event.getKey();
log.info("Visited node " + key);
accessed.add(key.toString());

View File

@ -198,7 +198,7 @@ protected void queryTest(boolean useNamedRegion) throws Exception {
// Sleep a bit to allow async repl to happen
sleep(SLEEP_TIME);
assertEquals("Query cache used", 1, remoteQueryListener.getSawRegionModificationCount());
remoteQueryListener.clearSawRegionModification();
@ -207,12 +207,12 @@ protected void queryTest(boolean useNamedRegion) throws Exception {
assertEquals("63088 has correct # of accounts", 6, dao1.getCountForBranch(branch, useNamedRegion));
assertEquals("Query cache used", 1, remoteQueryListener.getSawRegionModificationCount());
remoteQueryListener.clearSawRegionModification();
sleep(SLEEP_TIME);
assertEquals("Query cache used", 1, localQueryListener.getSawRegionModificationCount());
localQueryListener.clearSawRegionModification();
log.info("First query on node 1 done");
// Sleep a bit to allow async repl to happen

View File

@ -21,7 +21,10 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.util.Set;
import org.hibernate.Session;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.cfg.Configuration;
import org.hibernate.cfg.Environment;
import org.hibernate.cfg.Mappings;

View File

@ -30,11 +30,11 @@
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cache.CacheKey;
import org.hibernate.cache.infinispan.util.CacheHelper;
import org.hibernate.test.cache.infinispan.functional.Contact;
import org.hibernate.test.cache.infinispan.functional.Customer;
import org.infinispan.Cache;
import org.infinispan.manager.CacheManager;
import org.infinispan.marshall.MarshalledValue;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
@ -143,8 +143,8 @@ public void testAll() throws Exception {
assertLoadedFromCache(remoteListener, ids.customerId, ids.contactIds);
// After modification, local cache should have been invalidated and hence should be empty
assertTrue(localCollectionCache.isEmpty());
assertTrue(localCustomerCache.isEmpty());
assertEquals(0, getValidKeyCount(localCollectionCache.keySet()));
assertEquals(0, getValidKeyCount(localCustomerCache.keySet()));
} catch (Exception e) {
log.error("Error", e);
throw e;
@ -307,6 +307,16 @@ private void assertLoadedFromCache(MyListener listener, Integer custId, Set cont
.contains("Customer.contacts#" + custId));
}
protected int getValidKeyCount(Set keys) {
int result = 0;
for (Object key : keys) {
if (!(CacheHelper.isEvictAllNotification(key))) {
result++;
}
}
return result;
}
@Listener
public static class MyListener {
private static final Logger log = LoggerFactory.getLogger(MyListener.class);
@ -329,8 +339,7 @@ public boolean isEmpty() {
public void nodeVisited(CacheEntryVisitedEvent event) {
log.debug(event.toString());
if (!event.isPre()) {
MarshalledValue mv = (MarshalledValue) event.getKey();
CacheKey cacheKey = (CacheKey) mv.get();
CacheKey cacheKey = (CacheKey) event.getKey();
Integer primKey = (Integer) cacheKey.getKey();
String key = (String) cacheKey.getEntityOrRoleName() + '#' + primKey;
log.debug("MyListener[" + name +"] - Visiting key " + key);

View File

@ -34,10 +34,11 @@
import org.hibernate.cache.Region;
import org.hibernate.cache.StandardQueryCache;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractGeneralDataRegionTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.infinispan.Cache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
@ -74,8 +75,8 @@ protected String getStandardRegionName(String regionPrefix) {
}
@Override
protected Cache getInfinispanCache(InfinispanRegionFactory regionFactory) {
return regionFactory.getCacheManager().getCache("local-query");
protected CacheAdapter getInfinispanCache(InfinispanRegionFactory regionFactory) {
return CacheAdapterImpl.newInstance(regionFactory.getCacheManager().getCache("local-query"));
}
@Override
@ -186,7 +187,7 @@ private void getDoesNotBlockPutTest() throws Exception {
assertEquals(VALUE1, region.get(KEY));
// final Fqn rootFqn = getRegionFqn(getStandardRegionName(REGION_PREFIX), REGION_PREFIX);
final Cache jbc = getInfinispanCache(regionFactory);
final CacheAdapter jbc = getInfinispanCache(regionFactory);
final CountDownLatch blockerLatch = new CountDownLatch(1);
final CountDownLatch writerLatch = new CountDownLatch(1);

View File

@ -29,8 +29,9 @@
import org.hibernate.cache.Region;
import org.hibernate.cache.UpdateTimestampsCache;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.test.cache.infinispan.AbstractGeneralDataRegionTestCase;
import org.infinispan.Cache;
/**
* Tests of TimestampsRegionImpl.
@ -55,8 +56,8 @@ protected Region createRegion(InfinispanRegionFactory regionFactory, String regi
}
@Override
protected Cache getInfinispanCache(InfinispanRegionFactory regionFactory) {
return regionFactory.getCacheManager().getCache("timestamps");
protected CacheAdapter getInfinispanCache(InfinispanRegionFactory regionFactory) {
return CacheAdapterImpl.newInstance(regionFactory.getCacheManager().getCache("timestamps"));
}
}

View File

@ -80,6 +80,8 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
Synchronization s = (Synchronization) synchronizations.get(i);
s.beforeCompletion();
}
runXaResourcePrepare();
status = Status.STATUS_COMMITTING;
@ -92,6 +94,8 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
throw new SystemException();
}
}
runXaResourceCommitTx();
status = Status.STATUS_COMMITTED;
@ -117,6 +121,8 @@ public void rollback() throws IllegalStateException, SystemException {
throw new SystemException();
}
}
runXaResourceRollback();
for (int i = 0; i < synchronizations.size(); i++) {
Synchronization s = (Synchronization) synchronizations.get(i);