HHH-11323 Update CorrectnessTestCase
This commit is contained in:
parent
cc56c9672b
commit
ba3677b690
|
@ -11,6 +11,7 @@ import java.io.BufferedWriter;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.file.Files;
|
||||
import java.text.SimpleDateFormat;
|
||||
|
@ -42,8 +43,11 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import javax.persistence.OptimisticLockException;
|
||||
import javax.persistence.PersistenceException;
|
||||
import javax.transaction.RollbackException;
|
||||
import javax.transaction.Status;
|
||||
import javax.transaction.TransactionManager;
|
||||
|
||||
import org.hibernate.LockMode;
|
||||
|
@ -54,6 +58,7 @@ import org.hibernate.SessionFactory;
|
|||
import org.hibernate.StaleObjectStateException;
|
||||
import org.hibernate.StaleStateException;
|
||||
import org.hibernate.Transaction;
|
||||
import org.hibernate.TransactionException;
|
||||
import org.hibernate.boot.Metadata;
|
||||
import org.hibernate.boot.MetadataSources;
|
||||
import org.hibernate.boot.registry.StandardServiceRegistry;
|
||||
|
@ -78,6 +83,8 @@ import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLoca
|
|||
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
|
||||
import org.hibernate.resource.transaction.spi.TransactionStatus;
|
||||
|
||||
import org.hibernate.testing.AfterClassOnce;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl;
|
||||
import org.hibernate.testing.jta.TestingJtaPlatformImpl;
|
||||
import org.hibernate.testing.junit4.CustomParameterized;
|
||||
|
@ -85,20 +92,20 @@ import org.hibernate.test.cache.infinispan.stress.entities.Address;
|
|||
import org.hibernate.test.cache.infinispan.stress.entities.Family;
|
||||
import org.hibernate.test.cache.infinispan.stress.entities.Person;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.infinispan.configuration.cache.InterceptorConfiguration;
|
||||
import org.infinispan.test.fwk.TestResourceTracker;
|
||||
import org.infinispan.util.concurrent.TimeoutException;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import org.infinispan.commands.VisitableCommand;
|
||||
import org.infinispan.commands.tx.CommitCommand;
|
||||
import org.infinispan.commands.tx.RollbackCommand;
|
||||
import org.infinispan.commons.util.ByRef;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.cache.InterceptorConfiguration;
|
||||
import org.infinispan.context.InvocationContext;
|
||||
import org.infinispan.interceptors.base.BaseCustomInterceptor;
|
||||
import org.infinispan.remoting.RemoteException;
|
||||
|
@ -113,7 +120,7 @@ import org.infinispan.remoting.RemoteException;
|
|||
@RunWith(CustomParameterized.class)
|
||||
public abstract class CorrectnessTestCase {
|
||||
static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(CorrectnessTestCase.class);
|
||||
static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(10);
|
||||
static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(2);
|
||||
static final int NUM_NODES = 4;
|
||||
static final int NUM_THREADS_PER_NODE = 4;
|
||||
static final int NUM_THREADS = NUM_NODES * NUM_THREADS_PER_NODE;
|
||||
|
@ -122,6 +129,9 @@ public abstract class CorrectnessTestCase {
|
|||
static final int MAX_MEMBERS = 10;
|
||||
private final static Comparator<Log<?>> WALL_CLOCK_TIME_COMPARATOR = (o1, o2) -> Long.compare(o1.wallClockTime, o2.wallClockTime);
|
||||
|
||||
private final static boolean INVALIDATE_REGION = Boolean.getBoolean("testInfinispan.correctness.invalidateRegion");
|
||||
private final static boolean INJECT_FAILURES = Boolean.getBoolean("testInfinispan.correctness.injectFailures");
|
||||
|
||||
@Parameterized.Parameter(0)
|
||||
public String name;
|
||||
|
||||
|
@ -180,36 +190,13 @@ public abstract class CorrectnessTestCase {
|
|||
ssrb.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() );
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
|
||||
TransactionManager tm = transactionManager;
|
||||
tm.begin();
|
||||
try {
|
||||
runnable.run();
|
||||
} catch (RuntimeException e) {
|
||||
tm.setRollbackOnly();
|
||||
throw e;
|
||||
} finally {
|
||||
if (!rolledBack && tm.getStatus() == Status.STATUS_ACTIVE) {
|
||||
log.trace("TM commit begin");
|
||||
tm.commit();
|
||||
log.trace("TM commit end");
|
||||
} else {
|
||||
log.trace("TM rollback begin");
|
||||
//log.warn(Util.threadDump());
|
||||
tm.rollback();
|
||||
log.trace("TM rollback end");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Operation getOperation() {
|
||||
if (accessType == AccessType.READ_ONLY) {
|
||||
ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||
Operation operation;
|
||||
int r = random.nextInt(30);
|
||||
if (r == 0) operation = new InvalidateCache();
|
||||
if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache();
|
||||
else if (r < 5) operation = new QueryFamilies();
|
||||
else if (r < 10) operation = new RemoveFamily(r < 12);
|
||||
else operation = new ReadFamily(r < 20);
|
||||
|
@ -228,16 +215,10 @@ public abstract class CorrectnessTestCase {
|
|||
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_WRITE },
|
||||
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE },
|
||||
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, AccessType.READ_WRITE },
|
||||
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE }
|
||||
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.NONSTRICT_READ_WRITE }
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
|
||||
// no transaction on JTA TM
|
||||
runnable.run();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
|
||||
super.applySettings(ssrb);
|
||||
|
@ -246,8 +227,9 @@ public abstract class CorrectnessTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeClassOnce
|
||||
public void beforeClass() {
|
||||
TestResourceTracker.testStarted(getClass().getSimpleName());
|
||||
Arrays.asList(new File(System.getProperty("java.io.tmpdir"))
|
||||
.listFiles((dir, name) -> name.startsWith("family_") || name.startsWith("invalidations-")))
|
||||
.stream().forEach(f -> f.delete());
|
||||
|
@ -260,6 +242,7 @@ public abstract class CorrectnessTestCase {
|
|||
.applySetting( Environment.HBM2DDL_AUTO, "create-drop" )
|
||||
.applySetting( Environment.CACHE_REGION_FACTORY, FailingInfinispanRegionFactory.class.getName())
|
||||
.applySetting( TestInfinispanRegionFactory.CACHE_MODE, cacheMode )
|
||||
.applySetting( Environment.USE_MINIMAL_PUTS, "false" )
|
||||
.applySetting( Environment.GENERATE_STATISTICS, "false" );
|
||||
applySettings(ssrb);
|
||||
|
||||
|
@ -276,11 +259,12 @@ public abstract class CorrectnessTestCase {
|
|||
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, accessType == AccessType.TRANSACTIONAL);
|
||||
}
|
||||
|
||||
@After
|
||||
@AfterClassOnce
|
||||
public void afterClass() {
|
||||
for (SessionFactory sf : sessionFactories) {
|
||||
if (sf != null) sf.close();
|
||||
}
|
||||
TestResourceTracker.testFinished(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
public static Class[] getAnnotatedClasses() {
|
||||
|
@ -338,18 +322,40 @@ public abstract class CorrectnessTestCase {
|
|||
@Override
|
||||
protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) {
|
||||
super.amendCacheConfiguration(cacheName, configurationBuilder);
|
||||
// failure to write into timestamps would cause failure even though both DB and cache has been updated
|
||||
if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) {
|
||||
configurationBuilder.customInterceptors().addInterceptor()
|
||||
configurationBuilder.transaction().cacheStopTimeout(1, TimeUnit.SECONDS);
|
||||
if (INJECT_FAILURES) {
|
||||
// failure to write into timestamps would cause failure even though both DB and cache has been updated
|
||||
if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) {
|
||||
configurationBuilder.customInterceptors().addInterceptor()
|
||||
.interceptorClass(FailureInducingInterceptor.class)
|
||||
.position(InterceptorConfiguration.Position.FIRST);
|
||||
log.trace("Injecting FailureInducingInterceptor into " + cacheName);
|
||||
} else {
|
||||
log.trace("Not injecting into " + cacheName);
|
||||
log.trace("Injecting FailureInducingInterceptor into " + cacheName);
|
||||
} else {
|
||||
log.trace("Not injecting into " + cacheName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final static Class[][] EXPECTED = {
|
||||
{ TransactionException.class, RollbackException.class, StaleObjectStateException.class },
|
||||
{ TransactionException.class, RollbackException.class, PessimisticLockException.class },
|
||||
{ TransactionException.class, RollbackException.class, LockAcquisitionException.class },
|
||||
{ RemoteException.class, TimeoutException.class },
|
||||
{ StaleStateException.class, PessimisticLockException.class},
|
||||
{ StaleStateException.class, ObjectNotFoundException.class},
|
||||
{ StaleStateException.class, ConstraintViolationException.class},
|
||||
{ StaleStateException.class, LockAcquisitionException.class},
|
||||
{ PersistenceException.class, ConstraintViolationException.class },
|
||||
{ PersistenceException.class, LockAcquisitionException.class },
|
||||
{ javax.persistence.PessimisticLockException.class, PessimisticLockException.class },
|
||||
{ OptimisticLockException.class, StaleStateException.class },
|
||||
{ PessimisticLockException.class },
|
||||
{ StaleObjectStateException.class },
|
||||
{ ObjectNotFoundException.class },
|
||||
{ LockAcquisitionException.class }
|
||||
};
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
|
||||
|
@ -379,23 +385,7 @@ public abstract class CorrectnessTestCase {
|
|||
// ignore exceptions from optimistic failures and induced exceptions
|
||||
if (hasCause(e, InducedException.class)) {
|
||||
continue;
|
||||
} else if (e instanceof RollbackException) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof StaleObjectStateException
|
||||
|| cause instanceof PessimisticLockException
|
||||
|| cause instanceof LockAcquisitionException) { // use MVCC database to prevent this!
|
||||
continue;
|
||||
}
|
||||
} else if (e instanceof RemoteException) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof org.infinispan.util.concurrent.TimeoutException) {
|
||||
continue;
|
||||
}
|
||||
} else if (e instanceof StaleStateException
|
||||
|| e instanceof PessimisticLockException
|
||||
|| e instanceof ObjectNotFoundException
|
||||
|| e instanceof ConstraintViolationException
|
||||
|| e instanceof LockAcquisitionException) { // use MVCC database to prevent this!
|
||||
} else if (Stream.of(EXPECTED).anyMatch(exceptions -> matches(e, exceptions))) {
|
||||
continue;
|
||||
}
|
||||
exceptions.add(e);
|
||||
|
@ -458,7 +448,8 @@ public abstract class CorrectnessTestCase {
|
|||
}));
|
||||
}
|
||||
for (ForkJoinTask<?> task : tasks) {
|
||||
task.get();
|
||||
// with heavy logging this may have trouble to complete
|
||||
task.get(30, TimeUnit.SECONDS);
|
||||
}
|
||||
if (!exceptions.isEmpty()) {
|
||||
for (Exception e : exceptions) {
|
||||
|
@ -507,33 +498,38 @@ public abstract class CorrectnessTestCase {
|
|||
if (invalidators != null && !invalidators.isEmpty()) {
|
||||
delayed.add(new DelayedInvalidators(map, entry.getKey()));
|
||||
}
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
// poll until all invalidations come
|
||||
long deadline = System.currentTimeMillis() + 30000;
|
||||
while (System.currentTimeMillis() < deadline) {
|
||||
for (Iterator<DelayedInvalidators> iterator = delayed.iterator(); iterator.hasNext(); ) {
|
||||
DelayedInvalidators entry = iterator.next();
|
||||
Object pendingPutMap = entry.getPendingPutMap();
|
||||
if (pendingPutMap == null) {
|
||||
iterator.remove();
|
||||
}
|
||||
else {
|
||||
java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap);
|
||||
if (invalidators == null || invalidators.isEmpty()) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
iterateInvalidators(delayed, getInvalidators, (k, i) -> {});
|
||||
if (delayed.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
if (!delayed.isEmpty()) {
|
||||
throw new IllegalStateException("Invalidators were not cleared: " + delayed);
|
||||
iterateInvalidators(delayed, getInvalidators, (k, i) -> log.warnf("Left invalidators on key %s: %s", k, i));
|
||||
throw new IllegalStateException("Invalidators were not cleared: " + delayed.size());
|
||||
}
|
||||
}
|
||||
|
||||
private void iterateInvalidators(List<DelayedInvalidators> delayed, Method getInvalidators, BiConsumer<Object, java.util.Collection> invalidatorConsumer) throws IllegalAccessException, InvocationTargetException {
|
||||
for (Iterator<DelayedInvalidators> iterator = delayed.iterator(); iterator.hasNext(); ) {
|
||||
DelayedInvalidators entry = iterator.next();
|
||||
Object pendingPutMap = entry.getPendingPutMap();
|
||||
if (pendingPutMap == null) {
|
||||
iterator.remove();
|
||||
}
|
||||
else {
|
||||
java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap);
|
||||
if (invalidators == null || invalidators.isEmpty()) {
|
||||
iterator.remove();
|
||||
}
|
||||
invalidatorConsumer.accept(entry.key, invalidators);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -545,11 +541,19 @@ public abstract class CorrectnessTestCase {
|
|||
return hasCause(cause, clazz);
|
||||
}
|
||||
|
||||
private boolean matches(Throwable throwable, Class[] classes) {
|
||||
return matches(throwable, classes, 0);
|
||||
}
|
||||
|
||||
private boolean matches(Throwable throwable, Class[] classes, int index) {
|
||||
return index >= classes.length || (classes[index].isInstance(throwable) && matches(throwable.getCause(), classes, index + 1));
|
||||
}
|
||||
|
||||
protected Operation getOperation() {
|
||||
ThreadLocalRandom random = ThreadLocalRandom.current();
|
||||
Operation operation;
|
||||
int r = random.nextInt(100);
|
||||
if (r == 0) operation = new InvalidateCache();
|
||||
if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache();
|
||||
else if (r < 5) operation = new QueryFamilies();
|
||||
else if (r < 10) operation = new RemoveFamily(r < 6);
|
||||
else if (r < 20) operation = new UpdateFamily(r < 12, random.nextInt(1, 3));
|
||||
|
@ -638,8 +642,6 @@ public abstract class CorrectnessTestCase {
|
|||
return read.getValue() instanceof java.util.Collection && ((java.util.Collection) read.getValue()).isEmpty();
|
||||
}
|
||||
|
||||
protected abstract void withTx(Runnable runnable, boolean rolledBack) throws Exception;
|
||||
|
||||
private abstract class Operation {
|
||||
protected final boolean rolledBack;
|
||||
|
||||
|
@ -650,21 +652,8 @@ public abstract class CorrectnessTestCase {
|
|||
public abstract void run() throws Exception;
|
||||
|
||||
protected void withSession(Consumer<Session> consumer) throws Exception {
|
||||
ByRef<Session> sessionRef = new ByRef<>(null);
|
||||
try {
|
||||
withTx(() -> openSessionAndExecute(sessionRef, consumer), rolledBack);
|
||||
} finally {
|
||||
Session s = sessionRef.get();
|
||||
if (s != null) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void openSessionAndExecute(ByRef<Session> sessionRef, Consumer<Session> consumer) {
|
||||
int node = threadNode.get();
|
||||
Session s = sessionFactory(node).openSession();
|
||||
sessionRef.set(s);
|
||||
Transaction tx = s.getTransaction();
|
||||
tx.begin();
|
||||
try {
|
||||
|
@ -684,12 +673,16 @@ public abstract class CorrectnessTestCase {
|
|||
log.trace("Hibernate rollback end");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.trace("Hibernate commit or rollback failed", e);
|
||||
log.trace("Hibernate commit or rollback failed, status is " + tx.getStatus(), e);
|
||||
if (tx.getStatus() == TransactionStatus.MARKED_ROLLBACK) {
|
||||
tx.rollback();
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
// cannot close before XA commit since force increment requires open connection
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
// cannot close before XA commit since force increment requires open connection
|
||||
// s.close();
|
||||
}
|
||||
|
||||
protected void withRandomFamily(BiConsumer<Session, Family> consumer, Ref<String> familyNameUpdate, Ref<Set<String>> familyMembersUpdate, LockMode lockMode) throws Exception {
|
||||
|
@ -814,7 +807,7 @@ public abstract class CorrectnessTestCase {
|
|||
throw e;
|
||||
} finally {
|
||||
int after = timestampGenerator.getAndIncrement();
|
||||
log.trace("Finished InsertFamily at " + after);
|
||||
log.trace("Finished InsertFamily at " + after + ", " + (failure ? "failed" : "success"));
|
||||
familyIds.put(family.getId(), new AtomicInteger(NUM_ACCESS_AFTER_REMOVAL));
|
||||
LogType type = failure || rolledBack ? LogType.WRITE_FAILURE : LogType.WRITE;
|
||||
getRecordList(familyNames, family.getId()).add(new Log<>(before, after, family.getName(), type));
|
||||
|
@ -993,10 +986,8 @@ public abstract class CorrectnessTestCase {
|
|||
@Override
|
||||
public void run() throws Exception {
|
||||
log.trace("Invalidating all caches");
|
||||
withTx(() -> {
|
||||
int node = threadNode.get();
|
||||
sessionFactory(node).getCache().evictAllRegions();
|
||||
}, rolledBack);
|
||||
int node = threadNode.get();
|
||||
sessionFactory(node).getCache().evictAllRegions();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue