HHH-10083 Support replicated and distributed caches

This commit is contained in:
Radim Vansa 2015-08-26 16:21:03 +02:00 committed by Galder Zamarreño
parent 5c36abf508
commit b9a2128709
59 changed files with 2393 additions and 708 deletions

View File

@ -59,6 +59,13 @@ test {
systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = '2lc-test-tcp.xml'
// systemProperties['log4j.configuration'] = 'file:/log4j/log4j-infinispan.xml'
enabled = true
// Without this I have trouble running specific test using --tests switch
doFirst {
filter.includePatterns.each {
include "${it.replaceAll('\\.', "\\${File.separator}")}.class"
}
filter.setIncludePatterns('*')
}
}
task packageTests(type: Jar) {

View File

@ -0,0 +1,52 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.UUID;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class FutureUpdateSynchronization extends InvocationAfterCompletion {
private static final Log log = LogFactory.getLog( FutureUpdateSynchronization.class );
private final UUID uuid = UUID.randomUUID();
private final Object key;
private final Object value;
public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, Object key, Object value) {
super(tc, cache, requiresTransaction);
this.key = key;
this.value = value;
}
public UUID getUuid() {
return uuid;
}
@Override
protected void invoke(boolean success, AdvancedCache cache) {
// Exceptions in #afterCompletion() are silently ignored, since the transaction
// is already committed in DB. However we must not return until we update the cache.
for (;;) {
try {
cache.put(key, new FutureUpdate(uuid, success ? value : null));
return;
}
catch (Exception e) {
log.error("Failure updating cache in afterCompletion, will retry", e);
}
}
}
}

View File

