Put prototype of a next-gen TimestampsRegion impl under src control

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@14254 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Brian Stansberry 2007-12-21 19:49:29 +00:00
parent 773e1e46f1
commit 5cf738dabb
2 changed files with 644 additions and 0 deletions

View File

@ -0,0 +1,363 @@
/*
* Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, v. 2.1. This program is distributed in the
* hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details. You should have received a
* copy of the GNU Lesser General Public License, v.2.1 along with this
* distribution; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* Red Hat Author(s): Brian Stansberry
*/
package org.hibernate.cache.jbc2.timestamp;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import javax.transaction.Transaction;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.TimestampsRegion;
import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
import org.hibernate.cache.jbc2.util.CacheHelper;
import org.jboss.cache.Cache;
import org.jboss.cache.Fqn;
import org.jboss.cache.config.Option;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.annotation.NodeRemoved;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.notifications.event.NodeRemovedEvent;
/**
* Prototype of a clustered timestamps cache region impl usable if the
* TimestampsRegion API is changed.
* <p>
* Maintains a local (authoritative) cache of timestamps along with the
* distributed cache held in JBoss Cache. Listens for changes in the distributed
* cache and updates the local cache accordingly. Ensures that any changes in
* the local cache represent either 1) an increase in the timestamp or
* 2) a stepback in the timestamp by the caller that initially increased
* it as part of a pre-invalidate call. This approach allows
* timestamp changes to be replicated asynchronously by JBoss Cache while still
* preventing invalid backward changes in timestamps.
* </p>
*
* NOTE: This is just a prototype!!! Only useful if we change the
* TimestampsRegion API.
*
* @author Brian Stansberry
* @version $Revision: 14106 $
*/
@CacheListener
public class ClusteredConcurrentTimestampsRegionImpl extends TransactionalDataRegionAdapter implements TimestampsRegion {
public static final String TYPE = "TS";
private final ConcurrentHashMap localCache = new ConcurrentHashMap();
/**
* Create a new ClusteredConccurentTimestampsRegionImpl.
*
* @param jbcCache
* @param regionName
* @param regionPrefix
* TODO
* @param metadata
*/
public ClusteredConcurrentTimestampsRegionImpl(Cache jbcCache, String regionName, String regionPrefix, Properties properties) {
super(jbcCache, regionName, regionPrefix, null);
jbcCache.addCacheListener(this);
populateLocalCache();
}
@Override
protected Fqn<String> createRegionFqn(String regionName, String regionPrefix) {
return getTypeFirstRegionFqn(regionName, regionPrefix, TYPE);
}
public void evict(Object key) throws CacheException {
Option opt = getNonLockingDataVersionOption(true);
CacheHelper.removeNode(getCacheInstance(), getRegionFqn(), key, opt);
}
public void evictAll() throws CacheException {
Option opt = getNonLockingDataVersionOption(true);
CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt);
// Restore the region root node
CacheHelper.addNode(getCacheInstance(), getRegionFqn(), false, true, null);
}
public Object get(Object key) throws CacheException {
Entry entry = getLocalEntry(key);
Object timestamp = entry.getCurrent();
if (timestamp == null) {
// Double check the distributed cache
Object[] vals = (Object[]) suspendAndGet(key, null, false);
if (vals != null) {
storeDataFromJBC(key, vals);
timestamp = entry.getCurrent();
}
}
return timestamp;
}
public void put(Object key, Object value) throws CacheException {
throw new UnsupportedOperationException("Prototype only; Hibernate core must change the API before really using");
}
public void preInvalidate(Object key, Object value) throws CacheException {
Entry entry = getLocalEntry(key);
if (entry.preInvalidate(value)) {
putInJBossCache(key, entry);
}
}
public void invalidate(Object key, Object value, Object preInvalidateValue) throws CacheException {
Entry entry = getLocalEntry(key);
if (entry.invalidate(value, preInvalidateValue)) {
putInJBossCache(key, entry);
}
}
private void putInJBossCache(Object key, Entry entry) {
// Get an exclusive right to update JBC for this key from this node.
boolean locked = false;
try {
entry.acquireJBCWriteMutex();
locked = true;
// We have the JBCWriteMutex, so no other *local* thread will
// be trying to write this key.
// It's possible here some remote thread has come in and
// changed the values again, but since we are reading the
// values to write to JBC right now, we know we are writing
// the latest values; i.e. we don't assume that what we cached
// in entry.update() above is what we should write to JBC *now*.
// Our write could be redundant, i.e. we are writing what
// some remote thread just came in an wrote. There is a chance
// that yet another remote thread will update us, and we'll then
// overwrite that later data in JBC. But, all remote nodes will
// ignore that change in their localCache; the only place it
// will live will be in JBC, where it can only effect the
// initial state transfer values on newly joined nodes
// (i.e. populateLocalCache()).
// Don't hold the JBC node lock throughout the tx, as that
// prevents reads and other updates
Transaction tx = suspend();
try {
Option opt = getNonLockingDataVersionOption(false);
// We ensure ASYNC semantics (JBCACHE-1175)
opt.setForceAsynchronous(true);
CacheHelper.put(getCacheInstance(), getRegionFqn(), key, entry.getJBCUpdateValues(), opt);
}
finally {
resume(tx);
}
}
catch (InterruptedException e) {
throw new CacheException("Interrupted while acquiring right to update " + key, e);
}
finally {
if (locked) {
entry.releaseJBCWriteMutex();
}
}
}
@Override
public void destroy() throws CacheException {
getCacheInstance().removeCacheListener(this);
super.destroy();
localCache.clear();
}
/**
* Monitors cache events and updates the local cache
*
* @param event
*/
@NodeModified
public void nodeModified(NodeModifiedEvent event) {
if (event.isOriginLocal() || event.isPre())
return;
Fqn fqn = event.getFqn();
Fqn regFqn = getRegionFqn();
if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) {
Object key = fqn.get(regFqn.size());
Object[] vals = (Object[]) event.getData().get(ITEM);
storeDataFromJBC(key, vals);
// TODO consider this hack instead of the simple entry.update above:
// if (!entry.update(vals[0], vals[1])) {
// // Hack! Use the fact that the Object[] stored in JBC is
// // mutable to correct our local JBC state in this callback
// Object[] correct = entry.getJBCUpdateValues();
// vals[0] = correct[0];
// vals[1] = correct[1];
// }
}
}
private void storeDataFromJBC(Object key, Object[] vals) {
Entry entry = getLocalEntry(key);
if (vals[0].equals(vals[1])) {
entry.preInvalidate(vals[0]);
}
else {
entry.invalidate(vals[0], vals[1]);
}
}
/**
* Monitors cache events and updates the local cache
*
* @param event
*/
@NodeRemoved
public void nodeRemoved(NodeRemovedEvent event) {
if (event.isOriginLocal() || event.isPre())
return;
Fqn fqn = event.getFqn();
Fqn regFqn = getRegionFqn();
if (fqn.isChildOrEquals(regFqn)) {
if (fqn.size() == regFqn.size()) {
localCache.clear();
}
else {
Object key = fqn.get(regFqn.size());
localCache.remove(key);
}
}
}
/**
* Brings all data from the distributed cache into our local cache.
*/
private void populateLocalCache() {
Set children = CacheHelper.getChildrenNames(getCacheInstance(), getRegionFqn());
for (Object key : children) {
Object[] vals = (Object[]) suspendAndGet(key, null, false);
if (vals != null) {
storeDataFromJBC(key, vals);
}
}
}
private Entry getLocalEntry(Object key) {
Entry entry = new Entry();
Entry oldEntry = (Entry) localCache.putIfAbsent(key, entry);
return (oldEntry == null ? entry : oldEntry);
}
private class Entry {
private Semaphore writeMutex = new Semaphore(1);
private boolean preInvalidated = false;
private Object preInval = null;
private Object current = null;
void acquireJBCWriteMutex() throws InterruptedException {
writeMutex.acquire();
}
void releaseJBCWriteMutex() {
writeMutex.release();
}
synchronized boolean preInvalidate(Object newVal) {
boolean result = false;
if (newVal instanceof Comparable) {
if (current == null || ((Comparable) newVal).compareTo(current) > 0) {
preInval = current = newVal;
preInvalidated = true;
result = true;
}
}
else {
preInval = current = newVal;
result = true;
}
return result;
}
synchronized boolean invalidate(Object newVal, Object preInvalidateValue) {
boolean result = false;
if (current == null) {
// Initial load from JBC
current = newVal;
preInval = preInvalidateValue;
preInvalidated = false;
result = true;
}
else if (preInvalidated) {
if (newVal instanceof Comparable) {
if (safeEquals(preInvalidateValue, this.preInval)
|| ((Comparable) newVal).compareTo(preInval) > 0) {
current = newVal;
preInval = preInvalidateValue;
preInvalidated = false;
result = true;
}
}
else {
current = newVal;
preInval = preInvalidateValue;
result = true;
}
}
else if (newVal instanceof Comparable) {
// See if we had a 2nd invalidation from the same initial
// preinvalidation timestamp. If so, only increment
// if the new current value is an increase
if (safeEquals(preInvalidateValue, this.preInval)
&& ((Comparable) newVal).compareTo(current) > 0) {
current = newVal;
preInval = preInvalidateValue;
result = true;
}
}
return result;
}
synchronized Object getCurrent() {
return current;
}
synchronized Object getPreInval() {
return preInval;
}
synchronized Object[] getJBCUpdateValues() {
return new Object[] {current, preInval};
}
private boolean safeEquals(Object a, Object b) {
return (a == b || (a != null && a.equals(b)));
}
}
}

