HHH-10101 Implement nonstrict-read-write mode in Infinispan 2LC

* requires non-transactional cache in repl/dist/local mode and versioned entities
This commit is contained in:
Radim Vansa 2015-09-14 11:17:09 +02:00 committed by Galder Zamarreño
parent b2c9724905
commit 282893605c
49 changed files with 1548 additions and 764 deletions

View File

@ -0,0 +1,188 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.context.Flag;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
/**
* Access delegate that relaxes the consistency a bit: stale reads are prohibited only after the transaction
* commits. This should also be able to work with async caches, and that would allow the replication delay
* even after the commit.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class NonStrictAccessDelegate implements AccessDelegate {
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
private final BaseTransactionalDataRegion region;
private final AdvancedCache cache;
private final AdvancedCache writeCache;
private final AdvancedCache putFromLoadCache;
private final Comparator versionComparator;
public NonStrictAccessDelegate(BaseTransactionalDataRegion region) {
this.region = region;
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("Nonstrict-read-write mode cannot use invalidation.");
}
if (configuration.transaction().transactionMode().isTransactional()) {
throw new IllegalArgumentException("Currently transactional caches are not supported.");
}
this.versionComparator = region.getCacheDataDescription().getVersionComparator();
if (versionComparator == null) {
throw new IllegalArgumentException("This strategy requires versioned entities/collections but region " + region.getName() + " contains non-versioned data!");
}
}
@Override
public Object get(SessionImplementor session, Object key, long txTimestamp) throws CacheException {
if (txTimestamp < region.getLastRegionInvalidation() ) {
return null;
}
Object value = cache.get(key);
if (value instanceof VersionedEntry) {
return ((VersionedEntry) value).getValue();
}
return value;
}
@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version) {
return putFromLoad(session, key, value, txTimestamp, version, false);
}
@Override
public boolean putFromLoad(SessionImplementor session, Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride) throws CacheException {
long lastRegionInvalidation = region.getLastRegionInvalidation();
if (txTimestamp < lastRegionInvalidation) {
log.tracef("putFromLoad not executed since tx started at %d, before last region invalidation finished = %d", txTimestamp, lastRegionInvalidation);
return false;
}
assert version != null;
if (minimalPutOverride) {
Object prev = cache.get(key);
if (prev != null) {
Object oldVersion = getVersion(prev);
if (oldVersion != null) {
if (versionComparator.compare(version, oldVersion) <= 0) {
return false;
}
}
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
return false;
}
}
}
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
// when it is present in the container. TombstoneCallInterceptor will deal with this.
if (!(value instanceof CacheEntry)) {
value = new VersionedEntry(value, version, txTimestamp);
}
putFromLoadCache.put(key, value);
return true;
}
@Override
public boolean insert(SessionImplementor session, Object key, Object value, Object version) throws CacheException {
return false;
}
@Override
public boolean update(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion) throws CacheException {
return false;
}
@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
Object value = cache.get(key);
Object version = getVersion(value);
// there's no 'afterRemove', so we have to use our own synchronization
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
}
@Override
public void removeAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}
@Override
public void evict(Object key) throws CacheException {
writeCache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
@Override
public void evictAll() throws CacheException {
region.beginInvalidation();
try {
Caches.broadcastEvictAll(cache);
}
finally {
region.endInvalidation();
}
}
@Override
public void unlockItem(SessionImplementor session, Object key) throws CacheException {
}
@Override
public boolean afterInsert(SessionImplementor session, Object key, Object value, Object version) {
writeCache.put(key, getVersioned(value, version, session.getTimestamp()));
return true;
}
@Override
public boolean afterUpdate(SessionImplementor session, Object key, Object value, Object currentVersion, Object previousVersion, SoftLock lock) {
writeCache.put(key, getVersioned(value, currentVersion, session.getTimestamp()));
return true;
}
protected Object getVersion(Object value) {
if (value instanceof CacheEntry) {
return ((CacheEntry) value).getVersion();
}
else if (value instanceof VersionedEntry) {
return ((VersionedEntry) value).getVersion();
}
return null;
}
protected Object getVersioned(Object value, Object version, long timestamp) {
assert value != null;
assert version != null;
return new VersionedEntry(value, version, timestamp);
}
}

View File

@ -0,0 +1,43 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.InvocationAfterCompletion;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import java.util.concurrent.TimeUnit;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class RemovalSynchronization extends InvocationAfterCompletion {
private final BaseTransactionalDataRegion region;
private final Object key;
private final Object version;
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) {
super(tc, cache, requiresTransaction);
this.region = region;
this.key = key;
this.version = version;
}
@Override
protected void invoke(boolean success, AdvancedCache cache) {
if (success) {
if (version == null) {
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
else {
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
}
}
}

View File

@ -0,0 +1,160 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.SizeCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.CloseableIterable;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.NullValueConverter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Comparator;
import java.util.Set;
import java.util.UUID;
/**
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
* and {@link org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion}
*
* The behaviour here also breaks notifications, which are not used for 2LC caches.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class VersionedCallInterceptor extends CallInterceptor {
private final Comparator<Object> versionComparator;
private AdvancedCache cache;
public VersionedCallInterceptor(Comparator<Object> versionComparator) {
this.versionComparator = versionComparator;
}
@Inject
public void injectDependencies(AdvancedCache cache) {
this.cache = cache;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
MVCCEntry e = (MVCCEntry) ctx.lookupEntry(command.getKey());
if (e == null) {
return null;
}
Object oldValue = e.getValue();
Object oldVersion = null;
long oldTimestamp = Long.MIN_VALUE;
if (oldValue instanceof VersionedEntry) {
oldVersion = ((VersionedEntry) oldValue).getVersion();
oldTimestamp = ((VersionedEntry) oldValue).getTimestamp();
oldValue = ((VersionedEntry) oldValue).getValue();
}
else if (oldValue instanceof org.hibernate.cache.spi.entry.CacheEntry) {
oldVersion = ((org.hibernate.cache.spi.entry.CacheEntry) oldValue).getVersion();
}
Object newValue = command.getValue();
Object newVersion = null;
long newTimestamp = Long.MIN_VALUE;
Object actualNewValue = newValue;
boolean isRemoval = false;
if (newValue instanceof VersionedEntry) {
VersionedEntry ve = (VersionedEntry) newValue;
newVersion = ve.getVersion();
newTimestamp = ve.getTimestamp();
if (ve.getValue() == null) {
isRemoval = true;
}
else if (ve.getValue() instanceof org.hibernate.cache.spi.entry.CacheEntry) {
actualNewValue = ve.getValue();
}
}
else if (newValue instanceof org.hibernate.cache.spi.entry.CacheEntry) {
newVersion = ((org.hibernate.cache.spi.entry.CacheEntry) newValue).getVersion();
}
if (newVersion == null) {
// eviction or post-commit removal: we'll store it with given timestamp
setValue(e, newValue);
return null;
}
if (oldVersion == null) {
assert oldValue == null || oldTimestamp != Long.MIN_VALUE;
if (newTimestamp == Long.MIN_VALUE) {
// remove, knowing the version
setValue(e, newValue);
}
else if (newTimestamp <= oldTimestamp) {
// either putFromLoad or regular update/insert - in either case this update might come
// when it was evicted/region-invalidated. In both cases, with old timestamp we'll leave
// the invalid value
assert oldValue == null;
}
else {
setValue(e, newValue);
}
return null;
}
int compareResult = versionComparator.compare(newVersion, oldVersion);
if (isRemoval && compareResult >= 0) {
setValue(e, newValue);
}
else if (compareResult > 0) {
setValue(e, actualNewValue);
}
return null;
}
private Object setValue(MVCCEntry e, Object value) {
if (e.isRemoved()) {
e.setRemoved(false);
e.setCreated(true);
e.setValid(true);
}
else {
e.setChanged(true);
}
return e.setValue(value);
}
private void removeValue(MVCCEntry e) {
e.setRemoved(true);
e.setChanged(true);
e.setCreated(false);
e.setValid(false);
e.setValue(null);
}
@Override
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable {
Set<Flag> flags = command.getFlags();
int size = 0;
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null);
// In non-transactional caches we don't care about context
CloseableIterable<CacheEntry<Object, Void>> iterable = decoratedCache
.filterEntries(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).converter(NullValueConverter.getInstance());
try {
for (CacheEntry<Object, Void> entry : iterable) {
if (size++ == Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
}
}
finally {
iterable.close();
}
return size;
}
}

View File

@ -8,7 +8,6 @@ package org.hibernate.cache.infinispan.collection;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CacheKeysFactory;
@ -47,14 +46,7 @@ public class CollectionRegionImpl extends BaseTransactionalDataRegion implements
@Override
public CollectionRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {
checkAccessType( accessType );
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
case READ_WRITE:
case TRANSACTIONAL:
return new CollectionAccess( this, accessDelegate );
default:
throw new CacheException( "Unsupported access type [" + accessType.getExternalName() + "]" );
}
AccessDelegate accessDelegate = createAccessDelegate(accessType);
return new CollectionAccess( this, accessDelegate );
}
}

View File

@ -47,18 +47,12 @@ public class EntityRegionImpl extends BaseTransactionalDataRegion implements Ent
@Override
public EntityRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {
checkAccessType(accessType);
if ( !getCacheDataDescription().isMutable() ) {
accessType = AccessType.READ_ONLY;
AccessDelegate accessDelegate = createAccessDelegate(accessType);
if ( accessType == AccessType.READ_ONLY || !getCacheDataDescription().isMutable() ) {
return new ReadOnlyAccess( this, accessDelegate );
}
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
return new ReadOnlyAccess( this, accessDelegate);
case READ_WRITE:
case TRANSACTIONAL:
return new ReadWriteAccess( this, accessDelegate);
default:
throw new CacheException( "Unsupported access type [" + accessType.getExternalName() + "]" );
else {
return new ReadWriteAccess( this, accessDelegate );
}
}
}

View File

@ -47,8 +47,6 @@ public abstract class BaseRegion implements Region {
protected volatile long lastRegionInvalidation = Long.MIN_VALUE;
protected int invalidations = 0;
private PutFromLoadValidator validator;
/**
* Base region constructor.
*
@ -241,13 +239,6 @@ public abstract class BaseRegion implements Region {
}
}
protected synchronized PutFromLoadValidator getValidator() {
if (validator == null) {
validator = new PutFromLoadValidator(cache);
}
return validator;
}
protected void runInvalidation(boolean inTransaction) {
// If we're running inside a transaction, we need to remove elements one-by-one
// to clean the context as well (cache.clear() does not do that).

View File

@ -8,23 +8,29 @@ package org.hibernate.cache.infinispan.impl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.AccessDelegate;
import org.hibernate.cache.infinispan.access.NonStrictAccessDelegate;
import org.hibernate.cache.infinispan.access.NonTxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.TombstoneAccessDelegate;
import org.hibernate.cache.infinispan.access.TombstoneCallInterceptor;
import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.VersionedCallInterceptor;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CacheKeysFactory;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.TransactionalDataRegion;
import org.hibernate.cache.spi.access.AccessType;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.util.logging.Log;
@ -32,8 +38,10 @@ import org.infinispan.util.logging.LogFactory;
import javax.transaction.TransactionManager;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Support for Inifinispan {@link org.hibernate.cache.spi.TransactionalDataRegion} implementors.
@ -47,10 +55,17 @@ public abstract class BaseTransactionalDataRegion
private static final Log log = LogFactory.getLog( BaseTransactionalDataRegion.class );
private final CacheDataDescription metadata;
private final CacheKeysFactory cacheKeysFactory;
private final boolean requiresTransaction;
protected final boolean useTombstones;
protected final long tombstoneExpiration;
protected final boolean requiresTransaction;
private long tombstoneExpiration;
private PutFromLoadValidator validator;
private AccessType accessType;
private Strategy strategy;
protected enum Strategy {
VALIDATION, TOMBSTONES, VERSIONED_ENTRIES
}
/**
* Base transactional region constructor
@ -70,25 +85,10 @@ public abstract class BaseTransactionalDataRegion
this.cacheKeysFactory = cacheKeysFactory;
Configuration configuration = cache.getCacheConfiguration();
CacheMode cacheMode = configuration.clustering().cacheMode();
useTombstones = cacheMode.isDistributed() || cacheMode.isReplicated();
// TODO: make these timeouts configurable
tombstoneExpiration = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
requiresTransaction = configuration.transaction().transactionMode().isTransactional()
&& !configuration.transaction().autoCommit();
if (useTombstones) {
if (configuration.eviction().maxEntries() >= 0) {
log.warn("Setting eviction on cache using tombstones can introduce inconsistencies!");
}
cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(tombstoneExpiration);
cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, TombstoneCallInterceptor.class);
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size());
}
// TODO: make these timeouts configurable
tombstoneExpiration = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
}
@Override
@ -100,21 +100,77 @@ public abstract class BaseTransactionalDataRegion
return cacheKeysFactory;
}
protected AccessDelegate createAccessDelegate() {
protected synchronized AccessDelegate createAccessDelegate(AccessType accessType) {
if (accessType == null) {
throw new IllegalArgumentException();
}
if (this.accessType != null && !this.accessType.equals(accessType)) {
throw new IllegalStateException("This region was already set up for " + this.accessType + ", cannot use using " + accessType);
}
this.accessType = accessType;
CacheMode cacheMode = cache.getCacheConfiguration().clustering().cacheMode();
if (accessType == AccessType.NONSTRICT_READ_WRITE) {
prepareForVersionedEntries();
return new NonStrictAccessDelegate(this);
}
if (cacheMode.isDistributed() || cacheMode.isReplicated()) {
prepareForTombstones();
return new TombstoneAccessDelegate(this);
}
else {
prepareForValidation();
if (cache.getCacheConfiguration().transaction().transactionMode().isTransactional()) {
return new TxInvalidationCacheAccessDelegate(this, getValidator());
return new TxInvalidationCacheAccessDelegate(this, validator);
}
else {
return new NonTxInvalidationCacheAccessDelegate(this, getValidator());
return new NonTxInvalidationCacheAccessDelegate(this, validator);
}
}
}
protected void prepareForValidation() {
if (strategy != null) {
assert strategy == Strategy.VALIDATION;
return;
}
validator = new PutFromLoadValidator(cache);
strategy = Strategy.VALIDATION;
}
protected void prepareForVersionedEntries() {
if (strategy != null) {
assert strategy == Strategy.VERSIONED_ENTRIES;
return;
}
cache.removeInterceptor(CallInterceptor.class);
VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(metadata.getVersionComparator());
cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, VersionedCallInterceptor.class);
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size());
strategy = Strategy.VERSIONED_ENTRIES;
}
private void prepareForTombstones() {
if (strategy != null) {
assert strategy == Strategy.TOMBSTONES;
return;
}
Configuration configuration = cache.getCacheConfiguration();
if (configuration.eviction().maxEntries() >= 0) {
log.warn("Setting eviction on cache using tombstones can introduce inconsistencies!");
}
cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(tombstoneExpiration);
cache.getComponentRegistry().registerComponent(tombstoneCallInterceptor, TombstoneCallInterceptor.class);
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
cache.addInterceptor(tombstoneCallInterceptor, interceptorChain.size());
strategy = Strategy.TOMBSTONES;
}
public long getTombstoneExpiration() {
return tombstoneExpiration;
}
@ -125,10 +181,23 @@ public abstract class BaseTransactionalDataRegion
@Override
protected void runInvalidation(boolean inTransaction) {
if (!useTombstones) {
super.runInvalidation(inTransaction);
if (strategy == null) {
return;
}
switch (strategy) {
case VALIDATION:
super.runInvalidation(inTransaction);
return;
case TOMBSTONES:
removeEntries(inTransaction, Tombstone.EXCLUDE_TOMBSTONES);
return;
case VERSIONED_ENTRIES:
removeEntries(inTransaction, VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE);
return;
}
}
private void removeEntries(boolean inTransaction, KeyValueFilter filter) {
// If the transaction is required, we simply need it -> will create our own
boolean startedTx = false;
if ( !inTransaction && requiresTransaction) {
@ -144,11 +213,19 @@ public abstract class BaseTransactionalDataRegion
try {
AdvancedCache localCache = Caches.localCache(cache);
CloseableIterator<CacheEntry> it = Caches.entrySet(localCache, Tombstone.EXCLUDE_TOMBSTONES).iterator();
long now = nextTimestamp();
try {
while (it.hasNext()) {
// Cannot use it.next(); it.remove() due to ISPN-5653
CacheEntry entry = it.next();
localCache.remove(entry.getKey(), entry.getValue());
switch (strategy) {
case TOMBSTONES:
localCache.remove(entry.getKey(), entry.getValue());
break;
case VERSIONED_ENTRIES:
localCache.put(entry.getKey(), new VersionedEntry(null, null, now), tombstoneExpiration, TimeUnit.MILLISECONDS);
break;
}
}
}
finally {
@ -169,12 +246,36 @@ public abstract class BaseTransactionalDataRegion
@Override
public Map toMap() {
if (useTombstones) {
AdvancedCache localCache = Caches.localCache(cache);
return Caches.entrySet(localCache, Tombstone.EXCLUDE_TOMBSTONES, FutureUpdate.VALUE_EXTRACTOR).toMap();
if (strategy == null) {
return Collections.EMPTY_MAP;
}
else {
return super.toMap();
switch (strategy) {
case VALIDATION:
return super.toMap();
case TOMBSTONES:
return Caches.entrySet(Caches.localCache(cache), Tombstone.EXCLUDE_TOMBSTONES, FutureUpdate.VALUE_EXTRACTOR).toMap();
case VERSIONED_ENTRIES:
return Caches.entrySet(Caches.localCache(cache), VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE, VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).toMap();
default:
throw new IllegalStateException(strategy.toString());
}
}
@Override
public boolean contains(Object key) {
if (!checkValid()) {
return false;
}
Object value = cache.get(key);
if (value instanceof Tombstone) {
return false;
}
if (value instanceof FutureUpdate) {
return ((FutureUpdate) value).getValue() != null;
}
if (value instanceof VersionedEntry) {
return ((VersionedEntry) value).getValue() != null;
}
return value != null;
}
}

View File

@ -28,16 +28,16 @@ import javax.transaction.TransactionManager;
public class NaturalIdRegionImpl extends BaseTransactionalDataRegion
implements NaturalIdRegion {
/**
* Constructor for the natural id region.
*
* @param cache instance to store natural ids
* @param name of natural id region
/**
* Constructor for the natural id region.
*
* @param cache instance to store natural ids
* @param name of natural id region
* @param transactionManager
* @param metadata for the natural id region
* @param factory for the natural id region
* @param metadata for the natural id region
* @param factory for the natural id region
* @param cacheKeysFactory factory for cache keys
*/
*/
public NaturalIdRegionImpl(
AdvancedCache cache, String name, TransactionManager transactionManager,
CacheDataDescription metadata, RegionFactory factory, CacheKeysFactory cacheKeysFactory) {
@ -47,18 +47,12 @@ public class NaturalIdRegionImpl extends BaseTransactionalDataRegion
@Override
public NaturalIdRegionAccessStrategy buildAccessStrategy(AccessType accessType) throws CacheException {
checkAccessType( accessType );
if (!getCacheDataDescription().isMutable()) {
accessType = AccessType.READ_ONLY;
AccessDelegate accessDelegate = createAccessDelegate(accessType);
if ( accessType == AccessType.READ_ONLY || !getCacheDataDescription().isMutable() ) {
return new ReadOnlyAccess( this, accessDelegate );
}
AccessDelegate accessDelegate = createAccessDelegate();
switch ( accessType ) {
case READ_ONLY:
return new ReadOnlyAccess( this, accessDelegate );
case READ_WRITE:
case TRANSACTIONAL:
return new ReadWriteAccess( this, accessDelegate );
default:
throw new CacheException( "Unsupported access type [" + accessType.getExternalName() + "]" );
else {
return new ReadWriteAccess( this, accessDelegate );
}
}
}

View File

@ -26,6 +26,8 @@ public class Externalizers {
public final static int TOMBSTONE_UPDATE = 1203;
public final static int FUTURE_UPDATE = 1204;
public final static int VALUE_EXTRACTOR = 1205;
public final static int VERSIONED_ENTRY = 1206;
public final static int EXCLUDE_EMPTY_EXTRACT_VALUE = 1207;
public final static AdvancedExternalizer[] ALL_EXTERNALIZERS = new AdvancedExternalizer[] {
new UUIDExternalizer(),
@ -33,7 +35,9 @@ public class Externalizers {
new Tombstone.ExcludeTombstonesFilterExternalizer(),
new TombstoneUpdate.Externalizer(),
new FutureUpdate.Externalizer(),
new FutureUpdate.ValueExtractorExternalizer()
new FutureUpdate.ValueExtractorExternalizer(),
new VersionedEntry.Externalizer(),
new VersionedEntry.ExcludeEmptyExtractValueExternalizer()
};
public static class UUIDExternalizer implements AdvancedExternalizer<UUID> {

View File

@ -0,0 +1,122 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.infinispan.commons.marshall.AdvancedExternalizer;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.metadata.Metadata;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class VersionedEntry {
public static final ExcludeEmptyFilter EXCLUDE_EMPTY_EXTRACT_VALUE = new ExcludeEmptyFilter();
private final Object value;
private final Object version;
private final long timestamp;
public VersionedEntry(Object value, Object version, long timestamp) {
this.value = value;
this.version = version;
this.timestamp = timestamp;
}
public Object getValue() {
return value;
}
public Object getVersion() {
return version;
}
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("VersionedEntry{");
sb.append("value=").append(value);
sb.append(", version=").append(version);
sb.append(", timestamp=").append(timestamp);
sb.append('}');
return sb.toString();
}
private static class ExcludeEmptyFilter implements KeyValueFilter<Object, Object>, Converter<Object, Object, Object> {
@Override
public boolean accept(Object key, Object value, Metadata metadata) {
if (value instanceof VersionedEntry) {
return ((VersionedEntry) value).getValue() != null;
}
return true;
}
@Override
public Object convert(Object key, Object value, Metadata metadata) {
if (value instanceof VersionedEntry) {
return ((VersionedEntry) value).getValue();
}
return value;
}
}
public static class Externalizer implements AdvancedExternalizer<VersionedEntry> {
@Override
public Set<Class<? extends VersionedEntry>> getTypeClasses() {
return Collections.<Class<? extends VersionedEntry>>singleton(VersionedEntry.class);
}
@Override
public Integer getId() {
return Externalizers.VERSIONED_ENTRY;
}
@Override
public void writeObject(ObjectOutput output, VersionedEntry object) throws IOException {
output.writeObject(object.value);
output.writeObject(object.version);
output.writeLong(object.timestamp);
}
@Override
public VersionedEntry readObject(ObjectInput input) throws IOException, ClassNotFoundException {
Object value = input.readObject();
Object version = input.readObject();
long timestamp = input.readLong();
return new VersionedEntry(value, version, timestamp);
}
}
public static class ExcludeEmptyExtractValueExternalizer implements AdvancedExternalizer<ExcludeEmptyFilter> {
@Override
public Set<Class<? extends ExcludeEmptyFilter>> getTypeClasses() {
return Collections.<Class<? extends ExcludeEmptyFilter>>singleton(ExcludeEmptyFilter.class);
}
@Override
public Integer getId() {
return Externalizers.EXCLUDE_EMPTY_EXTRACT_VALUE;
}
@Override
public void writeObject(ObjectOutput output, ExcludeEmptyFilter object) throws IOException {
}
@Override
public ExcludeEmptyFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return EXCLUDE_EMPTY_EXTRACT_VALUE;
}
}
}

View File

@ -46,8 +46,6 @@ public abstract class AbstractExtraAPITest<S extends RegionAccessStrategy> exten
protected abstract S getAccessStrategy();
protected abstract AccessType getAccessType();
@After
public final void releaseLocalAccessStrategy() throws Exception {
if ( environment != null ) {

View File

@ -15,6 +15,7 @@ import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.transaction.jta.platform.spi.JtaPlatform;
import org.hibernate.resource.transaction.spi.TransactionStatus;
@ -59,13 +60,29 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
}
@CustomParameterized.Order(1)
@Parameterized.Parameters(name = "{2}")
@Parameterized.Parameters(name = "{2},{3}")
public List<Object[]> getCacheModeParameters() {
ArrayList<Object[]> modes = new ArrayList<>();
modes.add(new Object[] { CacheMode.INVALIDATION_SYNC });
if (!useTransactionalCache()) {
modes.add(new Object[]{CacheMode.REPL_SYNC});
modes.add(new Object[]{CacheMode.DIST_SYNC});
for (AccessType accessType : new AccessType[] {
AccessType.TRANSACTIONAL,
AccessType.READ_ONLY,
AccessType.READ_WRITE
}) {
modes.add(new Object[]{CacheMode.INVALIDATION_SYNC, accessType});
}
for (AccessType accessType : new AccessType[] {
AccessType.READ_ONLY,
AccessType.READ_WRITE,
AccessType.NONSTRICT_READ_WRITE
}) {
modes.add(new Object[]{CacheMode.REPL_SYNC, accessType});
modes.add(new Object[]{CacheMode.DIST_SYNC, accessType});
if (canUseLocalMode()) {
modes.add(new Object[]{CacheMode.LOCAL, accessType});
}
}
if (canUseLocalMode()) {
modes.add(new Object[]{CacheMode.LOCAL, AccessType.TRANSACTIONAL});
}
return modes;
}
@ -79,6 +96,10 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
@Parameterized.Parameter(2)
public CacheMode cacheMode;
@Parameterized.Parameter(3)
public AccessType accessType;
public static final String REGION_PREFIX = "test";
private static final String PREFER_IPV4STACK = "java.net.preferIPv4Stack";
@ -114,6 +135,10 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
System.setProperty(JGROUPS_CFG_FILE, jgroupsCfgFile);
}
protected boolean canUseLocalMode() {
return true;
}
protected <T> T withTx(NodeEnvironment environment, SessionImplementor session, Callable<T> callable) throws Exception {
if (jtaPlatform != null) {
TransactionManager tm = environment.getServiceRegistry().getService(JtaPlatform.class).retrieveTransactionManager();
@ -178,7 +203,7 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
final StandardServiceRegistryBuilder ssrb = CacheTestUtil.buildBaselineStandardServiceRegistryBuilder(
REGION_PREFIX, getRegionFactoryClass(), true, false, jtaPlatform);
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, useTransactionalCache());
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, accessType == AccessType.TRANSACTIONAL);
ssrb.applySetting(TestInfinispanRegionFactory.CACHE_MODE, cacheMode);
return ssrb;
}
@ -186,8 +211,4 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
protected Class<? extends RegionFactory> getRegionFactoryClass() {
return TestInfinispanRegionFactory.class;
}
protected boolean useTransactionalCache() {
return false;
}
}

View File

@ -8,7 +8,6 @@ import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.internal.CacheDataDescriptionImpl;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.RegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.jdbc.connections.spi.JdbcConnectionAccess;
@ -84,7 +83,10 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
protected AssertionFailedError node1Failure;
protected AssertionFailedError node2Failure;
protected abstract AccessType getAccessType();
@Override
protected boolean canUseLocalMode() {
return false;
}
@Before
public void prepareResources() throws Exception {
@ -121,6 +123,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
protected SessionImplementor mockedSession() {
SessionMock session = mock(SessionMock.class);
when(session.isClosed()).thenReturn(false);
when(session.getTimestamp()).thenReturn(System.currentTimeMillis());
if (jtaPlatform == BatchModeJtaPlatform.class) {
BatchModeTransactionCoordinator txCoord = new BatchModeTransactionCoordinator();
when(session.getTransactionCoordinator()).thenReturn(txCoord);
@ -213,13 +216,19 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(0, localRegion.getCache().size());
assertEquals(0, remoteRegion.getCache().size());
assertNull("local is clean", localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s1 = mockedSession();
assertNull("local is clean", localAccessStrategy.get(s1, KEY, s1.getTimestamp()));
SessionImplementor s2 = mockedSession();
assertNull("remote is clean", remoteAccessStrategy.get(s2, KEY, s2.getTimestamp()));
localAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s3 = mockedSession();
localAccessStrategy.putFromLoad(s3, KEY, VALUE1, s3.getTimestamp(), 1);
SessionImplementor s4 = mockedSession();
assertEquals(VALUE1, localAccessStrategy.get(s4, KEY, s4.getTimestamp()));
SessionImplementor s5 = mockedSession();
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), new Integer(1));
SessionImplementor s6 = mockedSession();
assertEquals(VALUE1, remoteAccessStrategy.get(s6, KEY, s6.getTimestamp()));
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
@ -232,9 +241,11 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
return null;
});
assertNull(localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s7 = mockedSession();
assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp()));
assertEquals(0, localRegion.getCache().size());
assertNull(remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s8 = mockedSession();
assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp()));
assertEquals(0, remoteRegion.getCache().size());
}
@ -274,31 +285,25 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
}
}
@Test
public void testPutFromLoad() throws Exception {
putFromLoadTest( false );
}
@Test
public void testPutFromLoadMinimal() throws Exception {
putFromLoadTest( true );
}
protected abstract void putFromLoadTest(boolean useMinimalAPI) throws Exception;
protected abstract Object generateNextKey();
protected void evictOrRemoveAllTest(final boolean evict) throws Exception {
final Object KEY = generateNextKey();
assertEquals(0, localRegion.getCache().size());
assertEquals(0, remoteRegion.getCache().size());
assertNull("local is clean", localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
assertNull("remote is clean", remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s1 = mockedSession();
assertNull("local is clean", localAccessStrategy.get(s1, KEY, s1.getTimestamp()));
SessionImplementor s2 = mockedSession();
assertNull("remote is clean", remoteAccessStrategy.get(s2, KEY, s2.getTimestamp()));
localAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s3 = mockedSession();
localAccessStrategy.putFromLoad(s3, KEY, VALUE1, s3.getTimestamp(), 1);
SessionImplementor s4 = mockedSession();
assertEquals(VALUE1, localAccessStrategy.get(s4, KEY, s4.getTimestamp()));
SessionImplementor s5 = mockedSession();
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), 1);
SessionImplementor s6 = mockedSession();
assertEquals(VALUE1, remoteAccessStrategy.get(s6, KEY, s6.getTimestamp()));
// Wait for async propagation
sleep(250);
@ -314,27 +319,33 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
return null;
});
// This should re-establish the region root node in the optimistic case
assertNull(localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s7 = mockedSession();
assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp()));
assertEquals(0, localRegion.getCache().size());
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertNull(remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s8 = mockedSession();
assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp()));
assertEquals(0, remoteRegion.getCache().size());
// Wait for async propagation of EndInvalidationCommand before executing naked put
sleep(250);
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s9 = mockedSession();
remoteAccessStrategy.putFromLoad(s9, KEY, VALUE1, s9.getTimestamp(), 1);
SessionImplementor s10 = mockedSession();
assertEquals(VALUE1, remoteAccessStrategy.get(s10, KEY, s10.getTimestamp()));
assertEquals(1, remoteRegion.getCache().size());
// Wait for async propagation
sleep(250);
assertEquals((isUsingInvalidation() ? null : VALUE1), localAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
assertEquals(VALUE1, remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
SessionImplementor s11 = mockedSession();
assertEquals((isUsingInvalidation() ? null : VALUE1), localAccessStrategy.get(s11, KEY, s11.getTimestamp()));
SessionImplementor s12 = mockedSession();
assertEquals(VALUE1, remoteAccessStrategy.get(s12, KEY, s12.getTimestamp()));
}
protected class PutFromLoadNode2 extends Thread {
@ -355,11 +366,10 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(remoteEnvironment, session, () -> {
assertNull(remoteAccessStrategy.get(session, KEY, txTimestamp));
assertNull(remoteAccessStrategy.get(session, KEY, session.getTimestamp()));
// Let node1 write
writeLatch1.countDown();
@ -367,9 +377,9 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
writeLatch2.await();
if (useMinimalAPI) {
remoteAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, new Integer(1), true);
remoteAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1, true);
} else {
remoteAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, new Integer(1));
remoteAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1);
}
return null;
});

View File

@ -18,35 +18,9 @@ import static org.mockito.Mockito.mock;
* @author Galder Zamarreño
* @since 3.5
*/
public abstract class CollectionRegionAccessExtraAPITest extends AbstractExtraAPITest<CollectionRegionAccessStrategy> {
public class CollectionRegionAccessExtraAPITest extends AbstractExtraAPITest<CollectionRegionAccessStrategy> {
@Override
protected CollectionRegionAccessStrategy getAccessStrategy() {
return environment.getCollectionRegion( REGION_NAME, CACHE_DATA_DESCRIPTION).buildAccessStrategy( getAccessType() );
}
public static class Transactional extends CollectionRegionAccessExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
}
@Override
protected boolean useTransactionalCache() {
return true;
}
}
public static class ReadWrite extends CollectionRegionAccessExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_WRITE;
}
}
public static class ReadOnly extends CollectionRegionAccessExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_ONLY;
}
return environment.getCollectionRegion( REGION_NAME, CACHE_DATA_DESCRIPTION).buildAccessStrategy( accessType );
}
}

View File

@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
* @author Galder Zamarreño
* @since 3.5
*/
public abstract class AbstractCollectionRegionAccessStrategyTest extends
public class CollectionRegionAccessStrategyTest extends
AbstractRegionAccessStrategyTest<CollectionRegionImpl, CollectionRegionAccessStrategy> {
protected static int testCount;
@ -59,12 +59,9 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
@Override
protected CollectionRegionAccessStrategy getAccessStrategy(CollectionRegionImpl region) {
return region.buildAccessStrategy( getAccessType() );
return region.buildAccessStrategy( accessType );
}
@Test
public abstract void testCacheConfiguration();
@Test
public void testGetRegion() {
assertEquals( "Correct region", localRegion, localAccessStrategy.getRegion() );
@ -92,7 +89,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
SessionImplementor session = mockedSession();
delegate.putFromLoad(session, "k1", "v1", 0, null );
delegate.putFromLoad(session, "k1", "v1", session.getTimestamp(), null );
return null;
}
};
@ -163,7 +160,16 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
};
}
@Override
@Test
public void testPutFromLoad() throws Exception {
putFromLoadTest(false);
}
@Test
public void testPutFromLoadMinimal() throws Exception {
putFromLoadTest(true);
}
protected void putFromLoadTest(final boolean useMinimalAPI) throws Exception {
final Object KEY = generateNextKey();
@ -176,17 +182,16 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
assertNull(localAccessStrategy.get(session, KEY, txTimestamp));
assertNull(localAccessStrategy.get(session, KEY, session.getTimestamp()));
writeLatch1.await();
if (useMinimalAPI) {
localAccessStrategy.putFromLoad(session, KEY, VALUE2, txTimestamp, new Integer(2), true);
localAccessStrategy.putFromLoad(session, KEY, VALUE2, session.getTimestamp(), 2, true);
} else {
localAccessStrategy.putFromLoad(session, KEY, VALUE2, txTimestamp, new Integer(2));
localAccessStrategy.putFromLoad(session, KEY, VALUE2, session.getTimestamp(), 2);
}
return null;
});
@ -220,8 +225,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTest extends
long txTimestamp = System.currentTimeMillis();
assertEquals( VALUE2, localAccessStrategy.get(mockedSession(), KEY, txTimestamp ) );
Object remoteValue = remoteAccessStrategy.get(mockedSession(), KEY, txTimestamp);
SessionImplementor s1 = mockedSession();
assertEquals( VALUE2, localAccessStrategy.get(s1, KEY, s1.getTimestamp() ) );
SessionImplementor s2 = mockedSession();
Object remoteValue = remoteAccessStrategy.get(s2, KEY, s2.getTimestamp());
if (isUsingInvalidation()) {
assertEquals( VALUE1, remoteValue);
}