@ -28,15 +28,6 @@ public abstract class InvalidationCacheAccessDelegate implements AccessDelegate
protected final PutFromLoadValidator putValidator;
protected final AdvancedCache<Object, Object> writeCache;
public static InvalidationCacheAccessDelegate create(BaseRegion region, PutFromLoadValidator validator) {
if (region.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional()) {
return new TxInvalidationCacheAccessDelegate(region, validator);
}
else {
return new NonTxInvalidationCacheAccessDelegate(region, validator);
}
}
/**
* Create a new transactional access delegate instance.
*

View File

@ -13,12 +13,12 @@ import java.util.UUID;
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class Synchronization implements javax.transaction.Synchronization {
public class InvalidationSynchronization implements javax.transaction.Synchronization {
public final UUID uuid = UUID.randomUUID();
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
private final Object[] keys;
public Synchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
public InvalidationSynchronization(NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor, Object[] keys) {
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
this.keys = keys;
}

View File

@ -20,7 +20,7 @@ import org.infinispan.remoting.rpc.RpcManager;
* Non-transactional counterpart of {@link TxPutFromLoadInterceptor}.
* Invokes {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} for each invalidation from
* remote node ({@link BeginInvalidationCommand} and sends {@link EndInvalidationCommand} after the transaction
* is complete, with help of {@link Synchronization};
* is complete, with help of {@link InvalidationSynchronization};
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/

View File

@ -617,7 +617,7 @@ public class PutFromLoadValidator {
if (trace) {
log.tracef("Registering lock owner %s for %s: %s", lockOwnerToString(session), cache.getName(), Arrays.toString(keys));
}
Synchronization sync = new Synchronization(nonTxPutFromLoadInterceptor, keys);
InvalidationSynchronization sync = new InvalidationSynchronization(nonTxPutFromLoadInterceptor, keys);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
return sync.uuid;
}

View File

@ -0,0 +1,175 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.concurrent.TimeUnit;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneAccessDelegate implements AccessDelegate {
private static final Log log = LogFactory.getLog( TombstoneAccessDelegate.class );
protected final BaseTransactionalDataRegion region;
protected final AdvancedCache cache;
protected final AdvancedCache writeCache;
protected final AdvancedCache asyncWriteCache;
protected final AdvancedCache putFromLoadCache;
protected final boolean requiresTransaction;
public TombstoneAccessDelegate(BaseTransactionalDataRegion region) {
this.region = region;
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
this.asyncWriteCache = Caches.asyncWriteCache(cache, Flag.IGNORE_RETURN_VALUES);
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("For tombstone-based caching, invalidation cache is not allowed.");
}
if (configuration.transaction().transactionMode().isTransactional()) {
throw new IllegalArgumentException("Currently transactional caches are not supported.");
}
requiresTransaction = configuration.transaction().transactionMode().isTransactional()
&& !configuration.transaction().autoCommit();
}
@Override
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
if (txTimestamp < region.getLastRegionInvalidation() ) {
return null;
}
Object value = cache.get(key);
if (value instanceof Tombstone) {
return null;
}
else if (value instanceof FutureUpdate) {
return ((FutureUpdate) value).getValue();
}
else {
return value;
}
}
@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
return putFromLoad(session, key, value, txTimestamp, version, false);
}
@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException {
long lastRegionInvalidation = region.getLastRegionInvalidation();
if (txTimestamp < lastRegionInvalidation) {
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation);
return false;
}
if (minimalPutOverride) {
Object prev = cache.get(key);
if (prev instanceof Tombstone) {
Tombstone tombstone = (Tombstone) prev;
long lastTimestamp = tombstone.getLastTimestamp();
if (txTimestamp <= lastTimestamp) {
log.tracef("putFromLoad not executed since tx started at %d, before last invalidation finished = %d", txTimestamp, lastTimestamp);
return false;
}
}
else if (prev != null) {
log.tracef("putFromLoad not executed since cache contains %s", prev);
return false;
}
}
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
// when it is present in the container. TombstoneCallInterceptor will deal with this.
putFromLoadCache.put(key, new TombstoneUpdate(session.getTimestamp(), value));
return true;
}
@Override
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
write(session, key, value);
return true;
}
@Override
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
write(session, key, value);
return true;
}
protected void write(SessionImplementor session, Object key, Object value) {
TransactionCoordinator tc = session.getTransactionCoordinator();
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, writeCache, requiresTransaction, key, value);
// FutureUpdate is handled in TombstoneCallInterceptor
writeCache.put(key, new FutureUpdate(sync.getUuid(), null), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
tc.getLocalSynchronizations().registerSynchronization(sync);
}
@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
TombstoneSynchronization sync = new TombstoneSynchronization(transactionCoordinator, asyncWriteCache, requiresTransaction, region, key);
Tombstone tombstone = new Tombstone(sync.getUuid(), session.getTimestamp() + region.getTombstoneExpiration(), false);
writeCache.put(key, tombstone, region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
}
@Override
public void removeAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}
@Override
public void evict(Object key) throws CacheException {
writeCache.put(key, TombstoneUpdate.EVICT);
}
@Override
public void evictAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}
@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
}
@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
return false;
}
@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
return false;
}
}

View File

@ -0,0 +1,197 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.CloseableIterable;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
* and {@link org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion}
*
* The behaviour here also breaks notifications, which are not used for 2LC caches.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneCallInterceptor extends CallInterceptor {
private static final Log log = LogFactory.getLog( TombstoneCallInterceptor.class );
private static final UUID ZERO = new UUID(0, 0);
private AdvancedCache cache;
private final Metadata expiringMetadata;
public TombstoneCallInterceptor(long tombstoneExpiration) {
expiringMetadata = new EmbeddedMetadata.Builder().lifespan(tombstoneExpiration, TimeUnit.MILLISECONDS).build();
}
@Inject
public void injectDependencies(AdvancedCache cache) {
this.cache = cache;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(command.getKey());
if (e == null) {
return null;
}
Object value = command.getValue();
if (value instanceof TombstoneUpdate) {
return handleTombstoneUpdate(e, (TombstoneUpdate) value);
}
else if (value instanceof Tombstone) {
return handleTombstone(e, (Tombstone) value);
}
else if (value instanceof FutureUpdate) {
return handleFutureUpdate(e, (FutureUpdate) value, command);
}
else {
return super.visitPutKeyValueCommand(ctx, command);
}
}
private Object handleFutureUpdate(MVCCEntry e, FutureUpdate futureUpdate, PutKeyValueCommand command) {
Object storedValue = e.getValue();
if (storedValue instanceof FutureUpdate) {
FutureUpdate storedFutureUpdate = (FutureUpdate) storedValue;
if (futureUpdate.getUuid().equals(storedFutureUpdate.getUuid())) {
if (futureUpdate.getValue() != null) {
// transaction succeeded
setValue(e, futureUpdate.getValue());
}
else {
// transaction failed
setValue(e, storedFutureUpdate.getValue());
}
}
else {
// two conflicting updates
setValue(e, new FutureUpdate(ZERO, null));
e.setMetadata(expiringMetadata);
// Infinispan always commits the entry with data with the metadata provided to the command,
// However, in non-conflicting case we want to keep the value not expired
command.setMetadata(expiringMetadata);
}
}
else if (storedValue instanceof Tombstone){
return null;
}
else {
if (futureUpdate.getValue() != null) {
// The future update has disappeared (probably due to region invalidation) and
// the currently stored value was putFromLoaded (or is null).
// We cannot keep the possibly outdated value here but we cannot know that
// this command's value is the most up-to-date. Therefore, we'll remove
// the value and let future putFromLoad update it.
removeValue(e);
}
else {
// this is the pre-update
// change in logic: we don't keep the old value around anymore (for read-write strategy)
setValue(e, new FutureUpdate(futureUpdate.getUuid(), null));
}
}
return null;
}
private Object handleTombstone(MVCCEntry e, Tombstone tombstone) {
Object storedValue = e.getValue();
if (storedValue instanceof Tombstone) {
e.setChanged(true);
e.setValue(tombstone.merge((Tombstone) storedValue));
}
else {
setValue(e, tombstone);
}
return null;
}
protected Object handleTombstoneUpdate(MVCCEntry e, TombstoneUpdate tombstoneUpdate) {
Object storedValue = e.getValue();
Object value = tombstoneUpdate.getValue();
if (storedValue instanceof Tombstone) {
Tombstone tombstone = (Tombstone) storedValue;
if (tombstone.getLastTimestamp() < tombstoneUpdate.getTimestamp()) {
e.setChanged(true);
e.setValue(value);
}
}
else if (storedValue == null) {
// putFromLoad (putIfAbsent)
setValue(e, value);
}
else if (value == null) {
// evict
removeValue(e);
}
return null;
}
private Object setValue(MVCCEntry e, Object value) {
if (e.isRemoved()) {
e.setRemoved(false);
e.setCreated(true);
e.setValid(true);
}
else {
e.setChanged(true);
}
return e.setValue(value);
}
private void removeValue(MVCCEntry e) {
e.setRemoved(true);
e.setChanged(true);
e.setCreated(false);
e.setValid(false);
e.setValue(null);
}
@Override
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable {
Set<Flag> flags = command.getFlags();
int size = 0;
Map<Object, CacheEntry> contextEntries = ctx.getLookedUpEntries();
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null);
// In non-transactional caches we don't care about context
CloseableIterable<CacheEntry<Object, Object>> iterable = decoratedCache
.filterEntries(Tombstone.EXCLUDE_TOMBSTONES)
.converter(FutureUpdate.VALUE_EXTRACTOR);
try {
for (CacheEntry<Object, Object> entry : iterable) {
if (entry.getValue() != null && size++ == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
}
}
finally {
iterable.close();
}
return size;
}
}

View File

@ -0,0 +1,49 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneSynchronization<K> extends InvocationAfterCompletion {
private final UUID uuid = UUID.randomUUID();
private final BaseTransactionalDataRegion region;
private final K key;
public TombstoneSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, K key) {
super(tc, cache, requiresTransaction);
this.key = key;
this.region = region;
}
public UUID getUuid() {
return uuid;
}
public K getKey() {
return key;
}
@Override
public void beforeCompletion() {
}
@Override
public void invoke(boolean success, AdvancedCache cache) {
Tombstone tombstone = new Tombstone(uuid, region.nextTimestamp(), true);
cache.put(key, tombstone, region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
}

View File

@ -47,13 +47,12 @@ public class CollectionRegionImpl extends BaseTransactionalDataRegion implements
@Override
public CollectionRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {
checkAccessType( accessType );
getValidator();
AccessDelegate delegate = InvalidationCacheAccessDelegate.create(this, getValidator());
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
case READ_WRITE:
case TRANSACTIONAL:
return new CollectionAccess( this, delegate );
return new CollectionAccess( this, accessDelegate );
default:
throw new CacheException( "Unsupported access type [" + accessType.getExternalName() + "]" );
}

View File

@ -7,7 +7,7 @@
package org.hibernate.cache.infinispan.entity;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CacheKeysFactory;
@ -50,7 +50,7 @@ public class EntityRegionImpl extends BaseTransactionalDataRegion implements Ent
if ( !getCacheDataDescription().isMutable() ) {
accessType = AccessType.READ_ONLY;
}
InvalidationCacheAccessDelegate accessDelegate = InvalidationCacheAccessDelegate.create(this, getValidator());
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
return new ReadOnlyAccess( this, accessDelegate);

View File

@ -7,7 +7,7 @@
package org.hibernate.cache.infinispan.entity;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.spi.EntityRegion;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
@ -25,9 +25,9 @@ import org.hibernate.persister.entity.EntityPersister;
class ReadOnlyAccess implements EntityRegionAccessStrategy {
protected final EntityRegionImpl region;
protected final InvalidationCacheAccessDelegate delegate;
protected final AccessDelegate delegate;
ReadOnlyAccess(EntityRegionImpl region, InvalidationCacheAccessDelegate delegate) {
ReadOnlyAccess(EntityRegionImpl region, AccessDelegate delegate) {
this.region = region;
this.delegate = delegate;
}

View File

@ -7,7 +7,7 @@
package org.hibernate.cache.infinispan.entity;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
@ -20,7 +20,7 @@ import org.hibernate.engine.spi.SessionImplementor;
*/
class ReadWriteAccess extends ReadOnlyAccess {
ReadWriteAccess(EntityRegionImpl region, InvalidationCacheAccessDelegate delegate) {
ReadWriteAccess(EntityRegionImpl region, AccessDelegate delegate) {
super(region, delegate);
}

View File

@ -38,25 +38,18 @@ public abstract class BaseRegion implements Region {
private static final Log log = LogFactory.getLog( BaseRegion.class );
private enum InvalidateState {
INVALID, CLEARING, VALID
}
private final String name;
private final AdvancedCache localAndSkipLoadCache;
private final TransactionManager tm;
private final Object invalidationMutex = new Object();
private final AtomicReference<InvalidateState> invalidateState =
new AtomicReference<InvalidateState>( InvalidateState.VALID );
protected final String name;
protected final AdvancedCache cache;
protected final AdvancedCache localAndSkipLoadCache;
protected final TransactionManager tm;
private final RegionFactory factory;
protected final AdvancedCache cache;
protected volatile long lastRegionInvalidation = Long.MIN_VALUE;
protected int invalidations = 0;
private PutFromLoadValidator validator;
/**
/**
* Base region constructor.
*
* @param cache instance for the region
@ -146,32 +139,7 @@ public abstract class BaseRegion implements Region {
* @return true if the region is valid, false otherwise
*/
public boolean checkValid() {
boolean valid = isValid();
if ( !valid ) {
synchronized (invalidationMutex) {
if ( invalidateState.compareAndSet( InvalidateState.INVALID, InvalidateState.CLEARING ) ) {
try {
runInvalidation( getCurrentTransaction() != null );
log.tracef( "Transition state from CLEARING to VALID" );
invalidateState.compareAndSet(
InvalidateState.CLEARING, InvalidateState.VALID
);
}
catch ( Exception e ) {
if ( log.isTraceEnabled() ) {
log.trace( "Could not invalidate region: ", e );
}
}
}
}
valid = isValid();
}
return valid;
}
protected boolean isValid() {
return invalidateState.get() == InvalidateState.VALID;
return lastRegionInvalidation != Long.MAX_VALUE;
}
/**
@ -213,10 +181,31 @@ public abstract class BaseRegion implements Region {
* Invalidates the region.
*/
public void invalidateRegion() {
// this is called only from EvictAllCommand, we don't have any ongoing transaction
beginInvalidation();
endInvalidation();
}
public void beginInvalidation() {
if (log.isTraceEnabled()) {
log.trace( "Invalidate region: " + name );
log.trace( "Begin invalidating region: " + name );
}
synchronized (this) {
lastRegionInvalidation = Long.MAX_VALUE;
++invalidations;
}
runInvalidation(getCurrentTransaction() != null);
}
public void endInvalidation() {
synchronized (this) {
if (--invalidations == 0) {
lastRegionInvalidation = nextTimestamp();
}
}
if (log.isTraceEnabled()) {
log.trace( "End invalidating region: " + name );
}
invalidateState.set(InvalidateState.INVALID);
}
public TransactionManager getTransactionManager() {
@ -233,7 +222,7 @@ public abstract class BaseRegion implements Region {
return cache;
}
private Transaction getCurrentTransaction() {
protected Transaction getCurrentTransaction() {
try {
// Transaction manager could be null
return tm != null ? tm.getTransaction() : null;

View File

@ -6,15 +6,35 @@
*/
package org.hibernate.cache.infinispan.impl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.NonTxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.TombstoneAccessDelegate;
import org.hibernate.cache.infinispan.access.TombstoneCallInterceptor;
import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CacheKeysFactory;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.TransactionalDataRegion;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import javax.transaction.TransactionManager;
import java.util.List;
import java.util.Map;
/**
* Support for Inifinispan {@link org.hibernate.cache.spi.TransactionalDataRegion} implementors.
*
@ -24,26 +44,51 @@ import javax.transaction.TransactionManager;
*/
public abstract class BaseTransactionalDataRegion
extends BaseRegion implements TransactionalDataRegion {
private static final Log log = LogFactory.getLog( BaseTransactionalDataRegion.class );
private final CacheDataDescription metadata;
private final CacheKeysFactory cacheKeysFactory;
/**
* Base transactional region constructor
*
* @param cache instance to store transactional data
* @param name of the transactional region
protected final boolean useTombstones;
protected final long tombstoneExpiration;
protected final boolean requiresTransaction;
/**
* Base transactional region constructor
*
* @param cache instance to store transactional data
* @param name of the transactional region
* @param transactionManager
* @param metadata for the transactional region
* @param factory for the transactional region
* @param cacheKeysFactory factory for cache keys
*/
* @param metadata for the transactional region
* @param factory for the transactional region
* @param cacheKeysFactory factory for cache keys
*/
public BaseTransactionalDataRegion(
AdvancedCache cache, String name, TransactionManager transactionManager,
CacheDataDescription metadata, RegionFactory factory, CacheKeysFactory cacheKeysFactory) {
super( cache, name, transactionManager, factory);
this.metadata = metadata;
this.cacheKeysFactory = cacheKeysFactory;
Configuration configuration = cache.getCacheConfiguration();
CacheMode cacheMode = configuration.clustering().cacheMode();
useTombstones = cacheMode.isDistributed() || cacheMode.isReplicated();
// TODO: make these timeouts configurable
tombstoneExpiration = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
requiresTransaction = configuration.transaction().transactionMode().isTransactional()
&& !configuration.transaction().autoCommit();
if (useTombstones) {
if (configuration.eviction().maxEntries() >= 0) {
log.warn("Setting eviction on cache using tombstones can introduce inconsistencies!");
}
cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(tombstoneExpiration);
cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, TombstoneCallInterceptor.class);
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size());
}
}
@Override
@ -54,4 +99,82 @@ public abstract class BaseTransactionalDataRegion
public CacheKeysFactory getCacheKeysFactory() {
return cacheKeysFactory;
}
protected AccessDelegate createAccessDelegate() {
CacheMode cacheMode = cache.getCacheConfiguration().clustering().cacheMode();
if (cacheMode.isDistributed() || cacheMode.isReplicated()) {
return new TombstoneAccessDelegate(this);
}
else {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
return new TxInvalidationCacheAccessDelegate(this, getValidator());
}
else {
return new NonTxInvalidationCacheAccessDelegate(this, getValidator());
}
}
}
public long getTombstoneExpiration() {
return tombstoneExpiration;
}
public long getLastRegionInvalidation() {
return lastRegionInvalidation;
}
@Override
protected void runInvalidation(boolean inTransaction) {
if (!useTombstones) {
super.runInvalidation(inTransaction);
return;
}
// If the transaction is required, we simply need it -> will create our own
boolean startedTx = false;
if ( !inTransaction && requiresTransaction) {
try {
tm.begin();
startedTx = true;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
// We can never use cache.clear() since tombstones must be kept.
try {
AdvancedCache localCache = Caches.localCache(cache);
CloseableIterator<CacheEntry> it = Caches.entrySet(localCache, Tombstone.EXCLUDE_TOMBSTONES).iterator();
try {
while (it.hasNext()) {
// Cannot use it.next(); it.remove() due to ISPN-5653
CacheEntry entry = it.next();
localCache.remove(entry.getKey(), entry.getValue());
}
}
finally {
it.close();
}
}
finally {
if (startedTx) {
try {
tm.commit();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
@Override
public Map toMap() {
if (useTombstones) {
AdvancedCache localCache = Caches.localCache(cache);
return Caches.entrySet(localCache, Tombstone.EXCLUDE_TOMBSTONES, FutureUpdate.VALUE_EXTRACTOR).toMap();
}
else {
return super.toMap();
}
}
}

View File

@ -8,8 +8,6 @@ package org.hibernate.cache.infinispan.naturalid;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CacheKeysFactory;
@ -52,13 +50,13 @@ public class NaturalIdRegionImpl extends BaseTransactionalDataRegion
if (!getCacheDataDescription().isMutable()) {
accessType = AccessType.READ_ONLY;
}
AccessDelegate delegate = InvalidationCacheAccessDelegate.create( this, getValidator());
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
return new ReadOnlyAccess( this, delegate );
return new ReadOnlyAccess( this, accessDelegate );
case READ_WRITE:
case TRANSACTIONAL:
return new ReadWriteAccess( this, delegate );
return new ReadWriteAccess( this, accessDelegate );
default:
throw new CacheException( "Unsupported access type [" + accessType.getExternalName() + "]" );
}

View File

@ -8,18 +8,14 @@ package org.hibernate.cache.infinispan.query;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import org.hibernate.HibernateException;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.jdbc.WorkExecutor;
import org.hibernate.jdbc.WorkExecutorVisitable;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.TransactionConfiguration;
@ -28,8 +24,6 @@ import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -106,14 +100,6 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
@Override
public Object get(SessionImplementor session, Object key) throws CacheException {
// If the region is not valid, skip cache store to avoid going remote to retrieve the query.
// The aim of this is to maintain same logic/semantics as when state transfer was configured.
// TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround
boolean skipCacheStore = false;
if ( !isValid() ) {
skipCacheStore = true;
}
if ( !checkValid() ) {
return null;
}
@ -129,12 +115,7 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
result = map.get(key);
}
if (result == null) {
if ( skipCacheStore ) {
result = getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key );
}
else {
result = getCache.get( key );
}
result = getCache.get( key );
}
return result;
}
@ -178,51 +159,28 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
}
}
private class PostTransactionQueryUpdate implements Synchronization {
private final TransactionCoordinator tc;
private class PostTransactionQueryUpdate extends InvocationAfterCompletion {
private final SessionImplementor session;
private final Object key;
private final Object value;
public PostTransactionQueryUpdate(TransactionCoordinator tc, SessionImplementor session, Object key, Object value) {
this.tc = tc;
super(tc, putCache, putCacheRequiresTransaction);
this.session = session;
this.key = key;
this.value = value;
}
@Override
public void beforeCompletion() {
public void afterCompletion(int status) {
transactionContext.remove(session);
super.afterCompletion(status);
}
@Override
public void afterCompletion(int status) {
transactionContext.remove(session);
switch (status) {
case Status.STATUS_COMMITTING:
case Status.STATUS_COMMITTED:
try {
// TODO: isolation without obtaining Connection
tc.createIsolationDelegate().delegateWork(new WorkExecutorVisitable<Void>() {
@Override
public Void accept(WorkExecutor<Void> executor, Connection connection) throws SQLException {
putCache.put(key, value);
return null;
}
}
, putCacheRequiresTransaction);
}
catch (HibernateException e) {
// silently fail any exceptions
if (log.isTraceEnabled()) {
log.trace("Exception during query cache update", e);
}
}
break;
default:
// it would be nicer to react only on ROLLING_BACK and ROLLED_BACK statuses
// but TransactionCoordinator gives us UNKNOWN on rollback
break;
protected void invoke(boolean success, AdvancedCache cache) {
if (success) {
cache.put(key, value);
}
}
}

View File

@ -67,21 +67,8 @@ public class ClusteredTimestampsRegionImpl extends TimestampsRegionImpl {
public Object get(SessionImplementor session, Object key) throws CacheException {
Object value = localCache.get( key );
// If the region is not valid, skip cache store to avoid going remote to retrieve the query.
// The aim of this is to maintain same logic/semantics as when state transfer was configured.
// TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround
boolean skipCacheStore = false;
if ( !isValid() ) {
skipCacheStore = true;
}
if ( value == null && checkValid() ) {
if ( skipCacheStore ) {
value = cache.withFlags( Flag.SKIP_CACHE_STORE ).get( key );
}
else {
value = cache.get( key );
}
value = cache.get( key );
if ( value != null ) {
localCache.put( key, value );

View File

@ -6,9 +6,9 @@
*/
package org.hibernate.cache.infinispan.util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.transaction.Status;
@ -20,6 +20,8 @@ import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.filter.AcceptAllKeyValueFilter;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.filter.NullValueConverter;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
@ -36,17 +38,17 @@ public class Caches {
// Suppresses default constructor, ensuring non-instantiability.
}
/**
* Call an operation within a transaction. This method guarantees that the
* right pattern is used to make sure that the transaction is always either
* committed or rollback.
*
* @param cache instance whose transaction manager to use
* @param c callable instance to run within a transaction
* @param <T> type of callable return
* @return returns whatever the callable returns
* @throws Exception if any operation within the transaction fails
*/
/**
* Call an operation within a transaction. This method guarantees that the
* right pattern is used to make sure that the transaction is always either
* committed or rollback.
*
* @param cache instance whose transaction manager to use
* @param c callable instance to run within a transaction
* @param <T> type of callable return
* @return returns whatever the callable returns
* @throws Exception if any operation within the transaction fails
*/
public static <T> T withinTx(
AdvancedCache cache,
Callable<T> c) throws Exception {
@ -54,17 +56,17 @@ public class Caches {
return withinTx( cache.getTransactionManager(), c );
}
/**
* Call an operation within a transaction. This method guarantees that the
* right pattern is used to make sure that the transaction is always either
* committed or rollbacked.
*
* @param tm transaction manager
* @param c callable instance to run within a transaction
* @param <T> type of callable return
* @return returns whatever the callable returns
* @throws Exception if any operation within the transaction fails
*/
/**
* Call an operation within a transaction. This method guarantees that the
* right pattern is used to make sure that the transaction is always either
* committed or rollbacked.
*
* @param tm transaction manager
* @param c callable instance to run within a transaction
* @param <T> type of callable return
* @return returns whatever the callable returns
* @throws Exception if any operation within the transaction fails
*/
public static <T> T withinTx(
TransactionManager tm,
Callable<T> c) throws Exception {
@ -106,36 +108,36 @@ public class Caches {
});
}
/**
* Transform a given cache into a local cache
*
* @param cache to be transformed
* @return a cache that operates only in local-mode
*/
/**
* Transform a given cache into a local cache
*
* @param cache to be transformed
* @return a cache that operates only in local-mode
*/
public static AdvancedCache localCache(AdvancedCache cache) {
return cache.withFlags( Flag.CACHE_MODE_LOCAL );
}
/**
* Transform a given cache into a cache that ignores return values for
* operations returning previous values, i.e. {@link AdvancedCache#put(Object, Object)}
*
* @param cache to be transformed
* @return a cache that ignores return values
*/
/**
* Transform a given cache into a cache that ignores return values for
* operations returning previous values, i.e. {@link AdvancedCache#put(Object, Object)}
*
* @param cache to be transformed
* @return a cache that ignores return values
*/
public static AdvancedCache ignoreReturnValuesCache(AdvancedCache cache) {
return cache.withFlags( Flag.SKIP_CACHE_LOAD, Flag.SKIP_REMOTE_LOOKUP, Flag.IGNORE_RETURN_VALUES );
}
/**
* Transform a given cache into a cache that ignores return values for
* operations returning previous values, i.e. {@link AdvancedCache#put(Object, Object)},
* adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to add to the returned cache
* @return a cache that ignores return values
*/
/**
* Transform a given cache into a cache that ignores return values for
* operations returning previous values, i.e. {@link AdvancedCache#put(Object, Object)},
* adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to add to the returned cache
* @return a cache that ignores return values
*/
public static AdvancedCache ignoreReturnValuesCache(
AdvancedCache cache, Flag extraFlag) {
return cache.withFlags(
@ -143,14 +145,14 @@ public class Caches {
);
}
/**
* Transform a given cache into a cache that writes cache entries without
* waiting for them to complete, adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to add to the returned cache
* @return a cache that writes asynchronously
*/
/**
* Transform a given cache into a cache that writes cache entries without
* waiting for them to complete, adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to add to the returned cache
* @return a cache that writes asynchronously
*/
public static AdvancedCache asyncWriteCache(
AdvancedCache cache,
Flag extraFlag) {
@ -162,12 +164,12 @@ public class Caches {
);
}
/**
* Transform a given cache into a cache that fails silently if cache writes fail.
*
* @param cache to be transformed
* @return a cache that fails silently if cache writes fail
*/
/**
* Transform a given cache into a cache that fails silently if cache writes fail.
*
* @param cache to be transformed
* @return a cache that fails silently if cache writes fail
*/
public static AdvancedCache failSilentWriteCache(AdvancedCache cache) {
return cache.withFlags(
Flag.FAIL_SILENTLY,
@ -177,14 +179,14 @@ public class Caches {
);
}
/**
* Transform a given cache into a cache that fails silently if
* cache writes fail, adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to be added to returned cache
* @return a cache that fails silently if cache writes fail
*/
/**
* Transform a given cache into a cache that fails silently if
* cache writes fail, adding an extra flag.
*
* @param cache to be transformed
* @param extraFlag to be added to returned cache
* @return a cache that fails silently if cache writes fail
*/
public static AdvancedCache failSilentWriteCache(
AdvancedCache cache,
Flag extraFlag) {
@ -197,13 +199,13 @@ public class Caches {
);
}
/**
* Transform a given cache into a cache that fails silently if
* cache reads fail.
*
* @param cache to be transformed
* @return a cache that fails silently if cache reads fail
*/
/**
* Transform a given cache into a cache that fails silently if
* cache reads fail.
*
* @param cache to be transformed
* @return a cache that fails silently if cache reads fail
*/
public static AdvancedCache failSilentReadCache(AdvancedCache cache) {
return cache.withFlags(
Flag.FAIL_SILENTLY,
@ -211,11 +213,11 @@ public class Caches {
);
}
/**
* Broadcast an evict-all command with the given cache instance.
*
* @param cache instance used to broadcast command
*/
/**
* Broadcast an evict-all command with the given cache instance.
*
* @param cache instance used to broadcast command
*/
public static void broadcastEvictAll(AdvancedCache cache) {
final RpcManager rpcManager = cache.getRpcManager();
if ( rpcManager != null ) {
@ -230,41 +232,41 @@ public class Caches {
}
}
/**
* Indicates whether the given cache is configured with
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_ASYNC} or
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_SYNC}.
*
* @param cache to check for invalidation configuration
* @return true if the cache is configured with invalidation, false otherwise
*/
/**
* Indicates whether the given cache is configured with
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_ASYNC} or
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_SYNC}.
*
* @param cache to check for invalidation configuration
* @return true if the cache is configured with invalidation, false otherwise
*/
public static boolean isInvalidationCache(AdvancedCache cache) {
return cache.getCacheConfiguration()
.clustering().cacheMode().isInvalidation();
}
/**
* Indicates whether the given cache is configured with
* {@link org.infinispan.configuration.cache.CacheMode#REPL_SYNC},
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_SYNC}, or
* {@link org.infinispan.configuration.cache.CacheMode#DIST_SYNC}.
*
* @param cache to check for synchronous configuration
* @return true if the cache is configured with synchronous mode, false otherwise
*/
/**
* Indicates whether the given cache is configured with
* {@link org.infinispan.configuration.cache.CacheMode#REPL_SYNC},
* {@link org.infinispan.configuration.cache.CacheMode#INVALIDATION_SYNC}, or
* {@link org.infinispan.configuration.cache.CacheMode#DIST_SYNC}.
*
* @param cache to check for synchronous configuration
* @return true if the cache is configured with synchronous mode, false otherwise
*/
public static boolean isSynchronousCache(AdvancedCache cache) {
return cache.getCacheConfiguration()
.clustering().cacheMode().isSynchronous();
}
/**
* Indicates whether the given cache is configured to cluster its contents.
* A cache is considered to clustered if it's configured with any cache mode
* except {@link org.infinispan.configuration.cache.CacheMode#LOCAL}
*
* @param cache to check whether it clusters its contents
* @return true if the cache is configured with clustering, false otherwise
*/
/**
* Indicates whether the given cache is configured to cluster its contents.
* A cache is considered to clustered if it's configured with any cache mode
* except {@link org.infinispan.configuration.cache.CacheMode#LOCAL}
*
* @param cache to check whether it clusters its contents
* @return true if the cache is configured with clustering, false otherwise
*/
public static boolean isClustered(AdvancedCache cache) {
return cache.getCacheConfiguration()
.clustering().cacheMode().isClustered();
@ -291,89 +293,236 @@ public class Caches {
/**
* This interface is provided for convenient fluent use of CloseableIterable
*/
public interface CollectableCloseableIterable extends CloseableIterable {
Set toSet();
public interface CollectableCloseableIterable<T> extends CloseableIterable<T> {
Set<T> toSet();
}
public static CollectableCloseableIterable keys(AdvancedCache cache) {
public interface MapCollectableCloseableIterable<K, V> extends CloseableIterable<CacheEntry<K, V>> {
Map<K, V> toMap();
}
public static <K, V> CollectableCloseableIterable<K> keys(AdvancedCache<K, V> cache) {
return keys(cache, (KeyValueFilter<K, V>) AcceptAllKeyValueFilter.getInstance());
}
public static <K, V> CollectableCloseableIterable<K> keys(AdvancedCache<K, V> cache, KeyValueFilter<K, V> filter) {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
// Dummy read to enlist the LocalTransaction as workaround for ISPN-5676
cache.containsKey(false);
}
// HHH-10023: we can't use keySet()
final CloseableIterable<CacheEntry<Object, Void>> entryIterable = cache
.filterEntries( AcceptAllKeyValueFilter.getInstance() )
final CloseableIterable<CacheEntry<K, Void>> entryIterable = cache
.filterEntries( filter )
.converter( NullValueConverter.getInstance() );
return new CollectableCloseableIterable() {
return new CollectableCloseableIterableImpl<K, Void, K>(entryIterable, Selector.KEY);
}
public static <K, V> CollectableCloseableIterable<V> values(AdvancedCache<K, V> cache) {
return values(cache, (KeyValueFilter<K, V>) AcceptAllKeyValueFilter.getInstance());
}
public static <K, V> CollectableCloseableIterable<V> values(AdvancedCache<K, V> cache, KeyValueFilter<K, V> filter) {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
// Dummy read to enlist the LocalTransaction as workaround for ISPN-5676
cache.containsKey(false);
}
// HHH-10023: we can't use values()
final CloseableIterable<CacheEntry<K, V>> entryIterable = cache.filterEntries(filter);
return new CollectableCloseableIterableImpl<K, V, V>(entryIterable, Selector.VALUE);
}
public static <K, V, T> CollectableCloseableIterable<T> values(AdvancedCache<K, V> cache, KeyValueFilter<K, V> filter, Converter<K, V, T> converter) {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
// Dummy read to enlist the LocalTransaction as workaround for ISPN-5676
cache.containsKey(false);
}
// HHH-10023: we can't use values()
final CloseableIterable<CacheEntry<K, T>> entryIterable = cache.filterEntries(filter).converter(converter);
return new CollectableCloseableIterableImpl<K, T, T>(entryIterable, Selector.VALUE);
}
public static <K, V> MapCollectableCloseableIterable<K, V> entrySet(AdvancedCache<K, V> cache) {
return entrySet(cache, (KeyValueFilter<K, V>) AcceptAllKeyValueFilter.getInstance());
}
public static <K, V> MapCollectableCloseableIterable<K, V> entrySet(AdvancedCache<K, V> cache, KeyValueFilter<K, V> filter) {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
// Dummy read to enlist the LocalTransaction as workaround for ISPN-5676
cache.containsKey(false);
}
// HHH-10023: we can't use values()
final CloseableIterable<CacheEntry<K, V>> entryIterable = cache.filterEntries(filter);
return new MapCollectableCloseableIterableImpl<K, V>(entryIterable);
}
public static <K, V, T> MapCollectableCloseableIterable<K, T> entrySet(AdvancedCache<K, V> cache, KeyValueFilter<K, V> filter, Converter<K, V, T> converter) {
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
// Dummy read to enlist the LocalTransaction as workaround for ISPN-5676
cache.containsKey(false);
}
// HHH-10023: we can't use values()
final CloseableIterable<CacheEntry<K, T>> entryIterable = cache.filterEntries(filter).converter(converter);
return new MapCollectableCloseableIterableImpl<K, T>(entryIterable);
}
/* Function<CacheEntry<K, V>, T> */
private interface Selector<K, V, T> {
Selector KEY = new Selector<Object, Void, Object>() {
@Override
public void close() {
entryIterable.close();
}
@Override
public CloseableIterator iterator() {
final CloseableIterator<CacheEntry<Object, Void>> entryIterator = entryIterable.iterator();
return new CloseableIterator() {
@Override
public void close() {
entryIterator.close();
}
@Override
public boolean hasNext() {
return entryIterator.hasNext();
}
@Override
public Object next() {
return entryIterator.next().getKey();
}
@Override
public void remove() {
throw new UnsupportedOperationException( "remove() not supported" );
}
};
}
@Override
public String toString() {
CloseableIterator<CacheEntry<Object, Void>> it = entryIterable.iterator();
try {
if (!it.hasNext()) {
return "[]";
}
StringBuilder sb = new StringBuilder();
sb.append('[');
for (; ; ) {
CacheEntry<Object, Void> entry = it.next();
sb.append(entry.getKey());
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(',').append(' ');
}
}
finally {
it.close();
}
}
@Override
public Set toSet() {
HashSet set = new HashSet();
CloseableIterator it = iterator();
try {
while (it.hasNext()) {
set.add(it.next());
}
}
finally {
it.close();
}
return set;
public Object apply(CacheEntry<Object, Void> entry) {
return entry.getKey();
}
};
Selector VALUE = new Selector<Object, Object, Object>() {
@Override
public Object apply(CacheEntry<Object, Object> entry) {
return entry.getValue();
}
};
T apply(CacheEntry<K, V> entry);
}
private static class CollectableCloseableIterableImpl<K, V, T> implements CollectableCloseableIterable<T> {
private final CloseableIterable<CacheEntry<K, V>> entryIterable;
private final Selector<K, V, T> selector;
public CollectableCloseableIterableImpl(CloseableIterable<CacheEntry<K, V>> entryIterable, Selector<K, V, T> selector) {
this.entryIterable = entryIterable;
this.selector = selector;
}
@Override
public void close() {
entryIterable.close();
}
@Override
public CloseableIterator<T> iterator() {
final CloseableIterator<CacheEntry<K, V>> entryIterator = entryIterable.iterator();
return new CloseableIterator<T>() {
@Override
public void close() {
entryIterator.close();
}
@Override
public boolean hasNext() {
return entryIterator.hasNext();
}
@Override
public T next() {
return selector.apply(entryIterator.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException( "remove() not supported" );
}
};
}
@Override
public String toString() {
CloseableIterator<CacheEntry<K, V>> it = entryIterable.iterator();
try {
if (!it.hasNext()) {
return "[]";
}
StringBuilder sb = new StringBuilder();
sb.append('[');
for (; ; ) {
CacheEntry<K, V> entry = it.next();
sb.append(selector.apply(entry));
if (!it.hasNext()) {
return sb.append(']').toString();
}
sb.append(',').append(' ');
}
}
finally {
it.close();
}
}
@Override
public Set toSet() {
HashSet set = new HashSet();
CloseableIterator it = iterator();
try {
while (it.hasNext()) {
set.add(it.next());
}
}
finally {
it.close();
}
return set;
}
}
private static class MapCollectableCloseableIterableImpl<K, V> implements MapCollectableCloseableIterable<K, V> {
private final CloseableIterable<CacheEntry<K, V>> entryIterable;
public MapCollectableCloseableIterableImpl(CloseableIterable<CacheEntry<K, V>> entryIterable) {
this.entryIterable = entryIterable;
}
@Override
public Map<K, V> toMap() {
Map<K, V> map = new HashMap<K, V>();
CloseableIterator<CacheEntry<K, V>> it = entryIterable.iterator();
try {
while (it.hasNext()) {
CacheEntry<K, V> entry = it.next();
V value = entry.getValue();
if (value != null) {
map.put(entry.getKey(), value);
}
}
return map;
}
finally {
it.close();
}
}
@Override
public String toString() {
CloseableIterator<CacheEntry<K, V>> it = entryIterable.iterator();
try {
if (!it.hasNext()) {
return "{}";
}
StringBuilder sb = new StringBuilder();
sb.append('{');
for (; ; ) {
CacheEntry<K, V> entry = it.next();
sb.append(entry.getKey()).append('=').append(entry.getValue());
if (!it.hasNext()) {
return sb.append('}').toString();
}
sb.append(',').append(' ');
}
}
finally {
it.close();
}
}
@Override
public void close() {
entryIterable.close();
}
@Override
public CloseableIterator<CacheEntry<K, V>> iterator() {
return entryIterable.iterator();
}
}
}

View File

@ -7,7 +7,6 @@
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.commons.util.Util;
import java.io.IOException;
import java.io.ObjectInput;
@ -22,9 +21,19 @@ import java.util.UUID;
public class Externalizers {
public final static int UUID = 1200;
public final static int TOMBSTONE = 1201;
public final static int EXCLUDE_TOMBSTONES_FILTER = 1202;
public final static int TOMBSTONE_UPDATE = 1203;
public final static int FUTURE_UPDATE = 1204;
public final static int VALUE_EXTRACTOR = 1205;
public final static AdvancedExternalizer[] ALL_EXTERNALIZERS = new AdvancedExternalizer[] {
new UUIDExternalizer()
new UUIDExternalizer(),
new Tombstone.Externalizer(),
new Tombstone.ExcludeTombstonesFilterExternalizer(),
new TombstoneUpdate.Externalizer(),
new FutureUpdate.Externalizer(),
new FutureUpdate.ValueExtractorExternalizer()
};
public static class UUIDExternalizer implements AdvancedExternalizer<UUID> {

View File

@ -0,0 +1,109 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.filter.Converter;
import org.infinispan.metadata.Metadata;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
/**
* This value can be overwritten only by an entity with the same uuid
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class FutureUpdate {
public static final ValueExtractor VALUE_EXTRACTOR = new ValueExtractor();
private final UUID uuid;
private final Object value;
public FutureUpdate(UUID uuid, Object value) {
this.uuid = uuid;
this.value = value;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("FutureUpdate{");
sb.append("uuid=").append(uuid);
sb.append(", value=").append(value);
sb.append('}');
return sb.toString();
}
public UUID getUuid() {
return uuid;
}
public Object getValue() {
return value;
}
public static class Externalizer implements AdvancedExternalizer<FutureUpdate> {
@Override
public void writeObject(ObjectOutput output, FutureUpdate object) throws IOException {
output.writeLong(object.uuid.getMostSignificantBits());
output.writeLong(object.uuid.getLeastSignificantBits());
output.writeObject(object.value);
}
@Override
public FutureUpdate readObject(ObjectInput input) throws IOException, ClassNotFoundException {
long msb = input.readLong();
long lsb = input.readLong();
return new FutureUpdate(new UUID(msb, lsb), input.readObject());
}
@Override
public Set<Class<? extends FutureUpdate>> getTypeClasses() {
return Collections.<Class<? extends FutureUpdate>>singleton(FutureUpdate.class);
}
@Override
public Integer getId() {
return Externalizers.FUTURE_UPDATE;
}
}
public static class ValueExtractor implements Converter {
private ValueExtractor() {}
@Override
public Object convert(Object key, Object value, Metadata metadata) {
return value instanceof FutureUpdate ? ((FutureUpdate) value).getValue() : value;
}
}
public static class ValueExtractorExternalizer implements AdvancedExternalizer<ValueExtractor> {
@Override
public Set<Class<? extends ValueExtractor>> getTypeClasses() {
return Collections.<Class<? extends ValueExtractor>>singleton(ValueExtractor.class);
}
@Override
public Integer getId() {
return Externalizers.VALUE_EXTRACTOR;
}
@Override
public void writeObject(ObjectOutput output, ValueExtractor object) throws IOException {
}
@Override
public ValueExtractor readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return VALUE_EXTRACTOR;
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.HibernateException;
import org.hibernate.jdbc.WorkExecutor;
import org.hibernate.jdbc.WorkExecutorVisitable;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import java.sql.Connection;
import java.sql.SQLException;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public abstract class InvocationAfterCompletion implements Synchronization {
protected static final Log log = LogFactory.getLog( InvocationAfterCompletion.class );
protected final TransactionCoordinator tc;
protected final AdvancedCache cache;
protected final boolean requiresTransaction;
public InvocationAfterCompletion(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction) {
this.tc = tc;
this.cache = cache;
this.requiresTransaction = requiresTransaction;
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCompletion(int status) {
switch (status) {
case Status.STATUS_COMMITTING:
case Status.STATUS_COMMITTED:
invokeIsolated(true);
break;
default:
// it would be nicer to react only on ROLLING_BACK and ROLLED_BACK statuses
// but TransactionCoordinator gives us UNKNOWN on rollback
invokeIsolated(false);
break;
}
}
protected void invokeIsolated(final boolean success) {
try {
// TODO: isolation without obtaining Connection -> needs HHH-9993
tc.createIsolationDelegate().delegateWork(new WorkExecutorVisitable<Void>() {
@Override
public Void accept(WorkExecutor<Void> executor, Connection connection) throws SQLException {
invoke(success, cache);
return null;
}
}, requiresTransaction);
}
catch (HibernateException e) {
// silently fail any exceptions
if (log.isTraceEnabled()) {
log.trace("Exception during query cache update", e);
}
}
}
protected abstract void invoke(boolean success, AdvancedCache cache);
}

View File

@ -0,0 +1,162 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.metadata.Metadata;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class Tombstone {
public static final ExcludeTombstonesFilter EXCLUDE_TOMBSTONES = new ExcludeTombstonesFilter();
// when release == true and UUID is not found, don't insert anything, because this is a release delta
private final boolean release;
// the format of data is repeated (timestamp, UUID.LSB, UUID.MSB)
private final long[] data;
public Tombstone(UUID uuid, long timestamp, boolean release) {
this.data = new long[] { timestamp, uuid.getLeastSignificantBits(), uuid.getMostSignificantBits() };
this.release = release;
}
private Tombstone(long[] data, boolean release) {
this.data = data;
this.release = release;
}
public long getLastTimestamp() {
long max = data[0];
for (int i = 3; i < data.length; i += 3) {
max = Math.max(max, data[i]);
}
return max;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Tombstone{");
sb.append("release=").append(release);
sb.append(", data={");
for (int i = 0; i < data.length; i += 3) {
if (i != 0) {
sb.append(", ");
}
sb.append(new UUID(data[i + 2], data[i + 1])).append('=').append(data[i]);
}
sb.append("} }");
return sb.toString();
}
public Tombstone merge(Tombstone old) {
assert old != null;
assert data.length == 3;
if (release) {
int toRemove = 0;
for (int i = 0; i < old.data.length; i += 3) {
if (old.data[i] < data[0] || (data[1] == old.data[i + 1] && data[2] == old.data[i + 2])) {
toRemove += 3;
}
}
if (old.data.length == toRemove) {
// we want to remove all, but we need to keep at least ourselves
return this;
}
else {
long[] newData = new long[old.data.length - toRemove];
int j = 0;
for (int i = 0; i < old.data.length; i += 3) {
if (old.data[i] >= data[0] && (data[1] != old.data[i + 1] || data[2] != old.data[i + 2])) {
newData[j] = old.data[i];
newData[j + 1] = old.data[i + 1];
newData[j + 2] = old.data[i + 2];
j += 3;
}
}
return new Tombstone(newData, false);
}
}
else {
long[] newData = Arrays.copyOf(old.data, old.data.length + 3);
System.arraycopy(data, 0, newData, old.data.length, 3);
return new Tombstone(newData, false);
}
}
public static class Externalizer implements AdvancedExternalizer<Tombstone> {
@Override
public Set<Class<? extends Tombstone>> getTypeClasses() {
return Collections.<Class<? extends Tombstone>>singleton(Tombstone.class);
}
@Override
public Integer getId() {
return Externalizers.TOMBSTONE;
}
@Override
public void writeObject(ObjectOutput output, Tombstone tombstone) throws IOException {
output.writeBoolean(tombstone.release);
output.writeInt(tombstone.data.length);
for (int i = 0; i < tombstone.data.length; ++i) {
output.writeLong(tombstone.data[i]);
}
}
@Override
public Tombstone readObject(ObjectInput input) throws IOException, ClassNotFoundException {
boolean release = input.readBoolean();
int length = input.readInt();
long[] data = new long[length];
for (int i = 0; i < data.length; ++i) {
data[i] = input.readLong();
}
return new Tombstone(data, release);
// return INSTANCE;
}
}
public static class ExcludeTombstonesFilter implements KeyValueFilter {
private ExcludeTombstonesFilter() {}
@Override
public boolean accept(Object key, Object value, Metadata metadata) {
return !(value instanceof Tombstone);
}
}
public static class ExcludeTombstonesFilterExternalizer implements AdvancedExternalizer<ExcludeTombstonesFilter> {
@Override
public Set<Class<? extends ExcludeTombstonesFilter>> getTypeClasses() {
return Collections.<Class<? extends ExcludeTombstonesFilter>>singleton(ExcludeTombstonesFilter.class);
}
@Override
public Integer getId() {
return Externalizers.EXCLUDE_TOMBSTONES_FILTER;
}
@Override
public void writeObject(ObjectOutput output, ExcludeTombstonesFilter object) throws IOException {
}
@Override
public ExcludeTombstonesFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return EXCLUDE_TOMBSTONES;
}
}
}

View File

@ -0,0 +1,78 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneUpdate<T> {
public static TombstoneUpdate EVICT = new TombstoneUpdate(Long.MIN_VALUE, null);
private long timestamp;
private T value;
public TombstoneUpdate(long timestamp, T value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public T getValue() {
return value;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("TombstoneUpdate{");
sb.append("timestamp=").append(timestamp);
sb.append(", value=").append(value);
sb.append('}');
return sb.toString();
}
public static class Externalizer implements AdvancedExternalizer<TombstoneUpdate> {
@Override
public Set<Class<? extends TombstoneUpdate>> getTypeClasses() {
return Collections.<Class<? extends TombstoneUpdate>>singleton(TombstoneUpdate.class);
}
@Override
public Integer getId() {
return Externalizers.TOMBSTONE_UPDATE;
}
@Override
public void writeObject(ObjectOutput output, TombstoneUpdate object) throws IOException {
output.writeObject(object.getValue());
if (object.getValue() != null) {
output.writeLong(object.getTimestamp());
}
}
@Override
public TombstoneUpdate readObject(ObjectInput input) throws IOException, ClassNotFoundException {
Object value = input.readObject();
if (value != null) {
long timestamp = input.readLong();
return new TombstoneUpdate(timestamp, value);
}
else {
return EVICT;
}
}
}
}

View File

@ -6,6 +6,7 @@
*/
package org.hibernate.test.cache.infinispan;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
@ -23,6 +24,7 @@ import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.testing.junit4.CustomParameterized;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Before;
@ -48,19 +50,35 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
@Rule
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
@CustomParameterized.Order(0)
@Parameterized.Parameters(name = "{0}")
public static List<Object[]> getJtaParameters() {
public List<Object[]> getJtaParameters() {
return Arrays.asList(
new Object[] { "JTA", BatchModeJtaPlatform.class },
new Object[] { "non-JTA", null });
}
@Parameterized.Parameter(value = 0)
@CustomParameterized.Order(1)
@Parameterized.Parameters(name = "{2}")
public List<Object[]> getCacheModeParameters() {
ArrayList<Object[]> modes = new ArrayList<>();
modes.add(new Object[] { CacheMode.INVALIDATION_SYNC });
if (!useTransactionalCache()) {
modes.add(new Object[]{CacheMode.REPL_SYNC});
modes.add(new Object[]{CacheMode.DIST_SYNC});
}
return modes;
}
@Parameterized.Parameter(0)
public String mode;
@Parameterized.Parameter(value = 1)
@Parameterized.Parameter(1)
public Class<? extends JtaPlatform> jtaPlatform;
@Parameterized.Parameter(2)
public CacheMode cacheMode;
public static final String REGION_PREFIX = "test";
private static final String PREFER_IPV4STACK = "java.net.preferIPv4Stack";
@ -160,10 +178,16 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
final StandardServiceRegistryBuilder ssrb = CacheTestUtil.buildBaselineStandardServiceRegistryBuilder(
REGION_PREFIX, getRegionFactoryClass(), true, false, jtaPlatform);
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, useTransactionalCache());
ssrb.applySetting(TestInfinispanRegionFactory.CACHE_MODE, cacheMode);
return ssrb;
}
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.class;
}
protected boolean useTransactionalCache() {
return false;
}
}

View File

@ -11,13 +11,19 @@ import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.RegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.jdbc.connections.spi.JdbcConnectionAccess;
import org.hibernate.engine.jdbc.spi.JdbcServices;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.transaction.internal.TransactionImpl;
import org.hibernate.internal.util.compare.ComparableComparator;
import org.hibernate.resource.jdbc.spi.JdbcSessionContext;
import org.hibernate.resource.jdbc.spi.JdbcSessionOwner;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.backend.jdbc.spi.JdbcResourceTransactionAccess;
import org.hibernate.resource.transaction.spi.TransactionCoordinatorOwner;
import org.hibernate.service.ServiceRegistry;
import org.hibernate.test.cache.infinispan.util.BatchModeJtaPlatform;
import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
@ -35,6 +41,8 @@ import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
@ -122,8 +130,26 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
return tx;
});
} else if (jtaPlatform == null) {
Connection connection = mock(Connection.class);
JdbcConnectionAccess jdbcConnectionAccess = mock(JdbcConnectionAccess.class);
try {
when(jdbcConnectionAccess.obtainConnection()).thenReturn(connection);
} catch (SQLException e) {
// never thrown from mock
}
JdbcSessionOwner jdbcSessionOwner = mock(JdbcSessionOwner.class);
when(jdbcSessionOwner.getJdbcConnectionAccess()).thenReturn(jdbcConnectionAccess);
SqlExceptionHelper sqlExceptionHelper = mock(SqlExceptionHelper.class);
JdbcServices jdbcServices = mock(JdbcServices.class);
when(jdbcServices.getSqlExceptionHelper()).thenReturn(sqlExceptionHelper);
ServiceRegistry serviceRegistry = mock(ServiceRegistry.class);
when(serviceRegistry.getService(JdbcServices.class)).thenReturn(jdbcServices);
JdbcSessionContext jdbcSessionContext = mock(JdbcSessionContext.class);
when(jdbcSessionContext.getServiceRegistry()).thenReturn(serviceRegistry);
when(jdbcSessionOwner.getJdbcSessionContext()).thenReturn(jdbcSessionContext);
NonJtaTransactionCoordinator txOwner = mock(NonJtaTransactionCoordinator.class);
when(txOwner.getResourceLocalTransaction()).thenReturn(new JdbcResourceTransactionMock());
when(txOwner.getJdbcSessionOwner()).thenReturn(jdbcSessionOwner);
TransactionCoordinator txCoord = JdbcResourceLocalTransactionCoordinatorBuilderImpl.INSTANCE
.buildTransactionCoordinator(txOwner, null);
when(session.getTransactionCoordinator()).thenReturn(txCoord);

View File

@ -569,7 +569,7 @@ public class InfinispanRegionFactoryTestCase {
}
private InfinispanRegionFactory createRegionFactory(final EmbeddedCacheManager manager, Properties p) {
final InfinispanRegionFactory factory = new TestInfinispanRegionFactory() {
final InfinispanRegionFactory factory = new TestInfinispanRegionFactory(new Properties()) {
@Override
protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup(SessionFactoryOptions settings, Properties properties) {

View File

@ -6,8 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.collection;
import javax.transaction.TransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -17,10 +15,11 @@ import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.NonTxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.collection.CollectionRegionImpl;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.access.CollectionRegionAccessStrategy;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.AbstractRegionAccessStrategyTest;
@ -37,7 +36,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
/**
* Base class for tests of CollectionRegionAccessStrategy impls.
@ -74,16 +72,22 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
@Test
public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
if (cacheMode.isInvalidation()) {
doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation();
}
}
public void doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation() {
final CountDownLatch pferLatch = new CountDownLatch( 1 );
final CountDownLatch removeLatch = new CountDownLatch( 1 );
final TransactionManager remoteTm = remoteRegion.getTransactionManager();
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator validator = getPutFromLoadValidator(remoteRegion.getCache(), cm, removeLatch, pferLatch);
final InvalidationCacheAccessDelegate delegate =
InvalidationCacheAccessDelegate.create(localRegion, validator);
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
new TxInvalidationCacheAccessDelegate(localRegion, validator) :
new NonTxInvalidationCacheAccessDelegate(localRegion, validator);
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
@ -142,7 +146,9 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
// the remove should be blocked because the putFromLoad has been acquired
// and the remove can continue only after we've inserted the entry
assertFalse(pferLatch.await( 2, TimeUnit.SECONDS ) );
}
catch (InterruptedException e) {
log.debug( "Interrupted" );

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.collection;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.CollectionRegionAccessStrategy;
import org.hibernate.test.cache.infinispan.AbstractExtraAPITest;
@ -32,8 +31,8 @@ public abstract class CollectionRegionAccessExtraAPITest extends AbstractExtraAP
}
@Override
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.Transactional.class;
protected boolean useTransactionalCache() {
return true;
}
}

View File

@ -16,24 +16,15 @@ import static org.junit.Assert.assertTrue;
*
* @author <a href="brian.stansberry@jboss.com">Brian Stansberry</a>
*/
public abstract class CollectionRegionReadOnlyAccessTest extends AbstractCollectionRegionAccessStrategyTest {
public class CollectionRegionReadOnlyAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_ONLY;
}
/**
* Tests READ_ONLY access when invalidation is used.
*
* @author Galder Zamarreño
* @since 3.5
*/
public static class Invalidation extends CollectionRegionReadOnlyAccessTest {
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue( "Using Invalidation", isUsingInvalidation() );
assertTrue( isSynchronous() );
}
}
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue( isSynchronous() );
}
}

View File

@ -10,18 +10,15 @@ import static org.junit.Assert.assertTrue;
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public abstract class CollectionRegionReadWriteAccessTest extends AbstractCollectionRegionAccessStrategyTest {
public class CollectionRegionReadWriteAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_WRITE;
}
public static class Invalidation extends CollectionRegionReadWriteAccessTest {
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue(isUsingInvalidation());
assertTrue(isSynchronous());
}
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue(isSynchronous());
}
}

View File

@ -5,7 +5,6 @@
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.collection;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
@ -16,30 +15,20 @@ import static org.junit.Assert.assertTrue;
*
* @author <a href="brian.stansberry@jboss.com">Brian Stansberry</a>
*/
public abstract class CollectionRegionTransactionalAccessTest extends AbstractCollectionRegionAccessStrategyTest {
public class CollectionRegionTransactionalAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
}
@Override
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.Transactional.class;
protected boolean useTransactionalCache() {
return true;
}
/**
* InvalidatedTransactionalTestCase.
*
* @author Galder Zamarreño
* @since 3.5
*/
public static class Invalidation extends CollectionRegionTransactionalAccessTest {
@Override
public void testCacheConfiguration() {
assertTrue("Transactions", isTransactional());
assertTrue("Using Invalidation", isUsingInvalidation());
assertTrue("Synchronous mode", isSynchronous());
}
@Override
public void testCacheConfiguration() {
assertTrue("Transactions", isTransactional());
assertTrue("Synchronous mode", isSynchronous());
}
}

View File

@ -52,7 +52,8 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
}
@Test
public abstract void testCacheConfiguration();
public void testCacheConfiguration() {
}
@Test
public void testGetRegion() {
@ -396,5 +397,4 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp));
}
}

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.entity;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.test.cache.infinispan.AbstractExtraAPITest;
@ -60,8 +59,8 @@ public class EntityRegionExtraAPITest extends AbstractExtraAPITest<EntityRegionA
}
@Override
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.Transactional.class;
protected boolean useTransactionalCache() {
return true;
}
}

View File

@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
* @author Galder Zamarreño
* @since 3.5
*/
public abstract class EntityRegionReadOnlyAccessTest extends AbstractEntityRegionAccessStrategyTest {
public class EntityRegionReadOnlyAccessTest extends AbstractEntityRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
@ -66,12 +66,4 @@ public abstract class EntityRegionReadOnlyAccessTest extends AbstractEntityRegio
@Override
public void testContestedPutFromLoad() throws Exception {
}
public static class Invalidation extends EntityRegionReadOnlyAccessTest {
@Test
@Override
public void testCacheConfiguration() {
assertTrue("Using Invalidation", isUsingInvalidation());
}
}
}

View File

@ -6,21 +6,10 @@
*/
package org.hibernate.test.cache.infinispan.entity;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.jboss.logging.Logger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@ -29,7 +18,7 @@ import static org.junit.Assert.assertTrue;
* @author Galder Zamarreño
* @since 3.5
*/
public abstract class EntityRegionTransactionalAccessTest extends AbstractEntityRegionAccessStrategyTest {
public class EntityRegionTransactionalAccessTest extends AbstractEntityRegionAccessStrategyTest {
private static final Logger log = Logger.getLogger( EntityRegionTransactionalAccessTest.class );
@Override
@ -38,20 +27,14 @@ public abstract class EntityRegionTransactionalAccessTest extends AbstractEntity
}
@Override
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.Transactional.class;
protected boolean useTransactionalCache() {
return true;
}
/**
* @author Galder Zamarreño
*/
public static class Invalidation extends EntityRegionTransactionalAccessTest {
@Test
@Override
public void testCacheConfiguration() {
assertTrue(isTransactional());
assertTrue("Using Invalidation", isUsingInvalidation());
assertTrue("Synchronous mode", isSynchronous());
}
@Test
@Override
public void testCacheConfiguration() {
assertTrue(isTransactional());
assertTrue("Synchronous mode", isSynchronous());
}
}

View File

@ -6,7 +6,7 @@
*/
package org.hibernate.test.cache.infinispan.functional;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -30,8 +30,7 @@ import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
import org.hibernate.test.cache.infinispan.tm.JtaPlatformImpl;
import org.hibernate.testing.junit4.CustomParameterized;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.configuration.cache.CacheMode;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -42,12 +41,16 @@ import org.junit.runners.Parameterized;
*/
@RunWith(CustomParameterized.class)
public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctionalTestCase {
private static final Log log = LogFactory.getLog( AbstractFunctionalTest.class );
protected static final Object[] TRANSACTIONAL = new Object[]{"transactional", JtaPlatformImpl.class, JtaTransactionCoordinatorBuilderImpl.class, XaConnectionProvider.class, AccessType.TRANSACTIONAL, TestInfinispanRegionFactory.Transactional.class};
protected static final Object[] READ_WRITE = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, TestInfinispanRegionFactory.class};
protected static final Object[] READ_ONLY = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, TestInfinispanRegionFactory.class};
protected static final Object[] TRANSACTIONAL = new Object[]{"transactional", JtaPlatformImpl.class, JtaTransactionCoordinatorBuilderImpl.class, XaConnectionProvider.class, AccessType.TRANSACTIONAL, true, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_WRITE_INVALIDATION = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_ONLY_INVALIDATION = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_WRITE_REPLICATED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.REPL_SYNC };
protected static final Object[] READ_ONLY_REPLICATED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.REPL_SYNC };
protected static final Object[] READ_WRITE_DISTRIBUTED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.DIST_SYNC };
protected static final Object[] READ_ONLY_DISTRIBUTED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.DIST_SYNC };
// We need to use @ClassRule here since in @BeforeClassOnce startUp we're preparing the session factory,
// constructing CacheManager along - and there we check that the test has the name already set
@ClassRule
public static final InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
@ -67,13 +70,35 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
public AccessType accessType;
@Parameterized.Parameter(value = 5)
public Class<? extends RegionFactory> regionFactoryClass;
public boolean useTransactionalCache;
@Parameterized.Parameter(value = 6)
public CacheMode cacheMode;
protected boolean useJta;
@Parameterized.Parameters(name = "{0}")
@CustomParameterized.Order(0)
@Parameterized.Parameters(name = "{0}, {6}")
public abstract List<Object[]> getParameters();
public List<Object[]> getParameters(boolean tx, boolean rw, boolean ro) {
ArrayList<Object[]> parameters = new ArrayList<>();
if (tx) {
parameters.add(TRANSACTIONAL);
}
if (rw) {
parameters.add(READ_WRITE_INVALIDATION);
parameters.add(READ_WRITE_REPLICATED);
parameters.add(READ_WRITE_DISTRIBUTED);
}
if (ro) {
parameters.add(READ_ONLY_INVALIDATION);
parameters.add(READ_ONLY_REPLICATED);
parameters.add(READ_ONLY_DISTRIBUTED);
}
return parameters;
}
@BeforeClassOnce
public void setUseJta() {
useJta = jtaPlatformClass != null;
@ -93,6 +118,10 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
return accessType.getExternalName();
}
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.class;
}
protected boolean getUseQueryCache() {
return true;
}
@ -105,7 +134,9 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
settings.put( Environment.USE_SECOND_LEVEL_CACHE, "true" );
settings.put( Environment.GENERATE_STATISTICS, "true" );
settings.put( Environment.USE_QUERY_CACHE, String.valueOf( getUseQueryCache() ) );
settings.put( Environment.CACHE_REGION_FACTORY, regionFactoryClass.getName() );
settings.put( Environment.CACHE_REGION_FACTORY, getRegionFactoryClass().getName() );
settings.put( TestInfinispanRegionFactory.TRANSACTIONAL, useTransactionalCache );
settings.put( TestInfinispanRegionFactory.CACHE_MODE, cacheMode);
if ( jtaPlatformClass != null ) {
settings.put( AvailableSettings.JTA_PLATFORM, jtaPlatformClass.getName() );

View File

@ -12,21 +12,13 @@ import java.util.List;
import java.util.Set;
import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.test.cache.infinispan.tm.JtaPlatformImpl;
import org.hibernate.test.cache.infinispan.tm.XaConnectionProvider;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -41,7 +33,7 @@ import static org.junit.Assert.assertNull;
public class BulkOperationsTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE);
return getParameters(true, true, false);
}
@ClassRule

View File

@ -63,7 +63,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE);
return getParameters(true, true, false);
}
@Override
@ -91,19 +91,25 @@ public class ConcurrentWriteTest extends SingleNodeTest {
public void testSingleUser() throws Exception {
// setup
sessionFactory().getStatistics().clear();
// wait a while to make sure that timestamp comparison works after invalidateRegion
Thread.sleep(1);
Customer customer = createCustomer( 0 );
final Integer customerId = customer.getId();
getCustomerIDs().add( customerId );
// wait a while to make sure that timestamp comparison works after collection remove (during insert)
Thread.sleep(1);
assertNull( "contact exists despite not being added", getFirstContact( customerId ) );
// check that cache was hit
SecondLevelCacheStatistics customerSlcs = sessionFactory()
.getStatistics()
.getSecondLevelCacheStatistics( Customer.class.getName() );
assertEquals( customerSlcs.getPutCount(), 1 );
assertEquals( customerSlcs.getElementCountInMemory(), 1 );
assertEquals( customerSlcs.getEntries().size(), 1 );
assertEquals( 1, customerSlcs.getPutCount() );
assertEquals( 1, customerSlcs.getElementCountInMemory() );
assertEquals( 1, customerSlcs.getEntries().size() );
log.infof( "Add contact to customer {0}", customerId );
SecondLevelCacheStatistics contactsCollectionSlcs = sessionFactory()
@ -155,6 +161,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
for ( Future<Void> future : futures ) {
future.get();
}
executor.shutdown();
log.info( "All future gets checked" );
}
catch (Throwable t) {

View File

@ -21,7 +21,7 @@ import static org.junit.Assert.assertTrue;
public class EqualityTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE, READ_ONLY);
return getParameters(true, true, true);
}
@Override

View File

@ -0,0 +1,175 @@
package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.spi.Region;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Tests specific to invalidation mode caches
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class InvalidationTest extends SingleNodeTest {
static final Log log = LogFactory.getLog(ReadOnlyTest.class);
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE_INVALIDATION);
}
@Test
@TestForIssue(jiraKey = "HHH-9868")
public void testConcurrentRemoveAndPutFromLoad() throws Exception {
Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
final Item item = new Item( "chris", "Chris's Item" );
withTxSession(s -> {
s.persist(item);
});
Phaser deletePhaser = new Phaser(2);
Phaser getPhaser = new Phaser(2);
HookInterceptor hook = new HookInterceptor();
AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache();
pendingPutsCache.addInterceptor(hook, 0);
AtomicBoolean getThreadBlockedInDB = new AtomicBoolean(false);
Thread deleteThread = new Thread(() -> {
try {
withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId());
assertNotNull(loadedItem);
arriveAndAwait(deletePhaser, 2000);
arriveAndAwait(deletePhaser, 2000);
log.trace("Item loaded");
s.delete(loadedItem);
s.flush();
log.trace("Item deleted");
// start get-thread here
arriveAndAwait(deletePhaser, 2000);
// we need longer timeout since in non-MVCC DBs the get thread
// can be blocked
arriveAndAwait(deletePhaser, 4000);
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "delete-thread");
Thread getThread = new Thread(() -> {
try {
withTxSession(s -> {
// DB load should happen before the record is deleted,
// putFromLoad should happen after deleteThread ends
Item loadedItem = s.get(Item.class, item.getId());
if (getThreadBlockedInDB.get()) {
assertNull(loadedItem);
} else {
assertNotNull(loadedItem);
}
});
} catch (PessimisticLockException e) {
// If we end up here, database locks guard us against situation tested
// in this case and HHH-9868 cannot happen.
// (delete-thread has ITEMS table write-locked and we try to acquire read-lock)
try {
arriveAndAwait(getPhaser, 2000);
arriveAndAwait(getPhaser, 2000);
} catch (Exception e1) {
throw new RuntimeException(e1);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "get-thread");
deleteThread.start();
// deleteThread loads the entity
arriveAndAwait(deletePhaser, 2000);
withTx(() -> {
sessionFactory().getCache().evictEntity(Item.class, item.getId());
assertFalse(sessionFactory().getCache().containsEntity(Item.class, item.getId()));
return null;
});
arriveAndAwait(deletePhaser, 2000);
// delete thread invalidates PFER
arriveAndAwait(deletePhaser, 2000);
// get thread gets the entity from DB
hook.block(getPhaser, getThread);
getThread.start();
try {
arriveAndAwait(getPhaser, 2000);
} catch (TimeoutException e) {
getThreadBlockedInDB.set(true);
}
arriveAndAwait(deletePhaser, 2000);
// delete thread finishes the remove from DB and cache
deleteThread.join();
hook.unblock();
arriveAndAwait(getPhaser, 2000);
// get thread puts the entry into cache
getThread.join();
withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId());
assertNull(loadedItem);
});
}
protected static void arriveAndAwait(Phaser phaser, int timeout) throws TimeoutException, InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), timeout, TimeUnit.MILLISECONDS);
}
private static class HookInterceptor extends BaseCustomInterceptor {
Phaser phaser;
Thread thread;
public synchronized void block(Phaser phaser, Thread thread) {
this.phaser = phaser;
this.thread = thread;
}
public synchronized void unblock() {
phaser = null;
thread = null;
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
Phaser phaser;
Thread thread;
synchronized (this) {
phaser = this.phaser;
thread = this.thread;
}
if (phaser != null && Thread.currentThread() == thread) {
arriveAndAwait(phaser, 2000);
arriveAndAwait(phaser, 2000);
}
return super.visitGetKeyValueCommand(ctx, command);
}
}
}

View File

@ -20,11 +20,10 @@ import javax.naming.StringRefAddr;
import org.hibernate.boot.registry.StandardServiceRegistry;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.JndiInfinispanRegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cfg.Environment;
import org.hibernate.engine.config.spi.ConfigurationService;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
import org.hibernate.stat.Statistics;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
@ -41,7 +40,6 @@ import org.jboss.util.naming.NonSerializableFactory;
import org.jnp.server.Main;
import org.jnp.server.SingletonNamingServer;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
@ -59,7 +57,12 @@ public class JndiRegionFactoryTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Collections.singletonList(new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, JndiInfinispanRegionFactory.class});
return Collections.singletonList(READ_WRITE_INVALIDATION);
}
@Override
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return JndiInfinispanRegionFactory.class;
}
@Override

View File

@ -36,7 +36,7 @@ public class MultiTenancyTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Collections.singletonList(READ_ONLY);
return Collections.singletonList(READ_ONLY_INVALIDATION);
}
@Override

View File

@ -20,7 +20,7 @@ import static org.junit.Assert.assertNotNull;
public class NoTenancyTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Collections.singletonList(READ_ONLY);
return Collections.singletonList(READ_ONLY_INVALIDATION);
}
@Test
@ -42,5 +42,4 @@ public class NoTenancyTest extends SingleNodeTest {
assertEquals(1, localCache.size());
assertEquals(sessionFactory().getClassMetadata(Item.class).getIdentifierType().getReturnedClass(), keys.iterator().next().getClass());
}
}

