OPENJPA-70. Added new RemoteCommitEvent payload type and logic to fire RemoteCommitEvents when stale records are detected. This logic still gets fired via afterCommit(), even though these checks can happen either after a commit or a flush is issued. Additionally, these events are only fired against local listeners, so actually represent local analysis that is detecting remote events, rather than remote events themselves.

git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@552059 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Patrick Linskey 2007-06-29 23:08:41 +00:00
parent 7db9a7fae5
commit 5c4a8c7e73
4 changed files with 74 additions and 16 deletions

View File

@ -513,7 +513,7 @@ public class DataCacheStoreManager
for (Iterator iter = exceps.iterator(); iter.hasNext(); ) { for (Iterator iter = exceps.iterator(); iter.hasNext(); ) {
Exception e = (Exception) iter.next(); Exception e = (Exception) iter.next();
if (e instanceof OptimisticException) if (e instanceof OptimisticException)
evictOptimisticLockFailure((OptimisticException) e); notifyOptimisticLockFailure((OptimisticException) e);
} }
return exceps; return exceps;
} }
@ -552,22 +552,25 @@ public class DataCacheStoreManager
} }
/** /**
* Evict from the cache the OID (if available) that resulted in an * Fire local staleness detection events from the cache the OID (if
* optimistic lock exception iff the version information in the cache * available) that resulted in an optimistic lock exception iff the
* matches the version information in the state manager for the failed * version information in the cache matches the version information
* instance. This means that we will evict data from the cache for records * in the state manager for the failed instance. This means that we
* that should have successfully committed according to the data cache but * will evict data from the cache for records that should have
* successfully committed according to the data cache but
* did not. The only predictable reason that could cause this behavior * did not. The only predictable reason that could cause this behavior
* is a concurrent out-of-band modification to the database that was not * is a concurrent out-of-band modification to the database that was not
* communicated to the cache. This logic makes OpenJPA's data cache * communicated to the cache. This logic makes OpenJPA's data cache
* somewhat tolerant of such behavior, in that the cache will be cleaned * somewhat tolerant of such behavior, in that the cache will be cleaned
* up as failures occur. * up as failures occur.
*/ */
private void evictOptimisticLockFailure(OptimisticException e) { private void notifyOptimisticLockFailure(OptimisticException e) {
Object o = ((OptimisticException) e).getFailedObject(); Object o = e.getFailedObject();
OpenJPAStateManager sm = _ctx.getStateManager(o); OpenJPAStateManager sm = _ctx.getStateManager(o);
if (sm == null) if (sm == null)
return; return;
Object oid = sm.getId();
boolean remove;
// this logic could be more efficient -- we could aggregate // this logic could be more efficient -- we could aggregate
// all the cache->oid changes, and then use DataCache.removeAll() // all the cache->oid changes, and then use DataCache.removeAll()
@ -579,11 +582,10 @@ public class DataCacheStoreManager
cache.writeLock(); cache.writeLock();
try { try {
DataCachePCData data = cache.get(sm.getId()); DataCachePCData data = cache.get(oid);
if (data == null) if (data == null)
return; return;
boolean remove;
switch (compareVersion(sm, sm.getVersion(), data.getVersion())) { switch (compareVersion(sm, sm.getVersion(), data.getVersion())) {
case StoreManager.VERSION_LATER: case StoreManager.VERSION_LATER:
case StoreManager.VERSION_SAME: case StoreManager.VERSION_SAME:
@ -614,10 +616,17 @@ public class DataCacheStoreManager
break; break;
} }
if (remove) if (remove)
// remove directly instead of via the RemoteCommitListener
// since we have a write lock here already, so this is more
// efficient than read-locking and then write-locking later.
cache.remove(sm.getId()); cache.remove(sm.getId());
} finally { } finally {
cache.writeUnlock(); cache.writeUnlock();
} }
// fire off a remote commit stalenesss detection event.
_ctx.getConfiguration().getRemoteCommitEventManager()
.fireLocalStaleNotification(oid);
} }
public StoreQuery newQuery(String language) { public StoreQuery newQuery(String language) {

View File

@ -40,12 +40,12 @@ public class RemoteCommitEvent
implements Externalizable { implements Externalizable {
/** /**
* Names of added classes, upated and deleted Object IDs. * Names of added classes, updated and deleted Object IDs.
*/ */
public static final int PAYLOAD_OIDS = 0; public static final int PAYLOAD_OIDS = 0;
/** /**
* Names of added classes, added, upated and deleted Object IDs. * Names of added classes, added, updated and deleted Object IDs.
*/ */
public static final int PAYLOAD_OIDS_WITH_ADDS = 1; public static final int PAYLOAD_OIDS_WITH_ADDS = 1;
@ -54,6 +54,16 @@ public class RemoteCommitEvent
*/ */
public static final int PAYLOAD_EXTENTS = 2; public static final int PAYLOAD_EXTENTS = 2;
/**
* The local {@link BrokerFactory} detected that local data is out of date
* with the data store. Stale object IDs will be in t he updated set,
* although it is possible that records were actually deleted, rather than
* updated.
*
* @since 1.0.0
*/
public static final int PAYLOAD_LOCAL_STALE_DETECTION = 3;
private static final Localizer s_loc = Localizer.forPackage private static final Localizer s_loc = Localizer.forPackage
(RemoteCommitEvent.class); (RemoteCommitEvent.class);
@ -114,7 +124,9 @@ public class RemoteCommitEvent
/** /**
* When the event type is not PAYLOAD_EXTENTS, return the set of * When the event type is not PAYLOAD_EXTENTS, return the set of
* object IDs for updated objects. * object IDs for updated objects. When the event type is
* PAYLOAD_LOCAL_STALE_DETECTION, items in this list may actually have
* been deleted from the database.
*/ */
public Collection getUpdatedObjectIds() { public Collection getUpdatedObjectIds() {
if (_payload == PAYLOAD_EXTENTS) if (_payload == PAYLOAD_EXTENTS)

View File

@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Collections;
import org.apache.openjpa.conf.OpenJPAConfiguration; import org.apache.openjpa.conf.OpenJPAConfiguration;
import org.apache.openjpa.kernel.Broker; import org.apache.openjpa.kernel.Broker;
@ -116,6 +117,17 @@ public class RemoteCommitEventManager
listen.afterCommit(ev); listen.afterCommit(ev);
} }
/**
* Fire an event to local listeners only notifying them of a detected
* stale record.
*/
public void fireLocalStaleNotification(Object oid) {
RemoteCommitEvent ev = new RemoteCommitEvent(
RemoteCommitEvent.PAYLOAD_LOCAL_STALE_DETECTION,
null, null, Collections.singleton(oid), null);
fireEvent(ev);
}
////////////////////////////////////// //////////////////////////////////////
// TransactionListener implementation // TransactionListener implementation
////////////////////////////////////// //////////////////////////////////////

View File

@ -28,17 +28,36 @@ import javax.sql.DataSource;
import org.apache.openjpa.persistence.OpenJPAPersistence; import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.test.SingleEMFTestCase; import org.apache.openjpa.persistence.test.SingleEMFTestCase;
import org.apache.openjpa.event.RemoteCommitListener;
import org.apache.openjpa.event.RemoteCommitEvent;
public class TestDataCacheOptimisticLockRecovery public class TestDataCacheOptimisticLockRecovery
extends SingleEMFTestCase { extends SingleEMFTestCase {
private int pk; private int pk;
private int remoteCommitEventStaleCount = 0;
private Object staleOid;
public void setUp() { public void setUp() {
setUp("openjpa.DataCache", "true", setUp("openjpa.DataCache", "true",
"openjpa.RemoteCommitProvider", "sjvm", "openjpa.RemoteCommitProvider", "sjvm",
OptimisticLockInstance.class); OptimisticLockInstance.class);
emf.getConfiguration().getRemoteCommitEventManager().addListener(
new RemoteCommitListener() {
public void afterCommit(RemoteCommitEvent e) {
if (e.getPayloadType() ==
RemoteCommitEvent.PAYLOAD_LOCAL_STALE_DETECTION) {
remoteCommitEventStaleCount++;
staleOid = e.getUpdatedObjectIds().iterator().next();
}
}
public void close() {
}
}
);
EntityManager em = emf.createEntityManager(); EntityManager em = emf.createEntityManager();
em.getTransaction().begin(); em.getTransaction().begin();
OptimisticLockInstance oli = new OptimisticLockInstance("foo"); OptimisticLockInstance oli = new OptimisticLockInstance("foo");
@ -59,6 +78,9 @@ public class TestDataCacheOptimisticLockRecovery
em = emf.createEntityManager(); em = emf.createEntityManager();
em.getTransaction().begin(); em.getTransaction().begin();
OptimisticLockInstance oli = em.find(OptimisticLockInstance.class, pk); OptimisticLockInstance oli = em.find(OptimisticLockInstance.class, pk);
Object oid = OpenJPAPersistence.toOpenJPAObjectId(
OpenJPAPersistence.getMetaData(oli),
OpenJPAPersistence.cast(em).getObjectId(oli));
int firstOpLockValue = oli.getOpLock(); int firstOpLockValue = oli.getOpLock();
em.lock(oli, LockModeType.READ); em.lock(oli, LockModeType.READ);
@ -90,7 +112,11 @@ public class TestDataCacheOptimisticLockRecovery
em.getTransaction().rollback(); em.getTransaction().rollback();
} }
// 4. obtain the object in a new persistence context and // 4. check that the corresponding remote commit event was fired
assertEquals(1, remoteCommitEventStaleCount);
assertEquals(oid, staleOid);
// 5. obtain the object in a new persistence context and
// assert that the oplock column is set to the one that // assert that the oplock column is set to the one that
// happened in the out-of-band transaction // happened in the out-of-band transaction
em.close(); em.close();
@ -102,14 +128,13 @@ public class TestDataCacheOptimisticLockRecovery
assertEquals("data cache is not being cleared when oplock " assertEquals("data cache is not being cleared when oplock "
+ "violations occur", secondOpLockValue, oli.getOpLock()); + "violations occur", secondOpLockValue, oli.getOpLock());
// 5. get a read lock on the instance and commit the tx; this // 6. get a read lock on the instance and commit the tx; this
// time it should go through // time it should go through
em.getTransaction().begin(); em.getTransaction().begin();
em.lock(oli, LockModeType.READ); em.lock(oli, LockModeType.READ);
try { try {
em.getTransaction().commit(); em.getTransaction().commit();
} catch (RollbackException e) { } catch (RollbackException e) {
e.printStackTrace();
throw e; throw e;
} finally { } finally {
if (em.getTransaction().isActive()) if (em.getTransaction().isActive())