View File

@ -8,7 +8,6 @@ package org.hibernate.test.cache.infinispan.collection;
import java.util.Properties;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.CollectionRegion;
@ -19,24 +18,20 @@ import org.hibernate.cache.spi.access.CollectionRegionAccessStrategy;
import org.hibernate.test.cache.infinispan.AbstractEntityCollectionRegionTest;
import org.infinispan.AdvancedCache;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
/**
* @author Galder Zamarreño
*/
public class CollectionRegionImplTest extends AbstractEntityCollectionRegionTest {
protected static final String CACHE_NAME = "test";
@Override
protected void supportedAccessTypeTest(RegionFactory regionFactory, Properties properties) {
CollectionRegion region = regionFactory.buildCollectionRegion("test", properties, MUTABLE_NON_VERSIONED);
assertNotNull(region.buildAccessStrategy(AccessType.READ_ONLY));
assertNotNull(region.buildAccessStrategy(AccessType.READ_WRITE));
assertNotNull(region.buildAccessStrategy(AccessType.TRANSACTIONAL));
try {
region.buildAccessStrategy(AccessType.NONSTRICT_READ_WRITE);
fail("Incorrectly got NONSTRICT_READ_WRITE");
} catch (CacheException good) {
for (AccessType accessType : AccessType.values()) {
CollectionRegion region = regionFactory.buildCollectionRegion(CACHE_NAME, properties, MUTABLE_NON_VERSIONED);
assertNotNull(region.buildAccessStrategy(accessType));
((InfinispanRegionFactory) regionFactory).getCacheManager().removeCache(CACHE_NAME);
}
}

View File

@ -1,30 +0,0 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.collection;
import org.hibernate.cache.spi.access.AccessType;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Base class for tests of READ_ONLY access.
*
* @author <a href="brian.stansberry@jboss.com">Brian Stansberry</a>
*/
public class CollectionRegionReadOnlyAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_ONLY;
}
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue( isSynchronous() );
}
}

View File

@ -1,24 +0,0 @@
package org.hibernate.test.cache.infinispan.collection;
import org.hibernate.cache.spi.access.AccessType;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests read-write access
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class CollectionRegionReadWriteAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_WRITE;
}
@Override
public void testCacheConfiguration() {
assertFalse(isTransactional());
assertTrue(isSynchronous());
}
}

View File

@ -1,34 +0,0 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.collection;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import static org.junit.Assert.assertTrue;
/**
* Base class for tests of TRANSACTIONAL access.
*
* @author <a href="brian.stansberry@jboss.com">Brian Stansberry</a>
*/
public class CollectionRegionTransactionalAccessTest extends AbstractCollectionRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
}
@Override
protected boolean useTransactionalCache() {
return true;
}
@Override
public void testCacheConfiguration() {
assertTrue("Transactions", isTransactional());
assertTrue("Synchronous mode", isSynchronous());
}
}

View File

@ -11,6 +11,7 @@ import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
@ -18,7 +19,6 @@ import org.hibernate.test.cache.infinispan.AbstractRegionAccessStrategyTest;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.TestSynchronization;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -32,7 +32,7 @@ import static org.mockito.Mockito.mock;
* @author Galder Zamarreño
* @since 3.5
*/
public abstract class AbstractEntityRegionAccessStrategyTest extends
public class EntityRegionAccessStrategyTest extends
AbstractRegionAccessStrategyTest<EntityRegionImpl, EntityRegionAccessStrategy> {
protected static int testCount;
@ -48,11 +48,7 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
@Override
protected EntityRegionAccessStrategy getAccessStrategy(EntityRegionImpl region) {
return region.buildAccessStrategy(getAccessType());
}
@Test
public void testCacheConfiguration() {
return region.buildAccessStrategy( accessType );
}
@Test
@ -60,6 +56,24 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
assertEquals("Correct region", localRegion, localAccessStrategy.getRegion());
}
@Test
public void testPutFromLoad() throws Exception {
if (accessType == AccessType.READ_ONLY) {
putFromLoadTestReadOnly(false);
} else {
putFromLoadTest(false);
}
}
@Test
public void testPutFromLoadMinimal() throws Exception {
if (accessType == AccessType.READ_ONLY) {
putFromLoadTestReadOnly(true);
} else {
putFromLoadTest(true);
}
}
/**
* Simulate 2 nodes, both start, tx do a get, experience a cache miss, then
* 'read from db.' First does a putFromLoad, then an update. Second tries to
@ -83,20 +97,19 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
assertNull(localAccessStrategy.get(session, KEY, txTimestamp));
assertNull(localAccessStrategy.get(session, KEY, session.getTimestamp()));
writeLatch1.await();
if (useMinimalAPI) {
localAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, null, true);
localAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1, true);
} else {
localAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, null);
localAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1);
}
doUpdate(localAccessStrategy, session, KEY, VALUE2);
doUpdate(localAccessStrategy, session, KEY, VALUE2, 2);
return null;
});
} catch (Exception e) {
@ -124,9 +137,10 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals( VALUE2, localAccessStrategy.get(mockedSession(), KEY, txTimestamp));
Object remoteValue = remoteAccessStrategy.get(mockedSession(), KEY, txTimestamp);
SessionImplementor s1 = mockedSession();
assertEquals( VALUE2, localAccessStrategy.get(s1, KEY, s1.getTimestamp()));
SessionImplementor s2 = mockedSession();
Object remoteValue = remoteAccessStrategy.get(s2, KEY, s2.getTimestamp());
if (isUsingInvalidation()) {
// invalidation command invalidates pending put
assertNull(remoteValue);
@ -151,12 +165,11 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
assertNull("Correct initial value", localAccessStrategy.get(session, KEY, txTimestamp));
assertNull("Correct initial value", localAccessStrategy.get(session, KEY, session.getTimestamp()));
doInsert(localAccessStrategy, session, KEY, VALUE1);
doInsert(localAccessStrategy, session, KEY, VALUE1, 1);
readLatch.countDown();
commitLatch.await();
@ -178,12 +191,11 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
readLatch.await();
assertNull("Correct initial value", localAccessStrategy.get(session, KEY, txTimestamp));
assertNull("Correct initial value", localAccessStrategy.get(session, KEY, session.getTimestamp()));
return null;
});
} catch (Exception e) {
@ -207,26 +219,53 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE1, localAccessStrategy.get(mockedSession(), KEY, txTimestamp));
SessionImplementor s1 = mockedSession();
assertEquals("Correct node1 value", VALUE1, localAccessStrategy.get(s1, KEY, s1.getTimestamp()));
Object expected = isUsingInvalidation() ? null : VALUE1;
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(mockedSession(), KEY, txTimestamp));
SessionImplementor s2 = mockedSession();
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(s2, KEY, s2.getTimestamp()));
}
protected void doInsert(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, String value) {
protected void doInsert(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, String value, Object version) {
strategy.insert(session, key, value, null);
session.getTransactionCoordinator().getLocalSynchronizations().registerSynchronization(
new TestSynchronization.AfterInsert(strategy, session, key, value));
new TestSynchronization.AfterInsert(strategy, session, key, value, version));
}
protected void putFromLoadTestReadOnly(boolean minimal) throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
Object expected = isUsingInvalidation() ? null : VALUE1;
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
assertNull(localAccessStrategy.get(session, KEY, session.getTimestamp()));
if (minimal)
localAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1, true);
else
localAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1);
return null;
});
SessionImplementor s2 = mockedSession();
assertEquals(VALUE1, localAccessStrategy.get(s2, KEY, s2.getTimestamp()));
SessionImplementor s3 = mockedSession();
assertEquals(expected, remoteAccessStrategy.get(s3, KEY, s3.getTimestamp()));
}
@Test
public void testUpdate() throws Exception {
if (accessType == AccessType.READ_ONLY) {
return;
}
final Object KEY = generateNextKey();
// Set up initial state
localAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
remoteAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
SessionImplementor s1 = mockedSession();
localAccessStrategy.putFromLoad(s1, KEY, VALUE1, s1.getTimestamp(), 1);
SessionImplementor s2 = mockedSession();
remoteAccessStrategy.putFromLoad(s2, KEY, VALUE1, s2.getTimestamp(), 1);
// Let the async put propagate
sleep(250);
@ -236,16 +275,15 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
final CountDownLatch completionLatch = new CountDownLatch(2);
Thread updater = new Thread("testUpdate-updater") {
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
withTx(localEnvironment, mockedSession(), () -> {
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
log.debug("Transaction began, get initial value");
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(mockedSession(), KEY, txTimestamp));
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(session, KEY, session.getTimestamp()));
log.debug("Now update value");
doUpdate(AbstractEntityRegionAccessStrategyTest.this.localAccessStrategy, mockedSession(), KEY, VALUE2);
doUpdate(localAccessStrategy, session, KEY, VALUE2, 2);
log.debug("Notify the read latch");
readLatch.countDown();
log.debug("Await commit");
@ -268,20 +306,20 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
};
Thread reader = new Thread("testUpdate-reader") {
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
log.debug("Transaction began, await read latch");
readLatch.await();
log.debug("Read latch acquired, verify local access strategy");
// This won't block w/ mvc and will read the old value
Object expected = VALUE1;
assertEquals("Correct value", expected, localAccessStrategy.get(session, KEY, txTimestamp));
// This won't block w/ mvc and will read the old value (if transactional as the transaction
// is not being committed yet, or if non-strict as we do the actual update only after transaction)
// or null if non-transactional
Object expected = isTransactional() || accessType == AccessType.NONSTRICT_READ_WRITE ? VALUE1 : null;
assertEquals("Correct value", expected, localAccessStrategy.get(session, KEY, session.getTimestamp()));
return null;
});
} catch (Exception e) {
@ -307,25 +345,30 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(mockedSession(), KEY, txTimestamp));
SessionImplementor s3 = mockedSession();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(s3, KEY, s3.getTimestamp()));
Object expected = isUsingInvalidation() ? null : VALUE2;
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(mockedSession(), KEY, txTimestamp));
SessionImplementor s4 = mockedSession();
assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(s4, KEY, s4.getTimestamp()));
}
protected void doUpdate(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value) throws javax.transaction.RollbackException, javax.transaction.SystemException {
protected void doUpdate(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value, Object version) throws javax.transaction.RollbackException, javax.transaction.SystemException {
SoftLock softLock = strategy.lockItem(session, key, null);
strategy.update(session, key, value, null, null);
session.getTransactionCoordinator().getLocalSynchronizations().registerSynchronization(
new TestSynchronization.AfterUpdate(strategy, session, key, value, softLock));
new TestSynchronization.AfterUpdate(strategy, session, key, value, version, softLock));
}
@Test
public void testContestedPutFromLoad() throws Exception {
if (accessType == AccessType.READ_ONLY) {
return;
}
final Object KEY = TestingKeyFactory.generateEntityCacheKey(KEY_BASE + testCount++);
localAccessStrategy.putFromLoad(mockedSession(), KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
SessionImplementor s1 = mockedSession();
localAccessStrategy.putFromLoad(s1, KEY, VALUE1, s1.getTimestamp(), 1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final CountDownLatch pferCompletionLatch = new CountDownLatch(1);
@ -333,17 +376,14 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
final CountDownLatch completionLatch = new CountDownLatch(1);
Thread blocker = new Thread("Blocker") {
@Override
public void run() {
try {
SessionImplementor session = mockedSession();
long txTimestamp = System.currentTimeMillis();
withTx(localEnvironment, session, () -> {
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(session, KEY, txTimestamp));
assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(session, KEY, session.getTimestamp()));
doUpdate(localAccessStrategy, session, KEY, VALUE2);
doUpdate(localAccessStrategy, session, KEY, VALUE2, 2);
pferLatch.countDown();
commitLatch.await();
@ -361,15 +401,12 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
};
Thread putter = new Thread("Putter") {
@Override
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
localAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, new Integer(1));
localAccessStrategy.putFromLoad(session, KEY, VALUE1, session.getTimestamp(), 1);
return null;
});
} catch (Exception e) {
@ -394,7 +431,7 @@ public abstract class AbstractEntityRegionAccessStrategyTest extends
assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp));
SessionImplementor session = mockedSession();
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(session, KEY, session.getTimestamp()));
}
}

View File

@ -9,12 +9,9 @@ package org.hibernate.test.cache.infinispan.entity;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.test.cache.infinispan.AbstractExtraAPITest;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertEquals;
/**
* Tests for the "extra API" in EntityRegionAccessStrategy;.
@ -32,56 +29,23 @@ public class EntityRegionExtraAPITest extends AbstractExtraAPITest<EntityRegionA
@Override
protected EntityRegionAccessStrategy getAccessStrategy() {
return environment.getEntityRegion( REGION_NAME, CACHE_DATA_DESCRIPTION).buildAccessStrategy( getAccessType() );
}
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
return environment.getEntityRegion( REGION_NAME, CACHE_DATA_DESCRIPTION).buildAccessStrategy( accessType );
}
@Test
@SuppressWarnings( {"UnnecessaryBoxing"})
public void testAfterInsert() {
assertFalse("afterInsert always returns false", accessStrategy.afterInsert(SESSION, KEY, VALUE1, Integer.valueOf( 1 )));
boolean retval = accessStrategy.afterInsert(SESSION, KEY, VALUE1, Integer.valueOf( 1 ));
assertEquals(accessType == AccessType.NONSTRICT_READ_WRITE, retval);
}
@Test
@SuppressWarnings( {"UnnecessaryBoxing"})
public void testAfterUpdate() {
assertFalse("afterInsert always returns false", accessStrategy.afterUpdate(
SESSION, KEY, VALUE2, Integer.valueOf( 1 ), Integer.valueOf( 2 ), new MockSoftLock()));
}
public static class Transactional extends EntityRegionExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
}
@Override
protected boolean useTransactionalCache() {
return true;
}
}
public static class ReadWrite extends EntityRegionExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_WRITE;
}
}
public static class ReadOnly extends EntityRegionExtraAPITest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_ONLY;
}
@Test(expected = UnsupportedOperationException.class)
@Override
public void testAfterUpdate() {
accessStrategy.afterUpdate(null, KEY, VALUE2, 1, 2, new MockSoftLock());
if (accessType == AccessType.READ_ONLY) {
return;
}
boolean retval = accessStrategy.afterUpdate(SESSION, KEY, VALUE2, Integer.valueOf( 1 ), Integer.valueOf( 2 ), new MockSoftLock());
assertEquals(accessType == AccessType.NONSTRICT_READ_WRITE, retval);
}
}

View File

@ -28,17 +28,14 @@ import static org.junit.Assert.fail;
* @since 3.5
*/
public class EntityRegionImplTest extends AbstractEntityCollectionRegionTest {
protected static final String CACHE_NAME = "test";
@Override
protected void supportedAccessTypeTest(RegionFactory regionFactory, Properties properties) {
EntityRegion region = regionFactory.buildEntityRegion("test", properties, MUTABLE_NON_VERSIONED);
assertNotNull(region.buildAccessStrategy(AccessType.READ_ONLY));
assertNotNull(region.buildAccessStrategy(AccessType.READ_WRITE));
assertNotNull(region.buildAccessStrategy(AccessType.TRANSACTIONAL));
try {
region.buildAccessStrategy(AccessType.NONSTRICT_READ_WRITE);
fail("Incorrectly got NONSTRICT_READ_WRITE");
} catch (CacheException good) {
for (AccessType accessType : AccessType.values()) {
EntityRegion region = regionFactory.buildEntityRegion("test", properties, MUTABLE_NON_VERSIONED);
assertNotNull(region.buildAccessStrategy(accessType));
((InfinispanRegionFactory) regionFactory).getCacheManager().removeCache(CACHE_NAME);
}
}

View File

@ -1,69 +0,0 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.entity;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Base class for tests of TRANSACTIONAL access.
*
* @author Galder Zamarreño
* @since 3.5
*/
public class EntityRegionReadOnlyAccessTest extends AbstractEntityRegionAccessStrategyTest {
@Override
protected AccessType getAccessType() {
return AccessType.READ_ONLY;
}
protected void putFromLoadTest(boolean minimal) throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
Object expected = isUsingInvalidation() ? null : VALUE1;
long txTimestamp = System.currentTimeMillis();
SessionImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
assertNull(localAccessStrategy.get(session, KEY, System.currentTimeMillis()));
if (minimal)
localAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, 1, true);
else
localAccessStrategy.putFromLoad(session, KEY, VALUE1, txTimestamp, 1);
return null;
});
assertEquals(VALUE1, localAccessStrategy.get(session, KEY, System.currentTimeMillis()));
assertEquals(expected, remoteAccessStrategy.get(mockedSession(), KEY, System.currentTimeMillis()));
}
@Test(expected = UnsupportedOperationException.class)
@Override
public void testUpdate() throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
SessionImplementor session = mockedSession();
SoftLock softLock = localAccessStrategy.lockItem(session, KEY, null);
localAccessStrategy.update(session, KEY, VALUE2, 2, 1);
localAccessStrategy.unlockItem(session, KEY, softLock);
}
@Ignore
@Override
public void testContestedPutFromLoad() throws Exception {
}
}

