diff --git a/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java new file mode 100644 index 0000000000..a7e613c175 --- /dev/null +++ b/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java @@ -0,0 +1,363 @@ +/* + * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved. + * + * This copyrighted material is made available to anyone wishing to use, modify, + * copy, or redistribute it subject to the terms and conditions of the GNU + * Lesser General Public License, v. 2.1. This program is distributed in the + * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. You should have received a + * copy of the GNU Lesser General Public License, v.2.1 along with this + * distribution; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * Red Hat Author(s): Brian Stansberry + */ + +package org.hibernate.cache.jbc2.timestamp; + +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +import javax.transaction.Transaction; + +import org.hibernate.cache.CacheException; +import org.hibernate.cache.TimestampsRegion; +import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter; +import org.hibernate.cache.jbc2.util.CacheHelper; +import org.jboss.cache.Cache; +import org.jboss.cache.Fqn; +import org.jboss.cache.config.Option; +import org.jboss.cache.notifications.annotation.CacheListener; +import org.jboss.cache.notifications.annotation.NodeModified; +import org.jboss.cache.notifications.annotation.NodeRemoved; +import org.jboss.cache.notifications.event.NodeModifiedEvent; +import org.jboss.cache.notifications.event.NodeRemovedEvent; + +/** + * Prototype of a clustered timestamps cache region impl usable if the + * TimestampsRegion API is changed. + *

+ * Maintains a local (authoritative) cache of timestamps along with the + * distributed cache held in JBoss Cache. Listens for changes in the distributed + * cache and updates the local cache accordingly. Ensures that any changes in + * the local cache represent either 1) an increase in the timestamp or + * 2) a stepback in the timestamp by the caller that initially increased + * it as part of a pre-invalidate call. This approach allows + * timestamp changes to be replicated asynchronously by JBoss Cache while still + * preventing invalid backward changes in timestamps. + *

+ * + * NOTE: This is just a prototype!!! Only useful if we change the + * TimestampsRegion API. + * + * @author Brian Stansberry + * @version $Revision: 14106 $ + */ +@CacheListener +public class ClusteredConcurrentTimestampsRegionImpl extends TransactionalDataRegionAdapter implements TimestampsRegion { + + public static final String TYPE = "TS"; + + private final ConcurrentHashMap localCache = new ConcurrentHashMap(); + + /** + * Create a new ClusteredConccurentTimestampsRegionImpl. + * + * @param jbcCache + * @param regionName + * @param regionPrefix + * TODO + * @param metadata + */ + public ClusteredConcurrentTimestampsRegionImpl(Cache jbcCache, String regionName, String regionPrefix, Properties properties) { + super(jbcCache, regionName, regionPrefix, null); + + jbcCache.addCacheListener(this); + + populateLocalCache(); + } + + @Override + protected Fqn createRegionFqn(String regionName, String regionPrefix) { + return getTypeFirstRegionFqn(regionName, regionPrefix, TYPE); + } + + public void evict(Object key) throws CacheException { + Option opt = getNonLockingDataVersionOption(true); + CacheHelper.removeNode(getCacheInstance(), getRegionFqn(), key, opt); + } + + public void evictAll() throws CacheException { + Option opt = getNonLockingDataVersionOption(true); + CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt); + // Restore the region root node + CacheHelper.addNode(getCacheInstance(), getRegionFqn(), false, true, null); + } + + public Object get(Object key) throws CacheException { + Entry entry = getLocalEntry(key); + Object timestamp = entry.getCurrent(); + if (timestamp == null) { + // Double check the distributed cache + Object[] vals = (Object[]) suspendAndGet(key, null, false); + if (vals != null) { + storeDataFromJBC(key, vals); + timestamp = entry.getCurrent(); + } + } + return timestamp; + } + + public void put(Object key, Object value) throws CacheException { + + throw new UnsupportedOperationException("Prototype only; Hibernate core must change the API before really using"); + } + + public void preInvalidate(Object key, Object value) throws CacheException { + + Entry entry = getLocalEntry(key); + if (entry.preInvalidate(value)) { + putInJBossCache(key, entry); + } + } + + public void invalidate(Object key, Object value, Object preInvalidateValue) throws CacheException { + + Entry entry = getLocalEntry(key); + if (entry.invalidate(value, preInvalidateValue)) { + putInJBossCache(key, entry); + } + } + + private void putInJBossCache(Object key, Entry entry) { + + // Get an exclusive right to update JBC for this key from this node. + boolean locked = false; + try { + entry.acquireJBCWriteMutex(); + locked = true; + // We have the JBCWriteMutex, so no other *local* thread will + // be trying to write this key. + // It's possible here some remote thread has come in and + // changed the values again, but since we are reading the + // values to write to JBC right now, we know we are writing + // the latest values; i.e. we don't assume that what we cached + // in entry.update() above is what we should write to JBC *now*. + // Our write could be redundant, i.e. we are writing what + // some remote thread just came in an wrote. There is a chance + // that yet another remote thread will update us, and we'll then + // overwrite that later data in JBC. But, all remote nodes will + // ignore that change in their localCache; the only place it + // will live will be in JBC, where it can only effect the + // initial state transfer values on newly joined nodes + // (i.e. populateLocalCache()). + + // Don't hold the JBC node lock throughout the tx, as that + // prevents reads and other updates + Transaction tx = suspend(); + try { + Option opt = getNonLockingDataVersionOption(false); + // We ensure ASYNC semantics (JBCACHE-1175) + opt.setForceAsynchronous(true); + CacheHelper.put(getCacheInstance(), getRegionFqn(), key, entry.getJBCUpdateValues(), opt); + } + finally { + resume(tx); + } + } + catch (InterruptedException e) { + throw new CacheException("Interrupted while acquiring right to update " + key, e); + } + finally { + if (locked) { + entry.releaseJBCWriteMutex(); + } + } + } + + @Override + public void destroy() throws CacheException { + + getCacheInstance().removeCacheListener(this); + super.destroy(); + localCache.clear(); + } + + /** + * Monitors cache events and updates the local cache + * + * @param event + */ + @NodeModified + public void nodeModified(NodeModifiedEvent event) { + if (event.isOriginLocal() || event.isPre()) + return; + + Fqn fqn = event.getFqn(); + Fqn regFqn = getRegionFqn(); + if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) { + Object key = fqn.get(regFqn.size()); + Object[] vals = (Object[]) event.getData().get(ITEM); + storeDataFromJBC(key, vals); + // TODO consider this hack instead of the simple entry.update above: +// if (!entry.update(vals[0], vals[1])) { +// // Hack! Use the fact that the Object[] stored in JBC is +// // mutable to correct our local JBC state in this callback +// Object[] correct = entry.getJBCUpdateValues(); +// vals[0] = correct[0]; +// vals[1] = correct[1]; +// } + } + } + + private void storeDataFromJBC(Object key, Object[] vals) { + Entry entry = getLocalEntry(key); + if (vals[0].equals(vals[1])) { + entry.preInvalidate(vals[0]); + } + else { + entry.invalidate(vals[0], vals[1]); + } + } + + /** + * Monitors cache events and updates the local cache + * + * @param event + */ + @NodeRemoved + public void nodeRemoved(NodeRemovedEvent event) { + if (event.isOriginLocal() || event.isPre()) + return; + + Fqn fqn = event.getFqn(); + Fqn regFqn = getRegionFqn(); + if (fqn.isChildOrEquals(regFqn)) { + if (fqn.size() == regFqn.size()) { + localCache.clear(); + } + else { + Object key = fqn.get(regFqn.size()); + localCache.remove(key); + } + } + } + + /** + * Brings all data from the distributed cache into our local cache. + */ + private void populateLocalCache() { + Set children = CacheHelper.getChildrenNames(getCacheInstance(), getRegionFqn()); + for (Object key : children) { + Object[] vals = (Object[]) suspendAndGet(key, null, false); + if (vals != null) { + storeDataFromJBC(key, vals); + } + } + } + + private Entry getLocalEntry(Object key) { + + Entry entry = new Entry(); + Entry oldEntry = (Entry) localCache.putIfAbsent(key, entry); + return (oldEntry == null ? entry : oldEntry); + } + + private class Entry { + + private Semaphore writeMutex = new Semaphore(1); + private boolean preInvalidated = false; + private Object preInval = null; + private Object current = null; + + void acquireJBCWriteMutex() throws InterruptedException { + writeMutex.acquire(); + } + + void releaseJBCWriteMutex() { + writeMutex.release(); + } + + synchronized boolean preInvalidate(Object newVal) { + + boolean result = false; + if (newVal instanceof Comparable) { + if (current == null || ((Comparable) newVal).compareTo(current) > 0) { + preInval = current = newVal; + preInvalidated = true; + result = true; + } + } + else { + preInval = current = newVal; + result = true; + } + + return result; + } + + synchronized boolean invalidate(Object newVal, Object preInvalidateValue) { + + boolean result = false; + + if (current == null) { + // Initial load from JBC + current = newVal; + preInval = preInvalidateValue; + preInvalidated = false; + result = true; + } + else if (preInvalidated) { + if (newVal instanceof Comparable) { + if (safeEquals(preInvalidateValue, this.preInval) + || ((Comparable) newVal).compareTo(preInval) > 0) { + current = newVal; + preInval = preInvalidateValue; + preInvalidated = false; + result = true; + } + } + else { + current = newVal; + preInval = preInvalidateValue; + result = true; + } + } + else if (newVal instanceof Comparable) { + // See if we had a 2nd invalidation from the same initial + // preinvalidation timestamp. If so, only increment + // if the new current value is an increase + if (safeEquals(preInvalidateValue, this.preInval) + && ((Comparable) newVal).compareTo(current) > 0) { + current = newVal; + preInval = preInvalidateValue; + result = true; + } + } + + return result; + } + + synchronized Object getCurrent() { + return current; + } + + synchronized Object getPreInval() { + return preInval; + } + + synchronized Object[] getJBCUpdateValues() { + return new Object[] {current, preInval}; + } + + private boolean safeEquals(Object a, Object b) { + return (a == b || (a != null && a.equals(b))); + } + } + + + +} diff --git a/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java b/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java new file mode 100644 index 0000000000..78bdb704e1 --- /dev/null +++ b/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved. + * + * This copyrighted material is made available to anyone wishing to use, modify, + * copy, or redistribute it subject to the terms and conditions of the GNU + * Lesser General Public License, v. 2.1. This program is distributed in the + * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. You should have received a + * copy of the GNU Lesser General Public License, v.2.1 along with this + * distribution; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + * Red Hat Author(s): Brian Stansberry + */ + +package org.hibernate.test.cache.jbc2.timestamp; + +import java.util.Properties; +import java.util.Random; + +import junit.framework.AssertionFailedError; + +import org.hibernate.cache.UpdateTimestampsCache; +import org.hibernate.cache.jbc2.CacheInstanceManager; +import org.hibernate.cache.jbc2.JBossCacheRegionFactory; +import org.hibernate.cache.jbc2.MultiplexedJBossCacheRegionFactory; +import org.hibernate.cache.jbc2.timestamp.ClusteredConcurrentTimestampsRegionImpl; +import org.hibernate.cfg.Configuration; +import org.hibernate.test.cache.jbc2.AbstractJBossCacheTestCase; +import org.hibernate.test.util.CacheTestUtil; +import org.jboss.cache.Cache; + +/** + * A ClusteredConcurrentTimestampCacheTestCase. + * + * @author Brian Stansberry + * @version $Revision: 1 $ + */ +public class ClusteredConcurrentTimestampRegionTestCase extends AbstractJBossCacheTestCase { + + private static final String KEY1 = "com.foo.test.Entity1"; + private static final String KEY2 = "com.foo.test.Entity2"; + + private static final Long ONE = new Long(1); + private static final Long TWO = new Long(2); + private static final Long THREE = new Long(3); + private static final Long TEN = new Long(10); + private static final Long ELEVEN = new Long(11); + + private static Cache cache; + private static Properties properties; + private ClusteredConcurrentTimestampsRegionImpl region; + + /** + * Create a new ClusteredConcurrentTimestampCacheTestCase. + * + * @param name + */ + public ClusteredConcurrentTimestampRegionTestCase(String name) { + super(name); + } + + + + @Override + protected void setUp() throws Exception { + super.setUp(); + + if (cache == null) { + Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true); + properties = cfg.getProperties(); + cache = createCache(); + + // Sleep a bit to avoid concurrent FLUSH problem + avoidConcurrentFlush(); + } + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + + if (region != null) { + region.destroy(); + } + } + + private Cache createCache() throws Exception { + Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true); + JBossCacheRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg); + CacheInstanceManager mgr = regionFactory.getCacheInstanceManager(); + return mgr.getTimestampsCacheInstance(); + } + + protected ClusteredConcurrentTimestampsRegionImpl getTimestampRegion(Cache cache) throws Exception { + + return new ClusteredConcurrentTimestampsRegionImpl(cache, "test/" + UpdateTimestampsCache.class.getName(), "test", properties); + } + + public void testSimplePreinvalidate() throws Exception { + + region = getTimestampRegion(cache); + + assertEquals(null, region.get(KEY1)); + region.preInvalidate(KEY1, TWO); + assertEquals(TWO, region.get(KEY1)); + region.preInvalidate(KEY1, ONE); + assertEquals(TWO, region.get(KEY1)); + region.preInvalidate(KEY1, TWO); + assertEquals(TWO, region.get(KEY1)); + region.preInvalidate(KEY1, THREE); + assertEquals(THREE, region.get(KEY1)); + } + + public void testInitialState() throws Exception { + + region = getTimestampRegion(cache); + region.preInvalidate(KEY1, TEN); + region.preInvalidate(KEY2, ELEVEN); + region.invalidate(KEY1, ONE, TEN); + + Cache cache2 = createCache(); + registerCache(cache2); + + // Sleep a bit to avoid concurrent FLUSH problem + avoidConcurrentFlush(); + + ClusteredConcurrentTimestampsRegionImpl region2 = getTimestampRegion(cache2); + assertEquals(ONE, region2.get(KEY1)); + assertEquals(ELEVEN, region2.get(KEY2)); + } + + public void testSimpleInvalidate() throws Exception { + + region = getTimestampRegion(cache); + + assertEquals(null, region.get(KEY1)); + region.preInvalidate(KEY1, TWO); + assertEquals(TWO, region.get(KEY1)); + region.invalidate(KEY1, ONE, TWO); + assertEquals(ONE, region.get(KEY1)); + region.preInvalidate(KEY1, TEN); + region.preInvalidate(KEY1, ELEVEN); + assertEquals(ELEVEN, region.get(KEY1)); + region.invalidate(KEY1, TWO, TEN); + assertEquals(ELEVEN, region.get(KEY1)); + region.invalidate(KEY1, TWO, ELEVEN); + assertEquals(TWO, region.get(KEY1)); + region.preInvalidate(KEY1, TEN); + assertEquals(TEN, region.get(KEY1)); + region.invalidate(KEY1, THREE, TEN); + assertEquals(THREE, region.get(KEY1)); + } + + public void testConcurrentActivityClustered() throws Exception { + concurrentActivityTest(true); + } + + public void testConcurrentActivityNonClustered() throws Exception { + concurrentActivityTest(false); + } + + private void concurrentActivityTest(boolean clustered) throws Exception { + + region = getTimestampRegion(cache); + ClusteredConcurrentTimestampsRegionImpl region2 = region; + + if (clustered) { + Cache cache2 = createCache(); + registerCache(cache2); + + // Sleep a bit to avoid concurrent FLUSH problem + avoidConcurrentFlush(); + + region2 = getTimestampRegion(cache2); + } + + Tester[] testers = new Tester[20]; + for (int i = 0; i < testers.length; i++) { + testers[i] = new Tester((i % 2 == 0) ? region : region2); + testers[i].start(); + } + + for (int j = 0; j < 10; j++) { + sleep(2000); + + log.info("Running for " + ((j + 1) * 2) + " seconds"); + + for (int i = 0; i < testers.length; i++) { + if (testers[i].assertionFailure != null) + throw testers[i].assertionFailure; + } + + for (int i = 0; i < testers.length; i++) { + if (testers[i].exception != null) + throw testers[i].exception; + } + } + + for (int i = 0; i < testers.length; i++) { + testers[i].stop(); + } + + for (int i = 0; i < testers.length; i++) { + if (testers[i].assertionFailure != null) + throw testers[i].assertionFailure; + } + + for (int i = 0; i < testers.length; i++) { + if (testers[i].exception != null) + throw testers[i].exception; + } + } + + + + private class Tester implements Runnable { + + ClusteredConcurrentTimestampsRegionImpl region; + Exception exception; + AssertionFailedError assertionFailure; + boolean stopped = true; + Thread thread; + Random random = new Random(); + + Tester(ClusteredConcurrentTimestampsRegionImpl region) { + this.region = region; + } + + public void run() { + stopped = false; + + while (!stopped) { + try { + Long pre = new Long(region.nextTimestamp() + region.getTimeout()); + region.preInvalidate(KEY1, pre); + sleep(random.nextInt(1)); + Long post = new Long(region.nextTimestamp()); + region.invalidate(KEY1, post, pre); + Long ts = (Long) region.get(KEY1); + assertTrue(ts + " >= " + post, ts.longValue() >= post.longValue()); + sleep(random.nextInt(1)); + } + catch (AssertionFailedError e) { + assertionFailure = e; + } + catch (Exception e) { + if (!stopped) + exception = e; + } + finally { + stopped = true; + } + } + } + + void start() { + if (stopped) { + if (thread == null) { + thread = new Thread(this); + thread.setDaemon(true); + } + thread.start(); + } + } + + void stop() { + if (!stopped) { + stopped = true; + try { + thread.join(100); + } + catch (InterruptedException ignored) {} + + if (thread.isAlive()) + thread.interrupt(); + } + } + } +}