Block older operations on primary term transition

Today a replica learns of a new primary term via a cluster state update
and there is not a clean transition between the older primary term and
the newer primary term. This commit modifies this situation so that:
 - a replica shard learns of a new primary term via replication
   operations executed under the mandate of the new primary
 - when a replica shard learns of a new primary term, it blocks
   operations on older terms from reaching the engine, with a clear
   transition point between the operations on the older term and the
   operations on the newer term

This work paves the way for a primary/replica sync on primary
promotion. Future work will also ensure a clean transition point on a
promoted primary, and prepare a replica shard for a sync with the
promoted primary.

Relates #24779
This commit is contained in:
Jason Tedor 2017-05-19 16:17:22 -04:00 committed by GitHub
parent cb7a8d5876
commit 4cd70cf986
13 changed files with 371 additions and 137 deletions

View File

@ -38,7 +38,6 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -52,7 +51,6 @@ import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -179,8 +177,8 @@ public abstract class TransportReplicationAction<
Request shardRequest, IndexShard primary) throws Exception;
/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
* {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)}
* Synchronously execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
@ -584,7 +582,7 @@ public abstract class TransportReplicationAction<
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor);
replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
}
/**
@ -921,7 +919,7 @@ public abstract class TransportReplicationAction<
}
};
indexShard.acquirePrimaryOperationLock(onAcquired, executor);
indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
}
class ShardReference implements Releasable {

View File

@ -612,11 +612,6 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
rescheduleFsyncTask(durability);
}
}
// update primary terms
for (final IndexShard shard : this.shards.values()) {
shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
}
}
private void rescheduleFsyncTask(Translog.Durability durability) {

View File

@ -50,7 +50,6 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
@ -129,6 +128,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -195,7 +195,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final ShardPath path;
private final IndexShardOperationsLock indexShardOperationsLock;
private final IndexShardOperationPermits indexShardOperationPermits;
private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
@ -272,7 +272,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
this.cachingPolicy = cachingPolicy;
}
indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool);
indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool);
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
@ -328,7 +328,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return this.shardFieldData;
}
/**
* Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
*/
@ -340,6 +339,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* notifies the shard of an increase in the primary term
*/
public void updatePrimaryTerm(final long newTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
@ -354,10 +354,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
//
// We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
// potentially preventing re-allocation.
assert shardRouting.primary() == false || shardRouting.initializing() == false :
"a started primary shard should never update it's term. shard: " + shardRouting
+ " current term [" + primaryTerm + "] new term [" + newTerm + "]";
assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
assert shardRouting.initializing() == false :
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm;
}
}
@ -457,9 +460,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation locks are being held here, move state from started to relocated
assert indexShardOperationsLock.getActiveOperationsCount() == 0 :
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
@ -974,7 +977,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, refreshListeners);
indexShardOperationsLock.close();
indexShardOperationPermits.close();
}
}
}
@ -1841,35 +1844,89 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationLock(ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
verifyNotClosed();
verifyPrimary();
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false);
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
}
private final Object primaryTermMutex = new Object();
/**
* Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary
* term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown.
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
*
* @param operationPrimaryTerm the operation primary term
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/
public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquireReplicaOperationPermit(
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed();
verifyReplicationTarget();
if (primaryTerm > opPrimaryTerm) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm) {
synchronized (primaryTermMutex) {
if (operationPrimaryTerm > primaryTerm) {
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm;
primaryTerm = operationPrimaryTerm;
});
} catch (final InterruptedException | TimeoutException e) {
onPermitAcquired.onFailure(e);
return;
}
}
}
}
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true);
assert operationPrimaryTerm <= primaryTerm
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]";
indexShardOperationPermits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (operationPrimaryTerm < primaryTerm) {
releasable.close();
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired);
} else {
onPermitAcquired.onResponse(releasable);
}
}
@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
},
executorOnDelay,
true);
}
private static void onOperationPrimaryTermIsTooOld(
final ShardId shardId,
final long operationPrimaryTerm,
final long primaryTerm,
final ActionListener<Releasable> onPermitAcquired) {
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
operationPrimaryTerm,
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
}
public int getActiveOperationsCount() {
return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
}
private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
@ -36,7 +37,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IndexShardOperationsLock implements Closeable {
final class IndexShardOperationPermits implements Closeable {
private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;
@ -44,10 +45,10 @@ public class IndexShardOperationsLock implements Closeable {
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed due to relocation hand-off
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
private volatile boolean closed;
public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) {
IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
this.shardId = shardId;
this.logger = logger;
this.threadPool = threadPool;
@ -67,7 +68,7 @@ public class IndexShardOperationsLock implements Closeable {
* @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation lock has been closed
* @throws IndexShardClosedException if operation permit has been closed
*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
if (closed) {
@ -75,6 +76,7 @@ public class IndexShardOperationsLock implements Closeable {
}
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
@ -91,7 +93,7 @@ public class IndexShardOperationsLock implements Closeable {
}
if (queuedActions != null) {
// Try acquiring permits on fresh thread (for two reasons):
// - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
@ -106,14 +108,14 @@ public class IndexShardOperationsLock implements Closeable {
}
/**
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations
* terminates.
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* longer blocked.
*
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
if (closed) {

View File

@ -561,6 +561,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
@ -737,6 +738,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
*/
void updateRoutingEntry(ShardRouting shardRouting) throws IOException;
/**
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
*/
void updatePrimaryTerm(long primaryTerm);
/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.

