diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java index 7c2c760d80..c8d7706cc4 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java @@ -7,6 +7,49 @@ package org.hibernate.test.cache.infinispan.stress; +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.persistence.OptimisticLockException; +import javax.persistence.PersistenceException; +import javax.transaction.RollbackException; +import javax.transaction.TransactionManager; + import org.hibernate.LockMode; import org.hibernate.ObjectNotFoundException; import org.hibernate.PessimisticLockException; @@ -15,6 +58,7 @@ import org.hibernate.SessionFactory; import org.hibernate.StaleObjectStateException; import org.hibernate.StaleStateException; import org.hibernate.Transaction; +import org.hibernate.TransactionException; import org.hibernate.boot.Metadata; import org.hibernate.boot.MetadataSources; import org.hibernate.boot.registry.StandardServiceRegistry; @@ -38,47 +82,32 @@ import org.hibernate.mapping.RootClass; import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl; import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl; import org.hibernate.resource.transaction.spi.TransactionStatus; + +import org.hibernate.testing.AfterClassOnce; +import org.hibernate.testing.BeforeClassOnce; +import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl; +import org.hibernate.testing.jta.TestingJtaPlatformImpl; +import org.hibernate.testing.junit4.CustomParameterized; import org.hibernate.test.cache.infinispan.stress.entities.Address; import org.hibernate.test.cache.infinispan.stress.entities.Family; import org.hibernate.test.cache.infinispan.stress.entities.Person; import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory; -import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl; -import org.hibernate.testing.jta.TestingJtaPlatformImpl; -import org.hibernate.testing.junit4.CustomParameterized; -import org.infinispan.commands.VisitableCommand; -import org.infinispan.commands.tx.CommitCommand; -import org.infinispan.commands.tx.RollbackCommand; -import org.infinispan.commons.util.ByRef; -import org.infinispan.configuration.cache.CacheMode; -import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.InterceptorConfiguration; -import org.infinispan.context.InvocationContext; -import org.infinispan.interceptors.base.BaseCustomInterceptor; -import org.infinispan.remoting.RemoteException; -import org.junit.After; -import org.junit.Before; +import org.infinispan.test.fwk.TestResourceTracker; +import org.infinispan.util.concurrent.TimeoutException; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.TransactionManager; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.nio.file.Files; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; +import org.infinispan.commands.VisitableCommand; +import org.infinispan.commands.tx.CommitCommand; +import org.infinispan.commands.tx.RollbackCommand; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.BaseCustomInterceptor; +import org.infinispan.remoting.RemoteException; /** * Tries to execute random operations for {@link #EXECUTION_TIME} and then verify the log for correctness. @@ -90,7 +119,7 @@ import java.util.stream.Collectors; @RunWith(CustomParameterized.class) public abstract class CorrectnessTestCase { static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(CorrectnessTestCase.class); - static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(10); + static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(2); static final int NUM_NODES = 4; static final int NUM_THREADS_PER_NODE = 4; static final int NUM_THREADS = NUM_NODES * NUM_THREADS_PER_NODE; @@ -99,6 +128,9 @@ public abstract class CorrectnessTestCase { static final int MAX_MEMBERS = 10; private final static Comparator> WALL_CLOCK_TIME_COMPARATOR = (o1, o2) -> Long.compare(o1.wallClockTime, o2.wallClockTime); + private final static boolean INVALIDATE_REGION = Boolean.getBoolean("testInfinispan.correctness.invalidateRegion"); + private final static boolean INJECT_FAILURES = Boolean.getBoolean("testInfinispan.correctness.injectFailures"); + @Parameterized.Parameter(0) public String name; @@ -157,36 +189,13 @@ public abstract class CorrectnessTestCase { ssrb.applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() ); } - @Override - protected void withTx(Runnable runnable, boolean rolledBack) throws Exception { - TransactionManager tm = transactionManager; - tm.begin(); - try { - runnable.run(); - } catch (RuntimeException e) { - tm.setRollbackOnly(); - throw e; - } finally { - if (!rolledBack && tm.getStatus() == Status.STATUS_ACTIVE) { - log.trace("TM commit begin"); - tm.commit(); - log.trace("TM commit end"); - } else { - log.trace("TM rollback begin"); - //log.warn(Util.threadDump()); - tm.rollback(); - log.trace("TM rollback end"); - } - } - } - @Override protected Operation getOperation() { if (accessType == AccessType.READ_ONLY) { ThreadLocalRandom random = ThreadLocalRandom.current(); Operation operation; int r = random.nextInt(30); - if (r == 0) operation = new InvalidateCache(); + if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache(); else if (r < 5) operation = new QueryFamilies(); else if (r < 10) operation = new RemoveFamily(r < 12); else operation = new ReadFamily(r < 20); @@ -205,16 +214,10 @@ public abstract class CorrectnessTestCase { new Object[] { "read-write, invalidation", CacheMode.INVALIDATION_SYNC, AccessType.READ_WRITE }, new Object[] { "read-write, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE }, new Object[] { "read-write, distributed", CacheMode.DIST_SYNC, AccessType.READ_WRITE }, - new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.READ_WRITE } + new Object[] { "non-strict, replicated", CacheMode.REPL_SYNC, AccessType.NONSTRICT_READ_WRITE } ); } - @Override - protected void withTx(Runnable runnable, boolean rolledBack) throws Exception { - // no transaction on JTA TM - runnable.run(); - } - @Override protected void applySettings(StandardServiceRegistryBuilder ssrb) { super.applySettings(ssrb); @@ -223,8 +226,9 @@ public abstract class CorrectnessTestCase { } } - @Before + @BeforeClassOnce public void beforeClass() { + TestResourceTracker.testStarted(getClass().getSimpleName()); Arrays.asList(new File(System.getProperty("java.io.tmpdir")) .listFiles((dir, name) -> name.startsWith("family_") || name.startsWith("invalidations-"))) .stream().forEach(f -> f.delete()); @@ -237,6 +241,7 @@ public abstract class CorrectnessTestCase { .applySetting( Environment.HBM2DDL_AUTO, "create-drop" ) .applySetting( Environment.CACHE_REGION_FACTORY, FailingInfinispanRegionFactory.class.getName()) .applySetting( TestInfinispanRegionFactory.CACHE_MODE, cacheMode ) + .applySetting( Environment.USE_MINIMAL_PUTS, "false" ) .applySetting( Environment.GENERATE_STATISTICS, "false" ); applySettings(ssrb); @@ -253,11 +258,12 @@ public abstract class CorrectnessTestCase { ssrb.applySetting(TestInfinispanRegionFactory.TRANSACTIONAL, accessType == AccessType.TRANSACTIONAL); } - @After + @AfterClassOnce public void afterClass() { for (SessionFactory sf : sessionFactories) { if (sf != null) sf.close(); } + TestResourceTracker.testFinished(getClass().getSimpleName()); } public static Class[] getAnnotatedClasses() { @@ -315,18 +321,40 @@ public abstract class CorrectnessTestCase { @Override protected void amendCacheConfiguration(String cacheName, ConfigurationBuilder configurationBuilder) { super.amendCacheConfiguration(cacheName, configurationBuilder); - // failure to write into timestamps would cause failure even though both DB and cache has been updated - if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) { - configurationBuilder.customInterceptors().addInterceptor() + configurationBuilder.transaction().cacheStopTimeout(1, TimeUnit.SECONDS); + if (INJECT_FAILURES) { + // failure to write into timestamps would cause failure even though both DB and cache has been updated + if (!cacheName.equals("timestamps") && !cacheName.endsWith(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE)) { + configurationBuilder.customInterceptors().addInterceptor() .interceptorClass(FailureInducingInterceptor.class) .position(InterceptorConfiguration.Position.FIRST); - log.trace("Injecting FailureInducingInterceptor into " + cacheName); - } else { - log.trace("Not injecting into " + cacheName); + log.trace("Injecting FailureInducingInterceptor into " + cacheName); + } else { + log.trace("Not injecting into " + cacheName); + } } } } + private final static Class[][] EXPECTED = { + { TransactionException.class, RollbackException.class, StaleObjectStateException.class }, + { TransactionException.class, RollbackException.class, PessimisticLockException.class }, + { TransactionException.class, RollbackException.class, LockAcquisitionException.class }, + { RemoteException.class, TimeoutException.class }, + { StaleStateException.class, PessimisticLockException.class}, + { StaleStateException.class, ObjectNotFoundException.class}, + { StaleStateException.class, ConstraintViolationException.class}, + { StaleStateException.class, LockAcquisitionException.class}, + { PersistenceException.class, ConstraintViolationException.class }, + { PersistenceException.class, LockAcquisitionException.class }, + { javax.persistence.PessimisticLockException.class, PessimisticLockException.class }, + { OptimisticLockException.class, StaleStateException.class }, + { PessimisticLockException.class }, + { StaleObjectStateException.class }, + { ObjectNotFoundException.class }, + { LockAcquisitionException.class } + }; + @Test public void test() throws Exception { ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); @@ -356,23 +384,7 @@ public abstract class CorrectnessTestCase { // ignore exceptions from optimistic failures and induced exceptions if (hasCause(e, InducedException.class)) { continue; - } else if (e instanceof RollbackException) { - Throwable cause = e.getCause(); - if (cause instanceof StaleObjectStateException - || cause instanceof PessimisticLockException - || cause instanceof LockAcquisitionException) { // use MVCC database to prevent this! - continue; - } - } else if (e instanceof RemoteException) { - Throwable cause = e.getCause(); - if (cause instanceof org.infinispan.util.concurrent.TimeoutException) { - continue; - } - } else if (e instanceof StaleStateException - || e instanceof PessimisticLockException - || e instanceof ObjectNotFoundException - || e instanceof ConstraintViolationException - || e instanceof LockAcquisitionException) { // use MVCC database to prevent this! + } else if (Stream.of(EXPECTED).anyMatch(exceptions -> matches(e, exceptions))) { continue; } exceptions.add(e); @@ -435,7 +447,8 @@ public abstract class CorrectnessTestCase { })); } for (ForkJoinTask task : tasks) { - task.get(); + // with heavy logging this may have trouble to complete + task.get(30, TimeUnit.SECONDS); } if (!exceptions.isEmpty()) { for (Exception e : exceptions) { @@ -484,33 +497,38 @@ public abstract class CorrectnessTestCase { if (invalidators != null && !invalidators.isEmpty()) { delayed.add(new DelayedInvalidators(map, entry.getKey())); } - iterator.remove(); } } } // poll until all invalidations come long deadline = System.currentTimeMillis() + 30000; while (System.currentTimeMillis() < deadline) { - for (Iterator iterator = delayed.iterator(); iterator.hasNext(); ) { - DelayedInvalidators entry = iterator.next(); - Object pendingPutMap = entry.getPendingPutMap(); - if (pendingPutMap == null) { - iterator.remove(); - } - else { - java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap); - if (invalidators == null || invalidators.isEmpty()) { - iterator.remove(); - } - } - } + iterateInvalidators(delayed, getInvalidators, (k, i) -> {}); if (delayed.isEmpty()) { break; } Thread.sleep(1000); } if (!delayed.isEmpty()) { - throw new IllegalStateException("Invalidators were not cleared: " + delayed); + iterateInvalidators(delayed, getInvalidators, (k, i) -> log.warnf("Left invalidators on key %s: %s", k, i)); + throw new IllegalStateException("Invalidators were not cleared: " + delayed.size()); + } + } + + private void iterateInvalidators(List delayed, Method getInvalidators, BiConsumer invalidatorConsumer) throws IllegalAccessException, InvocationTargetException { + for (Iterator iterator = delayed.iterator(); iterator.hasNext(); ) { + DelayedInvalidators entry = iterator.next(); + Object pendingPutMap = entry.getPendingPutMap(); + if (pendingPutMap == null) { + iterator.remove(); + } + else { + java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap); + if (invalidators == null || invalidators.isEmpty()) { + iterator.remove(); + } + invalidatorConsumer.accept(entry.key, invalidators); + } } } @@ -522,11 +540,19 @@ public abstract class CorrectnessTestCase { return hasCause(cause, clazz); } + private boolean matches(Throwable throwable, Class[] classes) { + return matches(throwable, classes, 0); + } + + private boolean matches(Throwable throwable, Class[] classes, int index) { + return index >= classes.length || (classes[index].isInstance(throwable) && matches(throwable.getCause(), classes, index + 1)); + } + protected Operation getOperation() { ThreadLocalRandom random = ThreadLocalRandom.current(); Operation operation; int r = random.nextInt(100); - if (r == 0) operation = new InvalidateCache(); + if (r == 0 && INVALIDATE_REGION) operation = new InvalidateCache(); else if (r < 5) operation = new QueryFamilies(); else if (r < 10) operation = new RemoveFamily(r < 6); else if (r < 20) operation = new UpdateFamily(r < 12, random.nextInt(1, 3)); @@ -615,8 +641,6 @@ public abstract class CorrectnessTestCase { return read.getValue() instanceof java.util.Collection && ((java.util.Collection) read.getValue()).isEmpty(); } - protected abstract void withTx(Runnable runnable, boolean rolledBack) throws Exception; - private abstract class Operation { protected final boolean rolledBack; @@ -627,21 +651,8 @@ public abstract class CorrectnessTestCase { public abstract void run() throws Exception; protected void withSession(Consumer consumer) throws Exception { - ByRef sessionRef = new ByRef<>(null); - try { - withTx(() -> openSessionAndExecute(sessionRef, consumer), rolledBack); - } finally { - Session s = sessionRef.get(); - if (s != null) { - s.close(); - } - } - } - - protected void openSessionAndExecute(ByRef sessionRef, Consumer consumer) { int node = threadNode.get(); Session s = sessionFactory(node).openSession(); - sessionRef.set(s); Transaction tx = s.getTransaction(); tx.begin(); try { @@ -661,12 +672,16 @@ public abstract class CorrectnessTestCase { log.trace("Hibernate rollback end"); } } catch (Exception e) { - log.trace("Hibernate commit or rollback failed", e); + log.trace("Hibernate commit or rollback failed, status is " + tx.getStatus(), e); + if (tx.getStatus() == TransactionStatus.MARKED_ROLLBACK) { + tx.rollback(); + } throw e; + } finally { + // cannot close before XA commit since force increment requires open connection + s.close(); } } - // cannot close before XA commit since force increment requires open connection - // s.close(); } protected void withRandomFamily(BiConsumer consumer, Ref familyNameUpdate, Ref> familyMembersUpdate, LockMode lockMode) throws Exception { @@ -791,7 +806,7 @@ public abstract class CorrectnessTestCase { throw e; } finally { int after = timestampGenerator.getAndIncrement(); - log.trace("Finished InsertFamily at " + after); + log.trace("Finished InsertFamily at " + after + ", " + (failure ? "failed" : "success")); familyIds.put(family.getId(), new AtomicInteger(NUM_ACCESS_AFTER_REMOVAL)); LogType type = failure || rolledBack ? LogType.WRITE_FAILURE : LogType.WRITE; getRecordList(familyNames, family.getId()).add(new Log<>(before, after, family.getName(), type)); @@ -970,10 +985,8 @@ public abstract class CorrectnessTestCase { @Override public void run() throws Exception { log.trace("Invalidating all caches"); - withTx(() -> { - int node = threadNode.get(); - sessionFactory(node).getCache().evictAllRegions(); - }, rolledBack); + int node = threadNode.get(); + sessionFactory(node).getCache().evictAllRegions(); } }