mirror of https://github.com/apache/openjpa.git
OPENJPA-2646: Sporadic NullPointerException and ClassCastException caused by query cache misses in multithreaded environments.
git-svn-id: https://svn.apache.org/repos/asf/openjpa/trunk@1831596 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d577c0fd2
commit
8be86a6903
|
@ -362,7 +362,15 @@ public class PreparedQueryCacheImpl implements PreparedQueryCache {
|
|||
public boolean getEnableStatistics(){
|
||||
return _statsEnabled;
|
||||
}
|
||||
|
||||
|
||||
public void setMaxCacheSize(int size) {
|
||||
((CacheMap)_delegate).setCacheSize(size);
|
||||
}
|
||||
|
||||
public int getCacheSize() {
|
||||
return _delegate.size();
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
// Configurable contract
|
||||
//-------------------------------------------------------
|
||||
|
|
|
@ -35,7 +35,8 @@ import org.apache.openjpa.lib.util.SizedMap;
|
|||
import org.apache.openjpa.lib.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashMap;
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Fixed-size map that has ability to pin/unpin entries and move overflow to
|
||||
|
@ -65,8 +66,9 @@ public class CacheMap
|
|||
// number of pinned values (not including keys not mapped to values)
|
||||
private int _pinnedSize = 0;
|
||||
|
||||
private final ReentrantLock _writeLock = new ReentrantLock();
|
||||
private final ReentrantLock _readLock;
|
||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(true);
|
||||
private final Lock _readLock = rwl.readLock();
|
||||
private final Lock _writeLock = rwl.writeLock();
|
||||
|
||||
/**
|
||||
* Create a non-LRU (and therefore highly concurrent) cache map with a
|
||||
|
@ -128,14 +130,12 @@ public class CacheMap
|
|||
cacheMapOverflowRemoved(key, value);
|
||||
}
|
||||
};
|
||||
_readLock = null;
|
||||
} else {
|
||||
cacheMap = new LRUMap(size, load) {
|
||||
public void overflowRemoved(Object key, Object value) {
|
||||
cacheMapOverflowRemoved(key, value);
|
||||
}
|
||||
};
|
||||
_readLock = _writeLock;
|
||||
}
|
||||
if (max < 0)
|
||||
max = Integer.MAX_VALUE;
|
||||
|
@ -186,16 +186,14 @@ public class CacheMap
|
|||
* Acquire read lock.
|
||||
*/
|
||||
public void readLock() {
|
||||
if (_readLock != null)
|
||||
_readLock.lock();
|
||||
_readLock.lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release read lock.
|
||||
*/
|
||||
public void readUnlock() {
|
||||
if (_readLock != null)
|
||||
_readLock.unlock();
|
||||
_readLock.unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -216,7 +214,7 @@ public class CacheMap
|
|||
* Whether this cache map uses LRU eviction.
|
||||
*/
|
||||
public boolean isLRU() {
|
||||
return _readLock != null;
|
||||
return cacheMap instanceof LRUMap;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -348,24 +346,26 @@ public class CacheMap
|
|||
}
|
||||
|
||||
public Object get(Object key) {
|
||||
boolean putcache = false;
|
||||
Object val = null;
|
||||
readLock();
|
||||
try {
|
||||
// Check the main map first
|
||||
Object val = cacheMap.get(key);
|
||||
val = softMap.get(key);
|
||||
if (val == null) {
|
||||
// if we find the key in the soft map, move it back into
|
||||
// the primary map
|
||||
val = softMap.get(key);
|
||||
if (val != null){
|
||||
put(key, val);
|
||||
}else{
|
||||
val = cacheMap.get(key);
|
||||
if (val == null) {
|
||||
val = pinnedMap.get(key);
|
||||
} else {
|
||||
putcache = true;
|
||||
}
|
||||
}
|
||||
|
||||
return val;
|
||||
} finally {
|
||||
readUnlock();
|
||||
//cannot obtain a write lock while holding a read lock
|
||||
//doing it this way prevents a deadlock
|
||||
if (putcache)
|
||||
put(key, val);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,10 +19,13 @@
|
|||
|
||||
package org.apache.openjpa.persistence.jdbc.sqlcache;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.Query;
|
||||
import javax.persistence.TypedQuery;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
@ -41,19 +44,19 @@ public class TestMultithreadedReparameterization extends TestCase {
|
|||
private static String RESOURCE = "META-INF/persistence.xml";
|
||||
private static String UNIT_NAME = "PreparedQuery";
|
||||
protected static OpenJPAEntityManagerFactory emf;
|
||||
|
||||
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
if (emf == null) {
|
||||
Properties config = new Properties();
|
||||
config.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true,SchemaAction='drop,add')");
|
||||
config.put("openjpa.Log", "SQL=WARN");
|
||||
config.put("openjpa.jdbc.QuerySQLCache", "true(EnableStatistics=true)");
|
||||
config.put("openjpa.jdbc.QuerySQLCache", "true(EnableStatistics=true, MaxCacheSize=2)");
|
||||
config.put("openjpa.ConnectionFactoryProperties", "PrintParameters=true");
|
||||
emf = OpenJPAPersistence.createEntityManagerFactory(UNIT_NAME, RESOURCE, config);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testReparameterizationUnderHeavyLoad() throws Exception {
|
||||
long baseId = System.currentTimeMillis();
|
||||
EntityManager em = emf.createEntityManager();
|
||||
|
@ -68,14 +71,16 @@ public class TestMultithreadedReparameterization extends TestCase {
|
|||
em.persist(p);
|
||||
}
|
||||
em.getTransaction().commit();
|
||||
|
||||
|
||||
String jpql = "select p from Person p "
|
||||
+ "where p.id=:id and p.firstName=:first and p.lastName=:last and p.age=:age";
|
||||
int nRepeats = 20;
|
||||
Thread[] threads = new Thread[nThreads];
|
||||
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
Object[] args = {"id", baseId+i, "first", "First"+i, "last", "Last"+i, "age", (short)(20+i)};
|
||||
QueryThread thread = new QueryThread(emf.createEntityManager(), jpql, args, nRepeats);
|
||||
QueryThread thread = new QueryThread(emf.createEntityManager(), jpql, args, nRepeats, exceptions);
|
||||
threads[i] = new Thread(thread);
|
||||
}
|
||||
for (Thread thread : threads) {
|
||||
|
@ -84,12 +89,90 @@ public class TestMultithreadedReparameterization extends TestCase {
|
|||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
QueryStatistics<String> stats = emf.getConfiguration().getQuerySQLCacheInstance().getStatistics();
|
||||
assertEquals(nThreads*nRepeats,stats.getExecutionCount(), stats.getExecutionCount(jpql));
|
||||
assertEquals(nThreads*nRepeats-1,stats.getExecutionCount(), stats.getHitCount(jpql));
|
||||
|
||||
try {
|
||||
QueryStatistics<String> stats = emf.getConfiguration().getQuerySQLCacheInstance().getStatistics();
|
||||
for(Throwable t : exceptions) {
|
||||
fail((t.getCause() != null ? t.getCause().toString() : t.toString()));
|
||||
}
|
||||
assertEquals(nThreads*nRepeats,stats.getExecutionCount(), stats.getExecutionCount(jpql));
|
||||
assertEquals(nThreads*nRepeats-1,stats.getExecutionCount(), stats.getHitCount(jpql));
|
||||
} finally {
|
||||
//clear statistics for other tests
|
||||
emf.getConfiguration().getQuerySQLCacheInstance().clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This is a test to verify that the PreparedQueryCache correctly swaps queries between
|
||||
* the hard and the soft cache maps. It is important for this test that the max cache size
|
||||
* is set to a number much smaller than the default (1000) to force swapping between hard
|
||||
* and soft maps. During this swapping interval, it is possible that another thread will
|
||||
* attempt to read from the maps and cause either NPE or CCE.
|
||||
*
|
||||
* @see OPENJPA-2646
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testCacheSwappingUnderHeavyLoad() throws Exception {
|
||||
final int nRuns = 10;
|
||||
final int nThreads = 20;
|
||||
//This value needs to be more than the max cache size to reliably cause cache
|
||||
//overflow to start swapping between hard -> soft cache
|
||||
// ("openjpa.jdbc.QuerySQLCache", "true(MaxCacheSize=2")
|
||||
final int nQueries = 10;
|
||||
|
||||
final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
|
||||
|
||||
for (int y = 0; y < nRuns; y++) {
|
||||
Thread[] threads = new Thread[nThreads];
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
threads[i] = new Thread(new Runnable() {
|
||||
@Override public void run() {
|
||||
try {
|
||||
EntityManager em = emf.createEntityManager();
|
||||
// Since the cache (CacheMap) is set to a size of '2' all threads will
|
||||
// fill up the cache and constantly cause query strings to move
|
||||
// to/from the main cache and soft cache, eventually causing a
|
||||
// "cache miss" by a thread.
|
||||
String qStr = "select p from Person p where p.firstName=:first and p.id = ";
|
||||
for (int j = 0; j < nQueries; j++) {
|
||||
Query q = em.createQuery(qStr + j);
|
||||
q.setParameter("first", "test");
|
||||
q.getResultList();
|
||||
}
|
||||
em.close();
|
||||
} catch (Throwable t) {
|
||||
System.err.println("\nThread (" + Thread.currentThread().getName()
|
||||
+ "): Caught the following exception: " + t
|
||||
+ "\n With cause: " + t.getCause());
|
||||
//catch the AssertionError so that we can fail the main Thread
|
||||
exceptions.add(t);
|
||||
}
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
synchronized (thread) {
|
||||
try {
|
||||
thread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
for(Throwable t : exceptions) {
|
||||
fail((t.getCause() != null ? t.getCause().toString() : t.toString()));
|
||||
}
|
||||
} finally {
|
||||
//clear statistics for other tests
|
||||
emf.getConfiguration().getQuerySQLCacheInstance().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Each thread executes same query with same parameters repeatedly.
|
||||
*
|
||||
|
@ -101,34 +184,34 @@ public class TestMultithreadedReparameterization extends TestCase {
|
|||
public final String jpql;
|
||||
public final Object[] args;
|
||||
public final int nTimes;
|
||||
public QueryThread(EntityManager em, String jpql, Object[] args, int r) {
|
||||
public final List<Throwable> exceptions;
|
||||
public QueryThread(EntityManager em, String jpql, Object[] args, int r, List<Throwable> exceptions) {
|
||||
this.em = em;
|
||||
this.jpql = jpql;
|
||||
this.args = args;
|
||||
this.nTimes = r;
|
||||
this.exceptions = exceptions;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
for (int i = 0; i < nTimes; i++) {
|
||||
TypedQuery<Person> q = em.createQuery(jpql, Person.class);
|
||||
for (int j = 0; j < args.length; j += 2) {
|
||||
q.setParameter(args[j].toString(), args[j+1]);
|
||||
for (int i = 0; i < nTimes; i++) {
|
||||
TypedQuery<Person> q = em.createQuery(jpql, Person.class);
|
||||
for (int j = 0; j < args.length; j += 2) {
|
||||
q.setParameter(args[j].toString(), args[j+1]);
|
||||
}
|
||||
List<Person> result = q.getResultList();
|
||||
assertEquals(Thread.currentThread() + " failed", 1, result.size());
|
||||
Person p = result.get(0);
|
||||
assertEquals(args[1], p.getId());
|
||||
assertEquals(args[3], p.getFirstName());
|
||||
assertEquals(args[5], p.getLastName());
|
||||
assertEquals(args[7], p.getAge());
|
||||
}
|
||||
List<Person> result = q.getResultList();
|
||||
assertEquals(Thread.currentThread() + " failed", 1, result.size());
|
||||
Person p = result.get(0);
|
||||
assertEquals(args[1], p.getId());
|
||||
assertEquals(args[3], p.getFirstName());
|
||||
assertEquals(args[5], p.getLastName());
|
||||
assertEquals(args[7], p.getAge());
|
||||
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
fail();
|
||||
} catch (Throwable t) {
|
||||
//catch the AssertionError so that we can fail the main Thread
|
||||
exceptions.add(t);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -521,7 +521,7 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
|
|||
* cache.
|
||||
*/
|
||||
private boolean preExecute(Map params) {
|
||||
|
||||
|
||||
PreparedQueryCache cache = _em.getPreparedQueryCache();
|
||||
if (cache == null) {
|
||||
return false;
|
||||
|
@ -533,7 +533,7 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// Determine if the query has NULL parameters. If so, then do not use a PreparedQuery from the cache
|
||||
for (Object val : params.values()) {
|
||||
if (val == null) {
|
||||
|
@ -541,12 +541,13 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Boolean registered = cache.register(_id, _query, fetch);
|
||||
boolean alreadyCached = (registered == null);
|
||||
String lang = _query.getLanguage();
|
||||
QueryStatistics<String> stats = cache.getStatistics();
|
||||
if (alreadyCached && LANG_PREPARED_SQL.equals(lang)) {
|
||||
//This value is expected to be non-null as it was just registered
|
||||
PreparedQuery pq = _em.getPreparedQuery(_id);
|
||||
if (pq.isInitialized()) {
|
||||
try {
|
||||
|
@ -567,7 +568,7 @@ public class QueryImpl<X> extends AbstractQuery<X> implements Serializable {
|
|||
}
|
||||
return registered == Boolean.TRUE;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initialize the registered Prepared Query from the given opaque object.
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue