HHH-10008 SessionImplementor.getTimestamp() does not return transaction start time;

HHH-9962 Second level query cache returns stale data if query and update statements are executed concurrently

(cherry picked from commit 1d62197b9d)
This commit is contained in:
Radim Vansa 2015-08-12 13:09:53 +02:00 committed by Steve Ebersole
parent 9d071183a4
commit e048b36e21
4 changed files with 141 additions and 4 deletions

View File

@ -111,15 +111,13 @@ public class StandardQueryCache implements QueryCache {
if ( isNaturalKeyLookup && result.isEmpty() ) { if ( isNaturalKeyLookup && result.isEmpty() ) {
return false; return false;
} }
final long ts = cacheRegion.nextTimestamp();
if ( DEBUGGING ) { if ( DEBUGGING ) {
LOG.debugf( "Caching query results in region: %s; timestamp=%s", cacheRegion.getName(), ts ); LOG.debugf( "Caching query results in region: %s; timestamp=%s", cacheRegion.getName(), session.getTimestamp() );
} }
final List cacheable = new ArrayList( result.size() + 1 ); final List cacheable = new ArrayList( result.size() + 1 );
logCachedResultDetails( key, null, returnTypes, cacheable ); logCachedResultDetails( key, null, returnTypes, cacheable );
cacheable.add( ts ); cacheable.add( session.getTimestamp() );
final boolean isSingleResult = returnTypes.length == 1; final boolean isSingleResult = returnTypes.length == 1;
for ( Object aResult : result ) { for ( Object aResult : result ) {

View File

@ -1436,6 +1436,10 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
public Transaction beginTransaction() throws HibernateException { public Transaction beginTransaction() throws HibernateException {
errorIfClosed(); errorIfClosed();
Transaction result = getTransaction(); Transaction result = getTransaction();
// begin on already started transaction is noop, therefore, don't update the timestamp
if (result.getStatus() != TransactionStatus.ACTIVE) {
timestamp = factory.getSettings().getRegionFactory().nextTimestamp();
}
result.begin(); result.begin();
return result; return result;
} }

View File

@ -528,6 +528,10 @@ public class StatelessSessionImpl extends AbstractSessionImpl implements Statele
public Transaction beginTransaction() throws HibernateException { public Transaction beginTransaction() throws HibernateException {
errorIfClosed(); errorIfClosed();
Transaction result = getTransaction(); Transaction result = getTransaction();
// begin on already started transaction is noop, therefore, don't update the timestamp
if (result.getStatus() != TransactionStatus.ACTIVE) {
timestamp = factory.getSettings().getRegionFactory().nextTimestamp();
}
result.begin(); result.begin();
return result; return result;
} }

View File

@ -6,15 +6,24 @@
*/ */
package org.hibernate.test.querycache; package org.hibernate.test.querycache;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.hibernate.Criteria; import org.hibernate.Criteria;
import org.hibernate.EmptyInterceptor;
import org.hibernate.Hibernate; import org.hibernate.Hibernate;
import org.hibernate.Query; import org.hibernate.Query;
import org.hibernate.SQLQuery; import org.hibernate.SQLQuery;
import org.hibernate.Session; import org.hibernate.Session;
import org.hibernate.SessionBuilder;
import org.hibernate.Transaction; import org.hibernate.Transaction;
import org.hibernate.cfg.AvailableSettings; import org.hibernate.cfg.AvailableSettings;
import org.hibernate.criterion.Restrictions; import org.hibernate.criterion.Restrictions;
@ -26,6 +35,7 @@ import org.hibernate.testing.DialectChecks;
import org.hibernate.testing.RequiresDialectFeature; import org.hibernate.testing.RequiresDialectFeature;
import org.hibernate.testing.TestForIssue; import org.hibernate.testing.TestForIssue;
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase; import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
import org.hibernate.type.Type;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -39,6 +49,7 @@ import static org.junit.Assert.assertTrue;
public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase { public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
private static final CompositeKey PK = new CompositeKey(1, 2); private static final CompositeKey PK = new CompositeKey(1, 2);
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
@Override @Override
public String[] getMappings() { public String[] getMappings() {
@ -63,6 +74,17 @@ public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
settings.put( AvailableSettings.GENERATE_STATISTICS, "true" ); settings.put( AvailableSettings.GENERATE_STATISTICS, "true" );
} }
@Override
protected void shutDown() {
super.shutDown();
executor.shutdown();
}
@Override
protected boolean isCleanupTestDataRequired() {
return true;
}
@Override @Override
protected String getCacheConcurrencyStrategy() { protected String getCacheConcurrencyStrategy() {
return "nonstrict-read-write"; return "nonstrict-read-write";
@ -529,5 +551,114 @@ public class QueryCacheTest extends BaseNonConfigCoreFunctionalTestCase {
// assertEquals(1, query.getResultList().size()); // assertEquals(1, query.getResultList().size());
// } // }
@Test
@TestForIssue(jiraKey = "HHH-9962")
/* Test courtesy of Giambattista Bloisi */
public void testDelayedLoad() throws InterruptedException, ExecutionException {
DelayLoadOperations interceptor = new DelayLoadOperations();
final SessionBuilder sessionBuilder = sessionFactory().withOptions().interceptor(interceptor);
Item item1 = new Item();
item1.setName("Item1");
item1.setDescription("Washington");
Session s1 = sessionBuilder.openSession();
Transaction tx1 = s1.beginTransaction();
s1.persist(item1);
tx1.commit();
s1.close();
Item item2 = new Item();
item2.setName("Item2");
item2.setDescription("Chicago");
Session s2 = sessionBuilder.openSession();
Transaction tx2 = s2.beginTransaction();
s2.persist(item2);
tx2.commit();
s2.close();
interceptor.blockOnLoad();
Future<Item> fetchedItem = executor.submit(new Callable<Item>() {
public Item call() throws Exception {
return findByDescription(sessionBuilder, "Washington");
}
});
// wait for the onLoad listener to be called
interceptor.waitOnLoad();
Session s3 = sessionBuilder.openSession();
Transaction tx3 = s3.beginTransaction();
item1.setDescription("New York");
item2.setDescription("Washington");
s3.update(item1);
s3.update(item2);
tx3.commit();
s3.close();
interceptor.unblockOnLoad();
// the concurrent query was executed before the data was amended so
// let's expect "Item1" to be returned as living in Washington
Item fetched = fetchedItem.get();
assertEquals("Item1", fetched.getName());
// Query again: now "Item2" is expected to live in Washington
fetched = findByDescription(sessionBuilder, "Washington");
assertEquals("Item2", fetched.getName());
}
protected Item findByDescription(SessionBuilder sessionBuilder, final String description) {
Session s = sessionBuilder.openSession();
try {
return (Item) s.createCriteria(Item.class)
.setCacheable(true)
.setReadOnly(true)
.add(Restrictions.eq("description", description))
.uniqueResult();
} finally {
s.close();
}
}
public class DelayLoadOperations extends EmptyInterceptor {
private volatile CountDownLatch blockLatch;
private volatile CountDownLatch waitLatch;
@Override
public boolean onLoad(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) {
// Synchronize load and update activities
try {
if (waitLatch != null) {
waitLatch.countDown();
waitLatch = null;
}
if (blockLatch != null) {
blockLatch.await();
blockLatch = null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return true;
}
public void blockOnLoad() {
blockLatch = new CountDownLatch(1);
waitLatch = new CountDownLatch(1);
}
public void waitOnLoad() throws InterruptedException {
waitLatch.await();
}
public void unblockOnLoad() {
if (blockLatch != null) {
blockLatch.countDown();
}
}
}
} }