From c8ed5e1bef6a3a254b490d9fa5a6ae93465c7b66 Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Tue, 4 Aug 2015 11:21:04 +0200 Subject: [PATCH] HHH-9898 Test for correct behaviour of 2LC * Test is marked as ignored since it is expected to run it only manually --- .../stress/CorrectnessTestCase.java | 1158 +++++++++++++++++ .../infinispan/stress/entities/Address.java | 4 +- .../infinispan/stress/entities/Family.java | 9 +- .../src/test/resources/2lc-test-tcp.xml | 8 +- 4 files changed, 1167 insertions(+), 12 deletions(-) create mode 100644 hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java new file mode 100644 index 0000000000..06ba623a3f --- /dev/null +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/CorrectnessTestCase.java @@ -0,0 +1,1158 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ + +package org.hibernate.test.cache.infinispan.stress; + +import org.hibernate.LockMode; +import org.hibernate.ObjectNotFoundException; +import org.hibernate.PessimisticLockException; +import org.hibernate.Session; +import org.hibernate.SessionFactory; +import org.hibernate.StaleObjectStateException; +import org.hibernate.StaleStateException; +import org.hibernate.Transaction; +import org.hibernate.annotations.CacheConcurrencyStrategy; +import org.hibernate.boot.Metadata; +import org.hibernate.boot.MetadataSources; +import org.hibernate.boot.registry.StandardServiceRegistry; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cache.infinispan.InfinispanRegionFactory; +import org.hibernate.cache.infinispan.access.PutFromLoadValidator; +import org.hibernate.cache.infinispan.access.TransactionalAccessDelegate; +import org.hibernate.cache.spi.access.RegionAccessStrategy; +import org.hibernate.cfg.Environment; +import org.hibernate.criterion.Restrictions; +import org.hibernate.dialect.H2Dialect; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform; +import org.hibernate.exception.ConstraintViolationException; +import org.hibernate.exception.LockAcquisitionException; +import org.hibernate.mapping.Collection; +import org.hibernate.mapping.PersistentClass; +import org.hibernate.mapping.RootClass; +import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl; +import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl; +import org.hibernate.resource.transaction.spi.TransactionStatus; +import org.hibernate.test.cache.infinispan.stress.entities.Address; +import org.hibernate.test.cache.infinispan.stress.entities.Family; +import org.hibernate.test.cache.infinispan.stress.entities.Person; +import org.hibernate.testing.jta.JtaAwareConnectionProviderImpl; +import org.hibernate.testing.jta.TestingJtaPlatformImpl; +import org.infinispan.commands.VisitableCommand; +import org.infinispan.commands.tx.CommitCommand; +import org.infinispan.commands.tx.RollbackCommand; +import org.infinispan.commons.util.ByRef; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.cache.InterceptorConfiguration; +import org.infinispan.configuration.parsing.ConfigurationBuilderHolder; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.BaseCustomInterceptor; +import org.infinispan.manager.DefaultCacheManager; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.remoting.RemoteException; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.util.logging.LogFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.TransactionManager; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Tries to execute random operations for {@link #EXECUTION_TIME} and then verify the log for correctness. + * + * Assumes serializable consistency. + * + * @author Radim Vansa + */ +public abstract class CorrectnessTestCase { + static final org.infinispan.util.logging.Log log = LogFactory.getLog(CorrectnessTestCase.class); + static final long EXECUTION_TIME = TimeUnit.MINUTES.toMillis(10); + static final int NUM_NODES = 4; + static final int NUM_THREADS_PER_NODE = 4; + static final int NUM_THREADS = NUM_NODES * NUM_THREADS_PER_NODE; + static final int NUM_FAMILIES = 1; + static final int NUM_ACCESS_AFTER_REMOVAL = NUM_THREADS * 2; + static final int MAX_MEMBERS = 10; + private final static Comparator> WALL_CLOCK_TIME_COMPARATOR = (o1, o2) -> Long.compare(o1.wallClockTime, o2.wallClockTime); + + static ThreadLocal threadNode = new ThreadLocal<>(); + + final AtomicInteger timestampGenerator = new AtomicInteger(); + final ConcurrentSkipListMap familyIds = new ConcurrentSkipListMap<>(); + SessionFactory[] sessionFactories; + volatile boolean running = true; + + final ThreadLocal>>> familyNames = new ThreadLocal>>>() { + @Override + protected Map>> initialValue() { + return new HashMap<>(); + } + }; + final ThreadLocal>>>> familyMembers = new ThreadLocal>>>>() { + @Override + protected Map>>> initialValue() { + return new HashMap<>(); + } + }; + private List exceptions = Collections.synchronizedList(new ArrayList<>()); + + public String getDbName() { + return getClass().getName().replaceAll("\\W", "_"); + } + + public abstract static class Jta extends CorrectnessTestCase { + private final TransactionManager transactionManager = TestingJtaPlatformImpl.transactionManager(); + + @Override + protected void applySettings(StandardServiceRegistryBuilder ssrb) { + ssrb + .applySetting( Environment.JTA_PLATFORM, TestingJtaPlatformImpl.class.getName() ) + .applySetting( Environment.CONNECTION_PROVIDER, JtaAwareConnectionProviderImpl.class.getName() ) + .applySetting( Environment.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName() ); + } + + @Override + protected void withTx(Runnable runnable, boolean rolledBack) throws Exception { + int node = threadNode.get(); + TransactionManager tm = transactionManager; + tm.begin(); + try { + runnable.run(); + } catch (RuntimeException e) { + tm.setRollbackOnly(); + throw e; + } finally { + if (!rolledBack && tm.getStatus() == Status.STATUS_ACTIVE) { + log.trace("TM commit begin"); + tm.commit(); + log.trace("TM commit end"); + } else { + log.trace("TM rollback begin"); + //log.warn(Util.threadDump()); + tm.rollback(); + log.trace("TM rollback end"); + } + } + } + } + + @Ignore // as long-running test, we'll execute it only by hand + public static class JtaTransactional extends Jta { + } + + @Ignore // as long-running test, we'll execute it only by hand + public static class JtaReadOnly extends Jta { + @Override + protected Operation getOperation() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + Operation operation; + int r = random.nextInt(30); + if (r == 0) operation = new InvalidateCache(); + else if (r < 5) operation = new QueryFamilies(); + else if (r < 10) operation = new RemoveFamily(r < 12); + else operation = new ReadFamily(r < 20); + return operation; + } + + @Override + protected void applySettings(StandardServiceRegistryBuilder ssrb) { + super.applySettings(ssrb); + ssrb.applySetting(Environment.DEFAULT_CACHE_CONCURRENCY_STRATEGY, CacheConcurrencyStrategy.READ_ONLY.toAccessType().getExternalName()); + ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName()); + } + } + + @Ignore // as long-running test, we'll execute it only by hand + public static class JtaNonTransactional extends Jta { + @Override + protected void applySettings(StandardServiceRegistryBuilder ssrb) { + super.applySettings(ssrb); + ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName()); + } + } + + @Ignore // as long-running test, we'll execute it only by hand + public static class NonJta extends CorrectnessTestCase { + @Override + protected void withTx(Runnable runnable, boolean rolledBack) throws Exception { + // no transaction on JTA TM + runnable.run(); + } + + @Override + protected void applySettings(StandardServiceRegistryBuilder ssrb) { + super.applySettings(ssrb); + ssrb.applySetting(Environment.JTA_PLATFORM, NoJtaPlatform.class.getName()); + ssrb.applySetting(Environment.TRANSACTION_COORDINATOR_STRATEGY, JdbcResourceLocalTransactionCoordinatorBuilderImpl.class.getName()); + ssrb.applySetting(Environment.CACHE_REGION_FACTORY, ForceNonTxInfinispanRegionFactory.class.getName()); + } + } + + @Before + public void beforeClass() { + Arrays.asList(new File(System.getProperty("java.io.tmpdir")) + .listFiles((dir, name) -> name.startsWith("family_") || name.startsWith("invalidations-"))) + .stream().forEach(f -> f.delete()); + StandardServiceRegistryBuilder ssrb = new StandardServiceRegistryBuilder().enableAutoClose() + .applySetting( Environment.USE_SECOND_LEVEL_CACHE, "true" ) + .applySetting( Environment.USE_QUERY_CACHE, "true" ) + .applySetting( Environment.DRIVER, "org.h2.Driver" ) + .applySetting( Environment.URL, "jdbc:h2:mem:" + getDbName() + ";TRACE_LEVEL_FILE=4") + .applySetting( Environment.DIALECT, H2Dialect.class.getName() ) + .applySetting( Environment.HBM2DDL_AUTO, "create-drop" ) + .applySetting( Environment.CACHE_REGION_FACTORY, TestInfinispanRegionFactory.class.getName()) + .applySetting( Environment.GENERATE_STATISTICS, "false" ); + applySettings(ssrb); + + sessionFactories = new SessionFactory[NUM_NODES]; + for (int i = 0; i < NUM_NODES; ++i) { + StandardServiceRegistry registry = ssrb.build(); + Metadata metadata = buildMetadata( registry ); + sessionFactories[i] = metadata.buildSessionFactory(); + } + } + + protected void applySettings(StandardServiceRegistryBuilder ssrb) { + } + + @After + public void afterClass() { + for (SessionFactory sf : sessionFactories) { + if (sf != null) sf.close(); + } + } + + public static Class[] getAnnotatedClasses() { + return new Class[] {Family.class, Person.class, Address.class}; + } + + private static Metadata buildMetadata(StandardServiceRegistry registry) { + final String cacheStrategy = "transactional"; + + MetadataSources metadataSources = new MetadataSources( registry ); + for ( Class entityClass : getAnnotatedClasses() ) { + metadataSources.addAnnotatedClass( entityClass ); + } + + Metadata metadata = metadataSources.buildMetadata(); + + for ( PersistentClass entityBinding : metadata.getEntityBindings() ) { + if (!entityBinding.isInherited()) { + ( (RootClass) entityBinding ).setCacheConcurrencyStrategy( cacheStrategy); + } + } + + for ( Collection collectionBinding : metadata.getCollectionBindings() ) { + collectionBinding.setCacheConcurrencyStrategy( cacheStrategy ); + } + + return metadata; + } + + public static class InducedException extends Exception { + public InducedException(String message) { + super(message); + } + } + + public static class FailureInducingInterceptor extends BaseCustomInterceptor { + @Override + protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable { + // Failure in CommitCommand/RollbackCommand keeps some locks closed, therefore blocking the test + if (!(command instanceof CommitCommand || command instanceof RollbackCommand)) { + /* Introduce 5 % probability of failure */ + if (ThreadLocalRandom.current().nextInt(100) < 5) { + throw new InducedException("Simulating failure somewhere"); + } + } + return super.handleDefault(ctx, command); + } + } + + public static class TestInfinispanRegionFactory extends InfinispanRegionFactory { + private static AtomicInteger counter = new AtomicInteger(); + + public TestInfinispanRegionFactory() { + super(); // For reflection-based instantiation + } + + @Override + protected EmbeddedCacheManager createCacheManager(ConfigurationBuilderHolder holder) { + amendConfiguration(holder); + return new DefaultCacheManager(holder, true); + } + + protected void amendConfiguration(ConfigurationBuilderHolder holder) { + holder.getGlobalConfigurationBuilder().globalJmxStatistics().allowDuplicateDomains(true); + holder.getGlobalConfigurationBuilder().transport().nodeName("Node" + (char)('A' + counter.getAndIncrement())); + + for (Map.Entry entry : holder.getNamedConfigurationBuilders().entrySet()) { + // failure to write into timestamps would cause failure even though both DB and cache has been updated + if (!entry.getKey().equals("timestamps") && !entry.getKey().endsWith(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME)) { + entry.getValue().customInterceptors().addInterceptor() + .interceptorClass(FailureInducingInterceptor.class) + .position(InterceptorConfiguration.Position.FIRST); + log.trace("Injecting FailureInducingInterceptor into " + entry.getKey()); + } + else { + log.trace("Not injecting into " + entry.getKey()); + } + } + } + } + + public static class ForceNonTxInfinispanRegionFactory extends TestInfinispanRegionFactory { + public ForceNonTxInfinispanRegionFactory() { + super(); // For reflection-based instantiation + } + + @Override + protected void amendConfiguration(ConfigurationBuilderHolder holder) { + super.amendConfiguration(holder); + holder.getDefaultConfigurationBuilder().transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL); + for (ConfigurationBuilder cb : holder.getNamedConfigurationBuilders().values()) { + cb.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL); + } + } + } + + @Test + public void test() throws Exception { + ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); + + Map>> allFamilyNames = new HashMap<>(); + Map>>> allFamilyMembers = new HashMap<>(); + + running = true; + List> futures = new ArrayList<>(); + for (int node = 0; node < NUM_NODES; ++node) { + final int NODE = node; + for (int i = 0; i < NUM_THREADS_PER_NODE; ++i) { + final int I = i; + futures.add(exec.submit(() -> { + Thread.currentThread().setName("Node" + (char)('A' + NODE) + "-thread-" + I); + threadNode.set(NODE); + while (running) { + Operation operation; + if (familyIds.size() < NUM_FAMILIES) { + operation = new InsertFamily(ThreadLocalRandom.current().nextInt(5) == 0); + } else { + operation = getOperation(); + } + try { + operation.run(); + } catch (Exception e) { + // ignore exceptions from optimistic failures and induced exceptions + if (hasCause(e, InducedException.class)) { + continue; + } else if (e instanceof RollbackException) { + Throwable cause = e.getCause(); + if (cause instanceof StaleObjectStateException + || cause instanceof PessimisticLockException + || cause instanceof LockAcquisitionException) { // use MVCC database to prevent this! + continue; + } + } else if (e instanceof RemoteException) { + Throwable cause = e.getCause(); + if (cause instanceof org.infinispan.util.concurrent.TimeoutException) { + continue; + } + } else if (e instanceof StaleStateException + || e instanceof PessimisticLockException + || e instanceof ObjectNotFoundException + || e instanceof ConstraintViolationException + || e instanceof LockAcquisitionException) { // use MVCC database to prevent this! + continue; + } + exceptions.add(e); + log.error("Failed " + operation.getClass().getName(), e); + } + } + synchronized (allFamilyNames) { + for (Map.Entry>> entry : familyNames.get().entrySet()) { + List> list = allFamilyNames.get(entry.getKey()); + if (list == null) allFamilyNames.put(entry.getKey(), list = new ArrayList<>()); + list.addAll(entry.getValue()); + } + for (Map.Entry>>> entry : familyMembers.get().entrySet()) { + List>> list = allFamilyMembers.get(entry.getKey()); + if (list == null) allFamilyMembers.put(entry.getKey(), list = new ArrayList<>()); + list.addAll(entry.getValue()); + } + } + return null; + })); + } + } + long testEnd = System.currentTimeMillis() + EXECUTION_TIME; + while (System.currentTimeMillis() < testEnd) { + if (!exceptions.isEmpty()) { + break; + } + Thread.sleep(1000); + } + running = false; + exec.shutdown(); + if (!exec.awaitTermination(1000, TimeUnit.SECONDS)) throw new IllegalStateException(); + for (Future f : futures) { + f.get(); // check for exceptions + } + checkForEmptyPendingPuts(); + log.infof("Generated %d timestamps%n", timestampGenerator.get()); + AtomicInteger created = new AtomicInteger(); + AtomicInteger removed = new AtomicInteger(); + ForkJoinPool threadPool = ForkJoinPool.commonPool(); + ArrayList> tasks = new ArrayList<>(); + for (Map.Entry>> entry : allFamilyNames.entrySet()) { + tasks.add(threadPool.submit(() -> { + int familyId = entry.getKey(); + List> list = entry.getValue(); + created.incrementAndGet(); + NavigableMap>> logByTime = getWritesAtTime(list); + checkCorrectness("family_name-" + familyId + "-", list, logByTime); + if (list.stream().anyMatch(l -> l.type == LogType.WRITE && l.getValue() == null)) { + removed.incrementAndGet(); + } + })); + } + for (Map.Entry>>> entry : allFamilyMembers.entrySet()) { + tasks.add(threadPool.submit(() -> { + int familyId = entry.getKey(); + List>> list = entry.getValue(); + NavigableMap>>> logByTime = getWritesAtTime(list); + checkCorrectness("family_members-" + familyId + "-", list, logByTime); + })); + } + for (ForkJoinTask task : tasks) { + task.get(); + } + if (!exceptions.isEmpty()) { + for (Exception e : exceptions) { + log.error("Test failure", e); + } + throw new IllegalStateException("There were " + exceptions.size() + " exceptions"); + } + log.infof("Created %d families, removed %d%n", created.get(), removed.get()); + } + + private static class DelayedInvalidators { + final ConcurrentMap map; + final Object key; + + public DelayedInvalidators(ConcurrentMap map, Object key) { + this.map = map; + this.key = key; + } + + public Object getPendingPutMap() { + return map.get(key); + } + } + + protected void checkForEmptyPendingPuts() throws Exception { + Field pp = PutFromLoadValidator.class.getDeclaredField("pendingPuts"); + pp.setAccessible(true); + Method getInvalidators = null; + List delayed = new LinkedList<>(); + for (int i = 0; i < sessionFactories.length; i++) { + SessionFactoryImplementor sfi = (SessionFactoryImplementor) sessionFactories[i]; + for (Object regionName : sfi.getAllSecondLevelCacheRegions().keySet()) { + PutFromLoadValidator validator = getPutFromLoadValidator(sfi, (String) regionName); + if (validator == null) { + log.warn("No validator for " + regionName); + continue; + } + ConcurrentMap map = (ConcurrentMap) pp.get(validator); + for (Iterator> iterator = map.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry entry = iterator.next(); + if (getInvalidators == null) { + getInvalidators = entry.getValue().getClass().getMethod("getInvalidators"); + getInvalidators.setAccessible(true); + } + java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(entry.getValue()); + if (invalidators != null && !invalidators.isEmpty()) { + delayed.add(new DelayedInvalidators(map, entry.getKey())); + } + iterator.remove(); + } + } + } + // poll until all invalidations come + long deadline = System.currentTimeMillis() + 30000; + while (System.currentTimeMillis() < deadline) { + for (Iterator iterator = delayed.iterator(); iterator.hasNext(); ) { + DelayedInvalidators entry = iterator.next(); + Object pendingPutMap = entry.getPendingPutMap(); + if (pendingPutMap == null) { + iterator.remove(); + } + else { + java.util.Collection invalidators = (java.util.Collection) getInvalidators.invoke(pendingPutMap); + if (invalidators == null || invalidators.isEmpty()) { + iterator.remove(); + } + } + } + if (delayed.isEmpty()) { + break; + } + Thread.sleep(1000); + } + if (!delayed.isEmpty()) { + throw new IllegalStateException("Invalidators were not cleared: " + delayed); + } + } + + private boolean hasCause(Throwable throwable, Class clazz) { + if (throwable == null) return false; + Throwable cause = throwable.getCause(); + if (throwable == cause) return false; + if (clazz.isInstance(cause)) return true; + return hasCause(cause, clazz); + } + + protected Operation getOperation() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + Operation operation; + int r = random.nextInt(100); + if (r == 0) operation = new InvalidateCache(); + else if (r < 5) operation = new QueryFamilies(); + else if (r < 10) operation = new RemoveFamily(r < 6); + else if (r < 20) operation = new UpdateFamily(r < 12, random.nextInt(1, 3)); + else if (r < 35) operation = new AddMember(r < 25); + else if (r < 50) operation = new RemoveMember(r < 40); + else operation = new ReadFamily(r < 75); + return operation; + } + + private NavigableMap>> getWritesAtTime(List> list) { + NavigableMap>> writes = new TreeMap<>(); + for (Log log : list) { + if (log.type != LogType.WRITE) continue; + for (int time = log.before; time <= log.after; ++time) { + List> onTime = writes.get(time); + if (onTime == null) { + writes.put(time, onTime = new ArrayList<>()); + } + onTime.add(log); + } + } + return writes; + } + + private void checkCorrectness(String dumpPrefix, List> logs, NavigableMap>> writesByTime) { + Collections.sort(logs, WALL_CLOCK_TIME_COMPARATOR); + int nullReads = 0, reads = 0, writes = 0; + for (Log read : logs) { + if (read.type != LogType.READ) { + writes++; + continue; + } + if (read.getValue() == null || isEmptyCollection(read)) nullReads++; + else reads++; + + Map> possibleValues = new HashMap<>(); + for (List> list : writesByTime.subMap(read.before, true, read.after, true).values()) { + for (Log write : list) { + if (read.precedes(write)) continue; + possibleValues.put(write.getValue(), write); + } + } + int startOfLastWriteBeforeRead = 0; + for (Map.Entry>> entry : writesByTime.headMap(read.before, false).descendingMap().entrySet()) { + int time = entry.getKey(); + if (time < startOfLastWriteBeforeRead) break; + for (Log write : entry.getValue()) { + if (write.after < read.before && write.before > startOfLastWriteBeforeRead) { + startOfLastWriteBeforeRead = write.before; + } + possibleValues.put(write.getValue(), write); + } + } + + if (possibleValues.isEmpty()) { + // the entry was not created at all (first write failed) + break; + } + if (!possibleValues.containsKey(read.getValue())) { + dumpLogs(dumpPrefix, logs); + exceptions.add(new IllegalStateException(String.format("R %s: %d .. %d (%s, %s) -> %s not in %s (%d+)", dumpPrefix, + read.before, read.after, read.threadName, new SimpleDateFormat("HH:mm:ss,SSS").format(new Date(read.wallClockTime)), + read.getValue(), possibleValues.values(), startOfLastWriteBeforeRead))); + break; + } + } + log.infof("Checked %d null reads, %d reads and %d writes%n", nullReads, reads, writes); + } + + private void dumpLogs(String prefix, List> logs) { + try { + File f = File.createTempFile(prefix, ".log"); + log.info("Dumping logs into " + f.getAbsolutePath()); + try (BufferedWriter writer = Files.newBufferedWriter(f.toPath())) { + for (Log log : logs) { + writer.write(log.toString()); + writer.write('\n'); + } + } + } catch (IOException e) { + log.error("Failed to dump family logs"); + } + } + + private static boolean isEmptyCollection(Log read) { + return read.getValue() instanceof java.util.Collection && ((java.util.Collection) read.getValue()).isEmpty(); + } + + protected abstract void withTx(Runnable runnable, boolean rolledBack) throws Exception; + + private abstract class Operation { + protected final boolean rolledBack; + + public Operation(boolean rolledBack) { + this.rolledBack = rolledBack; + } + + public abstract void run() throws Exception; + + protected void withSession(Consumer consumer) throws Exception { + ByRef sessionRef = new ByRef<>(null); + try { + withTx(() -> openSessionAndExecute(sessionRef, consumer), rolledBack); + } finally { + Session s = sessionRef.get(); + if (s != null) { + s.close(); + } + } + } + + protected void openSessionAndExecute(ByRef sessionRef, Consumer consumer) { + int node = threadNode.get(); + Session s = sessionFactory(node).openSession(); + sessionRef.set(s); + Transaction tx = s.getTransaction(); + tx.begin(); + try { + consumer.accept(s); + } catch (Exception e) { + tx.markRollbackOnly(); + throw e; + } finally { + try { + if (!rolledBack && tx.getStatus() == TransactionStatus.ACTIVE) { + log.trace("Hibernate commit begin"); + tx.commit(); + log.trace("Hibernate commit end"); + } else { + log.trace("Hibernate rollback begin"); + tx.rollback(); + log.trace("Hibernate rollback end"); + } + } catch (Exception e) { + log.trace("Hibernate commit or rollback failed", e); + throw e; + } + } + // cannot close before XA commit since force increment requires open connection + // s.close(); + } + + protected void withRandomFamily(BiConsumer consumer, Ref familyNameUpdate, Ref> familyMembersUpdate, LockMode lockMode) throws Exception { + int id = randomFamilyId(ThreadLocalRandom.current()); + int before = timestampGenerator.getAndIncrement(); + log.tracef("Started %s(%d, %s) at %d", getClass().getSimpleName(), id, rolledBack, before); + Log familyNameLog = new Log<>(); + Log> familyMembersLog = new Log<>(); + + boolean failure = false; + try { + withSession(s -> { + Family f = lockMode != null ? s.get(Family.class, id, lockMode) : s.get(Family.class, id); + if (f == null) { + familyNameLog.setValue(null); + familyMembersLog.setValue(Collections.EMPTY_SET); + familyNotFound(id); + } else { + familyNameLog.setValue(f.getName()); + familyMembersLog.setValue(membersToNames(f.getMembers())); + consumer.accept(s, f); + } + }); + } catch (Exception e) { + failure = true; + throw e; + } finally { + int after = timestampGenerator.getAndIncrement(); + recordReadWrite(id, before, after, failure, familyNameUpdate, familyMembersUpdate, familyNameLog, familyMembersLog); + } + } + + protected void withRandomFamilies(int numFamilies, BiConsumer consumer, String[] familyNameUpdates, Set[] familyMembersUpdates, LockMode lockMode) throws Exception { + int ids[] = new int[numFamilies]; + Log[] familyNameLogs = new Log[numFamilies]; + Log>[] familyMembersLogs = new Log[numFamilies]; + for (int i = 0; i < numFamilies; ++i) { + ids[i] = randomFamilyId(ThreadLocalRandom.current()); + familyNameLogs[i] = new Log<>(); + familyMembersLogs[i] = new Log<>(); + } + int before = timestampGenerator.getAndIncrement(); + log.tracef("Started %s(%s) at %d", getClass().getSimpleName(), Arrays.toString(ids), before); + + boolean failure = false; + try { + withSession(s -> { + Family[] families = new Family[numFamilies]; + for (int i = 0; i < numFamilies; ++i) { + Family f = lockMode != null ? s.get(Family.class, ids[i], lockMode) : s.get(Family.class, ids[i]); + families[i] = f; + if (f == null) { + familyNameLogs[i].setValue(null); + familyMembersLogs[i].setValue(Collections.EMPTY_SET); + familyNotFound(ids[i]); + } else { + familyNameLogs[i].setValue(f.getName()); + familyMembersLogs[i].setValue(membersToNames(f.getMembers())); + } + } + consumer.accept(s, families); + }); + } catch (Exception e) { + failure = true; + throw e; + } finally { + int after = timestampGenerator.getAndIncrement(); + for (int i = 0; i < numFamilies; ++i) { + recordReadWrite(ids[i], before, after, failure, + familyNameUpdates != null ? Ref.of(familyNameUpdates[i]) : Ref.empty(), + familyMembersUpdates != null ? Ref.of(familyMembersUpdates[i]) : Ref.empty(), + familyNameLogs[i], familyMembersLogs[i]); + } + } + } + + private void recordReadWrite(int id, int before, int after, boolean failure, Ref familyNameUpdate, Ref> familyMembersUpdate, Log familyNameLog, Log> familyMembersLog) { + log.tracef("Finished %s at %d", getClass().getSimpleName(), after); + + LogType readType, writeType; + if (failure || rolledBack) { + writeType = LogType.WRITE_FAILURE; + readType = LogType.READ_FAILURE; + } else { + writeType = LogType.WRITE; + readType = LogType.READ; + } + + familyNameLog.setType(readType).setTimes(before, after); + familyMembersLog.setType(readType).setTimes(before, after); + + getRecordList(familyNames, id).add(familyNameLog); + getRecordList(familyMembers, id).add(familyMembersLog); + + + if (familyNameLog.getValue() != null) { + if (familyNameUpdate.isSet()) { + getRecordList(familyNames, id).add(new Log<>(before, after, familyNameUpdate.get(), writeType, familyNameLog)); + } + if (familyMembersUpdate.isSet()) { + getRecordList(familyMembers, id).add(new Log<>(before, after, familyMembersUpdate.get(), writeType, familyMembersLog)); + } + } + } + } + + private class InsertFamily extends Operation { + public InsertFamily(boolean rolledBack) { + super(rolledBack); + } + + @Override + public void run() throws Exception { + Family family = createFamily(); + int before = timestampGenerator.getAndIncrement(); + log.trace("Started InsertFamily at " + before); + boolean failure = false; + try { + withSession(s -> s.persist(family)); + } catch (Exception e) { + failure = true; + throw e; + } finally { + int after = timestampGenerator.getAndIncrement(); + log.trace("Finished InsertFamily at " + after); + familyIds.put(family.getId(), new AtomicInteger(NUM_ACCESS_AFTER_REMOVAL)); + LogType type = failure || rolledBack ? LogType.WRITE_FAILURE : LogType.WRITE; + getRecordList(familyNames, family.getId()).add(new Log<>(before, after, family.getName(), type)); + getRecordList(familyMembers, family.getId()).add(new Log<>(before, after, membersToNames(family.getMembers()), type)); + } + } + } + + private Set membersToNames(Set members) { + return members.stream().map(p -> p.getFirstName()).collect(Collectors.toSet()); + } + + private class ReadFamily extends Operation { + private final boolean evict; + + public ReadFamily(boolean evict) { + super(false); + this.evict = evict; + } + + @Override + public void run() throws Exception { + withRandomFamily((s, f) -> { + if (evict) { + sessionFactory(threadNode.get()).getCache().evictEntity(Family.class, f.getId()); + } + }, Ref.empty(), Ref.empty(), null); + } + } + + private class UpdateFamily extends Operation { + private final int numUpdates; + + public UpdateFamily(boolean rolledBack, int numUpdates) { + super(rolledBack); + this.numUpdates = numUpdates; + } + + @Override + public void run() throws Exception { + String[] newNames = new String[numUpdates]; + for (int i = 0; i < numUpdates; ++i) { + newNames[i] = randomString(ThreadLocalRandom.current()); + } + withRandomFamilies(numUpdates, (s, families) -> { + for (int i = 0; i < numUpdates; ++i) { + Family f = families[i]; + if (f != null) { + f.setName(newNames[i]); + s.persist(f); + } + } + }, newNames, null, LockMode.OPTIMISTIC_FORCE_INCREMENT); + } + } + + private class RemoveFamily extends Operation { + public RemoveFamily(boolean rolledBack) { + super(rolledBack); + } + + @Override + public void run() throws Exception { + withRandomFamily((s, f) -> s.delete(f), Ref.of(null), Ref.of(Collections.EMPTY_SET), LockMode.OPTIMISTIC); + } + } + + private abstract class MemberOperation extends Operation { + public MemberOperation(boolean rolledBack) { + super(rolledBack); + } + + @Override + public void run() throws Exception { + Ref> newMembers = new Ref<>(); + withRandomFamily((s, f) -> { + boolean updated = updateMembers(s, ThreadLocalRandom.current(), f); + if (updated) { + newMembers.set(membersToNames(f.getMembers())); + s.persist(f); + } + }, Ref.empty(), newMembers, LockMode.OPTIMISTIC_FORCE_INCREMENT); + } + + protected abstract boolean updateMembers(Session s, ThreadLocalRandom random, Family f); + } + + private class AddMember extends MemberOperation { + public AddMember(boolean rolledBack) { + super(rolledBack); + } + + protected boolean updateMembers(Session s, ThreadLocalRandom random, Family f) { + Set members = f.getMembers(); + if (members.size() < MAX_MEMBERS) { + members.add(createPerson(random, f)); + return true; + } else { + return false; + } + } + } + + private class RemoveMember extends MemberOperation { + public RemoveMember(boolean rolledBack) { + super(rolledBack); + } + + @Override + protected boolean updateMembers(Session s, ThreadLocalRandom random, Family f) { + int numMembers = f.getMembers().size(); + if (numMembers > 0) { + Iterator it = f.getMembers().iterator(); + Person person = null; + for (int i = random.nextInt(numMembers); i >= 0; --i) { + person = it.next(); + } + it.remove(); + if (person != null) { + s.delete(person); + } + return true; + } else { + return false; + } + } + } + + private class QueryFamilies extends Operation { + final static int MAX_RESULTS = 10; + + public QueryFamilies() { + super(false); + } + + @Override + public void run() throws Exception { + String prefix = new StringBuilder(2) + .append((char) ThreadLocalRandom.current().nextInt('A', 'Z' + 1)).append('%').toString(); + int[] ids = new int[MAX_RESULTS]; + String[] names = new String[MAX_RESULTS]; + Set[] members = new Set[MAX_RESULTS]; + + int before = timestampGenerator.getAndIncrement(); + log.tracef("Started QueryFamilies at %d", before); + withSession(s -> { + List results = s.createCriteria(Family.class) + .add(Restrictions.like("name", prefix)) + .setMaxResults(MAX_RESULTS) + .setCacheable(true) + .list(); + int index = 0; + for (Family f : results) { + ids[index] = f.getId(); + names[index] = f.getName(); + members[index] = membersToNames(f.getMembers()); + ++index; + } + }); + + int after = timestampGenerator.getAndIncrement(); + log.tracef("Finished QueryFamilies at %d", after); + for (int index = 0; index < MAX_RESULTS; ++index) { + if (names[index] == null) break; + getRecordList(familyNames, ids[index]).add(new Log<>(before, after, names[index], LogType.READ)); + getRecordList(familyMembers, ids[index]).add(new Log<>(before, after, members[index], LogType.READ)); + } + } + } + + private class InvalidateCache extends Operation { + public InvalidateCache() { + super(false); + } + + @Override + public void run() throws Exception { + log.trace("Invalidating all caches"); + withTx(() -> { + int node = threadNode.get(); + sessionFactory(node).getCache().evictAllRegions(); + }, rolledBack); + } + } + + private PutFromLoadValidator getPutFromLoadValidator(SessionFactoryImplementor sfi, String regionName) throws NoSuchFieldException, IllegalAccessException { + RegionAccessStrategy strategy = sfi.getSecondLevelCacheRegionAccessStrategy(regionName); + if (strategy == null) { + return null; + } + Field delegateField = strategy.getClass().getDeclaredField("delegate"); + delegateField.setAccessible(true); + Object delegate = delegateField.get(strategy); + Field validatorField = TransactionalAccessDelegate.class.getDeclaredField("putValidator"); + validatorField.setAccessible(true); + return (PutFromLoadValidator) validatorField.get(delegate); + } + + protected SessionFactory sessionFactory(int node) { + return sessionFactories[node]; + } + + private void familyNotFound(int id) { + AtomicInteger access = familyIds.get(id); + if (access == null) return; + if (access.decrementAndGet() == 0) { + familyIds.remove(id); + } + } + + private List getRecordList(ThreadLocal>> tlListMap, int id) { + Map> map = tlListMap.get(); + List list = map.get(id); + if (list == null) map.put(id, list = new ArrayList<>()); + return list; + } + + private int randomFamilyId(ThreadLocalRandom random) { + Map.Entry first = familyIds.firstEntry(); + Map.Entry last = familyIds.lastEntry(); + if (first == null || last == null) return 0; + Map.Entry ceiling = familyIds.ceilingEntry(random.nextInt(first.getKey(), last.getKey() + 1)); + return ceiling == null ? 0 : ceiling.getKey(); + } + + private static Family createFamily() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + String familyName = randomString(random); + Family f = new Family(familyName); + HashSet members = new HashSet<>(); + members.add(createPerson(random, f)); + f.setMembers(members); + return f; + } + + private static Person createPerson(ThreadLocalRandom random, Family family) { + return new Person(randomString(random), family); + } + + private static String randomString(ThreadLocalRandom random) { + StringBuilder sb = new StringBuilder(10); + for (int i = 0; i < 10; ++i) { + sb.append((char) random.nextInt('A', 'Z' + 1)); + } + return sb.toString(); + } + + private enum LogType { + READ('R'), WRITE('W'), READ_FAILURE('L'), WRITE_FAILURE('F'); + + private final char shortName; + + LogType(char shortName) { + this.shortName = shortName; + } + } + + private class Log { + int before; + int after; + T value; + LogType type; + Log[] preceding; + String threadName; + long wallClockTime; + + public Log(int time) { + this(); + this.before = time; + this.after = time; + } + + public Log(int before, int after, T value, LogType type, Log... preceding) { + this(); + this.before = before; + this.after = after; + this.value = value; + this.type = type; + this.preceding = preceding; + } + + public Log() { + threadName = Thread.currentThread().getName(); + wallClockTime = System.currentTimeMillis(); + } + + public Log setType(LogType type) { + this.type = type; + return this; + } + + public void setTimes(int before, int after) { + this.before = before; + this.after = after; + } + + public void setValue(T value) { + this.value = value; + } + + public T getValue() { + return value; + } + + public boolean precedes(Log write) { + if (write.preceding == null) return false; + for (Log l : write.preceding) { + if (l == this) return true; + } + return false; + } + + @Override + public String toString() { + return String.format("%c: %5d - %5d\t(%s,\t%s)\t%s", type.shortName, before, after, + new SimpleDateFormat("HH:mm:ss,SSS").format(new Date(wallClockTime)), threadName, value); + } + } + + private static class Ref { + private static Ref EMPTY = new Ref() { + @Override + public void set(Object value) { + throw new UnsupportedOperationException(); + } + }; + private boolean set; + private T value; + + public static Ref empty() { + return EMPTY; + } + + public static Ref of(T value) { + Ref ref = new Ref(); + ref.set(value); + return ref; + } + + public boolean isSet() { + return set; + } + + public T get() { + return value; + } + + public void set(T value) { + this.value = value; + this.set = true; + } + } +} diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Address.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Address.java index d547a85700..0a52e8e9ac 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Address.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Address.java @@ -153,6 +153,7 @@ public final class Address { Address address = (Address) o; + // inhabitants must not be in the comparison since we would end up in infinite recursion if (id != address.id) return false; if (streetNumber != address.streetNumber) return false; if (version != address.version) return false; @@ -160,8 +161,6 @@ public final class Address { return false; if (countryName != null ? !countryName.equals(address.countryName) : address.countryName != null) return false; - if (inhabitants != null ? !inhabitants.equals(address.inhabitants) : address.inhabitants != null) - return false; if (streetName != null ? !streetName.equals(address.streetName) : address.streetName != null) return false; if (zipCode != null ? !zipCode.equals(address.zipCode) : address.zipCode != null) @@ -177,7 +176,6 @@ public final class Address { result = 31 * result + (cityName != null ? cityName.hashCode() : 0); result = 31 * result + (countryName != null ? countryName.hashCode() : 0); result = 31 * result + (zipCode != null ? zipCode.hashCode() : 0); - result = 31 * result + (inhabitants != null ? inhabitants.hashCode() : 0); result = 31 * result + id; result = 31 * result + version; return result; diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Family.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Family.java index ebe7f1a8b8..e840d54a70 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Family.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/stress/entities/Family.java @@ -7,10 +7,12 @@ package org.hibernate.test.cache.infinispan.stress.entities; +import javax.persistence.CascadeType; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.persistence.OneToMany; +import javax.persistence.Version; import java.util.HashSet; import java.util.Set; @@ -22,8 +24,9 @@ public final class Family { private int id; private String name; private String secondName; - @OneToMany + @OneToMany(cascade = CascadeType.ALL, mappedBy = "family", orphanRemoval = true) private Set members; + @Version private int version; public Family(String name) { @@ -97,10 +100,9 @@ public final class Family { Family family = (Family) o; + // members must not be in the comparison since we would end up in infinite recursive call if (id != family.id) return false; if (version != family.version) return false; - if (members != null ? !members.equals(family.members) : family.members != null) - return false; if (name != null ? !name.equals(family.name) : family.name != null) return false; if (secondName != null ? !secondName.equals(family.secondName) : family.secondName != null) @@ -113,7 +115,6 @@ public final class Family { public int hashCode() { int result = name != null ? name.hashCode() : 0; result = 31 * result + (secondName != null ? secondName.hashCode() : 0); - result = 31 * result + (members != null ? members.hashCode() : 0); result = 31 * result + id; result = 31 * result + version; return result; diff --git a/hibernate-infinispan/src/test/resources/2lc-test-tcp.xml b/hibernate-infinispan/src/test/resources/2lc-test-tcp.xml index c3735e8091..6065c9ea3d 100644 --- a/hibernate-infinispan/src/test/resources/2lc-test-tcp.xml +++ b/hibernate-infinispan/src/test/resources/2lc-test-tcp.xml @@ -26,16 +26,14 @@ thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="false" - thread_pool.queue_max_size="100" - thread_pool.rejection_policy="Run" + thread_pool.rejection_policy="Discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" - oob_thread_pool.max_threads="8" + oob_thread_pool.max_threads="200" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" - oob_thread_pool.queue_max_size="100" - oob_thread_pool.rejection_policy="Run"/> + oob_thread_pool.rejection_policy="Discard"/>