diff --git a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedQueryCacheImpl.java b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedQueryCacheImpl.java index 1ada1153c..fe1ec2a73 100644 --- a/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedQueryCacheImpl.java +++ b/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/PreparedQueryCacheImpl.java @@ -362,7 +362,15 @@ public class PreparedQueryCacheImpl implements PreparedQueryCache { public boolean getEnableStatistics(){ return _statsEnabled; } - + + public void setMaxCacheSize(int size) { + ((CacheMap)_delegate).setCacheSize(size); + } + + public int getCacheSize() { + return _delegate.size(); + } + //------------------------------------------------------- // Configurable contract //------------------------------------------------------- diff --git a/openjpa-kernel/src/main/java/org/apache/openjpa/util/CacheMap.java b/openjpa-kernel/src/main/java/org/apache/openjpa/util/CacheMap.java index b25e7dc8f..8385914e7 100644 --- a/openjpa-kernel/src/main/java/org/apache/openjpa/util/CacheMap.java +++ b/openjpa-kernel/src/main/java/org/apache/openjpa/util/CacheMap.java @@ -35,7 +35,8 @@ import org.apache.openjpa.lib.util.SizedMap; import org.apache.openjpa.lib.util.concurrent.ConcurrentHashMap; import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashMap; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Fixed-size map that has ability to pin/unpin entries and move overflow to @@ -65,8 +66,9 @@ public class CacheMap // number of pinned values (not including keys not mapped to values) private int _pinnedSize = 0; - private final ReentrantLock _writeLock = new ReentrantLock(); - private final ReentrantLock _readLock; + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(true); + private final Lock _readLock = rwl.readLock(); + private final Lock _writeLock = rwl.writeLock(); /** * Create a non-LRU (and therefore highly concurrent) cache map with a @@ -128,14 +130,12 @@ public class CacheMap cacheMapOverflowRemoved(key, value); } }; - _readLock = null; } else { cacheMap = new LRUMap(size, load) { public void overflowRemoved(Object key, Object value) { cacheMapOverflowRemoved(key, value); } }; - _readLock = _writeLock; } if (max < 0) max = Integer.MAX_VALUE; @@ -186,16 +186,14 @@ public class CacheMap * Acquire read lock. */ public void readLock() { - if (_readLock != null) - _readLock.lock(); + _readLock.lock(); } /** * Release read lock. */ public void readUnlock() { - if (_readLock != null) - _readLock.unlock(); + _readLock.unlock(); } /** @@ -216,7 +214,7 @@ public class CacheMap * Whether this cache map uses LRU eviction. */ public boolean isLRU() { - return _readLock != null; + return cacheMap instanceof LRUMap; } /** @@ -348,24 +346,26 @@ public class CacheMap } public Object get(Object key) { + boolean putcache = false; + Object val = null; readLock(); try { - // Check the main map first - Object val = cacheMap.get(key); + val = softMap.get(key); if (val == null) { - // if we find the key in the soft map, move it back into - // the primary map - val = softMap.get(key); - if (val != null){ - put(key, val); - }else{ + val = cacheMap.get(key); + if (val == null) { val = pinnedMap.get(key); + } else { + putcache = true; } } - return val; } finally { readUnlock(); + //cannot obtain a write lock while holding a read lock + //doing it this way prevents a deadlock + if (putcache) + put(key, val); } } diff --git a/openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/persistence/jdbc/sqlcache/TestMultithreadedReparameterization.java b/openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/persistence/jdbc/sqlcache/TestMultithreadedReparameterization.java index e0bba900f..64f73717b 100644 --- a/openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/persistence/jdbc/sqlcache/TestMultithreadedReparameterization.java +++ b/openjpa-persistence-jdbc/src/test/java/org/apache/openjpa/persistence/jdbc/sqlcache/TestMultithreadedReparameterization.java @@ -19,10 +19,13 @@ package org.apache.openjpa.persistence.jdbc.sqlcache; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import javax.persistence.EntityManager; +import javax.persistence.Query; import javax.persistence.TypedQuery; import junit.framework.TestCase; @@ -41,19 +44,19 @@ public class TestMultithreadedReparameterization extends TestCase { private static String RESOURCE = "META-INF/persistence.xml"; private static String UNIT_NAME = "PreparedQuery"; protected static OpenJPAEntityManagerFactory emf; - + public void setUp() throws Exception { super.setUp(); if (emf == null) { Properties config = new Properties(); config.put("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true,SchemaAction='drop,add')"); config.put("openjpa.Log", "SQL=WARN"); - config.put("openjpa.jdbc.QuerySQLCache", "true(EnableStatistics=true)"); + config.put("openjpa.jdbc.QuerySQLCache", "true(EnableStatistics=true, MaxCacheSize=2)"); config.put("openjpa.ConnectionFactoryProperties", "PrintParameters=true"); emf = OpenJPAPersistence.createEntityManagerFactory(UNIT_NAME, RESOURCE, config); } } - + public void testReparameterizationUnderHeavyLoad() throws Exception { long baseId = System.currentTimeMillis(); EntityManager em = emf.createEntityManager(); @@ -68,14 +71,16 @@ public class TestMultithreadedReparameterization extends TestCase { em.persist(p); } em.getTransaction().commit(); - + String jpql = "select p from Person p " + "where p.id=:id and p.firstName=:first and p.lastName=:last and p.age=:age"; int nRepeats = 20; Thread[] threads = new Thread[nThreads]; + final List exceptions = Collections.synchronizedList(new ArrayList()); + for (int i = 0; i < nThreads; i++) { Object[] args = {"id", baseId+i, "first", "First"+i, "last", "Last"+i, "age", (short)(20+i)}; - QueryThread thread = new QueryThread(emf.createEntityManager(), jpql, args, nRepeats); + QueryThread thread = new QueryThread(emf.createEntityManager(), jpql, args, nRepeats, exceptions); threads[i] = new Thread(thread); } for (Thread thread : threads) { @@ -84,12 +89,90 @@ public class TestMultithreadedReparameterization extends TestCase { for (Thread thread : threads) { thread.join(); } - QueryStatistics stats = emf.getConfiguration().getQuerySQLCacheInstance().getStatistics(); - assertEquals(nThreads*nRepeats,stats.getExecutionCount(), stats.getExecutionCount(jpql)); - assertEquals(nThreads*nRepeats-1,stats.getExecutionCount(), stats.getHitCount(jpql)); - + try { + QueryStatistics stats = emf.getConfiguration().getQuerySQLCacheInstance().getStatistics(); + for(Throwable t : exceptions) { + fail((t.getCause() != null ? t.getCause().toString() : t.toString())); + } + assertEquals(nThreads*nRepeats,stats.getExecutionCount(), stats.getExecutionCount(jpql)); + assertEquals(nThreads*nRepeats-1,stats.getExecutionCount(), stats.getHitCount(jpql)); + } finally { + //clear statistics for other tests + emf.getConfiguration().getQuerySQLCacheInstance().clear(); + } } - + + /** + * This is a test to verify that the PreparedQueryCache correctly swaps queries between + * the hard and the soft cache maps. It is important for this test that the max cache size + * is set to a number much smaller than the default (1000) to force swapping between hard + * and soft maps. During this swapping interval, it is possible that another thread will + * attempt to read from the maps and cause either NPE or CCE. + * + * @see OPENJPA-2646 + * @throws Exception + */ + public void testCacheSwappingUnderHeavyLoad() throws Exception { + final int nRuns = 10; + final int nThreads = 20; + //This value needs to be more than the max cache size to reliably cause cache + //overflow to start swapping between hard -> soft cache + // ("openjpa.jdbc.QuerySQLCache", "true(MaxCacheSize=2") + final int nQueries = 10; + + final List exceptions = Collections.synchronizedList(new ArrayList()); + + for (int y = 0; y < nRuns; y++) { + Thread[] threads = new Thread[nThreads]; + for (int i = 0; i < nThreads; i++) { + threads[i] = new Thread(new Runnable() { + @Override public void run() { + try { + EntityManager em = emf.createEntityManager(); + // Since the cache (CacheMap) is set to a size of '2' all threads will + // fill up the cache and constantly cause query strings to move + // to/from the main cache and soft cache, eventually causing a + // "cache miss" by a thread. + String qStr = "select p from Person p where p.firstName=:first and p.id = "; + for (int j = 0; j < nQueries; j++) { + Query q = em.createQuery(qStr + j); + q.setParameter("first", "test"); + q.getResultList(); + } + em.close(); + } catch (Throwable t) { + System.err.println("\nThread (" + Thread.currentThread().getName() + + "): Caught the following exception: " + t + + "\n With cause: " + t.getCause()); + //catch the AssertionError so that we can fail the main Thread + exceptions.add(t); + } + } + }); + threads[i].start(); + } + + for (Thread thread : threads) { + synchronized (thread) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + try { + for(Throwable t : exceptions) { + fail((t.getCause() != null ? t.getCause().toString() : t.toString())); + } + } finally { + //clear statistics for other tests + emf.getConfiguration().getQuerySQLCacheInstance().clear(); + } + } + } + /** * Each thread executes same query with same parameters repeatedly. * @@ -101,34 +184,34 @@ public class TestMultithreadedReparameterization extends TestCase { public final String jpql; public final Object[] args; public final int nTimes; - public QueryThread(EntityManager em, String jpql, Object[] args, int r) { + public final List exceptions; + public QueryThread(EntityManager em, String jpql, Object[] args, int r, List exceptions) { this.em = em; this.jpql = jpql; this.args = args; this.nTimes = r; + this.exceptions = exceptions; } - + public void run() { try { - for (int i = 0; i < nTimes; i++) { - TypedQuery q = em.createQuery(jpql, Person.class); - for (int j = 0; j < args.length; j += 2) { - q.setParameter(args[j].toString(), args[j+1]); + for (int i = 0; i < nTimes; i++) { + TypedQuery q = em.createQuery(jpql, Person.class); + for (int j = 0; j < args.length; j += 2) { + q.setParameter(args[j].toString(), args[j+1]); + } + List result = q.getResultList(); + assertEquals(Thread.currentThread() + " failed", 1, result.size()); + Person p = result.get(0); + assertEquals(args[1], p.getId()); + assertEquals(args[3], p.getFirstName()); + assertEquals(args[5], p.getLastName()); + assertEquals(args[7], p.getAge()); } - List result = q.getResultList(); - assertEquals(Thread.currentThread() + " failed", 1, result.size()); - Person p = result.get(0); - assertEquals(args[1], p.getId()); - assertEquals(args[3], p.getFirstName()); - assertEquals(args[5], p.getLastName()); - assertEquals(args[7], p.getAge()); - - } - } catch (Exception ex) { - ex.printStackTrace(); - fail(); + } catch (Throwable t) { + //catch the AssertionError so that we can fail the main Thread + exceptions.add(t); } } - } } diff --git a/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java b/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java index 11e42fbb8..8eaab807f 100644 --- a/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java +++ b/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java @@ -521,7 +521,7 @@ public class QueryImpl extends AbstractQuery implements Serializable { * cache. */ private boolean preExecute(Map params) { - + PreparedQueryCache cache = _em.getPreparedQueryCache(); if (cache == null) { return false; @@ -533,7 +533,7 @@ public class QueryImpl extends AbstractQuery implements Serializable { } return false; } - + // Determine if the query has NULL parameters. If so, then do not use a PreparedQuery from the cache for (Object val : params.values()) { if (val == null) { @@ -541,12 +541,13 @@ public class QueryImpl extends AbstractQuery implements Serializable { return false; } } - + Boolean registered = cache.register(_id, _query, fetch); boolean alreadyCached = (registered == null); String lang = _query.getLanguage(); QueryStatistics stats = cache.getStatistics(); if (alreadyCached && LANG_PREPARED_SQL.equals(lang)) { + //This value is expected to be non-null as it was just registered PreparedQuery pq = _em.getPreparedQuery(_id); if (pq.isInitialized()) { try { @@ -567,7 +568,7 @@ public class QueryImpl extends AbstractQuery implements Serializable { } return registered == Boolean.TRUE; } - + /** * Initialize the registered Prepared Query from the given opaque object. *