View File

@ -1,40 +0,0 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.entity;
import org.hibernate.cache.spi.access.AccessType;
import org.jboss.logging.Logger;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* Base class for tests of TRANSACTIONAL access.
*
* @author Galder Zamarreño
* @since 3.5
*/
public class EntityRegionTransactionalAccessTest extends AbstractEntityRegionAccessStrategyTest {
private static final Logger log = Logger.getLogger( EntityRegionTransactionalAccessTest.class );
@Override
protected AccessType getAccessType() {
return AccessType.TRANSACTIONAL;
}
@Override
protected boolean useTransactionalCache() {
return true;
}
@Test
@Override
public void testCacheConfiguration() {
assertTrue(isTransactional());
assertTrue("Synchronous mode", isSynchronous());
}
}

View File

@ -11,12 +11,19 @@ import java.util.List;
import java.util.Map;
import org.hibernate.Session;
import org.hibernate.boot.Metadata;
import org.hibernate.boot.spi.MetadataImplementor;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.cfg.Environment;
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider;
import org.hibernate.engine.transaction.jta.platform.spi.JtaPlatform;
import org.hibernate.mapping.Column;
import org.hibernate.mapping.PersistentClass;
import org.hibernate.mapping.Property;
import org.hibernate.mapping.RootClass;
import org.hibernate.mapping.SimpleValue;
import org.hibernate.resource.transaction.TransactionCoordinatorBuilder;
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
@ -41,13 +48,15 @@ import org.junit.runners.Parameterized;
*/
@RunWith(CustomParameterized.class)
public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctionalTestCase {
protected static final Object[] TRANSACTIONAL = new Object[]{"transactional", JtaPlatformImpl.class, JtaTransactionCoordinatorBuilderImpl.class, XaConnectionProvider.class, AccessType.TRANSACTIONAL, true, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_WRITE_INVALIDATION = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_ONLY_INVALIDATION = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.INVALIDATION_SYNC };
protected static final Object[] READ_WRITE_REPLICATED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.REPL_SYNC };
protected static final Object[] READ_ONLY_REPLICATED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.REPL_SYNC };
protected static final Object[] READ_WRITE_DISTRIBUTED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.DIST_SYNC };
protected static final Object[] READ_ONLY_DISTRIBUTED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.DIST_SYNC };
protected static final Object[] TRANSACTIONAL = new Object[]{"transactional", JtaPlatformImpl.class, JtaTransactionCoordinatorBuilderImpl.class, XaConnectionProvider.class, AccessType.TRANSACTIONAL, true, CacheMode.INVALIDATION_SYNC, false };
protected static final Object[] READ_WRITE_INVALIDATION = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.INVALIDATION_SYNC, false };
protected static final Object[] READ_ONLY_INVALIDATION = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.INVALIDATION_SYNC, false };
protected static final Object[] READ_WRITE_REPLICATED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.REPL_SYNC, false };
protected static final Object[] READ_ONLY_REPLICATED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.REPL_SYNC, false };
protected static final Object[] READ_WRITE_DISTRIBUTED = new Object[]{"read-write", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_WRITE, false, CacheMode.DIST_SYNC, false };
protected static final Object[] READ_ONLY_DISTRIBUTED = new Object[]{"read-only", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.READ_ONLY, false, CacheMode.DIST_SYNC, false };
protected static final Object[] NONSTRICT_REPLICATED = new Object[]{"nonstrict", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.NONSTRICT_READ_WRITE, false, CacheMode.REPL_SYNC, true };
protected static final Object[] NONSTRICT_DISTRIBUTED = new Object[]{"nonstrict", null, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class, null, AccessType.NONSTRICT_READ_WRITE, false, CacheMode.DIST_SYNC, true };
// We need to use @ClassRule here since in @BeforeClassOnce startUp we're preparing the session factory,
// constructing CacheManager along - and there we check that the test has the name already set
@ -75,13 +84,16 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
@Parameterized.Parameter(value = 6)
public CacheMode cacheMode;
@Parameterized.Parameter(value = 7)
public boolean addVersions;
protected boolean useJta;
@CustomParameterized.Order(0)
@Parameterized.Parameters(name = "{0}, {6}")
public abstract List<Object[]> getParameters();
public List<Object[]> getParameters(boolean tx, boolean rw, boolean ro) {
public List<Object[]> getParameters(boolean tx, boolean rw, boolean ro, boolean nonstrict) {
ArrayList<Object[]> parameters = new ArrayList<>();
if (tx) {
parameters.add(TRANSACTIONAL);
@ -96,6 +108,10 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
parameters.add(READ_ONLY_REPLICATED);
parameters.add(READ_ONLY_DISTRIBUTED);
}
if (nonstrict) {
parameters.add(NONSTRICT_REPLICATED);
parameters.add(NONSTRICT_DISTRIBUTED);
}
return parameters;
}
@ -113,6 +129,36 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
};
}
@Override
protected void afterMetadataBuilt(Metadata metadata) {
if (addVersions) {
for (PersistentClass clazz : metadata.getEntityBindings()) {
if (clazz.getVersion() != null) {
continue;
}
try {
clazz.getMappedClass().getMethod("getVersion");
clazz.getMappedClass().getMethod("setVersion", long.class);
} catch (NoSuchMethodException e) {
continue;
}
RootClass rootClazz = clazz.getRootClass();
Property versionProperty = new Property();
versionProperty.setName("version");
SimpleValue value = new SimpleValue((MetadataImplementor) metadata, rootClazz.getTable());
value.setTypeName("long");
Column column = new Column();
column.setValue(value);
column.setName("version");
value.addColumn(column);
rootClazz.getTable().addColumn(column);
versionProperty.setValue(value);
rootClazz.setVersion(versionProperty);
rootClazz.addProperty(versionProperty);
}
}
}
@Override
public String getCacheConcurrencyStrategy() {
return accessType.getExternalName();

View File

@ -0,0 +1,178 @@
package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.Region;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.hibernate.testing.AfterClassOnce;
import org.hibernate.testing.BeforeClassOnce;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
/**
* Common base for TombstoneTest and VersionedTest
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public abstract class AbstractNonInvalidationTest extends SingleNodeTest {
protected static final long TIMEOUT = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
protected static final int WAIT_TIMEOUT = 2000;
protected static final TestTimeService TIME_SERVICE = new TestTimeService();
protected ExecutorService executor;
protected Log log = LogFactory.getLog(getClass());
protected AdvancedCache entityCache;
protected long itemId;
protected Region region;
@BeforeClassOnce
public void setup() {
executor = Executors.newCachedThreadPool(new ThreadFactory() {
AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Executor-" + counter.incrementAndGet());
}
});
}
@AfterClassOnce
public void shutdown() {
executor.shutdown();
}
@Override
protected void startUp() {
super.startUp();
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
}
@Before
public void insertAndClearCache() throws Exception {
Item item = new Item("my item", "Original item");
withTxSession(s -> s.persist(item));
entityCache.clear();
assertEquals("Cache is not empty", Collections.EMPTY_SET, Caches.keys(entityCache).toSet());
itemId = item.getId();
log.info("Insert and clear finished");
}
@After
public void cleanup() throws Exception {
withTxSession(s -> {
s.createQuery("delete from Item").executeUpdate();
});
}
protected Future<Boolean> removeFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return executor.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
s.delete(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> updateFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return executor.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
item.setDescription("Updated item");
s.update(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> evictWait(long id, CyclicBarrier loadBarrier, CountDownLatch preEvictLatch, CountDownLatch postEvictLatch) throws Exception {
return executor.submit(() -> {
try {
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
if (preEvictLatch != null) {
awaitOrThrow(preEvictLatch);
}
sessionFactory().getCache().evictEntity(Item.class, id);
} finally {
if (postEvictLatch != null) {
postEvictLatch.countDown();
}
}
return true;
});
}
protected void awaitOrThrow(CountDownLatch latch) throws InterruptedException, TimeoutException {
if (!latch.await(WAIT_TIMEOUT, TimeUnit.SECONDS)) {
throw new TimeoutException();
}
}
@Override
protected void addSettings(Map settings) {
super.addSettings(settings);
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
}
}

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -33,7 +32,7 @@ import static org.junit.Assert.assertNull;
public class BulkOperationsTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, false);
return getParameters(true, true, false, true);
}
@ClassRule

View File

@ -9,7 +9,6 @@ package org.hibernate.test.cache.infinispan.functional;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
@ -22,6 +21,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.hibernate.FlushMode;
import org.hibernate.LockMode;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
@ -63,7 +63,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, false);
return getParameters(true, true, false, true);
}
@Override
@ -276,6 +276,9 @@ public class ConcurrentWriteTest extends SingleNodeTest {
}
Contact contact = contacts.iterator().next();
// H2 version 1.3 (without MVCC fails with deadlock on Contacts/Customers modification, therefore,
// we have to enforce locking Contacts first
s.lock(contact, LockMode.PESSIMISTIC_WRITE);
contacts.remove( contact );
contact.setCustomer( null );

View File

@ -5,7 +5,6 @@ import org.hibernate.test.cache.infinispan.functional.entities.Name;
import org.hibernate.test.cache.infinispan.functional.entities.Person;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
@ -21,7 +20,7 @@ import static org.junit.Assert.assertTrue;
public class EqualityTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, true);
return getParameters(true, true, true, true);
}
@Override

View File

@ -29,7 +29,7 @@ public class ReadOnlyTest extends SingleNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(false, false, true);
return getParameters(false, false, true, true);
}
@Test

View File

@ -8,7 +8,6 @@ package org.hibernate.test.cache.infinispan.functional;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -49,7 +48,7 @@ import static org.junit.Assert.fail;
public class ReadWriteTest extends ReadOnlyTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, false);
return getParameters(true, true, false, true);
}
@Override
@ -393,13 +392,13 @@ public class ReadWriteTest extends ReadOnlyTest {
assertEquals( 0, slcs.getElementCountInMemory() );
assertEquals( 0, slcs.getEntries().size() );
ByRef<Item> itemRef = new ByRef<>(null);
ByRef<Long> idRef = new ByRef<>(null);
withTxSession(s -> {
Item item = new Item();
item.setName( "widget" );
item.setDescription( "A really top-quality, full-featured widget." );
s.persist( item );
itemRef.set(item);
idRef.set( item.getId() );
});
assertEquals( 1, slcs.getPutCount() );
@ -407,7 +406,7 @@ public class ReadWriteTest extends ReadOnlyTest {
assertEquals( 1, slcs.getEntries().size() );
withTxSession(s -> {
Item item = s.get( Item.class, itemRef.get().getId() );
Item item = s.get( Item.class, idRef.get() );
assertEquals( slcs.getHitCount(), 1 );
assertEquals( slcs.getMissCount(), 0 );
item.setDescription( "A bog standard item" );
@ -415,12 +414,15 @@ public class ReadWriteTest extends ReadOnlyTest {
assertEquals( slcs.getPutCount(), 2 );
CacheEntry entry = (CacheEntry) slcs.getEntries().get( itemRef.get().getId() );
CacheEntry entry = (CacheEntry) slcs.getEntries().get( idRef.get() );
Serializable[] ser = entry.getDisassembledState();
assertTrue( ser[0].equals( "widget" ) );
assertTrue( ser[1].equals( "A bog standard item" ) );
withTxSession(s -> s.delete(itemRef.get()));
withTxSession(s -> {
Item item = s.load(Item.class, idRef.get());
s.delete(item);
});
}
@Test

View File

@ -1,23 +1,9 @@
package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.Tombstone;
import org.hibernate.cache.spi.Region;
import org.hibernate.cache.spi.entry.StandardCacheEntryImpl;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
@ -26,14 +12,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@ -43,63 +23,13 @@ import static org.junit.Assert.*;
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class TombstoneTest extends SingleNodeTest {
private static Log log = LogFactory.getLog(TombstoneTest.class);
private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactory() {
AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, TombstoneTest.class.getSimpleName() + "-executor-" + counter.incrementAndGet());
}
});
private static final long TOMBSTONE_TIMEOUT = InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION.expiration().maxIdle();
private static final int WAIT_TIMEOUT = 2000;
private static final TestTimeService TIME_SERVICE = new TestTimeService();
private Region region;
private AdvancedCache entityCache;
private long itemId;
public class TombstoneTest extends AbstractNonInvalidationTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(READ_WRITE_REPLICATED, READ_WRITE_DISTRIBUTED);
}
@Override
protected void startUp() {
super.startUp();
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
}
@Before
public void insertAndClearCache() throws Exception {
Item item = new Item("my item", "item that belongs to me");
withTxSession(s -> s.persist(item));
entityCache.clear();
assertEquals("Cache is not empty", Collections.EMPTY_SET, Caches.keys(entityCache).toSet());
itemId = item.getId();
}
@After
public void cleanup() throws Exception {
withTxSession(s -> {
s.createQuery("delete from Item").executeUpdate();
});
}
@AfterClass
public static void shutdown() {
EXECUTOR.shutdown();
}
@Override
protected void addSettings(Map settings) {
super.addSettings(settings);
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
}
@Test
public void testTombstoneExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
@ -122,7 +52,7 @@ public class TombstoneTest extends SingleNodeTest {
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
@ -151,7 +81,7 @@ public class TombstoneTest extends SingleNodeTest {
Object value = contents.get(itemId);
if (value instanceof FutureUpdate) {
// DB did not blocked two concurrent updates
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertNull(entityCache.get(itemId));
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
@ -159,7 +89,7 @@ public class TombstoneTest extends SingleNodeTest {
// DB left only one update to proceed, and the entry should not be expired
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
}
@ -188,7 +118,7 @@ public class TombstoneTest extends SingleNodeTest {
assertEquals(1, contents.size());
assertEquals(Tombstone.class, contents.get(itemId).getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
@ -219,14 +149,14 @@ public class TombstoneTest extends SingleNodeTest {
Object value = contents.get(itemId);
if (removeSucceeded) {
assertEquals(Tombstone.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertNull(entityCache.get(itemId)); // force expiration
contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
} else {
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
}
@ -287,89 +217,8 @@ public class TombstoneTest extends SingleNodeTest {
Object value = contents.get(itemId);
assertNotNull(value);
assertEquals(StandardCacheEntryImpl.class, value.getClass());
TIME_SERVICE.advance(TOMBSTONE_TIMEOUT + 1);
TIME_SERVICE.advance(TIMEOUT + 1);
assertEquals(value, entityCache.get(itemId));
}
protected Future<Boolean> removeFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return EXECUTOR.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
s.delete(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> updateFlushWait(long id, CyclicBarrier loadBarrier, CountDownLatch preFlushLatch, CountDownLatch flushLatch, CountDownLatch commitLatch) throws Exception {
return EXECUTOR.submit(() -> withTxSessionApply(s -> {
try {
Item item = s.load(Item.class, id);
item.getName(); // force load & putFromLoad before the barrier
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
item.setDescription("Updated item");
s.update(item);
if (preFlushLatch != null) {
awaitOrThrow(preFlushLatch);
}
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} finally {
if (flushLatch != null) {
flushLatch.countDown();
}
}
awaitOrThrow(commitLatch);
return true;
}));
}
protected Future<Boolean> evictWait(long id, CyclicBarrier loadBarrier, CountDownLatch preEvictLatch, CountDownLatch postEvictLatch) throws Exception {
return EXECUTOR.submit(() -> {
try {
loadBarrier.await(WAIT_TIMEOUT, TimeUnit.SECONDS);
if (preEvictLatch != null) {
awaitOrThrow(preEvictLatch);
}
sessionFactory().getCache().evictEntity(Item.class, id);
} finally {
if (postEvictLatch != null) {
postEvictLatch.countDown();
}
}
return true;
});
}
protected void awaitOrThrow(CountDownLatch latch) throws InterruptedException, TimeoutException {
if (!latch.await(WAIT_TIMEOUT, TimeUnit.SECONDS)) {
throw new TimeoutException();
}
}
}

View File

@ -0,0 +1,245 @@
package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.Session;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.infinispan.commons.util.ByRef;
import org.junit.Test;
import javax.transaction.Synchronization;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static org.junit.Assert.*;
/**
* Tests specific to versioned entries -based caches.
* Similar to {@link TombstoneTest} but some cases have been removed since
* we are modifying the cache only once, therefore some sequences of operations
* would fail before touching the cache.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class VersionedTest extends AbstractNonInvalidationTest {
@Override
public List<Object[]> getParameters() {
return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED);
}
@Test
public void testTwoRemoves() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch flushLatch = new CountDownLatch(2);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = removeFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = removeFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
awaitOrThrow(flushLatch);
assertSingleCacheEntry();
commitLatch.countDown();
boolean firstResult = first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
boolean secondResult = second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
assertTrue(firstResult != secondResult);
assertSingleEmpty();
TIME_SERVICE.advance(TIMEOUT + 1);
assertEmptyCache();
}
@Test
public void testRemoveRolledBack() throws Exception {
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
s.delete(item);
assertSingleCacheEntry();
s.flush();
assertSingleCacheEntry();
markRollbackOnly(s);
});
assertSingleCacheEntry();
}
@Test
public void testUpdateRolledBack() throws Exception {
ByRef<Object> entryRef = new ByRef<>(null);
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
item.getDescription();
Object prevEntry = assertSingleCacheEntry();
entryRef.set(prevEntry);
item.setDescription("Updated item");
s.update(item);
assertEquals(prevEntry, assertSingleCacheEntry());
s.flush();
assertEquals(prevEntry, assertSingleCacheEntry());
markRollbackOnly(s);
});
assertEquals(entryRef.get(), assertSingleCacheEntry());
}
@Test
public void testStaleReadDuringUpdate() throws Exception {
ByRef<Object> entryRef = testStaleRead((s, item) -> {
item.setDescription("Updated item");
s.update(item);
});
assertNotEquals(entryRef.get(), assertSingleCacheEntry());
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
assertEquals("Updated item", item.getDescription());
});
}
@Test
public void testStaleReadDuringRemove() throws Exception {
testStaleRead((s, item) -> s.delete(item));
assertSingleEmpty();
withTxSession(s -> {
Item item = s.get(Item.class, itemId);
assertNull(item);
});
}
protected ByRef<Object> testStaleRead(BiConsumer<Session, Item> consumer) throws Exception {
AtomicReference<Exception> synchronizationException = new AtomicReference<>();
CountDownLatch syncLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> action = executor.submit(() -> withTxSessionApply(s -> {
try {
((SessionImplementor) s).getTransactionCoordinator().getLocalSynchronizations().registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
}
@Override
public void afterCompletion(int i) {
syncLatch.countDown();
try {
awaitOrThrow(commitLatch);
} catch (Exception e) {
synchronizationException.set(e);
}
}
});
Item item = s.load(Item.class, itemId);
consumer.accept(s, item);
s.flush();
} catch (StaleStateException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
} catch (PessimisticLockException e) {
log.info("Exception thrown: ", e);
markRollbackOnly(s);
return false;
}
return true;
}));
awaitOrThrow(syncLatch);
ByRef<Object> entryRef = new ByRef<>(null);
try {
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
assertEquals("Original item", item.getDescription());
entryRef.set(assertSingleCacheEntry());
});
} finally {
commitLatch.countDown();
}
assertTrue(action.get(WAIT_TIMEOUT, TimeUnit.SECONDS));
assertNull(synchronizationException.get());
return entryRef;
}
@Test
public void testUpdateEvictExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preEvictLatch = new CountDownLatch(1);
CountDownLatch postEvictLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = evictWait(itemId, loadBarrier, preEvictLatch, postEvictLatch);
awaitOrThrow(flushLatch);
assertSingleCacheEntry();
preEvictLatch.countDown();
awaitOrThrow(postEvictLatch);
assertSingleEmpty();
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
assertSingleEmpty();
TIME_SERVICE.advance(TIMEOUT + 1);
assertEmptyCache();
}
@Test
public void testEvictUpdateExpiration() throws Exception {
// since the timestamp for update is based on session open/tx begin time, we have to do this sequentially
sessionFactory().getCache().evictEntity(Item.class, itemId);
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
assertEquals(VersionedEntry.class, contents.get(itemId).getClass());
TIME_SERVICE.advance(1);
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
item.setDescription("Updated item");
s.update(item);
});
assertSingleCacheEntry();
TIME_SERVICE.advance(TIMEOUT + 1);
assertSingleCacheEntry();
}
protected void assertSingleEmpty() {
Map contents = Caches.entrySet(entityCache).toMap();
Object value;
assertEquals(1, contents.size());
value = contents.get(itemId);
assertEquals(VersionedEntry.class, value.getClass());
assertNull(((VersionedEntry) value).getValue());
}
protected void assertEmptyCache() {
assertNull(entityCache.get(itemId)); // force expiration
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(Collections.EMPTY_MAP, contents);
}
protected Object assertSingleCacheEntry() {
Map contents = Caches.entrySet(entityCache).toMap();
assertEquals(1, contents.size());
Object value = contents.get(itemId);
assertTrue(contents.toString(), value instanceof CacheEntry);
return value;
}
}

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -19,7 +18,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
import org.hibernate.testing.TestForIssue;
@ -63,7 +61,7 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, false);
return getParameters(true, true, false, true);
}
@Override

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@ -45,7 +44,7 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, true);
return getParameters(true, true, true, true);
}
@Override

View File

@ -1,24 +0,0 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
/**
* RepeatableSessionRefreshTest.
*
* @author Galder Zamarreño
* @since 3.5
*/
public class RepeatableSessionRefreshTest extends SessionRefreshTest {
private static final String CACHE_CONFIG = "entity-repeatable";
@Override
protected String getEntityCacheConfigName() {
return CACHE_CONFIG;
}
}

View File

@ -6,7 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -40,7 +39,7 @@ public class SessionRefreshTest extends DualNodeTest {
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, false);
return getParameters(true, true, false, true);
}
@Override

