HHH-10384 Fix thread safety issues in thread local optimiser
This commit is contained in:
parent
b51f498ac4
commit
bef14a5890
|
@ -28,16 +28,10 @@ import org.jboss.logging.Logger;
|
|||
public class PooledLoThreadLocalOptimizer extends AbstractOptimizer {
|
||||
private static final CoreMessageLogger LOG = Logger.getMessageLogger(
|
||||
CoreMessageLogger.class,
|
||||
PooledLoThreadLocalOptimizer.class.getName()
|
||||
PooledLoOptimizer.class.getName()
|
||||
);
|
||||
|
||||
private static class GenerationState {
|
||||
private GenerationState(final AccessCallback callback, final int incrementSize) {
|
||||
lastSourceValue = callback.getNextValue();
|
||||
upperLimitValue = lastSourceValue.copy().add( incrementSize );
|
||||
value = lastSourceValue.copy();
|
||||
}
|
||||
|
||||
// last value read from db source
|
||||
private IntegralDataTypeHolder lastSourceValue;
|
||||
// the current generator value
|
||||
|
@ -47,7 +41,7 @@ public class PooledLoThreadLocalOptimizer extends AbstractOptimizer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Constructs a PooledThreadLocalLoOptimizer.
|
||||
* Constructs a PooledLoThreadLocalOptimizer.
|
||||
*
|
||||
* @param returnClass The Java type of the values to be generated
|
||||
* @param incrementSize The increment size.
|
||||
|
@ -62,43 +56,39 @@ public class PooledLoThreadLocalOptimizer extends AbstractOptimizer {
|
|||
|
||||
@Override
|
||||
public Serializable generate(AccessCallback callback) {
|
||||
|
||||
GenerationState local = null;
|
||||
|
||||
if ( callback.getTenantIdentifier() == null ) {
|
||||
// for non-multi-tenancy, using a pool per thread
|
||||
local = localAssignedIds.get();
|
||||
}
|
||||
else if ( tenantSpecificState != null ) {
|
||||
// for multi-tenancy, using a pool per unique tenant
|
||||
local = tenantSpecificState.get( callback.getTenantIdentifier() );
|
||||
}
|
||||
|
||||
if ( local != null && local.value.lt( local.upperLimitValue ) ) {
|
||||
return local.value.makeValueThenIncrement();
|
||||
if ( local != null && local.value.lt( local.upperLimitValue ) ) {
|
||||
return local.value.makeValueThenIncrement();
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
final GenerationState generationState = locateGenerationState( callback );
|
||||
final GenerationState generationState = locateGenerationState(callback.getTenantIdentifier());
|
||||
|
||||
if ( callback.getTenantIdentifier() != null ) {
|
||||
return generationState.value.makeValueThenIncrement();
|
||||
if (generationState.lastSourceValue == null
|
||||
|| !generationState.value.lt(generationState.upperLimitValue)) {
|
||||
generationState.lastSourceValue = callback.getNextValue();
|
||||
generationState.upperLimitValue = generationState.lastSourceValue.copy().add(incrementSize);
|
||||
generationState.value = generationState.lastSourceValue.copy();
|
||||
// handle cases where initial-value is less that one (hsqldb for instance).
|
||||
while (generationState.value.lt(1)) {
|
||||
generationState.value.increment();
|
||||
}
|
||||
}
|
||||
else {
|
||||
if ( local == null ) {
|
||||
localAssignedIds.set( generationState );
|
||||
}
|
||||
// if we reached the upper limit value, increment to next block of sequences
|
||||
if ( !generationState.value.lt( generationState.upperLimitValue ) ) {
|
||||
generationState.lastSourceValue = callback.getNextValue();
|
||||
generationState.upperLimitValue = generationState.lastSourceValue.copy().add( incrementSize );
|
||||
generationState.value = generationState.lastSourceValue.copy();
|
||||
// handle cases where initial-value is less that one (hsqldb for instance).
|
||||
while ( generationState.value.lt( 1 ) ) {
|
||||
generationState.value.increment();
|
||||
}
|
||||
}
|
||||
if(callback.getTenantIdentifier() != null) {
|
||||
return generationState.value.makeValueThenIncrement();
|
||||
} else {
|
||||
if ( local == null ) {
|
||||
local = new GenerationState();
|
||||
localAssignedIds.set( local );
|
||||
}
|
||||
local.upperLimitValue = generationState.upperLimitValue.copy();
|
||||
local.value = generationState.value.copy();
|
||||
local.lastSourceValue = generationState.lastSourceValue.copy();
|
||||
generationState.value = generationState.upperLimitValue.copy();
|
||||
return local.value.makeValueThenIncrement();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -107,10 +97,10 @@ public class PooledLoThreadLocalOptimizer extends AbstractOptimizer {
|
|||
private Map<String, GenerationState> tenantSpecificState;
|
||||
private final ThreadLocal<GenerationState> localAssignedIds = new ThreadLocal<GenerationState>();
|
||||
|
||||
private GenerationState locateGenerationState(final AccessCallback callback) {
|
||||
if ( callback.getTenantIdentifier() == null ) {
|
||||
private GenerationState locateGenerationState(String tenantIdentifier) {
|
||||
if ( tenantIdentifier == null ) {
|
||||
if ( noTenantState == null ) {
|
||||
noTenantState = new GenerationState( callback, incrementSize );
|
||||
noTenantState = new GenerationState();
|
||||
}
|
||||
return noTenantState;
|
||||
}
|
||||
|
@ -118,18 +108,17 @@ public class PooledLoThreadLocalOptimizer extends AbstractOptimizer {
|
|||
GenerationState state;
|
||||
if ( tenantSpecificState == null ) {
|
||||
tenantSpecificState = new ConcurrentHashMap<String, GenerationState>();
|
||||
state = new GenerationState( callback, incrementSize );
|
||||
tenantSpecificState.put( callback.getTenantIdentifier(), state );
|
||||
state = new GenerationState();
|
||||
tenantSpecificState.put( tenantIdentifier, state );
|
||||
}
|
||||
else {
|
||||
state = tenantSpecificState.get( callback.getTenantIdentifier() );
|
||||
state = tenantSpecificState.get( tenantIdentifier );
|
||||
if ( state == null ) {
|
||||
state = new GenerationState( callback, incrementSize );
|
||||
tenantSpecificState.put( callback.getTenantIdentifier(), state );
|
||||
state = new GenerationState();
|
||||
tenantSpecificState.put( tenantIdentifier, state );
|
||||
}
|
||||
}
|
||||
return state;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue