[HHH-3817] Don't cache stale data via putFromLoad
[HHH-3818] Handle evictAll "without regard for transactions" git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@16190 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
parent
eb60160109
commit
fd8f6fbbaa
|
@ -171,8 +171,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
|
|||
|
||||
Node regionRoot = localCache.getRoot().getChild(regionFqn);
|
||||
assertFalse(regionRoot == null);
|
||||
Set children = regionRoot.getChildrenNames();
|
||||
assertEquals("No children in " + children, 0, children.size());
|
||||
assertEquals("No children in " + regionRoot, 0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
if (optimistic) {
|
||||
|
@ -181,7 +180,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
|
|||
|
||||
regionRoot = remoteCache.getRoot().getChild(regionFqn);
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, regionRoot.getChildrenNames().size());
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
if (optimistic) {
|
||||
|
@ -212,35 +211,24 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
|
|||
|
||||
localRegion.evictAll();
|
||||
|
||||
// This should re-establish the region root node in the optimistic case
|
||||
// This should re-establish the region root node
|
||||
assertNull(localRegion.get(KEY));
|
||||
|
||||
regionRoot = localCache.getRoot().getChild(regionFqn);
|
||||
if (optimistic) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, regionRoot.getChildrenNames().size());
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
// Re-establishing the region root on the local node doesn't
|
||||
// propagate it to other nodes. Do a get on the remote node to re-establish
|
||||
// This only adds a node in the case of optimistic locking
|
||||
assertEquals(null, remoteRegion.get(KEY));
|
||||
|
||||
regionRoot = remoteCache.getRoot().getChild(regionFqn);
|
||||
if (optimistic) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, regionRoot.getChildrenNames().size());
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
assertEquals("local is clean", null, localRegion.get(KEY));
|
||||
assertEquals("remote is clean", null, remoteRegion.get(KEY));
|
||||
|
|
|
@ -25,10 +25,14 @@ package org.hibernate.test.cache.jbc2;
|
|||
|
||||
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
import org.hibernate.cache.RegionFactory;
|
||||
import org.hibernate.cache.jbc2.util.CacheHelper;
|
||||
import org.hibernate.junit.UnitTestCase;
|
||||
import org.hibernate.test.util.CacheTestSupport;
|
||||
import org.jboss.cache.Cache;
|
||||
import org.jboss.cache.Node;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -96,4 +100,15 @@ public abstract class AbstractJBossCacheTestCase extends UnitTestCase {
|
|||
protected void avoidConcurrentFlush() {
|
||||
testSupport.avoidConcurrentFlush();
|
||||
}
|
||||
|
||||
protected int getValidChildrenCount(Node node) {
|
||||
int result = 0;
|
||||
Set<Node> children = node.getChildren();
|
||||
for (Node child : children) {
|
||||
if (node.isValid() && CacheHelper.Internal.NODE.equals(child.getFqn().getLastElement()) == false) {
|
||||
result++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@
|
|||
package org.hibernate.test.cache.jbc2.collection;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -480,36 +481,25 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
|
|||
else
|
||||
localAccessStrategy.removeAll();
|
||||
|
||||
// This should re-establish the region root node in the optimistic case
|
||||
// This should re-establish the region root node
|
||||
assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
|
||||
regionRoot = localCache.getRoot().getChild(regionFqn);
|
||||
if (isUsingOptimisticLocking()) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
// Re-establishing the region root on the local node doesn't
|
||||
// propagate it to other nodes. Do a get on the remote node to re-establish
|
||||
// This only adds a node in the case of optimistic locking
|
||||
assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
|
||||
regionRoot = remoteCache.getRoot().getChild(regionFqn);
|
||||
if (isUsingOptimisticLocking()) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
// Not invalidation, so we didn't insert a child above
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
// Not invalidation, so we didn't insert a child above
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
|
||||
// Test whether the get above messes up the optimistic version
|
||||
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
|
||||
|
@ -530,16 +520,6 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
|
|||
assertEquals("remote is correct", VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
private int getValidChildrenCount(Node node) {
|
||||
int result = 0;
|
||||
for (Iterator it = node.getChildren().iterator(); it.hasNext(); ) {
|
||||
if (((Node) it.next()).isValid()) {
|
||||
result++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void rollback() {
|
||||
try {
|
||||
BatchModeTransactionManager.getInstance().rollback();
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.hibernate.cache.CollectionRegion;
|
|||
import org.hibernate.cache.Region;
|
||||
import org.hibernate.cache.RegionFactory;
|
||||
import org.hibernate.cache.access.AccessType;
|
||||
import org.hibernate.cache.access.CollectionRegionAccessStrategy;
|
||||
import org.hibernate.cache.jbc2.BasicRegionAdapter;
|
||||
import org.hibernate.cache.jbc2.CacheInstanceManager;
|
||||
import org.hibernate.cache.jbc2.JBossCacheRegionFactory;
|
||||
|
@ -103,7 +104,10 @@ public class CollectionRegionImplTestCase extends AbstractEntityCollectionRegion
|
|||
|
||||
@Override
|
||||
protected void putInRegion(Region region, Object key, Object value) {
|
||||
((CollectionRegion) region).buildAccessStrategy(AccessType.TRANSACTIONAL).putFromLoad(key, value, System.currentTimeMillis(), new Integer(1));
|
||||
CollectionRegionAccessStrategy strategy = ((CollectionRegion) region).buildAccessStrategy(AccessType.TRANSACTIONAL);
|
||||
// putFromLoad is ignored if not preceded by a get, so do a get
|
||||
strategy.get(key, System.currentTimeMillis());
|
||||
strategy.putFromLoad(key, value, System.currentTimeMillis(), new Integer(1));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
*/
|
||||
package org.hibernate.test.cache.jbc2.entity;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -487,7 +486,9 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
|
|||
final String KEY = KEY_BASE + testCount++;
|
||||
|
||||
// Set up initial state
|
||||
localAccessStrategy.get(KEY, System.currentTimeMillis());
|
||||
localAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
|
||||
remoteAccessStrategy.get(KEY, System.currentTimeMillis());
|
||||
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
|
||||
|
||||
// Let the async put propagate
|
||||
|
@ -702,31 +703,21 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
|
|||
assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
|
||||
regionRoot = localCache.getRoot().getChild(regionFqn);
|
||||
if (isUsingOptimisticLocking()) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
|
||||
// Re-establishing the region root on the local node doesn't
|
||||
// propagate it to other nodes. Do a get on the remote node to re-establish
|
||||
assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
|
||||
regionRoot = remoteCache.getRoot().getChild(regionFqn);
|
||||
if (isUsingOptimisticLocking()) {
|
||||
assertFalse(regionRoot == null);
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
// Not invalidation, so we didn't insert a child above
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
}
|
||||
else {
|
||||
assertTrue("region root is removed", regionRoot == null || !regionRoot.isValid());
|
||||
}
|
||||
assertFalse(regionRoot == null);
|
||||
assertTrue(regionRoot.isValid());
|
||||
assertTrue(regionRoot.isResident());
|
||||
// Not invalidation, so we didn't insert a child above
|
||||
assertEquals(0, getValidChildrenCount(regionRoot));
|
||||
|
||||
// Test whether the get above messes up the optimistic version
|
||||
remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
|
||||
|
@ -747,16 +738,6 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
|
|||
assertEquals("remote is correct", VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
|
||||
}
|
||||
|
||||
private int getValidChildrenCount(Node node) {
|
||||
int result = 0;
|
||||
for (Iterator it = node.getChildren().iterator(); it.hasNext(); ) {
|
||||
if (((Node) it.next()).isValid()) {
|
||||
result++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
protected void rollback() {
|
||||
try {
|
||||
BatchModeTransactionManager.getInstance().rollback();
|
||||
|
@ -764,7 +745,6 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
|
|||
catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class AccessStrategyTestSetup extends TestSetup {
|
||||
|
|
|
@ -56,6 +56,7 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
|
|||
|
||||
final String KEY = KEY_BASE + testCount++;
|
||||
|
||||
localAccessStrategy.get(KEY, System.currentTimeMillis());
|
||||
localAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
|
||||
|
||||
final CountDownLatch pferLatch = new CountDownLatch(1);
|
||||
|
@ -63,7 +64,7 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
|
|||
final CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
|
||||
Thread blocker = new Thread() {
|
||||
Thread blocker = new Thread("Blocker") {
|
||||
|
||||
public void run() {
|
||||
|
||||
|
@ -95,7 +96,7 @@ public abstract class AbstractTransactionalAccessTestCase extends AbstractEntity
|
|||
}
|
||||
};
|
||||
|
||||
Thread putter = new Thread() {
|
||||
Thread putter = new Thread("Putter") {
|
||||
|
||||
public void run() {
|
||||
|
||||
|
|
|
@ -0,0 +1,607 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* Copyright (c) 2009, Red Hat Middleware LLC or third-party contributors as
|
||||
* indicated by the @author tags or express copyright attribution
|
||||
* statements applied by the authors. All third-party contributions are
|
||||
* distributed under license by Red Hat Middleware LLC.
|
||||
*
|
||||
* This copyrighted material is made available to anyone wishing to use, modify,
|
||||
* copy, or redistribute it subject to the terms and conditions of the GNU
|
||||
* Lesser General Public License, as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
|
||||
* for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with this distribution; if not, write to:
|
||||
* Free Software Foundation, Inc.
|
||||
* 51 Franklin Street, Fifth Floor
|
||||
* Boston, MA 02110-1301 USA
|
||||
*/
|
||||
package org.hibernate.test.cache.jbc2.functional;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.Transaction;
|
||||
|
||||
import junit.framework.Test;
|
||||
|
||||
import org.hibernate.FlushMode;
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.cache.RegionFactory;
|
||||
import org.hibernate.cache.jbc2.JBossCacheRegionFactory;
|
||||
import org.hibernate.cache.jbc2.builder.SharedCacheInstanceManager;
|
||||
import org.hibernate.cfg.Configuration;
|
||||
import org.hibernate.exception.ExceptionUtils;
|
||||
import org.hibernate.junit.functional.FunctionalTestClassTestSuite;
|
||||
import org.hibernate.stat.SecondLevelCacheStatistics;
|
||||
import org.hibernate.test.cache.jbc2.functional.util.DualNodeConnectionProviderImpl;
|
||||
import org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl;
|
||||
import org.hibernate.test.cache.jbc2.functional.util.DualNodeTestUtil;
|
||||
import org.hibernate.test.cache.jbc2.functional.util.DualNodeTransactionManagerLookup;
|
||||
import org.hibernate.transaction.CMTTransactionFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author nikita_tovstoles@mba.berkeley.edu
|
||||
*/
|
||||
public class MVCCConcurrentWriteTest extends CacheTestCaseBase {
|
||||
|
||||
private static final String JBC_CONFIG = "org/hibernate/test/cache/jbc2/functional/mvcc-treecache.xml";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MVCCConcurrentWriteTest.class);
|
||||
/**
|
||||
* when USER_COUNT==1, tests pass, when >4 tests fail
|
||||
*/
|
||||
final private int USER_COUNT = 5;
|
||||
final private int ITERATION_COUNT = 150;
|
||||
final private int THINK_TIME_MILLIS = 10;
|
||||
final private long LAUNCH_INTERVAL_MILLIS = 10;
|
||||
final private Random random = new Random();
|
||||
/**
|
||||
* kill switch used to stop all users when one fails
|
||||
*/
|
||||
private static volatile boolean TERMINATE_ALL_USERS = false;
|
||||
/**
|
||||
* collection of IDs of all customers participating in this test
|
||||
*/
|
||||
private Set<Integer> customerIDs = new HashSet<Integer>();
|
||||
|
||||
public MVCCConcurrentWriteTest(String x) {
|
||||
super(x);
|
||||
}
|
||||
|
||||
protected Class<? extends RegionFactory> getCacheRegionFactory() {
|
||||
return JBossCacheRegionFactory.class;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply any region-factory specific configurations.
|
||||
*
|
||||
* @param the Configuration to update.
|
||||
*/
|
||||
protected void configureCacheFactory(Configuration cfg) {
|
||||
cfg.setProperty(SharedCacheInstanceManager.CACHE_RESOURCE_PROP, JBC_CONFIG);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* test that DB can be queried
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
public void testPingDb() throws Exception {
|
||||
try {
|
||||
beginTx();
|
||||
getEnvironment().getSessionFactory().getCurrentSession().createQuery("from " + Customer.class.getName()).list();
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
fail("failed to query DB; exception=" + e);
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void prepareTest() throws Exception {
|
||||
super.prepareTest();
|
||||
TERMINATE_ALL_USERS = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanupTest() throws Exception {
|
||||
try {
|
||||
super.cleanupTest();
|
||||
|
||||
} finally {
|
||||
cleanup();
|
||||
//DualNodeJtaTransactionManagerImpl.cleanupTransactions();
|
||||
//DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Configuration cfg) {
|
||||
super.configure(cfg);
|
||||
cfg.setProperty(DualNodeTestUtil.NODE_ID_PROP, DualNodeTestUtil.LOCAL);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean getUseQueryCache() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> getConnectionProviderClass() {
|
||||
return DualNodeConnectionProviderImpl.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> getTransactionManagerLookupClass() {
|
||||
return DualNodeTransactionManagerLookup.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<?> getTransactionFactoryClass() {
|
||||
return CMTTransactionFactory.class;
|
||||
}
|
||||
|
||||
public void testSingleUser() throws Exception {
|
||||
//setup
|
||||
Customer customer = createCustomer(0);
|
||||
final Integer customerId = customer.getId();
|
||||
getCustomerIDs().add(customerId);
|
||||
|
||||
assertNull("contact exists despite not being added", getFirstContact(customerId));
|
||||
|
||||
//check that cache was hit
|
||||
SecondLevelCacheStatistics customerSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
|
||||
getPrefixedRegionName(Customer.class.getName()));
|
||||
assertEquals(customerSlcs.getPutCount(), 1);
|
||||
assertEquals(customerSlcs.getElementCountInMemory(), 1);
|
||||
assertEquals(customerSlcs.getEntries().size(), 1);
|
||||
|
||||
SecondLevelCacheStatistics contactsCollectionSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
|
||||
getPrefixedRegionName(Customer.class.getName() + ".contacts"));
|
||||
assertEquals(1, contactsCollectionSlcs.getPutCount());
|
||||
assertEquals(1, contactsCollectionSlcs.getElementCountInMemory());
|
||||
assertEquals(1, contactsCollectionSlcs.getEntries().size());
|
||||
|
||||
final Contact contact = addContact(customerId);
|
||||
assertNotNull("contact returned by addContact is null", contact);
|
||||
assertEquals("Customer.contacts cache was not invalidated after addContact",
|
||||
0, contactsCollectionSlcs.getElementCountInMemory());
|
||||
|
||||
assertNotNull("Contact missing after successful add call", getFirstContact(customerId));
|
||||
|
||||
|
||||
//read everyone's contacts
|
||||
readEveryonesFirstContact();
|
||||
|
||||
removeContact(customerId);
|
||||
assertNull("contact still exists after successful remove call", getFirstContact(customerId));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This will fail until JBCACHE-1494 is done and integrated. Note that
|
||||
* having getUseQueryCache() return true will allows this to pass.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testManyUsersFailureExpected() throws Exception {
|
||||
|
||||
//setup - create users
|
||||
for (int i = 0; i < USER_COUNT; i++) {
|
||||
Customer customer = createCustomer(0);
|
||||
getCustomerIDs().add(customer.getId());
|
||||
}
|
||||
|
||||
assertEquals("failed to create enough Customers", USER_COUNT, getCustomerIDs().size());
|
||||
|
||||
final ExecutorService pool = Executors.newFixedThreadPool(USER_COUNT);
|
||||
CountDownLatch completionLatch = new CountDownLatch(USER_COUNT);
|
||||
|
||||
Set<UserRunner> runners = new HashSet<UserRunner>();
|
||||
for (Integer customerId : getCustomerIDs()) {
|
||||
UserRunner r = new UserRunner(customerId, completionLatch);
|
||||
runners.add(r);
|
||||
pool.execute(r);
|
||||
LOG.info("launched " + r);
|
||||
Thread.sleep(LAUNCH_INTERVAL_MILLIS); //rampup
|
||||
}
|
||||
|
||||
assertEquals("not all user threads launched", USER_COUNT, runners.size());
|
||||
|
||||
boolean finishedInTime = completionLatch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
TERMINATE_ALL_USERS = true;
|
||||
|
||||
if (!finishedInTime) { //timed out waiting for users to finish
|
||||
pool.shutdown();
|
||||
fail("Timed out waiting for user threads to finish. Their state at the time of forced shutdown: " + statusOfRunnersToString(runners));
|
||||
} else {
|
||||
//if here -> pool finished before timing out
|
||||
//check whether all runners suceeded
|
||||
boolean success = true;
|
||||
for (UserRunner r : runners) {
|
||||
if (!r.isSuccess()) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue("at least one UserRunner failed: " + statusOfRunnersToString(runners), success);
|
||||
}
|
||||
}
|
||||
|
||||
public void cleanup() throws Exception {
|
||||
|
||||
getCustomerIDs().clear();
|
||||
|
||||
String deleteContactHQL = "delete from Contact";
|
||||
String deleteCustomerHQL = "delete from Customer";
|
||||
|
||||
beginTx();
|
||||
try {
|
||||
Session session = getEnvironment().getSessionFactory().getCurrentSession();
|
||||
session.createQuery(deleteContactHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
|
||||
session.createQuery(deleteCustomerHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Caught exception in cleanup", e);
|
||||
setRollbackOnly();
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Customer createCustomer(int nameSuffix) throws Exception {
|
||||
Customer customer = null;
|
||||
beginTx();
|
||||
try {
|
||||
customer = new Customer();
|
||||
customer.setName("customer_" + nameSuffix);
|
||||
customer.setContacts(new HashSet<Contact>());
|
||||
|
||||
getEnvironment().getSessionFactory().getCurrentSession().persist(customer);
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
return customer;
|
||||
}
|
||||
|
||||
/**
|
||||
* delegate method since I'm trying to figure out which txManager to use
|
||||
* given that this test runs multiple threads (SimpleJtaTxMgrImpl isn't suited for that).
|
||||
*
|
||||
* What is needed is a thread-safe JTATransactionManager impl that can handle concurrent TXs
|
||||
*
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
private void beginTx() throws Exception {
|
||||
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).begin();
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #beginTx()
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
private void commitTx() throws Exception {
|
||||
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).commit();
|
||||
}
|
||||
|
||||
// /**
|
||||
// * @see #beginTx()
|
||||
// * @throws java.lang.Exception
|
||||
// */
|
||||
// private void rollbackTx() throws Exception {
|
||||
// DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
|
||||
// }
|
||||
|
||||
private void setRollbackOnly() throws Exception {
|
||||
Transaction tx = DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).getCurrentTransaction();
|
||||
if (tx != null) {
|
||||
tx.setRollbackOnly();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* read first contact of every Customer participating in this test.
|
||||
* this forces concurrent cache writes of Customer.contacts Collection cache node
|
||||
*
|
||||
* @return who cares
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
private void readEveryonesFirstContact() throws Exception {
|
||||
beginTx();
|
||||
try {
|
||||
for (Integer customerId : getCustomerIDs()) {
|
||||
|
||||
if (TERMINATE_ALL_USERS) {
|
||||
setRollbackOnly();
|
||||
return;
|
||||
}
|
||||
|
||||
final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
|
||||
Set<Contact> contacts = customer.getContacts();
|
||||
if (!contacts.isEmpty()) {
|
||||
contacts.iterator().next();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* -load existing Customer
|
||||
* -get customer's contacts; return 1st one
|
||||
*
|
||||
* @param customerId
|
||||
* @return first Contact or null if customer has none
|
||||
*/
|
||||
private Contact getFirstContact(Integer customerId) throws Exception {
|
||||
assert customerId != null;
|
||||
|
||||
Contact firstContact = null;
|
||||
beginTx();
|
||||
try {
|
||||
final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
|
||||
Set<Contact> contacts = customer.getContacts();
|
||||
firstContact = contacts.isEmpty() ? null : contacts.iterator().next();
|
||||
|
||||
if (TERMINATE_ALL_USERS)
|
||||
setRollbackOnly();
|
||||
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
return firstContact;
|
||||
}
|
||||
|
||||
/**
|
||||
* -load existing Customer
|
||||
* -create a new Contact and add to customer's contacts
|
||||
*
|
||||
* @param customerId
|
||||
* @return added Contact
|
||||
*/
|
||||
private Contact addContact(Integer customerId) throws Exception {
|
||||
assert customerId != null;
|
||||
|
||||
Contact contact = null;
|
||||
beginTx();
|
||||
try {
|
||||
final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
|
||||
|
||||
contact = new Contact();
|
||||
contact.setName("contact name");
|
||||
contact.setTlf("wtf is tlf?");
|
||||
|
||||
contact.setCustomer(customer);
|
||||
customer.getContacts().add(contact);
|
||||
|
||||
//assuming contact is persisted via cascade from customer
|
||||
|
||||
if (TERMINATE_ALL_USERS)
|
||||
setRollbackOnly();
|
||||
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
return contact;
|
||||
}
|
||||
|
||||
/**
|
||||
* remove existing 'contact' from customer's list of contacts
|
||||
*
|
||||
* @param contact contact to remove from customer's contacts
|
||||
* @param customerId
|
||||
* @throws IllegalStateException if customer does not own a contact
|
||||
*/
|
||||
private void removeContact(Integer customerId) throws Exception {
|
||||
assert customerId != null;
|
||||
|
||||
beginTx();
|
||||
try {
|
||||
Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
|
||||
Set<Contact> contacts = customer.getContacts();
|
||||
if (contacts.size() != 1) {
|
||||
throw new IllegalStateException("can't remove contact: customer id=" + customerId + " expected exactly 1 contact, " +
|
||||
"actual count=" + contacts.size());
|
||||
}
|
||||
|
||||
Contact contact = contacts.iterator().next();
|
||||
contacts.remove(contact);
|
||||
contact.setCustomer(null);
|
||||
|
||||
//explicitly delete Contact because hbm has no 'DELETE_ORPHAN' cascade?
|
||||
//getEnvironment().getSessionFactory().getCurrentSession().delete(contact); //appears to not be needed
|
||||
|
||||
//assuming contact is persisted via cascade from customer
|
||||
|
||||
if (TERMINATE_ALL_USERS)
|
||||
setRollbackOnly();
|
||||
|
||||
} catch (Exception e) {
|
||||
setRollbackOnly();
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
commitTx();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the customerIDs
|
||||
*/
|
||||
public Set<Integer> getCustomerIDs() {
|
||||
return customerIDs;
|
||||
}
|
||||
|
||||
private String statusOfRunnersToString(Set<UserRunner> runners) {
|
||||
assert runners != null;
|
||||
|
||||
StringBuilder sb = new StringBuilder("TEST CONFIG [userCount=" + USER_COUNT +
|
||||
", iterationsPerUser=" + ITERATION_COUNT +
|
||||
", thinkTimeMillis=" + THINK_TIME_MILLIS + "] " +
|
||||
" STATE of UserRunners: ");
|
||||
|
||||
for (UserRunner r : runners) {
|
||||
sb.append(r.toString() + System.getProperty("line.separator"));
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
class UserRunner implements Runnable {
|
||||
final private CountDownLatch completionLatch;
|
||||
final private Integer customerId;
|
||||
private int completedIterations = 0;
|
||||
private Throwable causeOfFailure;
|
||||
|
||||
public UserRunner(final Integer cId, CountDownLatch completionLatch) {
|
||||
assert cId != null;
|
||||
assert completionLatch != null;
|
||||
this.customerId = cId;
|
||||
this.completionLatch = completionLatch;
|
||||
}
|
||||
|
||||
private boolean contactExists() throws Exception {
|
||||
return getFirstContact(customerId) != null;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
|
||||
//name this thread for easier log tracing
|
||||
Thread.currentThread().setName("UserRunnerThread-" + getCustomerId());
|
||||
try {
|
||||
for (int i = 0; i < ITERATION_COUNT && !TERMINATE_ALL_USERS; i++) {
|
||||
|
||||
if (contactExists()) {
|
||||
throw new IllegalStateException("contact already exists before add, customerId=" + customerId);
|
||||
}
|
||||
|
||||
addContact(customerId);
|
||||
|
||||
thinkRandomTime();
|
||||
|
||||
if (!contactExists()) {
|
||||
throw new IllegalStateException("contact missing after successful add, customerId=" + customerId);
|
||||
}
|
||||
|
||||
thinkRandomTime();
|
||||
|
||||
//read everyone's contacts
|
||||
readEveryonesFirstContact();
|
||||
|
||||
thinkRandomTime();
|
||||
|
||||
removeContact(customerId);
|
||||
|
||||
if (contactExists()) {
|
||||
throw new IllegalStateException("contact still exists after successful remove call, customerId=" + customerId);
|
||||
}
|
||||
|
||||
thinkRandomTime();
|
||||
|
||||
++completedIterations;
|
||||
}
|
||||
|
||||
} catch (Throwable t) {
|
||||
|
||||
this.causeOfFailure = t;
|
||||
TERMINATE_ALL_USERS = true;
|
||||
|
||||
//rollback current transaction if any
|
||||
//really should not happen since above methods all follow begin-commit-rollback pattern
|
||||
// try {
|
||||
// if (DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).getTransaction() != null) {
|
||||
// DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
|
||||
// }
|
||||
// } catch (SystemException ex) {
|
||||
// throw new RuntimeException("failed to rollback tx", ex);
|
||||
// }
|
||||
}
|
||||
finally {
|
||||
this.completionLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSuccess() {
|
||||
return ITERATION_COUNT == getCompletedIterations();
|
||||
}
|
||||
|
||||
public int getCompletedIterations() {
|
||||
return completedIterations;
|
||||
}
|
||||
|
||||
public Throwable getCauseOfFailure() {
|
||||
return causeOfFailure;
|
||||
}
|
||||
|
||||
public Integer getCustomerId() {
|
||||
return customerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() +
|
||||
"[customerId=" + getCustomerId() +
|
||||
" iterationsCompleted=" + getCompletedIterations() +
|
||||
" completedAll=" + isSuccess() +
|
||||
" causeOfFailure=" + (this.causeOfFailure != null ? ExceptionUtils.getStackTrace(causeOfFailure) : "") + "] ";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* sleep between 0 and THINK_TIME_MILLIS.
|
||||
* @throws RuntimeException if sleep is interruped or TERMINATE_ALL_USERS flag was set to true i
|
||||
n the meantime
|
||||
*/
|
||||
private void thinkRandomTime() {
|
||||
try {
|
||||
Thread.sleep(random.nextInt(THINK_TIME_MILLIS));
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RuntimeException("sleep interrupted", ex);
|
||||
}
|
||||
|
||||
if (TERMINATE_ALL_USERS) {
|
||||
throw new RuntimeException("told to terminate (because a UserRunner had failed)");
|
||||
}
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return new FunctionalTestClassTestSuite(MVCCConcurrentWriteTest.class);
|
||||
}
|
||||
}
|
||||
|
|
@ -31,6 +31,7 @@ import org.hibernate.cache.jbc2.MultiplexedJBossCacheRegionFactory;
|
|||
import org.hibernate.cache.jbc2.builder.MultiplexingCacheInstanceManager;
|
||||
import org.hibernate.cfg.Configuration;
|
||||
import org.hibernate.classic.Session;
|
||||
import org.hibernate.stat.SecondLevelCacheStatistics;
|
||||
import org.hibernate.test.cache.jbc2.functional.CacheTestCaseBase;
|
||||
import org.hibernate.test.cache.jbc2.functional.Contact;
|
||||
import org.hibernate.test.cache.jbc2.functional.Customer;
|
||||
|
@ -90,7 +91,12 @@ extends CacheTestCaseBase
|
|||
assertNotNull("Red Hat contacts exist", rhContacts);
|
||||
assertEquals("Created expected number of Red Hat contacts", 10, rhContacts.size());
|
||||
|
||||
SecondLevelCacheStatistics contactSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
|
||||
getPrefixedRegionName(Contact.class.getName()));
|
||||
assertEquals(contactSlcs.getElementCountInMemory(), 20);
|
||||
|
||||
assertEquals("Deleted all Red Hat contacts", 10, deleteContacts());
|
||||
assertEquals(0, contactSlcs.getElementCountInMemory());
|
||||
|
||||
List<Integer> jbContacts = getContactsByCustomer("JBoss");
|
||||
assertNotNull("JBoss contacts exist", jbContacts);
|
||||
|
@ -108,6 +114,7 @@ extends CacheTestCaseBase
|
|||
}
|
||||
|
||||
updateContacts("Kabir", "Updated");
|
||||
assertEquals(contactSlcs.getElementCountInMemory(), 0);
|
||||
for (Integer id : jbContacts)
|
||||
{
|
||||
Contact contact = getContact(id);
|
||||
|
@ -120,6 +127,21 @@ extends CacheTestCaseBase
|
|||
List<Integer> updated = getContactsByTLF("Updated");
|
||||
assertNotNull("Got updated contacts", updated);
|
||||
assertEquals("Updated contacts", 5, updated.size());
|
||||
|
||||
updateContactsWithOneManual("Kabir", "UpdatedAgain");
|
||||
assertEquals(contactSlcs.getElementCountInMemory(), 0);
|
||||
for (Integer id : jbContacts)
|
||||
{
|
||||
Contact contact = getContact(id);
|
||||
assertNotNull("JBoss contact " + id + " exists", contact);
|
||||
String expected = ("Kabir".equals(contact.getName())) ? "UpdatedAgain" : "2222";
|
||||
assertEquals("JBoss contact " + id + " has correct TLF",
|
||||
expected, contact.getTlf());
|
||||
}
|
||||
|
||||
updated = getContactsByTLF("UpdatedAgain");
|
||||
assertNotNull("Got updated contacts", updated);
|
||||
assertEquals("Updated contacts", 5, updated.size());
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -231,6 +253,34 @@ extends CacheTestCaseBase
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public int updateContactsWithOneManual(String name, String newTLF) throws Exception
|
||||
{
|
||||
String queryHQL = "from Contact c where c.name = :cName";
|
||||
String updateHQL = "update Contact set tlf = :cNewTLF where name = :cName";
|
||||
|
||||
SimpleJtaTransactionManagerImpl.getInstance().begin();
|
||||
try {
|
||||
|
||||
Session session = getSessions().getCurrentSession();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<Contact> list = session.createQuery(queryHQL).setParameter("cName", name).list();
|
||||
list.get(0).setTlf(newTLF);
|
||||
|
||||
int rowsAffected = session.createQuery(updateHQL)
|
||||
.setFlushMode(FlushMode.AUTO)
|
||||
.setParameter("cNewTLF", newTLF)
|
||||
.setParameter("cName", name)
|
||||
.executeUpdate();
|
||||
SimpleJtaTransactionManagerImpl.getInstance().commit();
|
||||
return rowsAffected;
|
||||
}
|
||||
catch (Exception e) {
|
||||
SimpleJtaTransactionManagerImpl.getInstance().rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public Contact getContact(Integer id) throws Exception
|
||||
{
|
||||
|
|
|
@ -113,13 +113,15 @@
|
|||
Number of milliseconds to wait until all responses for a
|
||||
synchronous call have been received.
|
||||
-->
|
||||
<attribute name="SyncReplTimeout">20000</attribute>
|
||||
<attribute name="SyncReplTimeout">10000</attribute>
|
||||
|
||||
<!-- Max number of milliseconds to wait for a lock acquisition -->
|
||||
<attribute name="LockAcquisitionTimeout">15000</attribute>
|
||||
<attribute name="LockAcquisitionTimeout">5000</attribute>
|
||||
|
||||
<!-- For now. disable asynchronous RPC marshalling/sending -->
|
||||
<attribute name="SerializationExecutorPoolSize">0</attribute>
|
||||
|
||||
<attribute name="UseLockStriping">false</attribute>
|
||||
|
||||
<!-- Specific eviction policy configurations. This is LRU -->
|
||||
<attribute name="EvictionPolicyConfig">
|
||||
|
|
|
@ -125,10 +125,10 @@
|
|||
Number of milliseconds to wait until all responses for a
|
||||
synchronous call have been received.
|
||||
-->
|
||||
<attribute name="SyncReplTimeout">20000</attribute>
|
||||
<attribute name="SyncReplTimeout">10000</attribute>
|
||||
|
||||
<!-- Max number of milliseconds to wait for a lock acquisition -->
|
||||
<attribute name="LockAcquisitionTimeout">15000</attribute>
|
||||
<attribute name="LockAcquisitionTimeout">5000</attribute>
|
||||
|
||||
<!--
|
||||
Indicate whether to use marshalling or not. Set this to true if you are running under a scoped
|
||||
|
|
|
@ -114,10 +114,10 @@
|
|||
Number of milliseconds to wait until all responses for a
|
||||
synchronous call have been received.
|
||||
-->
|
||||
<attribute name="SyncReplTimeout">20000</attribute>
|
||||
<attribute name="SyncReplTimeout">10000</attribute>
|
||||
|
||||
<!-- Max number of milliseconds to wait for a lock acquisition -->
|
||||
<attribute name="LockAcquisitionTimeout">15000</attribute>
|
||||
<attribute name="LockAcquisitionTimeout">5000</attribute>
|
||||
|
||||
<!-- For now. disable asynchronous RPC marshalling/sending -->
|
||||
<attribute name="SerializationExecutorPoolSize">0</attribute>
|
||||
|
|
Loading…
Reference in New Issue