View File

@ -1091,7 +1091,7 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString());
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
@ -1103,7 +1103,7 @@ public class TransportReplicationActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString());
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -444,7 +444,7 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString());
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
@ -456,7 +456,7 @@ public class TransportWriteActionTests extends ESTestCase {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString());
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -18,9 +18,12 @@
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -48,10 +51,23 @@ public class ShardStateIT extends ESIntegTestCase {
indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
logger.info("--> waiting for a yellow index");
// JDK 9 type inference gets confused, so we have to help the
// type inference
assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(),
equalTo(ClusterHealthStatus.YELLOW))));
ensureYellow();
// this forces the primary term to propagate to the replicas
int id = 0;
while (true) {
// find an ID that routes to the right shard, we will only index to the shard that saw a primary failure
final String idAsString = Integer.toString(id);
final int hash = Math.floorMod(Murmur3HashFunction.hash(idAsString), 2);
if (hash == shard) {
client()
.index(new IndexRequest("test", "type", idAsString).source("{ \"f\": \"" + idAsString + "\"}", XContentType.JSON))
.get();
break;
} else {
id++;
}
}
final long term0 = shard == 0 ? 2 : 1;
final long term1 = shard == 1 ? 2 : 1;
@ -63,13 +79,13 @@ public class ShardStateIT extends ESIntegTestCase {
assertPrimaryTerms(term0, term1);
}
protected void assertPrimaryTerms(long term0, long term1) {
protected void assertPrimaryTerms(long shard0Term, long shard1Term) {
for (String node : internalCluster().getNodeNames()) {
logger.debug("--> asserting primary terms terms on [{}]", node);
ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
IndexMetaData metaData = state.metaData().index("test");
assertThat(metaData.primaryTerm(0), equalTo(term0));
assertThat(metaData.primaryTerm(1), equalTo(term1));
assertThat(metaData.primaryTerm(0), equalTo(shard0Term));
assertThat(metaData.primaryTerm(1), equalTo(shard1Term));
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(metaData.getIndex());
if (indexService != null) {

View File

@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
@ -59,6 +61,7 @@ import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
@ -225,7 +228,6 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
assert shardRoutings().stream()
.filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false :
"replica with aId [" + replica.routingEntry().allocationId() + "] already exists";
replica.updatePrimaryTerm(primary.getPrimaryTerm());
replicas.add(replica);
updateAllocationIDsOnPrimary();
}
@ -254,17 +256,13 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
*/
public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException {
final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1;
IndexMetaData.Builder newMetaData =
IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm);
indexMetaData = newMetaData.build();
for (IndexShard shard: replicas) {
shard.updatePrimaryTerm(newTerm);
}
boolean found = replicas.remove(replica);
assert found;
assertTrue(replicas.remove(replica));
closeShards(primary);
primary = replica;
replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary());
primary.updatePrimaryTerm(newTerm);
updateAllocationIDsOnPrimary();
}
@ -476,15 +474,32 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final ReplicaRequest request,
final long globalCheckpoint,
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
try {
IndexShard replica = replicationGroup.replicas.stream()
IndexShard replica = replicationGroup.replicas.stream()
.filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get();
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
performOnReplica(request, replica);
listener.onResponse(new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
} catch (Exception e) {
listener.onFailure(e);
}
replica.acquireReplicaOperationPermit(
request.primaryTerm(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try {
replica.updateGlobalCheckpointOnReplica(globalCheckpoint);
performOnReplica(request, replica);
releasable.close();
listener.onResponse(
new ReplicaResponse(
replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.INDEX);
}
@Override

View File

@ -37,16 +37,15 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class IndexShardOperationsLockTests extends ESTestCase {
public class IndexShardOperationPermitsTests extends ESTestCase {
private static ThreadPool threadPool;
private IndexShardOperationsLock block;
private IndexShardOperationPermits permits;
@BeforeClass
public static void setupThreadPool() {
@ -61,13 +60,13 @@ public class IndexShardOperationsLockTests extends ESTestCase {
@Before
public void createIndexShardOperationsLock() {
block = new IndexShardOperationsLock(new ShardId("blubb", "id", 0), logger, threadPool);
permits = new IndexShardOperationPermits(new ShardId("blubb", "id", 0), logger, threadPool);
}
@After
public void checkNoInflightOperations() {
assertThat(block.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE));
assertThat(block.getActiveOperationsCount(), equalTo(0));
assertThat(permits.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE));
assertThat(permits.getActiveOperationsCount(), equalTo(0));
}
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
@ -87,7 +86,7 @@ public class IndexShardOperationsLockTests extends ESTestCase {
Thread thread = new Thread() {
public void run() {
latch.countDown();
block.acquire(future, ThreadPool.Names.GENERIC, true);
permits.acquire(future, ThreadPool.Names.GENERIC, true);
}
};
futures.add(future);
@ -123,29 +122,29 @@ public class IndexShardOperationsLockTests extends ESTestCase {
public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
block.acquire(future, ThreadPool.Names.GENERIC, true);
permits.acquire(future, ThreadPool.Names.GENERIC, true);
assertTrue(future.isDone());
future.get().close();
}
public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
block.close();
block.acquire(future, ThreadPool.Names.GENERIC, true);
permits.close();
permits.acquire(future, ThreadPool.Names.GENERIC, true);
ExecutionException exception = expectThrows(ExecutionException.class, future::get);
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
}
public void testBlockIfClosed() throws ExecutionException, InterruptedException {
block.close();
expectThrows(IndexShardClosedException.class, () -> block.blockOperations(randomInt(10), TimeUnit.MINUTES,
permits.close();
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); }));
}
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
try (Releasable releasable = blockAndWait()) {
block.acquire(future, ThreadPool.Names.GENERIC, true);
permits.acquire(future, ThreadPool.Names.GENERIC, true);
assertFalse(future.isDone());
}
future.get(1, TimeUnit.HOURS).close();
@ -192,8 +191,8 @@ public class IndexShardOperationsLockTests extends ESTestCase {
context.putHeader("foo", "bar");
context.putTransient("bar", "baz");
// test both with and without a executor name
block.acquire(future, ThreadPool.Names.GENERIC, true);
block.acquire(future2, null, true);
permits.acquire(future, ThreadPool.Names.GENERIC, true);
permits.acquire(future2, null, true);
}
assertFalse(future.isDone());
}
@ -209,7 +208,7 @@ public class IndexShardOperationsLockTests extends ESTestCase {
IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
threadPool.generic().execute(() -> {
try {
block.blockOperations(1, TimeUnit.MINUTES, () -> {
permits.blockOperations(1, TimeUnit.MINUTES, () -> {
try {
blockAcquired.countDown();
releaseBlock.await();
@ -241,31 +240,31 @@ public class IndexShardOperationsLockTests extends ESTestCase {
public void testActiveOperationsCount() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future1 = new PlainActionFuture<>();
block.acquire(future1, ThreadPool.Names.GENERIC, true);
permits.acquire(future1, ThreadPool.Names.GENERIC, true);
assertTrue(future1.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(1));
assertThat(permits.getActiveOperationsCount(), equalTo(1));
PlainActionFuture<Releasable> future2 = new PlainActionFuture<>();
block.acquire(future2, ThreadPool.Names.GENERIC, true);
permits.acquire(future2, ThreadPool.Names.GENERIC, true);
assertTrue(future2.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(2));
assertThat(permits.getActiveOperationsCount(), equalTo(2));
future1.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(1));
assertThat(permits.getActiveOperationsCount(), equalTo(1));
future1.get().close(); // check idempotence
assertThat(block.getActiveOperationsCount(), equalTo(1));
assertThat(permits.getActiveOperationsCount(), equalTo(1));
future2.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(0));
assertThat(permits.getActiveOperationsCount(), equalTo(0));
try (Releasable releasable = blockAndWait()) {
assertThat(block.getActiveOperationsCount(), equalTo(0));
assertThat(permits.getActiveOperationsCount(), equalTo(0));
}
PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();
block.acquire(future3, ThreadPool.Names.GENERIC, true);
permits.acquire(future3, ThreadPool.Names.GENERIC, true);
assertTrue(future3.isDone());
assertThat(block.getActiveOperationsCount(), equalTo(1));
assertThat(permits.getActiveOperationsCount(), equalTo(1));
future3.get().close();
assertThat(block.getActiveOperationsCount(), equalTo(0));
assertThat(permits.getActiveOperationsCount(), equalTo(0));
}
}

View File

@ -33,6 +33,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
@ -118,8 +119,12 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -130,11 +135,13 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
@ -262,20 +269,20 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
try {
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
try {
indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
}
public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
@ -287,10 +294,10 @@ public class IndexShardTests extends IndexShardTestCase {
// simulate promotion
indexShard = newStartedShard(false);
ShardRouting replicaRouting = indexShard.routingEntry();
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
} else {
indexShard = newStartedShard(true);
}
@ -298,15 +305,15 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX);
indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX);
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {
}
}
Releasable operation1 = acquirePrimaryOperationLockBlockingly(indexShard);
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationLockBlockingly(indexShard);
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(2, indexShard.getActiveOperationsCount());
Releasables.close(operation1, operation2);
@ -315,20 +322,20 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX);
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
return fut.get();
}
private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm)
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
return fut.get();
}
public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException {
public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;
@ -367,33 +374,165 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(0, indexShard.getActiveOperationsCount());
if (shardRouting.primary() == false) {
try {
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
fail("shard shouldn't accept primary ops");
} catch (IllegalStateException ignored) {
}
final IllegalStateException e =
expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX));
assertThat(e, hasToString(containsString("shard is not a primary")));
}
final long primaryTerm = indexShard.getPrimaryTerm();
Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm);
final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm);
final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm);
assertEquals(2, indexShard.getActiveOperationsCount());
try {
indexShard.acquireReplicaOperationLock(primaryTerm - 1, null, ThreadPool.Names.INDEX);
fail("you can not increment the operation counter with an older primary term");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("operation term"));
assertThat(e.getMessage(), containsString("too old"));
{
final AtomicBoolean onResponse = new AtomicBoolean();
final AtomicBoolean onFailure = new AtomicBoolean();
final AtomicReference<Exception> onFailureException = new AtomicReference<>();
ActionListener<Releasable> onLockAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
onResponse.set(true);
}
@Override
public void onFailure(Exception e) {
onFailure.set(true);
onFailureException.set(e);
}
};
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX);
assertFalse(onResponse.get());
assertTrue(onFailure.get());
assertThat(onFailureException.get(), instanceOf(IllegalStateException.class));
assertThat(
onFailureException.get(), hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old")));
}
// but you can increment with a newer one..
acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close();
Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());
{
final AtomicBoolean onResponse = new AtomicBoolean();
final AtomicBoolean onFailure = new AtomicBoolean();
final CyclicBarrier barrier = new CyclicBarrier(2);
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + 1 + randomInt(20),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
onResponse.set(true);
releasable.close();
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onFailure(Exception e) {
onFailure.set(true);
}
},
ThreadPool.Names.SAME);
});
thread.start();
barrier.await();
// our operation should be blocked until the previous operations complete
assertFalse(onResponse.get());
assertFalse(onFailure.get());
Releasables.close(operation1);
// our operation should still be blocked
assertFalse(onResponse.get());
assertFalse(onFailure.get());
Releasables.close(operation2);
barrier.await();
// now lock acquisition should have succeeded
assertTrue(onResponse.get());
assertFalse(onFailure.get());
thread.join();
assertEquals(0, indexShard.getActiveOperationsCount());
}
closeShards(indexShard);
}
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);
final CyclicBarrier barrier = new CyclicBarrier(3);
final CountDownLatch latch = new CountDownLatch(2);
final long primaryTerm = indexShard.getPrimaryTerm();
final AtomicLong counter = new AtomicLong();
final AtomicReference<Exception> onFailure = new AtomicReference<>();
final LongFunction<Runnable> function = increment -> () -> {
assert increment > 0;
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
primaryTerm + increment,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
counter.incrementAndGet();
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + increment));
latch.countDown();
releasable.close();
}
@Override
public void onFailure(Exception e) {
onFailure.set(e);
latch.countDown();
}
},
ThreadPool.Names.INDEX);
};
final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
final long secondIncrement = 1 + (randomBoolean() ? 0 : 1);
final Thread first = new Thread(function.apply(firstIncrement));
final Thread second = new Thread(function.apply(secondIncrement));
first.start();
second.start();
// the two threads synchronize attempting to acquire an operation permit
barrier.await();
// we wait for both operations to complete
latch.await();
first.join();
second.join();
final Exception e;
if ((e = onFailure.get()) != null) {
/*
* If one thread tried to set the primary term to a higher value than the other thread and the thread with the higher term won
* the race, then the other thread lost the race and only one operation should have been executed.
*/
assertThat(e, instanceOf(IllegalStateException.class));
assertThat(e, hasToString(matches("operation primary term \\[\\d+\\] is too old")));
assertThat(counter.get(), equalTo(1L));
} else {
assertThat(counter.get(), equalTo(2L));
}
assertThat(indexShard.getPrimaryTerm(), equalTo(primaryTerm + Math.max(firstIncrement, secondIncrement)));
closeShards(indexShard);
}
@ -701,7 +840,7 @@ public class IndexShardTests extends IndexShardTestCase {
}
});
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) {
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// start finalization of recovery
recoveryThread.start();
latch.await();
@ -711,7 +850,7 @@ public class IndexShardTests extends IndexShardTestCase {
// recovery can be now finalized
recoveryThread.join();
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) {
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// lock can again be acquired
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
}
@ -740,7 +879,7 @@ public class IndexShardTests extends IndexShardTestCase {
super.onResponse(releasable);
}
};
shard.acquirePrimaryOperationLock(onLockAcquired, ThreadPool.Names.INDEX);
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX);
onLockAcquiredActions.add(onLockAcquired);
}
@ -764,7 +903,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexThreads[i] = new Thread() {
@Override
public void run() {
try (Releasable operationLock = acquirePrimaryOperationLockBlockingly(shard)) {
try (Releasable operationLock = acquirePrimaryOperationPermitBlockingly(shard)) {
allPrimaryOperationLocksAcquired.countDown();
barrier.await();
} catch (InterruptedException | BrokenBarrierException | ExecutionException e) {

View File

@ -362,6 +362,11 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
this.shardRouting = shardRouting;
}
@Override
public void updatePrimaryTerm(long primaryTerm) {
term = primaryTerm;
}
@Override
public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<String> initializingAllocationIds) {
this.activeAllocationIds = activeAllocationIds;

View File

@ -114,7 +114,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
final ShardId shardId = shard.shardId();
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
shard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX);
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
try (Releasable operationLock = fut.get()) {
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
flushService.attemptSyncedFlush(shardId, listener);