HHH-10381 - Introduce a ThreadLocal-based pooled-lo optimizer to avoid locking

This commit is contained in:
Scott Marlow 2015-12-15 12:31:22 -05:00 committed by Steve Ebersole
parent fe48897692
commit 0c358d80f6
3 changed files with 194 additions and 2 deletions

View File

@ -0,0 +1,147 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.id.enhanced;
import org.hibernate.HibernateException;
import org.hibernate.id.IntegralDataTypeHolder;
import org.hibernate.internal.CoreMessageLogger;
import org.jboss.logging.Logger;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Variation of {@link PooledOptimizer} which interprets the incoming database value as the lo value, rather than
* the hi value, as well as using thread local to cache the generation state.
*
* @author Steve Ebersole
* @author Stuart Douglas
* @author Scott Marlow
*
* @see PooledOptimizer
*/
public class PooledThreadLocalLoOptimizer extends AbstractOptimizer {
private static final CoreMessageLogger LOG = Logger.getMessageLogger(
CoreMessageLogger.class,
PooledThreadLocalLoOptimizer.class.getName()
);
private static class GenerationState {
private GenerationState(final AccessCallback callback, final int incrementSize) {
lastSourceValue = callback.getNextValue();
upperLimitValue = lastSourceValue.copy().add(incrementSize);
value = lastSourceValue.copy();
}
// last value read from db source
private IntegralDataTypeHolder lastSourceValue;
// the current generator value
private IntegralDataTypeHolder value;
// the value at which we'll hit the db again
private IntegralDataTypeHolder upperLimitValue;
}
/**
* Constructs a PooledThreadLocalLoOptimizer.
*
* @param returnClass The Java type of the values to be generated
* @param incrementSize The increment size.
*/
public PooledThreadLocalLoOptimizer(Class returnClass, int incrementSize) {
super( returnClass, incrementSize );
if ( incrementSize < 1 ) {
throw new HibernateException( "increment size cannot be less than 1" );
}
LOG.creatingPooledLoOptimizer( incrementSize, returnClass.getName() );
}
@Override
public Serializable generate(AccessCallback callback) {
GenerationState local = null;
if ( callback.getTenantIdentifier() == null ) { // for non-multi-tenancy, using a pool per thread
local = localAssignedIds.get();
} else if (tenantSpecificState != null) { // for multi-tenancy, using a pool per unique tenant
local = tenantSpecificState.get( callback.getTenantIdentifier());
}
if ( local != null && local.value.lt( local.upperLimitValue ) ) {
return local.value.makeValueThenIncrement();
}
synchronized (this) {
final GenerationState generationState = locateGenerationState(callback);
if(callback.getTenantIdentifier() != null) {
return generationState.value.makeValueThenIncrement();
} else {
if ( local == null ) {
localAssignedIds.set( generationState );
}
// if we reached the upper limit value, increment to next block of sequences
if (!generationState.value.lt(generationState.upperLimitValue)) {
generationState.lastSourceValue = callback.getNextValue();
generationState.upperLimitValue = generationState.lastSourceValue.copy().add(incrementSize);
generationState.value = generationState.lastSourceValue.copy();
// handle cases where initial-value is less that one (hsqldb for instance).
while (generationState.value.lt(1)) {
generationState.value.increment();
}
}
return generationState.value.makeValueThenIncrement();
}
}
}
private GenerationState noTenantState;
private Map<String,GenerationState> tenantSpecificState;
private final ThreadLocal<GenerationState> localAssignedIds = new ThreadLocal<GenerationState>();
private GenerationState locateGenerationState(final AccessCallback callback) {
if ( callback.getTenantIdentifier() == null ) {
if (noTenantState == null) {
noTenantState = new GenerationState(callback, incrementSize);
}
return noTenantState;
}
else {
GenerationState state;
if ( tenantSpecificState == null ) {
tenantSpecificState = new ConcurrentHashMap<String, GenerationState>();
state = new GenerationState(callback, incrementSize);
tenantSpecificState.put( callback.getTenantIdentifier(), state );
}
else {
state = tenantSpecificState.get( callback.getTenantIdentifier() );
if ( state == null ) {
state = new GenerationState(callback, incrementSize);
tenantSpecificState.put( callback.getTenantIdentifier(), state );
}
}
return state;
}
}
private GenerationState noTenantGenerationState() {
if ( noTenantState == null ) {
throw new IllegalStateException( "Could not locate previous generation state for no-tenant" );
}
return noTenantState;
}
@Override
public IntegralDataTypeHolder getLastSourceValue() {
return noTenantGenerationState().lastSourceValue;
}
@Override
public boolean applyIncrementSizeToSourceValues() {
return true;
}
}

View File