View File

@ -0,0 +1,281 @@
/*
* Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, v. 2.1. This program is distributed in the
* hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details. You should have received a
* copy of the GNU Lesser General Public License, v.2.1 along with this
* distribution; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
* Red Hat Author(s): Brian Stansberry
*/
package org.hibernate.test.cache.jbc2.timestamp;
import java.util.Properties;
import java.util.Random;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.UpdateTimestampsCache;
import org.hibernate.cache.jbc2.CacheInstanceManager;
import org.hibernate.cache.jbc2.JBossCacheRegionFactory;
import org.hibernate.cache.jbc2.MultiplexedJBossCacheRegionFactory;
import org.hibernate.cache.jbc2.timestamp.ClusteredConcurrentTimestampsRegionImpl;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.jbc2.AbstractJBossCacheTestCase;
import org.hibernate.test.util.CacheTestUtil;
import org.jboss.cache.Cache;
/**
* A ClusteredConcurrentTimestampCacheTestCase.
*
* @author <a href="brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Revision: 1 $
*/
public class ClusteredConcurrentTimestampRegionTestCase extends AbstractJBossCacheTestCase {
private static final String KEY1 = "com.foo.test.Entity1";
private static final String KEY2 = "com.foo.test.Entity2";
private static final Long ONE = new Long(1);
private static final Long TWO = new Long(2);
private static final Long THREE = new Long(3);
private static final Long TEN = new Long(10);
private static final Long ELEVEN = new Long(11);
private static Cache cache;
private static Properties properties;
private ClusteredConcurrentTimestampsRegionImpl region;
/**
* Create a new ClusteredConcurrentTimestampCacheTestCase.
*
* @param name
*/
public ClusteredConcurrentTimestampRegionTestCase(String name) {
super(name);
}
@Override
protected void setUp() throws Exception {
super.setUp();
if (cache == null) {
Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true);
properties = cfg.getProperties();
cache = createCache();
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
}
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
if (region != null) {
region.destroy();
}
}
private Cache createCache() throws Exception {
Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true);
JBossCacheRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg);
CacheInstanceManager mgr = regionFactory.getCacheInstanceManager();
return mgr.getTimestampsCacheInstance();
}
protected ClusteredConcurrentTimestampsRegionImpl getTimestampRegion(Cache cache) throws Exception {
return new ClusteredConcurrentTimestampsRegionImpl(cache, "test/" + UpdateTimestampsCache.class.getName(), "test", properties);
}
public void testSimplePreinvalidate() throws Exception {
region = getTimestampRegion(cache);
assertEquals(null, region.get(KEY1));
region.preInvalidate(KEY1, TWO);
assertEquals(TWO, region.get(KEY1));
region.preInvalidate(KEY1, ONE);
assertEquals(TWO, region.get(KEY1));
region.preInvalidate(KEY1, TWO);
assertEquals(TWO, region.get(KEY1));
region.preInvalidate(KEY1, THREE);
assertEquals(THREE, region.get(KEY1));
}
public void testInitialState() throws Exception {
region = getTimestampRegion(cache);
region.preInvalidate(KEY1, TEN);
region.preInvalidate(KEY2, ELEVEN);
region.invalidate(KEY1, ONE, TEN);
Cache cache2 = createCache();
registerCache(cache2);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
ClusteredConcurrentTimestampsRegionImpl region2 = getTimestampRegion(cache2);
assertEquals(ONE, region2.get(KEY1));
assertEquals(ELEVEN, region2.get(KEY2));
}
public void testSimpleInvalidate() throws Exception {
region = getTimestampRegion(cache);
assertEquals(null, region.get(KEY1));
region.preInvalidate(KEY1, TWO);
assertEquals(TWO, region.get(KEY1));
region.invalidate(KEY1, ONE, TWO);
assertEquals(ONE, region.get(KEY1));
region.preInvalidate(KEY1, TEN);
region.preInvalidate(KEY1, ELEVEN);
assertEquals(ELEVEN, region.get(KEY1));
region.invalidate(KEY1, TWO, TEN);
assertEquals(ELEVEN, region.get(KEY1));
region.invalidate(KEY1, TWO, ELEVEN);
assertEquals(TWO, region.get(KEY1));
region.preInvalidate(KEY1, TEN);
assertEquals(TEN, region.get(KEY1));
region.invalidate(KEY1, THREE, TEN);
assertEquals(THREE, region.get(KEY1));
}
public void testConcurrentActivityClustered() throws Exception {
concurrentActivityTest(true);
}
public void testConcurrentActivityNonClustered() throws Exception {
concurrentActivityTest(false);
}
private void concurrentActivityTest(boolean clustered) throws Exception {
region = getTimestampRegion(cache);
ClusteredConcurrentTimestampsRegionImpl region2 = region;
if (clustered) {
Cache cache2 = createCache();
registerCache(cache2);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
region2 = getTimestampRegion(cache2);
}
Tester[] testers = new Tester[20];
for (int i = 0; i < testers.length; i++) {
testers[i] = new Tester((i % 2 == 0) ? region : region2);
testers[i].start();
}
for (int j = 0; j < 10; j++) {
sleep(2000);
log.info("Running for " + ((j + 1) * 2) + " seconds");
for (int i = 0; i < testers.length; i++) {
if (testers[i].assertionFailure != null)
throw testers[i].assertionFailure;
}
for (int i = 0; i < testers.length; i++) {
if (testers[i].exception != null)
throw testers[i].exception;
}
}
for (int i = 0; i < testers.length; i++) {
testers[i].stop();
}
for (int i = 0; i < testers.length; i++) {
if (testers[i].assertionFailure != null)
throw testers[i].assertionFailure;
}
for (int i = 0; i < testers.length; i++) {
if (testers[i].exception != null)
throw testers[i].exception;
}
}
private class Tester implements Runnable {
ClusteredConcurrentTimestampsRegionImpl region;
Exception exception;
AssertionFailedError assertionFailure;
boolean stopped = true;
Thread thread;
Random random = new Random();
Tester(ClusteredConcurrentTimestampsRegionImpl region) {
this.region = region;
}
public void run() {
stopped = false;
while (!stopped) {
try {
Long pre = new Long(region.nextTimestamp() + region.getTimeout());
region.preInvalidate(KEY1, pre);
sleep(random.nextInt(1));
Long post = new Long(region.nextTimestamp());
region.invalidate(KEY1, post, pre);
Long ts = (Long) region.get(KEY1);
assertTrue(ts + " >= " + post, ts.longValue() >= post.longValue());
sleep(random.nextInt(1));
}
catch (AssertionFailedError e) {
assertionFailure = e;
}
catch (Exception e) {
if (!stopped)
exception = e;
}
finally {
stopped = true;
}
}
}
void start() {
if (stopped) {
if (thread == null) {
thread = new Thread(this);
thread.setDaemon(true);
}
thread.start();
}
}
void stop() {
if (!stopped) {
stopped = true;
try {
thread.join(100);
}
catch (InterruptedException ignored) {}
if (thread.isAlive())
thread.interrupt();
}
}
}
}