View File

@ -18,6 +18,7 @@ public class Account implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private long version;
private AccountHolder accountHolder;
private Integer balance;
private String branch;
@ -30,6 +31,14 @@ public class Account implements Serializable {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public AccountHolder getAccountHolder() {
return accountHolder;
}

View File

@ -12,6 +12,7 @@ import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.ManyToOne;
import javax.persistence.Transient;
import org.hibernate.annotations.NaturalId;
import org.hibernate.annotations.NaturalIdCache;
@ -25,6 +26,8 @@ public class Citizen {
@Id
@GeneratedValue
private Integer id;
@Transient
private long version;
private String firstname;
private String lastname;
@NaturalId
@ -42,6 +45,14 @@ public class Citizen {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public String getFirstname() {
return firstname;
}

View File

@ -18,6 +18,8 @@ public class Contact implements Serializable {
String name;
String tlf;
Customer customer;
// mapping added programmatically
long version;
public Integer getId() {
return id;
@ -51,6 +53,14 @@ public class Contact implements Serializable {
this.customer = customer;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
@Override
public boolean equals(Object o) {
if (o == this)

View File

@ -17,6 +17,8 @@ import java.util.Set;
public class Customer implements Serializable {
Integer id;
String name;
// mapping added programmatically
long version;
private transient Set<Contact> contacts;
@ -46,4 +48,12 @@ public class Customer implements Serializable {
public void setContacts(Set<Contact> contacts) {
this.contacts = contacts;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
}

View File

@ -14,45 +14,55 @@ import java.util.Set;
* @author Gavin King
*/
public class Item {
private Long id;
private String name;
private String description;
private Item owner;
private Set<Item> items = new HashSet<Item>( );
private Long id;
// mapping for version is added programmatically
private long version;
private String name;
private String description;
private Item owner;
private Set<Item> items = new HashSet<Item>( );
private Item bagOwner;
private List<Item> bagOfItems = new ArrayList<Item>( );
private Set<OtherItem> otherItems = new HashSet<OtherItem>( );
public Item() {}
public Item( String name, String description ) {
public Item() {}
public Item( String name, String description ) {
this.name = name;
this.description = description;
}
public String getDescription() {
return description;
}
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void setDescription(String description) {
this.description = description;
}
public Long getId() {
return id;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public long getVersion() {
return version;
}
public void setName(String name) {
this.name = name;
}
public void setVersion(long version) {
this.version = version;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Item getOwner() {
return owner;

View File

@ -13,6 +13,7 @@ import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.ManyToOne;
import javax.persistence.Transient;
@Entity
@NaturalIdCache
@ -23,14 +24,16 @@ import javax.persistence.ManyToOne;
* @author Hardy Ferentschik
*/
public class NaturalIdOnManyToOne {
@Id
@GeneratedValue
int id;
@Id
@GeneratedValue
int id;
@Transient
long version;
@NaturalId
@ManyToOne
Citizen citizen;
@NaturalId
@ManyToOne
Citizen citizen;
public int getId() {
return id;
@ -40,6 +43,14 @@ public class NaturalIdOnManyToOne {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public Citizen getCitizen() {
return citizen;
}

View File

@ -14,6 +14,8 @@ import java.util.List;
*/
public class OtherItem {
private Long id;
// mapping added programmatically
private long version;
private String name;
private Item favoriteItem;
private List<Item> bagOfItems = new ArrayList<Item>();
@ -29,6 +31,14 @@ public class OtherItem {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public String getName() {
return name;
}

View File

@ -2,6 +2,8 @@ package org.hibernate.test.cache.infinispan.functional.entities;
import javax.persistence.EmbeddedId;
import javax.persistence.Entity;
import javax.persistence.Transient;
import java.io.Serializable;
/**
@ -16,6 +18,9 @@ public class Person implements Serializable {
int age;
@Transient
long version;
public Person() {}
public Person(String firstName, String lastName, int age) {
@ -38,4 +43,12 @@ public class Person implements Serializable {
public void setAge(int age) {
this.age = age;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
}

View File

@ -10,6 +10,7 @@ package org.hibernate.test.cache.infinispan.functional.entities;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Transient;
/**
* @author Emmanuel Bernard
@ -19,6 +20,8 @@ public class State {
@Id
@GeneratedValue
private Integer id;
@Transient
private long version;
private String name;
public Integer getId() {
@ -29,6 +32,14 @@ public class State {
this.id = id;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public String getName() {
return name;
}

View File

@ -15,7 +15,6 @@ import org.hibernate.SessionFactory;
import org.hibernate.StaleObjectStateException;
import org.hibernate.StaleStateException;
import org.hibernate.Transaction;
import org.hibernate.annotations.CacheConcurrencyStrategy;
import org.hibernate.boot.Metadata;
import org.hibernate.boot.MetadataSources;
import org.hibernate.boot.registry.StandardServiceRegistry;
@ -23,6 +22,7 @@ import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.InvalidationCacheAccessDelegate;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.RegionAccessStrategy;
import org.hibernate.cfg.Environment;
import org.hibernate.criterion.Restrictions;
@ -105,6 +105,9 @@ public abstract class CorrectnessTestCase {
@Parameterized.Parameter(1)
public CacheMode cacheMode;
@Parameterized.Parameter(2)
public AccessType accessType;
static ThreadLocal<Integer> threadNode = new ThreadLocal<>();
final AtomicInteger timestampGenerator = new AtomicInteger();
@ -134,33 +137,24 @@ public abstract class CorrectnessTestCase {
public static class Jta extends CorrectnessTestCase {
private final TransactionManager transactionManager = TestingJtaPlatformImpl.transactionManager();
@Parameterized.Parameter(2)
public boolean transactional;
@Parameterized.Parameter(3)
public boolean readOnly;
@Parameterized.Parameters(name = "{0}")
public List<Object[]> getParameters() {
return Arrays.<Object[]>asList(
new Object[] { "transactional, invalidation", CacheMode.INVALIDATION_SYNC, true, false },
new Object[] { "read-only, invalidation", CacheMode.INVALIDATION_SYNC, false, true }, // maybe not needed
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, false, false },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, false, false },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, false, false }
new Object[] { "transactional, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.TRANSACTIONAL },
new Object[] { "read-only, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_ONLY }, // maybe not needed
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, AccessType.READ_WRITE },
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.NONSTRICT_READ_WRITE }
);
}
@Override
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
ssrb
.applySetting( Environment.JTA_PLATFORM, TestingJtaPlatformImpl.class.getName() )
.applySetting( Environment.CONNECTION_PROVIDER, JtaAwareConnectionProviderImpl.class.getName() )
.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() )
.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, transactional);
if (readOnly) {
ssrb.applySetting(Environment.DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_ONLY.toAccessType().getExternalName());
}
super.applySettings(ssrb);
ssrb.applySetting( Environment.JTA_PLATFORM, TestingJtaPlatformImpl.class.getName() );
ssrb.applySetting( Environment.CONNECTION_PROVIDER, JtaAwareConnectionProviderImpl.class.getName() );
ssrb.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() );
}
@Override
@ -188,7 +182,7 @@ public abstract class CorrectnessTestCase {
@Override
protected Operation getOperation() {
if (readOnly) {
if (accessType == AccessType.READ_ONLY) {
ThreadLocalRandom random = ThreadLocalRandom.current();
Operation operation;
int r = random.nextInt(30);
@ -208,9 +202,10 @@ public abstract class CorrectnessTestCase {
@Parameterized.Parameters(name = "{0}")
public List<Object[]> getParameters() {
return Arrays.<Object[]>asList(
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC }
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, AccessType.READ_WRITE },
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE }
);
}
@ -225,7 +220,6 @@ public abstract class CorrectnessTestCase {
super.applySettings(ssrb);
ssrb.applySetting(Environment.JTA_PLATFORM, NoJtaPlatform.class.getName());
ssrb.applySetting(Environment.TRANSACTION_COORDINATOR_STRATEGY, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class.getName());
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, false);
}
}
@ -255,6 +249,8 @@ public abstract class CorrectnessTestCase {
}
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
ssrb.applySetting( Environment.DEFAULT_CACHE_CONCURRENCY_STRATEGY, accessType.getExternalName());
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, accessType == AccessType.TRANSACTIONAL);
}
@After

View File

@ -5,8 +5,6 @@ import org.hibernate.cache.spi.access.RegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SessionImplementor;
import javax.transaction.Synchronization;
/**
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
@ -14,11 +12,13 @@ public abstract class TestSynchronization implements javax.transaction.Synchroni
protected final SessionImplementor session;
protected final Object key;
protected final Object value;
protected final Object version;
public TestSynchronization(SessionImplementor session, Object key, Object value) {
public TestSynchronization(SessionImplementor session, Object key, Object value, Object version) {
this.session = session;
this.key = key;
this.value = value;
this.version = version;
}
@Override
@ -29,14 +29,14 @@ public abstract class TestSynchronization implements javax.transaction.Synchroni
public static class AfterInsert extends TestSynchronization {
private final EntityRegionAccessStrategy strategy;
public AfterInsert(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value) {
super(session, key, value);
public AfterInsert(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value, Object version) {
super(session, key, value, version);
this.strategy = strategy;
}
@Override
public void afterCompletion(int status) {
strategy.afterInsert(session, key, value, null);
strategy.afterInsert(session, key, value, version);
}
}
@ -44,15 +44,15 @@ public abstract class TestSynchronization implements javax.transaction.Synchroni
private final EntityRegionAccessStrategy strategy;
private final SoftLock lock;
public AfterUpdate(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value, SoftLock lock) {
super(session, key, value);
public AfterUpdate(EntityRegionAccessStrategy strategy, SessionImplementor session, Object key, Object value, Object version, SoftLock lock) {
super(session, key, value, version);
this.strategy = strategy;
this.lock = lock;
}
@Override
public void afterCompletion(int status) {
strategy.afterUpdate(session, key, value, null, null, lock);
strategy.afterUpdate(session, key, value, version, null, lock);
}
}
@ -61,7 +61,7 @@ public abstract class TestSynchronization implements javax.transaction.Synchroni
private final SoftLock lock;
public UnlockItem(RegionAccessStrategy strategy, SessionImplementor session, Object key, SoftLock lock) {
super(session, key, null);
super(session, key, null, null);
this.strategy = strategy;
this.lock = lock;
}

View File

@ -31,6 +31,11 @@ public class TestingKeyFactory {
this.id = id;
}
@Override
public String toString() {
return id;
}
@Override
public int hashCode() {
final int prime = 31;