View File

@ -6,34 +6,16 @@
*/
package org.hibernate.test.cache.infinispan.functional;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hibernate.PessimisticLockException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.spi.Region;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.Test;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Parent tests for both transactional and
@ -47,7 +29,7 @@ public class ReadOnlyTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return Collections.singletonList(READ_ONLY);
return getParameters(false, false, true);
}
@Test
@ -106,127 +88,4 @@ public class ReadOnlyTest extends SingleNodeTest {
s.delete(found);
});
}
@Test
@TestForIssue(jiraKey = "HHH-9868")
public void testConcurrentRemoveAndPutFromLoad() throws Exception {
final Item item = new Item( "chris", "Chris's Item" );
withTxSession(s -> {
s.persist(item);
});
Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
Phaser deletePhaser = new Phaser(2);
Phaser getPhaser = new Phaser(2);
HookInterceptor hook = new HookInterceptor();
AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache();
pendingPutsCache.addInterceptor(hook, 0);
Thread deleteThread = new Thread(() -> {
try {
withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId());
assertNotNull(loadedItem);
arriveAndAwait(deletePhaser);
arriveAndAwait(deletePhaser);
log.trace("Item loaded");
s.delete(loadedItem);
s.flush();
log.trace("Item deleted");
// start get-thread here
arriveAndAwait(deletePhaser);
arriveAndAwait(deletePhaser);
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "delete-thread");
Thread getThread = new Thread(() -> {
try {
withTxSession(s -> {
// DB load should happen before the record is deleted,
// putFromLoad should happen after deleteThread ends
Item loadedItem = s.get(Item.class, item.getId());
assertNotNull(loadedItem);
});
} catch (PessimisticLockException e) {
// If we end up here, database locks guard us against situation tested
// in this case and HHH-9868 cannot happen.
// (delete-thread has ITEMS table write-locked and we try to acquire read-lock)
try {
arriveAndAwait(getPhaser);
arriveAndAwait(getPhaser);
} catch (Exception e1) {
throw new RuntimeException(e1);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "get-thread");
deleteThread.start();
// deleteThread loads the entity
arriveAndAwait(deletePhaser);
withTx(() -> {
sessionFactory().getCache().evictEntity(Item.class, item.getId());
assertFalse(sessionFactory().getCache().containsEntity(Item.class, item.getId()));
return null;
});
arriveAndAwait(deletePhaser);
// delete thread invalidates PFER
arriveAndAwait(deletePhaser);
// get thread gets the entity from DB
hook.block(getPhaser, getThread);
getThread.start();
arriveAndAwait(getPhaser);
arriveAndAwait(deletePhaser);
// delete thread finishes the remove from DB and cache
deleteThread.join();
hook.unblock();
arriveAndAwait(getPhaser);
// get thread puts the entry into cache
getThread.join();
withTxSession(s -> {
Item loadedItem = s.get(Item.class, item.getId());
assertNull(loadedItem);
});
}
protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 1000, TimeUnit.SECONDS);
}
private static class HookInterceptor extends BaseCustomInterceptor {
Phaser phaser;
Thread thread;
public synchronized void block(Phaser phaser, Thread thread) {
this.phaser = phaser;
this.thread = thread;
}
public synchronized void unblock() {
phaser = null;
thread = null;
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
Phaser phaser;
Thread thread;
synchronized (this) {
phaser = this.phaser;
thread = this.thread;
}
if (phaser != null && Thread.currentThread() == thread) {
arriveAndAwait(phaser);
arriveAndAwait(phaser);
}
return super.visitGetKeyValueCommand(ctx, command);
}
}
}

View File

@ -32,11 +32,9 @@ import org.hibernate.testing.TestForIssue;
import org.infinispan.commons.util.ByRef;
import org.junit.After;
import org.junit.Test;
import org.junit.runners.Parameterized;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertNotNull;
import static org.hibernate.test.cache.infinispan.util.TxUtil.markRollbackOnly;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -51,7 +49,7 @@ import static org.junit.Assert.fail;
public class ReadWriteTest extends ReadOnlyTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE);
return getParameters(true, true, false);
}
@Override
@ -85,14 +83,18 @@ public class ReadWriteTest extends ReadOnlyTest {
s.persist( item );
s.persist( another );
});
// The collection has been removed, but we can't add it again immediately using putFromLoad
Thread.sleep(1);
withTxSession(s -> {
Item loaded = s.load( Item.class, item.getId() );
assertEquals( 1, loaded.getItems().size() );
});
SecondLevelCacheStatistics cStats = stats.getSecondLevelCacheStatistics( Item.class.getName() + ".items" );
assertEquals( 1, cStats.getElementCountInMemory() );
withTxSession(s -> {
SecondLevelCacheStatistics cStats = stats.getSecondLevelCacheStatistics( Item.class.getName() + ".items" );
Item loadedWithCachedCollection = (Item) s.load( Item.class, item.getId() );
stats.logSummary();
assertEquals( item.getName(), loadedWithCachedCollection.getName() );
@ -385,6 +387,8 @@ public class ReadWriteTest extends ReadOnlyTest {
SecondLevelCacheStatistics slcs = stats.getSecondLevelCacheStatistics( Item.class.getName() );
sessionFactory().getCache().evictEntityRegion( Item.class.getName() );
Thread.sleep(1);
assertEquals(0, slcs.getPutCount());
assertEquals( 0, slcs.getElementCountInMemory() );
assertEquals( 0, slcs.getEntries().size() );

View File

@ -0,0 +1,375 @@
package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.spi.Region;
import org.hibernate.cache.spi.entry.StandardCacheEntryImpl;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
/**
* Tests specific to tombstone-based caches
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneTest extends SingleNodeTest {
private static Log log = LogFactory.getLog(TombstoneTest.class);
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactory() {
AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, TombstoneTest.class.getSimpleName() + "-executor-" + counter.incrementAndGet());
}
});
private static final long TOMBSTONE_TIMEOUT = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
private static final int WAIT_TIMEOUT = 2000;
private static final TestTimeService TIME_SERVICE = new TestTimeService();
private Region region;
private AdvancedCache entityCache;
private long itemId;
@Override
public List<Object[]> getParameters() {
return Arrays.asList(READ_WRITE_REPLICATED, READ_WRITE_DISTRIBUTED);
}
@Override
protected void startUp() {
super.startUp();
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
}
@Before
public void insertAndClearCache() throws Exception {
Item item = new Item("my item", "item that belongs to me");
withTxSession(s -> s.persist(item));
entityCache.clear();
assertEquals("Cache is not empty", Collections.EMPTY_SET, Caches.keys(entityCache).toSet());
itemId = item.getId();
}
@After
public void cleanup() throws Exception {
withTxSession(s -> {
s.createQuery("delete from Item").executeUpdate();
});
}
@AfterClass
public static void shutdown() {
EXECUTOR.shutdown();
}
@Override
protected void addSettings(Map settings) {
super.addSettings(settings);
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
}
@Test
public void testTombstoneExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch flushLatch = new CountDownLatch(2);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = removeFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = removeFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
awaitOrThrow(flushLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
// after commit, the tombstone should still be in memory for some time (though, updatable)
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
}
@Test
public void testFutureUpdateExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch flushLatch = new CountDownLatch(2);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
awaitOrThrow(flushLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(FutureUpdate.class, contents.get(itemId).getClass());
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
// since we had two concurrent updates, the result should be invalid
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
Object value = contents.get(itemId);
if (value instanceof FutureUpdate) {
// DB did not blocked two concurrent updates
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertNull(entityCache.get(itemId));
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
} else {
// DB left only one update to proceed, and the entry should not be expired
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
}
@Test
public void testRemoveUpdateExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preFlushLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = removeFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = updateFlushWait(itemId, loadBarrier, preFlushLatch, null, commitLatch);
awaitOrThrow(flushLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
preFlushLatch.countDown();
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
}
@Test
public void testUpdateRemoveExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preFlushLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = removeFlushWait(itemId, loadBarrier, preFlushLatch, null, commitLatch);
awaitOrThrow(flushLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(FutureUpdate.class, contents.get(itemId).getClass());
preFlushLatch.countDown();
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
boolean removeSucceeded = second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
Object value = contents.get(itemId);
if (removeSucceeded) {
assertEquals(Tombstone.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
} else {
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
}
@Test
public void testUpdateEvictExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preEvictLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = evictWait(itemId, loadBarrier, preEvictLatch, null);
awaitOrThrow(flushLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(FutureUpdate.class, contents.get(itemId).getClass());
preEvictLatch.countDown();
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
contents = Caches.entrySet(entityCache).toMap();
assertEquals(0, contents.size());
assertNull(contents.get(itemId));
}
@Test
public void testEvictUpdateExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preFlushLatch = new CountDownLatch(1);
CountDownLatch postEvictLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = evictWait(itemId, loadBarrier, null, postEvictLatch);
Future<Boolean> second = updateFlushWait(itemId, loadBarrier, preFlushLatch, flushLatch, commitLatch);
awaitOrThrow(postEvictLatch);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
assertNull(contents.get(itemId));
preFlushLatch.countDown();
awaitOrThrow(flushLatch);
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(FutureUpdate.class, contents.get(itemId).getClass());
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
Object value = contents.get(itemId);
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
protected Future<Boolean> removeFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return EXECUTOR.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
s.delete(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> updateFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return EXECUTOR.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
item.setDescription("Updated item");
s.update(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> evictWait(long id, CyclicBarrier loadBarrier, CountDownLatch preEvictLatch, CountDownLatch postEvictLatch) throws Exception {
return EXECUTOR.submit(() -> {
try {
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
if (preEvictLatch != null) {
awaitOrThrow(preEvictLatch);
}
sessionFactory().getCache().evictEntity(Item.class, id);
} finally {
if (postEvictLatch != null) {
postEvictLatch.countDown();
}
}
return true;
});
}
protected void awaitOrThrow(CountDownLatch latch) throws InterruptedException, TimeoutException {
if (!latch.await(WAIT_TIMEOUT, TimeUnit.SECONDS)) {
throw new TimeoutException();
}
}
}

View File

@ -6,6 +6,7 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.lang.reflect.Constructor;
import java.util.Hashtable;
import java.util.Properties;
@ -22,6 +23,7 @@ import org.hibernate.cache.spi.TimestampsRegion;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.internal.util.ReflectHelper;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@ -33,7 +35,6 @@ import org.infinispan.util.logging.LogFactory;
* @since 3.5
*/
public class ClusterAwareRegionFactory implements RegionFactory {
private static final Log log = LogFactory.getLog(ClusterAwareRegionFactory.class);
private static final Hashtable<String, EmbeddedCacheManager> cacheManagers = new Hashtable<String, EmbeddedCacheManager>();
@ -42,11 +43,9 @@ public class ClusterAwareRegionFactory implements RegionFactory {
private boolean locallyAdded;
public ClusterAwareRegionFactory(Properties props) {
try {
delegate = (InfinispanRegionFactory) ReflectHelper.classForName(props.getProperty(DualNodeTest.REGION_FACTORY_DELEGATE)).newInstance();
} catch (Exception e) {
throw new IllegalStateException(e);
}
Class<? extends InfinispanRegionFactory> regionFactoryClass =
(Class<InfinispanRegionFactory>) props.get(DualNodeTest.REGION_FACTORY_DELEGATE);
delegate = CacheTestUtil.createRegionFactory(regionFactoryClass, props);
}
public static EmbeddedCacheManager getCacheManager(String name) {
@ -97,7 +96,6 @@ public class ClusterAwareRegionFactory implements RegionFactory {
return delegate.buildEntityRegion(regionName, properties, metadata);
}
@Override
public NaturalIdRegion buildNaturalIdRegion(String regionName, Properties properties, CacheDataDescription metadata)
throws CacheException {
return delegate.buildNaturalIdRegion( regionName, properties, metadata );

View File

@ -74,7 +74,7 @@ public abstract class DualNodeTest extends AbstractFunctionalTest {
settings.put( NODE_ID_PROP, LOCAL );
settings.put( NODE_ID_FIELD, LOCAL );
settings.put( REGION_FACTORY_DELEGATE, regionFactoryClass.getName() );
settings.put( REGION_FACTORY_DELEGATE, getRegionFactoryClass() );
}
@Override

View File

@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
import org.hibernate.testing.TestForIssue;
@ -62,7 +63,7 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE);
return getParameters(true, true, false);
}
@Override
@ -159,14 +160,23 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
sleep( 250 );
assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds );
// After modification, local cache should have been invalidated and hence should be empty
assertEquals( 0, localCollectionCache.size() );
assertEquals( 0, localCustomerCache.size() );
if (localCustomerCache.getCacheConfiguration().clustering().cacheMode().isInvalidation()) {
// After modification, local cache should have been invalidated and hence should be empty
assertEquals(0, localCustomerCache.size());
} else {
// Replicated cache is updated, not invalidated
assertEquals(1, localCustomerCache.size());
}
}
@TestForIssue(jiraKey = "HHH-9881")
@Test
public void testConcurrentLoadAndRemoval() throws Exception {
if (!remoteCustomerCache.getCacheConfiguration().clustering().cacheMode().isInvalidation()) {
// This test is tailored for invalidation-based strategies, using pending puts cache
return;
}
AtomicReference<Exception> getException = new AtomicReference<>();
AtomicReference<Exception> deleteException = new AtomicReference<>();

View File

@ -45,7 +45,7 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE, READ_ONLY);
return getParameters(true, true, true);
}
@Override

View File

@ -40,7 +40,7 @@ public class SessionRefreshTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(TRANSACTIONAL, READ_WRITE);
return getParameters(true, true, false);
}
@Override

View File

@ -138,7 +138,7 @@ public class QueryRegionImplTest extends AbstractGeneralDataRegionTest {
// Start the reader
reader.start();
assertTrue("Reader finished promptly", readerLatch.await(1000000000, TimeUnit.MILLISECONDS));
assertTrue("Reader finished promptly", readerLatch.await(100, TimeUnit.MILLISECONDS));
writerLatch.countDown();

View File

@ -43,22 +43,24 @@ import org.hibernate.test.cache.infinispan.stress.entities.Person;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl;
import org.hibernate.testing.jta.TestingJtaPlatformImpl;
import org.hibernate.testing.junit4.CustomParameterized;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commons.util.ByRef;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.InterceptorConfiguration;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.remoting.RemoteException;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.transaction.RollbackException;
import javax.transaction.Status;
@ -85,6 +87,7 @@ import java.util.stream.Collectors;
*
* @author Radim Vansa
*/
@RunWith(CustomParameterized.class)
public abstract class CorrectnessTestCase {
static final org.infinispan.util.logging.Log log = LogFactory.getLog(CorrectnessTestCase.class);
static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(10);
@ -96,6 +99,12 @@ public abstract class CorrectnessTestCase {
static final int MAX_MEMBERS = 10;
private final static Comparator<Log<?>> WALL_CLOCK_TIME_COMPARATOR = (o1, o2) -> Long.compare(o1.wallClockTime, o2.wallClockTime);
@Parameterized.Parameter(0)
public String name;
@Parameterized.Parameter(1)
public CacheMode cacheMode;
static ThreadLocal<Integer> threadNode = new ThreadLocal<>();
final AtomicInteger timestampGenerator = new AtomicInteger();
@ -121,20 +130,41 @@ public abstract class CorrectnessTestCase {
return getClass().getName().replaceAll("\\W", "_");
}
public abstract static class Jta extends CorrectnessTestCase {
@Ignore
public static class Jta extends CorrectnessTestCase {
private final TransactionManager transactionManager = TestingJtaPlatformImpl.transactionManager();
@Parameterized.Parameter(2)
public boolean transactional;
@Parameterized.Parameter(3)
public boolean readOnly;
@Parameterized.Parameters(name = "{0}")
public List<Object[]> getParameters() {
return Arrays.<Object[]>asList(
new Object[] { "transactional, invalidation", CacheMode.INVALIDATION_SYNC, true, false },
new Object[] { "read-only, invalidation", CacheMode.INVALIDATION_SYNC, false, true }, // maybe not needed
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, false, false },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, false, false },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, false, false }
);
}
@Override
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
ssrb
.applySetting( Environment.JTA_PLATFORM, TestingJtaPlatformImpl.class.getName() )
.applySetting( Environment.CONNECTION_PROVIDER, JtaAwareConnectionProviderImpl.class.getName() )
.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() );
.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() )
.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, transactional);
if (readOnly) {
ssrb.applySetting(Environment.DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_ONLY.toAccessType().getExternalName());
}
}
@Override
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
int node = threadNode.get();
TransactionManager tm = transactionManager;
tm.begin();
try {
@ -155,45 +185,35 @@ public abstract class CorrectnessTestCase {
}
}
}
}
@Ignore // as long-running test, we'll execute it only by hand
public static class JtaTransactional extends Jta {
}
@Ignore // as long-running test, we'll execute it only by hand
public static class JtaReadOnly extends Jta {
@Override
protected Operation getOperation() {
ThreadLocalRandom random = ThreadLocalRandom.current();
Operation operation;
int r = random.nextInt(30);
if (r == 0) operation = new InvalidateCache();
else if (r < 5) operation = new QueryFamilies();
else if (r < 10) operation = new RemoveFamily(r < 12);
else operation = new ReadFamily(r < 20);
return operation;
}
@Override
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
super.applySettings(ssrb);
ssrb.applySetting(Environment.DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_ONLY.toAccessType().getExternalName());
ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName());
if (readOnly) {
ThreadLocalRandom random = ThreadLocalRandom.current();
Operation operation;
int r = random.nextInt(30);
if (r == 0) operation = new InvalidateCache();
else if (r < 5) operation = new QueryFamilies();
else if (r < 10) operation = new RemoveFamily(r < 12);
else operation = new ReadFamily(r < 20);
return operation;
} else {
return super.getOperation();
}
}
}
@Ignore // as long-running test, we'll execute it only by hand
public static class JtaNonTransactional extends Jta {
@Override
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
super.applySettings(ssrb);
ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName());
}
}
@Ignore // as long-running test, we'll execute it only by hand
@Ignore
public static class NonJta extends CorrectnessTestCase {
@Parameterized.Parameters(name = "{0}")
public List<Object[]> getParameters() {
return Arrays.<Object[]>asList(
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC }
);
}
@Override
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
// no transaction on JTA TM
@ -205,7 +225,7 @@ public abstract class CorrectnessTestCase {
super.applySettings(ssrb);
ssrb.applySetting(Environment.JTA_PLATFORM, NoJtaPlatform.class.getName());
ssrb.applySetting(Environment.TRANSACTION_COORDINATOR_STRATEGY, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class.getName());
ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName());
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, false);
}
}
@ -222,6 +242,7 @@ public abstract class CorrectnessTestCase {
.applySetting( Environment.DIALECT, H2Dialect.class.getName() )
.applySetting( Environment.HBM2DDL_AUTO, "create-drop" )
.applySetting( Environment.CACHE_REGION_FACTORY, FailingInfinispanRegionFactory.class.getName())
.applySetting( TestInfinispanRegionFactory.CACHE_MODE, cacheMode )
.applySetting( Environment.GENERATE_STATISTICS, "false" );
applySettings(ssrb);
@ -291,30 +312,21 @@ public abstract class CorrectnessTestCase {
}
public static class FailingInfinispanRegionFactory extends TestInfinispanRegionFactory {
@Override
protected void amendConfiguration(ConfigurationBuilderHolder holder) {
for (Map.Entry<String, ConfigurationBuilder> entry : holder.getNamedConfigurationBuilders().entrySet()) {
// failure to write into timestamps would cause failure even though both DB and cache has been updated
if (!entry.getKey().equals("timestamps") && !entry.getKey().endsWith(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME)) {
entry.getValue().customInterceptors().addInterceptor()
.interceptorClass(FailureInducingInterceptor.class)
.position(InterceptorConfiguration.Position.FIRST);
log.trace("Injecting FailureInducingInterceptor into " + entry.getKey());
}
else {
log.trace("Not injecting into " + entry.getKey());
}
}
public FailingInfinispanRegionFactory(Properties properties) {
super(properties);
}
}
public static class ForceNonTxInfinispanRegionFactory extends FailingInfinispanRegionFactory {
@Override
protected void amendConfiguration(ConfigurationBuilderHolder holder) {
super.amendConfiguration(holder);
holder.getDefaultConfigurationBuilder().transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
for (ConfigurationBuilder cb : holder.getNamedConfigurationBuilders().values()) {
cb.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) {
super.amendCacheConfiguration(cacheName, configurationBuilder);
// failure to write into timestamps would cause failure even though both DB and cache has been updated
if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME)) {
configurationBuilder.customInterceptors().addInterceptor()
.interceptorClass(FailureInducingInterceptor.class)
.position(InterceptorConfiguration.Position.FIRST);
log.trace("Injecting FailureInducingInterceptor into " + cacheName);
} else {
log.trace("Not injecting into " + cacheName);
}
}
}
@ -974,12 +986,34 @@ public abstract class CorrectnessTestCase {
if (strategy == null) {
return null;
}
Field delegateField = strategy.getClass().getDeclaredField("delegate");
delegateField.setAccessible(true);
Field delegateField = getField(strategy.getClass(), "delegate");
Object delegate = delegateField.get(strategy);
Field validatorField = InvalidationCacheAccessDelegate.class.getDeclaredField("putValidator");
validatorField.setAccessible(true);
return (PutFromLoadValidator) validatorField.get(delegate);
if (delegate == null) {
return null;
}
if (InvalidationCacheAccessDelegate.class.isInstance(delegate)) {
Field validatorField = InvalidationCacheAccessDelegate.class.getDeclaredField("putValidator");
validatorField.setAccessible(true);
return (PutFromLoadValidator) validatorField.get(delegate);
} else {
return null;
}
}
private Field getField(Class<?> clazz, String fieldName) {
Field f = null;
while (clazz != null && clazz != Object.class) {
try {
f = clazz.getDeclaredField(fieldName);
break;
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
}
if (f != null) {
f.setAccessible(true);
}
return f;
}
protected SessionFactory sessionFactory(int node) {

View File

@ -47,7 +47,7 @@ import org.infinispan.notifications.cachelistener.event.Event;
*/
public class TimestampsRegionImplTest extends AbstractGeneralDataRegionTest {
@Override
@Override
protected String getStandardRegionName(String regionPrefix) {
return regionPrefix + "/" + UpdateTimestampsCache.class.getName();
}
@ -110,7 +110,8 @@ public class TimestampsRegionImplTest extends AbstractGeneralDataRegionTest {
public static class MockInfinispanRegionFactory extends TestInfinispanRegionFactory {
public MockInfinispanRegionFactory() {
public MockInfinispanRegionFactory(Properties properties) {
super(properties);
}
@Override
@ -149,5 +150,4 @@ public class TimestampsRegionImplTest extends AbstractGeneralDataRegionTest {
}
}
}
}

View File

@ -2,24 +2,30 @@ package org.hibernate.test.cache.infinispan.util;
import org.hibernate.HibernateException;
import org.hibernate.Transaction;
import org.hibernate.engine.jdbc.connections.spi.JdbcConnectionAccess;
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
import org.hibernate.engine.transaction.spi.IsolationDelegate;
import org.hibernate.engine.transaction.spi.TransactionObserver;
import org.hibernate.resource.transaction.SynchronizationRegistry;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.hibernate.resource.transaction.TransactionCoordinatorBuilder;
import org.hibernate.resource.transaction.backend.jta.internal.JtaIsolationDelegate;
import org.hibernate.resource.transaction.backend.jta.internal.StatusTranslator;
import org.hibernate.resource.transaction.spi.TransactionStatus;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.infinispan.transaction.tm.DummyTransaction;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import java.sql.Connection;
import java.sql.SQLException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Mocks transaction coordinator when {@link org.hibernate.engine.spi.SessionImplementor} is only mocked
* and {@link org.infinispan.transaction.tm.BatchModeTransactionManager} is used.
@ -117,7 +123,13 @@ public class BatchModeTransactionCoordinator implements TransactionCoordinator {
@Override
public IsolationDelegate createIsolationDelegate() {
throw new UnsupportedOperationException();
Connection connection = mock(Connection.class);
JdbcConnectionAccess jdbcConnectionAccess = mock(JdbcConnectionAccess.class);
try {
when(jdbcConnectionAccess.obtainConnection()).thenReturn(connection);
} catch (SQLException e) {
}
return new JtaIsolationDelegate(jdbcConnectionAccess, mock(SqlExceptionHelper.class), tm);
}
@Override

View File

@ -6,6 +6,7 @@
*/
package org.hibernate.test.cache.infinispan.util;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -85,18 +86,41 @@ public class CacheTestUtil {
return ssrb;
}
public static InfinispanRegionFactory createRegionFactory(Class<? extends InfinispanRegionFactory> clazz, Properties properties) {
try {
try {
Constructor<? extends InfinispanRegionFactory> constructor = clazz.getConstructor(Properties.class);
return constructor.newInstance(properties);
}
catch (NoSuchMethodException e) {
return clazz.newInstance();
}
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public static InfinispanRegionFactory startRegionFactory(ServiceRegistry serviceRegistry) {
try {
final ConfigurationService cfgService = serviceRegistry.getService( ConfigurationService.class );
final Properties properties = toProperties( cfgService.getSettings() );
String factoryType = cfgService.getSetting( AvailableSettings.CACHE_REGION_FACTORY, StandardConverters.STRING );
Class clazz = Thread.currentThread().getContextClassLoader().loadClass( factoryType );
InfinispanRegionFactory regionFactory;
if (clazz == InfinispanRegionFactory.class) {
regionFactory = new TestInfinispanRegionFactory();
regionFactory = new TestInfinispanRegionFactory(properties);
}
else {
regionFactory = (InfinispanRegionFactory) clazz.newInstance();
if (InfinispanRegionFactory.class.isAssignableFrom(clazz)) {
regionFactory = createRegionFactory(clazz, properties);
} else {
throw new IllegalArgumentException(clazz + " is not InfinispanRegionFactory");
}
}
final SessionFactoryOptionsImpl sessionFactoryOptions = new SessionFactoryOptionsImpl(
@ -104,7 +128,6 @@ public class CacheTestUtil {
(StandardServiceRegistry) serviceRegistry
)
);
final Properties properties = toProperties( cfgService.getSettings() );
regionFactory.start( sessionFactoryOptions, properties );

View File

@ -2,16 +2,18 @@ package org.hibernate.test.cache.infinispan.util;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.infinispan.commons.executors.CachedThreadPoolExecutorFactory;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.TransportConfigurationBuilder;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.fwk.TestResourceTracker;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.TimeService;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -20,12 +22,30 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TestInfinispanRegionFactory extends InfinispanRegionFactory {
private static AtomicInteger counter = new AtomicInteger();
protected static final String PREFIX = TestInfinispanRegionFactory.class.getName() + ".";
public static final String TRANSACTIONAL = PREFIX + "transactional";
public static final String CACHE_MODE = PREFIX + "cacheMode";
public static final String TIME_SERVICE = PREFIX + "timeService";
private final boolean transactional;
private final CacheMode cacheMode;
private final TimeService timeService;
public TestInfinispanRegionFactory(Properties properties) {
transactional = (boolean) properties.getOrDefault(TRANSACTIONAL, false);
cacheMode = (CacheMode) properties.getOrDefault(CACHE_MODE, null);
timeService = (TimeService) properties.getOrDefault(TIME_SERVICE, null);
}
@Override
protected EmbeddedCacheManager createCacheManager(ConfigurationBuilderHolder holder) {
amendConfiguration(holder);
return new DefaultCacheManager(holder, true);
DefaultCacheManager cacheManager = new DefaultCacheManager(holder, true);
if (timeService != null) {
cacheManager.getGlobalComponentRegistry().registerComponent(timeService, TimeService.class);
cacheManager.getGlobalComponentRegistry().rewire();
}
return cacheManager;
}
protected void amendConfiguration(ConfigurationBuilderHolder holder) {
@ -41,26 +61,27 @@ public class TestInfinispanRegionFactory extends InfinispanRegionFactory {
}
}
private String buildNodeName() {
StringBuilder sb = new StringBuilder("Node");
int id = counter.getAndIncrement();
int alphabet = 'Z' - 'A';
do {
sb.append((char) (id % alphabet + 'A'));
id /= alphabet;
} while (id > alphabet);
return sb.toString();
}
protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) {
}
public static class Transactional extends TestInfinispanRegionFactory {
@Override
protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) {
if (transactional) {
if (!cacheName.endsWith("query") && !cacheName.equals(DEF_TIMESTAMPS_RESOURCE)) {
configurationBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).useSynchronization(true);
}
} else {
configurationBuilder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
}
if (cacheMode != null) {
if (configurationBuilder.clustering().cacheMode().isInvalidation()) {
configurationBuilder.clustering().cacheMode(cacheMode);
}
}
}
@Override
public long nextTimestamp() {
if (timeService == null) {
return super.nextTimestamp();
} else {
return timeService.wallClockTime();
}
}
}

View File

@ -0,0 +1,26 @@
package org.hibernate.test.cache.infinispan.util;
import org.infinispan.util.DefaultTimeService;
import java.util.concurrent.TimeUnit;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TestTimeService extends DefaultTimeService {
private long time = super.wallClockTime();
@Override
public long wallClockTime() {
return time;
}
@Override
public long time() {
return TimeUnit.MILLISECONDS.toNanos(time);
}
public void advance(long millis) {
time += millis;
}
}