@ -38,7 +38,12 @@ public enum StandardOptimizerDescriptor {
* Describes the optimizer for use with tables/sequences that store the chunk information. Here, specifically the * Describes the optimizer for use with tables/sequences that store the chunk information. Here, specifically the
* lo value is stored in the database. * lo value is stored in the database.
*/ */
POOLED_LO( "pooled-lo", PooledLoOptimizer.class, true ); POOLED_LO( "pooled-lo", PooledLoOptimizer.class, true ),
/**
* Describes the optimizer for use with tables/sequences that store the chunk information. Here, specifically the
* lo value is stored in the database and ThreadLocal used to cache the generation state.
*/
POOLED_LOTL( "pooled-lotl", PooledThreadLocalLoOptimizer.class, true );
private static final Logger log = Logger.getLogger( StandardOptimizerDescriptor.class ); private static final Logger log = Logger.getLogger( StandardOptimizerDescriptor.class );
@ -96,6 +101,9 @@ public enum StandardOptimizerDescriptor {
else if ( POOLED_LO.externalName.equals( externalName ) ) { else if ( POOLED_LO.externalName.equals( externalName ) ) {
return POOLED_LO; return POOLED_LO;
} }
else if ( POOLED_LOTL.externalName.equals( externalName ) ) {
return POOLED_LOTL;
}
else { else {
log.debugf( "Unknown optimizer key [%s]; returning null assuming Optimizer impl class name", externalName ); log.debugf( "Unknown optimizer key [%s]; returning null assuming Optimizer impl class name", externalName );
return null; return null;

View File

@ -6,6 +6,7 @@
*/ */
package org.hibernate.id.enhanced; package org.hibernate.id.enhanced;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.hibernate.id.IdentifierGeneratorHelper; import org.hibernate.id.IdentifierGeneratorHelper;
@ -237,7 +238,6 @@ public class OptimizerUnitTest extends BaseUnitTestCase {
assertEquals( 3, sequence.getTimesCalled() ); assertEquals( 3, sequence.getTimesCalled() );
assertEquals( 7, sequence.getCurrentValue() ); assertEquals( 7, sequence.getCurrentValue() );
} }
@Test @Test
public void testRecoveredPooledLoOptimizerUsage() { public void testRecoveredPooledLoOptimizerUsage() {
final SourceMock sequence = new SourceMock( 1, 3 ); final SourceMock sequence = new SourceMock( 1, 3 );
@ -259,6 +259,39 @@ public class OptimizerUnitTest extends BaseUnitTestCase {
assertEquals( 4, sequence.getCurrentValue() ); assertEquals( 4, sequence.getCurrentValue() );
} }
@Test
public void testBasicPooledThreadLocalLoOptimizerUsage() {
final SourceMock sequence = new SourceMock( 1, 5000 ); // pass 5000 to match default for PooledThreadLocalLoOptimizer.THREAD_LOCAL_BLOCK_SIZE
final Optimizer optimizer = buildPooledThreadLocalLoOptimizer( 1, 5000 );
assertEquals( 0, sequence.getTimesCalled() );
assertEquals( -1, sequence.getCurrentValue() );
Long next = ( Long ) optimizer.generate( sequence );
assertEquals( 1, next.intValue() );
assertEquals( 1, sequence.getTimesCalled() );
assertEquals( 1, sequence.getCurrentValue() );
next = ( Long ) optimizer.generate( sequence );
assertEquals( 2, next.intValue() );
assertEquals( 1, sequence.getTimesCalled() );
assertEquals( 1, sequence.getCurrentValue() );
next = ( Long ) optimizer.generate( sequence );
assertEquals( 3, next.intValue() );
assertEquals( 1, sequence.getTimesCalled() );
assertEquals( 1, sequence.getCurrentValue() );
for( int looper = 0; looper < 5001; looper++) {
next = ( Long ) optimizer.generate( sequence );
}
assertEquals( 3 + 5001, next.intValue() );
assertEquals( 2, sequence.getTimesCalled() );
assertEquals( 5001, sequence.getCurrentValue() );
}
private static Optimizer buildNoneOptimizer(long initial, int increment) { private static Optimizer buildNoneOptimizer(long initial, int increment) {
return buildOptimizer( StandardOptimizerDescriptor.NONE, initial, increment ); return buildOptimizer( StandardOptimizerDescriptor.NONE, initial, increment );
} }
@ -275,6 +308,10 @@ public class OptimizerUnitTest extends BaseUnitTestCase {
return buildOptimizer( StandardOptimizerDescriptor.POOLED_LO, initial, increment ); return buildOptimizer( StandardOptimizerDescriptor.POOLED_LO, initial, increment );
} }
private static Optimizer buildPooledThreadLocalLoOptimizer(long initial, int increment) {
return buildOptimizer( StandardOptimizerDescriptor.POOLED_LOTL, initial, increment );
}
private static Optimizer buildOptimizer( private static Optimizer buildOptimizer(
StandardOptimizerDescriptor descriptor, StandardOptimizerDescriptor descriptor,
long initial, long initial,