HHH-11323 Update CorrectnessTestCase

(cherry picked from commit ba3677b690)
This commit is contained in:
Radim Vansa 2016-12-14 13:21:53 +01:00 committed by Gail Badner
parent 40e62aac7b
commit 7b355c8037
1 changed files with 141 additions and 128 deletions

View File

@ -7,6 +7,49 @@
package org.hibernate.test.cache.infinispan.stress;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.persistence.OptimisticLockException;
import javax.persistence.PersistenceException;
import javax.transaction.RollbackException;
import javax.transaction.TransactionManager;
import org.hibernate.LockMode;
import org.hibernate.ObjectNotFoundException;
import org.hibernate.PessimisticLockException;
@ -15,6 +58,7 @@ import org.hibernate.SessionFactory;
import org.hibernate.StaleObjectStateException;
import org.hibernate.StaleStateException;
import org.hibernate.Transaction;
import org.hibernate.TransactionException;
import org.hibernate.boot.Metadata;
import org.hibernate.boot.MetadataSources;
import org.hibernate.boot.registry.StandardServiceRegistry;
@ -38,47 +82,32 @@ import org.hibernate.mapping.RootClass;
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.spi.TransactionStatus;
import org.hibernate.testing.AfterClassOnce;
import org.hibernate.testing.BeforeClassOnce;
import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl;
import org.hibernate.testing.jta.TestingJtaPlatformImpl;
import org.hibernate.testing.junit4.CustomParameterized;
import org.hibernate.test.cache.infinispan.stress.entities.Address;
import org.hibernate.test.cache.infinispan.stress.entities.Family;
import org.hibernate.test.cache.infinispan.stress.entities.Person;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl;
import org.hibernate.testing.jta.TestingJtaPlatformImpl;
import org.hibernate.testing.junit4.CustomParameterized;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commons.util.ByRef;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.InterceptorConfiguration;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.remoting.RemoteException;
import org.junit.After;
import org.junit.Before;
import org.infinispan.test.fwk.TestResourceTracker;
import org.infinispan.util.concurrent.TimeoutException;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.TransactionManager;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.remoting.RemoteException;
/**
* Tries to execute random operations for {@link #EXECUTION_TIME} and then verify the log for correctness.
@ -90,7 +119,7 @@ import java.util.stream.Collectors;
@RunWith(CustomParameterized.class)
public abstract class CorrectnessTestCase {
static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(CorrectnessTestCase.class);
static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(10);
static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(2);
static final int NUM_NODES = 4;
static final int NUM_THREADS_PER_NODE = 4;
static final int NUM_THREADS = NUM_NODES * NUM_THREADS_PER_NODE;
@ -99,6 +128,9 @@ public abstract class CorrectnessTestCase {
static final int MAX_MEMBERS = 10;
private final static Comparator<Log<?>> WALL_CLOCK_TIME_COMPARATOR = (o1, o2) -> Long.compare(o1.wallClockTime, o2.wallClockTime);
private final static boolean INVALIDATE_REGION = Boolean.getBoolean("testInfinispan.correctness.invalidateRegion");
private final static boolean INJECT_FAILURES = Boolean.getBoolean("testInfinispan.correctness.injectFailures");
@Parameterized.Parameter(0)
public String name;
@ -157,36 +189,13 @@ public abstract class CorrectnessTestCase {
ssrb.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() );
}
@Override
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
TransactionManager tm = transactionManager;
tm.begin();
try {
runnable.run();
} catch (RuntimeException e) {
tm.setRollbackOnly();
throw e;
} finally {
if (!rolledBack && tm.getStatus() == Status.STATUS_ACTIVE) {
log.trace("TM commit begin");
tm.commit();
log.trace("TM commit end");
} else {
log.trace("TM rollback begin");
//log.warn(Util.threadDump());
tm.rollback();
log.trace("TM rollback end");
}
}
}
@Override
protected Operation getOperation() {
if (accessType == AccessType.READ_ONLY) {
ThreadLocalRandom random = ThreadLocalRandom.current();
Operation operation;
int r = random.nextInt(30);
if (r == 0) operation = new InvalidateCache();
if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache();
else if (r < 5) operation = new QueryFamilies();
else if (r < 10) operation = new RemoveFamily(r < 12);
else operation = new ReadFamily(r < 20);
@ -205,16 +214,10 @@ public abstract class CorrectnessTestCase {
new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE },
new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, AccessType.READ_WRITE },
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE }
new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.NONSTRICT_READ_WRITE }
);
}
@Override
protected void withTx(Runnable runnable, boolean rolledBack) throws Exception {
// no transaction on JTA TM
runnable.run();
}
@Override
protected void applySettings(StandardServiceRegistryBuilder ssrb) {
super.applySettings(ssrb);
@ -223,8 +226,9 @@ public abstract class CorrectnessTestCase {
}
}
@Before
@BeforeClassOnce
public void beforeClass() {
TestResourceTracker.testStarted(getClass().getSimpleName());
Arrays.asList(new File(System.getProperty("java.io.tmpdir"))
.listFiles((dir, name) -> name.startsWith("family_") || name.startsWith("invalidations-")))
.stream().forEach(f -> f.delete());
@ -237,6 +241,7 @@ public abstract class CorrectnessTestCase {
.applySetting( Environment.HBM2DDL_AUTO, "create-drop" )
.applySetting( Environment.CACHE_REGION_FACTORY, FailingInfinispanRegionFactory.class.getName())
.applySetting( TestInfinispanRegionFactory.CACHE_MODE, cacheMode )
.applySetting( Environment.USE_MINIMAL_PUTS, "false" )
.applySetting( Environment.GENERATE_STATISTICS, "false" );
applySettings(ssrb);
@ -253,11 +258,12 @@ public abstract class CorrectnessTestCase {
ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, accessType == AccessType.TRANSACTIONAL);
}
@After
@AfterClassOnce
public void afterClass() {
for (SessionFactory sf : sessionFactories) {
if (sf != null) sf.close();
}
TestResourceTracker.testFinished(getClass().getSimpleName());
}
public static Class[] getAnnotatedClasses() {
@ -315,18 +321,40 @@ public abstract class CorrectnessTestCase {
@Override
protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) {
super.amendCacheConfiguration(cacheName, configurationBuilder);
// failure to write into timestamps would cause failure even though both DB and cache has been updated
if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) {
configurationBuilder.customInterceptors().addInterceptor()
configurationBuilder.transaction().cacheStopTimeout(1, TimeUnit.SECONDS);
if (INJECT_FAILURES) {
// failure to write into timestamps would cause failure even though both DB and cache has been updated
if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) {
configurationBuilder.customInterceptors().addInterceptor()
.interceptorClass(FailureInducingInterceptor.class)
.position(InterceptorConfiguration.Position.FIRST);
log.trace("Injecting FailureInducingInterceptor into " + cacheName);
} else {
log.trace("Not injecting into " + cacheName);
log.trace("Injecting FailureInducingInterceptor into " + cacheName);
} else {
log.trace("Not injecting into " + cacheName);
}
}
}
}
private final static Class[][] EXPECTED = {
{ TransactionException.class, RollbackException.class, StaleObjectStateException.class },
{ TransactionException.class, RollbackException.class, PessimisticLockException.class },
{ TransactionException.class, RollbackException.class, LockAcquisitionException.class },
{ RemoteException.class, TimeoutException.class },
{ StaleStateException.class, PessimisticLockException.class},
{ StaleStateException.class, ObjectNotFoundException.class},
{ StaleStateException.class, ConstraintViolationException.class},
{ StaleStateException.class, LockAcquisitionException.class},
{ PersistenceException.class, ConstraintViolationException.class },
{ PersistenceException.class, LockAcquisitionException.class },
{ javax.persistence.PessimisticLockException.class, PessimisticLockException.class },
{ OptimisticLockException.class, StaleStateException.class },
{ PessimisticLockException.class },
{ StaleObjectStateException.class },
{ ObjectNotFoundException.class },
{ LockAcquisitionException.class }
};
@Test
public void test() throws Exception {
ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
@ -356,23 +384,7 @@ public abstract class CorrectnessTestCase {
// ignore exceptions from optimistic failures and induced exceptions
if (hasCause(e, InducedException.class)) {
continue;
} else if (e instanceof RollbackException) {
Throwable cause = e.getCause();
if (cause instanceof StaleObjectStateException
|| cause instanceof PessimisticLockException
|| cause instanceof LockAcquisitionException) { // use MVCC database to prevent this!
continue;
}
} else if (e instanceof RemoteException) {
Throwable cause = e.getCause();
if (cause instanceof org.infinispan.util.concurrent.TimeoutException) {
continue;
}
} else if (e instanceof StaleStateException
|| e instanceof PessimisticLockException
|| e instanceof ObjectNotFoundException
|| e instanceof ConstraintViolationException
|| e instanceof LockAcquisitionException) { // use MVCC database to prevent this!
} else if (Stream.of(EXPECTED).anyMatch(exceptions -> matches(e, exceptions))) {
continue;
}
exceptions.add(e);
@ -435,7 +447,8 @@ public abstract class CorrectnessTestCase {
}));
}
for (ForkJoinTask<?> task : tasks) {
task.get();
// with heavy logging this may have trouble to complete
task.get(30, TimeUnit.SECONDS);
}
if (!exceptions.isEmpty()) {
for (Exception e : exceptions) {
@ -484,33 +497,38 @@ public abstract class CorrectnessTestCase {
if (invalidators != null && !invalidators.isEmpty()) {
delayed.add(new DelayedInvalidators(map, entry.getKey()));
}
iterator.remove();
}
}
}
// poll until all invalidations come
long deadline = System.currentTimeMillis() + 30000;
while (System.currentTimeMillis() < deadline) {
for (Iterator<DelayedInvalidators> iterator = delayed.iterator(); iterator.hasNext(); ) {
DelayedInvalidators entry = iterator.next();
Object pendingPutMap = entry.getPendingPutMap();
if (pendingPutMap == null) {
iterator.remove();
}
else {
java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap);
if (invalidators == null || invalidators.isEmpty()) {
iterator.remove();
}
}
}
iterateInvalidators(delayed, getInvalidators, (k, i) -> {});
if (delayed.isEmpty()) {
break;
}
Thread.sleep(1000);
}
if (!delayed.isEmpty()) {
throw new IllegalStateException("Invalidators were not cleared: " + delayed);
iterateInvalidators(delayed, getInvalidators, (k, i) -> log.warnf("Left invalidators on key %s: %s", k, i));
throw new IllegalStateException("Invalidators were not cleared: " + delayed.size());
}
}
private void iterateInvalidators(List<DelayedInvalidators> delayed, Method getInvalidators, BiConsumer<Object, java.util.Collection> invalidatorConsumer) throws IllegalAccessException, InvocationTargetException {
for (Iterator<DelayedInvalidators> iterator = delayed.iterator(); iterator.hasNext(); ) {
DelayedInvalidators entry = iterator.next();
Object pendingPutMap = entry.getPendingPutMap();
if (pendingPutMap == null) {
iterator.remove();
}
else {
java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap);
if (invalidators == null || invalidators.isEmpty()) {
iterator.remove();
}
invalidatorConsumer.accept(entry.key, invalidators);
}
}
}
@ -522,11 +540,19 @@ public abstract class CorrectnessTestCase {
return hasCause(cause, clazz);
}
private boolean matches(Throwable throwable, Class[] classes) {
return matches(throwable, classes, 0);
}
private boolean matches(Throwable throwable, Class[] classes, int index) {
return index >= classes.length || (classes[index].isInstance(throwable) && matches(throwable.getCause(), classes, index + 1));
}
protected Operation getOperation() {
ThreadLocalRandom random = ThreadLocalRandom.current();
Operation operation;
int r = random.nextInt(100);
if (r == 0) operation = new InvalidateCache();
if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache();
else if (r < 5) operation = new QueryFamilies();
else if (r < 10) operation = new RemoveFamily(r < 6);
else if (r < 20) operation = new UpdateFamily(r < 12, random.nextInt(1, 3));
@ -615,8 +641,6 @@ public abstract class CorrectnessTestCase {
return read.getValue() instanceof java.util.Collection && ((java.util.Collection) read.getValue()).isEmpty();
}
protected abstract void withTx(Runnable runnable, boolean rolledBack) throws Exception;
private abstract class Operation {
protected final boolean rolledBack;
@ -627,21 +651,8 @@ public abstract class CorrectnessTestCase {
public abstract void run() throws Exception;
protected void withSession(Consumer<Session> consumer) throws Exception {
ByRef<Session> sessionRef = new ByRef<>(null);
try {
withTx(() -> openSessionAndExecute(sessionRef, consumer), rolledBack);
} finally {
Session s = sessionRef.get();
if (s != null) {
s.close();
}
}
}
protected void openSessionAndExecute(ByRef<Session> sessionRef, Consumer<Session> consumer) {
int node = threadNode.get();
Session s = sessionFactory(node).openSession();
sessionRef.set(s);
Transaction tx = s.getTransaction();
tx.begin();
try {
@ -661,12 +672,16 @@ public abstract class CorrectnessTestCase {
log.trace("Hibernate rollback end");
}
} catch (Exception e) {
log.trace("Hibernate commit or rollback failed", e);
log.trace("Hibernate commit or rollback failed, status is " + tx.getStatus(), e);
if (tx.getStatus() == TransactionStatus.MARKED_ROLLBACK) {
tx.rollback();
}
throw e;
} finally {
// cannot close before XA commit since force increment requires open connection
s.close();
}
}
// cannot close before XA commit since force increment requires open connection
// s.close();
}
protected void withRandomFamily(BiConsumer<Session, Family> consumer, Ref<String> familyNameUpdate, Ref<Set<String>> familyMembersUpdate, LockMode lockMode) throws Exception {
@ -791,7 +806,7 @@ public abstract class CorrectnessTestCase {
throw e;
} finally {
int after = timestampGenerator.getAndIncrement();
log.trace("Finished InsertFamily at " + after);
log.trace("Finished InsertFamily at " + after + ", " + (failure ? "failed" : "success"));
familyIds.put(family.getId(), new AtomicInteger(NUM_ACCESS_AFTER_REMOVAL));
LogType type = failure || rolledBack ? LogType.WRITE_FAILURE : LogType.WRITE;
getRecordList(familyNames, family.getId()).add(new Log<>(before, after, family.getName(), type));
@ -970,10 +985,8 @@ public abstract class CorrectnessTestCase {
@Override
public void run() throws Exception {
log.trace("Invalidating all caches");
withTx(() -> {
int node = threadNode.get();
sessionFactory(node).getCache().evictAllRegions();
}, rolledBack);
int node = threadNode.get();
sessionFactory(node).getCache().evictAllRegions();
}
}