HHH-10008 SessionImplementor.getTimestamp() does not return transaction start time;
HHH-9962 Second level query cache returns stale data if query and update statements are executed concurrently
This commit is contained in:
parent
1376b12ca9
commit
1d62197b9d
|
@ -111,15 +111,13 @@ public class StandardQueryCache implements QueryCache {
|
||||||
if ( isNaturalKeyLookup && result.isEmpty() ) {
|
if ( isNaturalKeyLookup && result.isEmpty() ) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final long ts = cacheRegion.nextTimestamp();
|
|
||||||
|
|
||||||
if ( DEBUGGING ) {
|
if ( DEBUGGING ) {
|
||||||
LOG.debugf( "Caching query results in region: %s; timestamp=%s", cacheRegion.getName(), ts );
|
LOG.debugf( "Caching query results in region: %s; timestamp=%s", cacheRegion.getName(), session.getTimestamp() );
|
||||||
}
|
}
|
||||||
|
|
||||||
final List cacheable = new ArrayList( result.size() + 1 );
|
final List cacheable = new ArrayList( result.size() + 1 );
|
||||||
logCachedResultDetails( key, null, returnTypes, cacheable );
|
logCachedResultDetails( key, null, returnTypes, cacheable );
|
||||||
cacheable.add( ts );
|
cacheable.add( session.getTimestamp() );
|
||||||
|
|
||||||
final boolean isSingleResult = returnTypes.length == 1;
|
final boolean isSingleResult = returnTypes.length == 1;
|
||||||
for ( Object aResult : result ) {
|
for ( Object aResult : result ) {
|
||||||
|
|
|
@ -1436,6 +1436,10 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
|
||||||
public Transaction beginTransaction() throws HibernateException {
|
public Transaction beginTransaction() throws HibernateException {
|
||||||
errorIfClosed();
|
errorIfClosed();
|
||||||
Transaction result = getTransaction();
|
Transaction result = getTransaction();
|
||||||
|
// begin on already started transaction is noop, therefore, don't update the timestamp
|
||||||
|
if (result.getStatus() != TransactionStatus.ACTIVE) {
|
||||||
|
timestamp = factory.getSettings().getRegionFactory().nextTimestamp();
|
||||||
|
}
|
||||||
result.begin();
|
result.begin();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -528,6 +528,10 @@ public class StatelessSessionImpl extends AbstractSessionImpl implements Statele
|
||||||
public Transaction beginTransaction() throws HibernateException {
|
public Transaction beginTransaction() throws HibernateException {
|
||||||
errorIfClosed();
|
errorIfClosed();
|
||||||
Transaction result = getTransaction();
|
Transaction result = getTransaction();
|
||||||
|
// begin on already started transaction is noop, therefore, don't update the timestamp
|
||||||
|
if (result.getStatus() != TransactionStatus.ACTIVE) {
|
||||||
|
timestamp = factory.getSettings().getRegionFactory().nextTimestamp();
|
||||||
|
}
|
||||||
result.begin();
|
result.begin();
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,15 +6,24 @@
|
||||||
*/
|
*/
|
||||||
package org.hibernate.test.querycache;
|
package org.hibernate.test.querycache;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.hibernate.Criteria;
|
import org.hibernate.Criteria;
|
||||||
|
import org.hibernate.EmptyInterceptor;
|
||||||
import org.hibernate.Hibernate;
|
import org.hibernate.Hibernate;
|
||||||
import org.hibernate.Query;
|
import org.hibernate.Query;
|
||||||
import org.hibernate.SQLQuery;
|
import org.hibernate.SQLQuery;
|
||||||
import org.hibernate.Session;
|
import org.hibernate.Session;
|
||||||
|
import org.hibernate.SessionBuilder;
|
||||||
import org.hibernate.Transaction;
|
import org.hibernate.Transaction;
|
||||||
import org.hibernate.cfg.AvailableSettings;
|
import org.hibernate.cfg.AvailableSettings;
|
||||||
import org.hibernate.criterion.Restrictions;
|
import org.hibernate.criterion.Restrictions;
|
||||||
|
@ -26,6 +35,7 @@ import org.hibernate.testing.DialectChecks;
|
||||||
import org.hibernate.testing.RequiresDialectFeature;
|
import org.hibernate.testing.RequiresDialectFeature;
|
||||||
import org.hibernate.testing.TestForIssue;
|
import org.hibernate.testing.TestForIssue;
|
||||||
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
|
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
|
||||||
|
import org.hibernate.type.Type;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -39,6 +49,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
|
public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
|
||||||
|
|
||||||
private static final CompositeKey PK = new CompositeKey(1, 2);
|
private static final CompositeKey PK = new CompositeKey(1, 2);
|
||||||
|
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String[] getMappings() {
|
public String[] getMappings() {
|
||||||
|
@ -63,6 +74,17 @@ public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
|
||||||
settings.put( AvailableSettings.GENERATE_STATISTICS, "true" );
|
settings.put( AvailableSettings.GENERATE_STATISTICS, "true" );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void shutDown() {
|
||||||
|
super.shutDown();
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isCleanupTestDataRequired() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected String getCacheConcurrencyStrategy() {
|
protected String getCacheConcurrencyStrategy() {
|
||||||
return "nonstrict-read-write";
|
return "nonstrict-read-write";
|
||||||
|
@ -529,5 +551,114 @@ public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
|
||||||
// assertEquals(1, query.getResultList().size());
|
// assertEquals(1, query.getResultList().size());
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@TestForIssue(jiraKey = "HHH-9962")
|
||||||
|
/* Test courtesy of Giambattista Bloisi */
|
||||||
|
public void testDelayedLoad() throws InterruptedException, ExecutionException {
|
||||||
|
DelayLoadOperations interceptor = new DelayLoadOperations();
|
||||||
|
final SessionBuilder sessionBuilder = sessionFactory().withOptions().interceptor(interceptor);
|
||||||
|
Item item1 = new Item();
|
||||||
|
item1.setName("Item1");
|
||||||
|
item1.setDescription("Washington");
|
||||||
|
Session s1 = sessionBuilder.openSession();
|
||||||
|
Transaction tx1 = s1.beginTransaction();
|
||||||
|
s1.persist(item1);
|
||||||
|
tx1.commit();
|
||||||
|
s1.close();
|
||||||
|
|
||||||
|
Item item2 = new Item();
|
||||||
|
item2.setName("Item2");
|
||||||
|
item2.setDescription("Chicago");
|
||||||
|
Session s2 = sessionBuilder.openSession();
|
||||||
|
Transaction tx2 = s2.beginTransaction();
|
||||||
|
s2.persist(item2);
|
||||||
|
tx2.commit();
|
||||||
|
s2.close();
|
||||||
|
|
||||||
|
interceptor.blockOnLoad();
|
||||||
|
|
||||||
|
Future<Item> fetchedItem = executor.submit(new Callable<Item>() {
|
||||||
|
public Item call() throws Exception {
|
||||||
|
return findByDescription(sessionBuilder, "Washington");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for the onLoad listener to be called
|
||||||
|
interceptor.waitOnLoad();
|
||||||
|
|
||||||
|
Session s3 = sessionBuilder.openSession();
|
||||||
|
Transaction tx3 = s3.beginTransaction();
|
||||||
|
item1.setDescription("New York");
|
||||||
|
item2.setDescription("Washington");
|
||||||
|
s3.update(item1);
|
||||||
|
s3.update(item2);
|
||||||
|
tx3.commit();
|
||||||
|
s3.close();
|
||||||
|
|
||||||
|
interceptor.unblockOnLoad();
|
||||||
|
|
||||||
|
// the concurrent query was executed before the data was amended so
|
||||||
|
// let's expect "Item1" to be returned as living in Washington
|
||||||
|
Item fetched = fetchedItem.get();
|
||||||
|
assertEquals("Item1", fetched.getName());
|
||||||
|
|
||||||
|
// Query again: now "Item2" is expected to live in Washington
|
||||||
|
fetched = findByDescription(sessionBuilder, "Washington");
|
||||||
|
assertEquals("Item2", fetched.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Item findByDescription(SessionBuilder sessionBuilder, final String description) {
|
||||||
|
Session s = sessionBuilder.openSession();
|
||||||
|
try {
|
||||||
|
return (Item) s.createCriteria(Item.class)
|
||||||
|
.setCacheable(true)
|
||||||
|
.setReadOnly(true)
|
||||||
|
.add(Restrictions.eq("description", description))
|
||||||
|
.uniqueResult();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class DelayLoadOperations extends EmptyInterceptor {
|
||||||
|
|
||||||
|
private volatile CountDownLatch blockLatch;
|
||||||
|
private volatile CountDownLatch waitLatch;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean onLoad(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) {
|
||||||
|
// Synchronize load and update activities
|
||||||
|
try {
|
||||||
|
if (waitLatch != null) {
|
||||||
|
waitLatch.countDown();
|
||||||
|
waitLatch = null;
|
||||||
|
}
|
||||||
|
if (blockLatch != null) {
|
||||||
|
blockLatch.await();
|
||||||
|
blockLatch = null;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void blockOnLoad() {
|
||||||
|
blockLatch = new CountDownLatch(1);
|
||||||
|
waitLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitOnLoad() throws InterruptedException {
|
||||||
|
waitLatch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unblockOnLoad() {
|
||||||
|
if (blockLatch != null) {
|
||||||
|
blockLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue