Make primary relocation handoff non-blocking (#19013)
Primary relocation and indexing concurrently can currently lead to a deadlock situation as indexing operations are blocked on a (bounded) thread pool during the hand-off phase between old and new primary. This change replaces blocking of indexing operations by putting operations that cannot be executed during relocation hand-off in a queue to be executed once relocation completes. Closes #18553.
This commit is contained in:
parent
50b97ba5f5
commit
b4064ce43f
|
@ -53,7 +53,7 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
|
|||
}
|
||||
|
||||
public void internalAddListener(ActionListener<T> listener) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false);
|
||||
boolean executeImmediate = false;
|
||||
synchronized (this) {
|
||||
if (executedListeners) {
|
||||
|
@ -106,4 +106,4 @@ public abstract class AbstractListenableActionFuture<T, L> extends AdapterAction
|
|||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
|
|||
if (listener instanceof ThreadedActionListener) {
|
||||
return listener;
|
||||
}
|
||||
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
return new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,17 +72,25 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
|
|||
private final ThreadPool threadPool;
|
||||
private final String executor;
|
||||
private final ActionListener<Response> listener;
|
||||
private final boolean forceExecution;
|
||||
|
||||
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener) {
|
||||
public ThreadedActionListener(ESLogger logger, ThreadPool threadPool, String executor, ActionListener<Response> listener,
|
||||
boolean forceExecution) {
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
this.executor = executor;
|
||||
this.listener = listener;
|
||||
this.forceExecution = forceExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onResponse(response);
|
||||
|
@ -98,6 +106,11 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
|
|||
@Override
|
||||
public void onFailure(final Throwable e) {
|
||||
threadPool.executor(executor).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return forceExecution;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
listener.onFailure(e);
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -91,6 +92,7 @@ public abstract class TransportReplicationAction<
|
|||
private final ShardStateAction shardStateAction;
|
||||
private final WriteConsistencyLevel defaultWriteConsistencyLevel;
|
||||
private final TransportRequestOptions transportOptions;
|
||||
private final String executor;
|
||||
|
||||
// package private for testing
|
||||
final String transportReplicaAction;
|
||||
|
@ -108,6 +110,7 @@ public abstract class TransportReplicationAction<
|
|||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.shardStateAction = shardStateAction;
|
||||
this.executor = executor;
|
||||
|
||||
this.transportPrimaryAction = actionName + "[p]";
|
||||
this.transportReplicaAction = actionName + "[r]";
|
||||
|
@ -157,7 +160,7 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
/**
|
||||
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
|
||||
* {@link #acquireReplicaOperationLock(ShardId, long)}.
|
||||
* {@link #acquireReplicaOperationLock(ShardId, long, ActionListener)}.
|
||||
*/
|
||||
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);
|
||||
|
||||
|
@ -235,12 +238,33 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(Request request, TransportChannel channel, Task task) throws Exception {
|
||||
ReplicationTask replicationTask = (ReplicationTask) task;
|
||||
boolean success = false;
|
||||
PrimaryShardReference primaryShardReference = getPrimaryShardReference(request.shardId());
|
||||
public void messageReceived(Request request, TransportChannel channel, Task task) {
|
||||
new AsyncPrimaryAction(request, channel, (ReplicationTask) task).run();
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {
|
||||
|
||||
private final Request request;
|
||||
private final TransportChannel channel;
|
||||
private final ReplicationTask replicationTask;
|
||||
|
||||
AsyncPrimaryAction(Request request, TransportChannel channel, ReplicationTask replicationTask) {
|
||||
this.request = request;
|
||||
this.channel = channel;
|
||||
this.replicationTask = replicationTask;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
acquirePrimaryShardReference(request.shardId(), this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(PrimaryShardReference primaryShardReference) {
|
||||
try {
|
||||
if (primaryShardReference.isRelocated()) {
|
||||
primaryShardReference.close(); // release shard operation lock as soon as possible
|
||||
setPhase(replicationTask, "primary_delegation");
|
||||
// delegate primary phase to relocation target
|
||||
// it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
|
||||
|
@ -268,7 +292,7 @@ public abstract class TransportReplicationAction<
|
|||
setPhase(replicationTask, "primary");
|
||||
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
|
||||
final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings());
|
||||
final ActionListener<Response> listener = createResponseListener(channel, replicationTask, primaryShardReference);
|
||||
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
|
||||
createReplicatedOperation(request, new ActionListener<PrimaryResult>() {
|
||||
@Override
|
||||
public void onResponse(PrimaryResult result) {
|
||||
|
@ -280,29 +304,30 @@ public abstract class TransportReplicationAction<
|
|||
listener.onFailure(e);
|
||||
}
|
||||
}, primaryShardReference, executeOnReplicas).execute();
|
||||
success = true;
|
||||
}
|
||||
} finally {
|
||||
if (success == false) {
|
||||
primaryShardReference.close();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
|
||||
onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult> createReplicatedOperation(
|
||||
Request request, ActionListener<PrimaryResult> listener,
|
||||
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
|
||||
return new ReplicationOperation<>(request, primaryShardReference, listener,
|
||||
executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName
|
||||
);
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
setPhase(replicationTask, "finished");
|
||||
try {
|
||||
channel.sendResponse(t);
|
||||
} catch (IOException e) {
|
||||
e.addSuppressed(t);
|
||||
logger.warn("failed to send response", e);
|
||||
}
|
||||
}
|
||||
|
||||
private ActionListener<Response> createResponseListener(final TransportChannel channel, final ReplicationTask replicationTask,
|
||||
final PrimaryShardReference primaryShardReference) {
|
||||
private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
|
||||
return new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
finish();
|
||||
primaryShardReference.close(); // release shard operation lock before responding to caller
|
||||
setPhase(replicationTask, "finished");
|
||||
try {
|
||||
channel.sendResponse(response);
|
||||
} catch (IOException e) {
|
||||
|
@ -310,15 +335,10 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
}
|
||||
|
||||
private void finish() {
|
||||
primaryShardReference.close();
|
||||
setPhase(replicationTask, "finished");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
primaryShardReference.close(); // release shard operation lock before responding to caller
|
||||
setPhase(replicationTask, "finished");
|
||||
primaryShardReference.close();
|
||||
try {
|
||||
channel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
|
@ -327,6 +347,14 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult> createReplicatedOperation(
|
||||
Request request, ActionListener<PrimaryResult> listener,
|
||||
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
|
||||
return new ReplicationOperation<>(request, primaryShardReference, listener,
|
||||
executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
protected class PrimaryResult implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
|
||||
|
@ -388,7 +416,7 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
}
|
||||
|
||||
private final class AsyncReplicaAction extends AbstractRunnable {
|
||||
private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
|
||||
private final ReplicaRequest request;
|
||||
private final TransportChannel channel;
|
||||
/**
|
||||
|
@ -405,6 +433,18 @@ public abstract class TransportReplicationAction<
|
|||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
try {
|
||||
ReplicaResult replicaResult = shardOperationOnReplica(request);
|
||||
releasable.close(); // release shard operation lock before responding to caller
|
||||
replicaResult.respond(new ResponseListener());
|
||||
} catch (Throwable t) {
|
||||
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
|
||||
AsyncReplicaAction.this.onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (t instanceof RetryOnReplicaException) {
|
||||
|
@ -451,11 +491,7 @@ public abstract class TransportReplicationAction<
|
|||
protected void doRun() throws Exception {
|
||||
setPhase(task, "replica");
|
||||
assert request.shardId() != null : "request shardId must be set";
|
||||
ReplicaResult result;
|
||||
try (Releasable ignored = acquireReplicaOperationLock(request.shardId(), request.primaryTerm())) {
|
||||
result = shardOperationOnReplica(request);
|
||||
}
|
||||
result.respond(new ResponseListener());
|
||||
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), this);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -733,10 +769,10 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
|
||||
/**
|
||||
* returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
|
||||
* tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
|
||||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
|
||||
*/
|
||||
protected PrimaryShardReference getPrimaryShardReference(ShardId shardId) {
|
||||
protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<PrimaryShardReference> onReferenceAcquired) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
IndexShard indexShard = indexService.getShard(shardId.id());
|
||||
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
||||
|
@ -746,17 +782,29 @@ public abstract class TransportReplicationAction<
|
|||
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
|
||||
"actual shard is not a primary " + indexShard.routingEntry());
|
||||
}
|
||||
return new PrimaryShardReference(indexShard, indexShard.acquirePrimaryOperationLock());
|
||||
|
||||
ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
onReferenceAcquired.onFailure(e);
|
||||
}
|
||||
};
|
||||
|
||||
indexShard.acquirePrimaryOperationLock(onAcquired, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire an operation on replicas. The lock is closed as soon as
|
||||
* replication is completed on the node.
|
||||
* tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node.
|
||||
*/
|
||||
protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) {
|
||||
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener<Releasable> onLockAcquired) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
IndexShard indexShard = indexService.getShard(shardId.id());
|
||||
return indexShard.acquireReplicaOperationLock(primaryTerm);
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,117 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Container that represents a resource with reference counting capabilities. Provides operations to suspend acquisition of new references.
|
||||
* This is useful for resource management when resources are intermittently unavailable.
|
||||
*
|
||||
* Assumes less than Integer.MAX_VALUE references are concurrently being held at one point in time.
|
||||
*/
|
||||
public final class SuspendableRefContainer {
|
||||
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
|
||||
private final Semaphore semaphore;
|
||||
|
||||
public SuspendableRefContainer() {
|
||||
// fair semaphore to ensure that blockAcquisition() does not starve under thread contention
|
||||
this.semaphore = new Semaphore(TOTAL_PERMITS, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries acquiring a reference. Returns reference holder if reference acquisition is not blocked at the time of invocation (see
|
||||
* {@link #blockAcquisition()}). Returns null if reference acquisition is blocked at the time of invocation.
|
||||
*
|
||||
* @return reference holder if reference acquisition is not blocked, null otherwise
|
||||
* @throws InterruptedException if the current thread is interrupted
|
||||
*/
|
||||
public Releasable tryAcquire() throws InterruptedException {
|
||||
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
|
||||
return idempotentRelease(1);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation.
|
||||
*
|
||||
* @return reference holder
|
||||
* @throws InterruptedException if the current thread is interrupted
|
||||
*/
|
||||
public Releasable acquire() throws InterruptedException {
|
||||
semaphore.acquire();
|
||||
return idempotentRelease(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a reference. Blocks if reference acquisition is blocked at the time of invocation.
|
||||
*
|
||||
* @return reference holder
|
||||
*/
|
||||
public Releasable acquireUninterruptibly() {
|
||||
semaphore.acquireUninterruptibly();
|
||||
return idempotentRelease(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disables reference acquisition and waits until all existing references are released.
|
||||
* When released, reference acquisition is enabled again.
|
||||
* This guarantees that between successful acquisition and release, no one is holding a reference.
|
||||
*
|
||||
* @return references holder to all references
|
||||
*/
|
||||
public Releasable blockAcquisition() {
|
||||
semaphore.acquireUninterruptibly(TOTAL_PERMITS);
|
||||
return idempotentRelease(TOTAL_PERMITS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that ensures permits are only released once
|
||||
*
|
||||
* @return reference holder
|
||||
*/
|
||||
private Releasable idempotentRelease(int permits) {
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
return () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
semaphore.release(permits);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of references currently being held.
|
||||
*/
|
||||
public int activeRefs() {
|
||||
int availablePermits = semaphore.availablePermits();
|
||||
if (availablePermits == 0) {
|
||||
// when blockAcquisition is holding all permits
|
||||
return 0;
|
||||
} else {
|
||||
return TOTAL_PERMITS - availablePermits;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,6 +32,7 @@ import org.apache.lucene.store.AlreadyClosedException;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
|
||||
|
@ -54,7 +55,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.SuspendableRefContainer;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -128,6 +128,7 @@ import java.util.Objects;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -187,7 +188,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
private final ShardPath path;
|
||||
|
||||
private final SuspendableRefContainer suspendableRefContainer;
|
||||
private final IndexShardOperationsLock indexShardOperationsLock;
|
||||
|
||||
private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
|
||||
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
|
||||
|
@ -260,7 +261,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
} else {
|
||||
cachingPolicy = new UsageTrackingQueryCachingPolicy();
|
||||
}
|
||||
suspendableRefContainer = new SuspendableRefContainer();
|
||||
indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool);
|
||||
searcherWrapper = indexSearcherWrapper;
|
||||
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
|
||||
refreshListeners = buildRefreshListeners();
|
||||
|
@ -436,20 +437,29 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
}
|
||||
|
||||
public IndexShard relocated(String reason) throws IndexShardNotStartedException {
|
||||
try (Releasable block = suspendableRefContainer.blockAcquisition()) {
|
||||
// no shard operation locks are being held here, move state from started to relocated
|
||||
synchronized (mutex) {
|
||||
if (state != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, state);
|
||||
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
|
||||
try {
|
||||
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
// no shard operation locks are being held here, move state from started to relocated
|
||||
assert indexShardOperationsLock.getActiveOperationsCount() == 0 :
|
||||
"in-flight operations in progress while moving shard state to relocated";
|
||||
synchronized (mutex) {
|
||||
if (state != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, state);
|
||||
}
|
||||
changeState(IndexShardState.RELOCATED, reason);
|
||||
}
|
||||
changeState(IndexShardState.RELOCATED, reason);
|
||||
}
|
||||
});
|
||||
} catch (TimeoutException e) {
|
||||
logger.warn("timed out waiting for relocation hand-off to complete");
|
||||
// This is really bad as ongoing replication operations are preventing this shard from completing relocation hand-off.
|
||||
// Fail primary relocation source and target shards.
|
||||
failShard("timed out waiting for relocation hand-off to complete", null);
|
||||
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public IndexShardState state() {
|
||||
return state;
|
||||
}
|
||||
|
@ -842,6 +852,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
|
||||
IOUtils.close(engine);
|
||||
indexShardOperationsLock.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1534,17 +1545,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners);
|
||||
}
|
||||
|
||||
public Releasable acquirePrimaryOperationLock() {
|
||||
/**
|
||||
* Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided
|
||||
* ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided
|
||||
* ActionListener will then be called using the provided executor.
|
||||
*/
|
||||
public void acquirePrimaryOperationLock(ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
|
||||
verifyNotClosed();
|
||||
verifyPrimary();
|
||||
return suspendableRefContainer.acquireUninterruptibly();
|
||||
|
||||
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* acquires operation log. If the given primary term is lower then the one in {@link #shardRouting}
|
||||
* an {@link IllegalArgumentException} is thrown.
|
||||
* Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary
|
||||
* term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown.
|
||||
*/
|
||||
public Releasable acquireReplicaOperationLock(long opPrimaryTerm) {
|
||||
public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
|
||||
verifyNotClosed();
|
||||
verifyReplicationTarget();
|
||||
if (primaryTerm > opPrimaryTerm) {
|
||||
|
@ -1552,11 +1569,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
|
||||
shardId, opPrimaryTerm, primaryTerm));
|
||||
}
|
||||
return suspendableRefContainer.acquireUninterruptibly();
|
||||
|
||||
indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true);
|
||||
}
|
||||
|
||||
public int getActiveOperationsCount() {
|
||||
return suspendableRefContainer.activeRefs(); // refCount is incremented on creation and decremented on close
|
||||
return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ThreadedActionListener;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class IndexShardOperationsLock implements Closeable {
|
||||
private final ShardId shardId;
|
||||
private final ESLogger logger;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
|
||||
// fair semaphore to ensure that blockOperations() does not starve under thread contention
|
||||
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
|
||||
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed due to relocation hand-off
|
||||
private volatile boolean closed;
|
||||
|
||||
public IndexShardOperationsLock(ShardId shardId, ESLogger logger, ThreadPool threadPool) {
|
||||
this.shardId = shardId;
|
||||
this.logger = logger;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
closed = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues
|
||||
* operations that are occurring in the meanwhile and runs them once onBlocked has executed.
|
||||
*
|
||||
* @param timeout the maximum time to wait for the in-flight operations block
|
||||
* @param timeUnit the time unit of the {@code timeout} argument
|
||||
* @param onBlocked the action to run once the block has been acquired
|
||||
* @throws InterruptedException if calling thread is interrupted
|
||||
* @throws TimeoutException if timed out waiting for in-flight operations to finish
|
||||
* @throws IndexShardClosedException if operation lock has been closed
|
||||
*/
|
||||
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
|
||||
if (closed) {
|
||||
throw new IndexShardClosedException(shardId);
|
||||
}
|
||||
try {
|
||||
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
|
||||
try {
|
||||
onBlocked.run();
|
||||
} finally {
|
||||
semaphore.release(TOTAL_PERMITS);
|
||||
}
|
||||
} else {
|
||||
throw new TimeoutException("timed out during blockOperations");
|
||||
}
|
||||
} finally {
|
||||
final List<ActionListener<Releasable>> queuedActions;
|
||||
synchronized (this) {
|
||||
queuedActions = delayedOperations;
|
||||
delayedOperations = null;
|
||||
}
|
||||
if (queuedActions != null) {
|
||||
// Try acquiring permits on fresh thread (for two reasons):
|
||||
// - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled.
|
||||
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
|
||||
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
|
||||
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
|
||||
// handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
for (ActionListener<Releasable> queuedAction : queuedActions) {
|
||||
acquire(queuedAction, null, false);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided
|
||||
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock
|
||||
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations
|
||||
* terminates.
|
||||
*
|
||||
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed
|
||||
* @param executorOnDelay executor to use for delayed call
|
||||
* @param forceExecution whether the runnable should force its execution in case it gets rejected
|
||||
*/
|
||||
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
|
||||
if (closed) {
|
||||
onAcquired.onFailure(new IndexShardClosedException(shardId));
|
||||
return;
|
||||
}
|
||||
Releasable releasable;
|
||||
try {
|
||||
synchronized (this) {
|
||||
releasable = tryAcquire();
|
||||
if (releasable == null) {
|
||||
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
|
||||
if (delayedOperations == null) {
|
||||
delayedOperations = new ArrayList<>();
|
||||
}
|
||||
if (executorOnDelay != null) {
|
||||
delayedOperations.add(
|
||||
new ThreadedActionListener<>(logger, threadPool, executorOnDelay, onAcquired, forceExecution));
|
||||
} else {
|
||||
delayedOperations.add(onAcquired);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
onAcquired.onFailure(e);
|
||||
return;
|
||||
}
|
||||
onAcquired.onResponse(releasable);
|
||||
}
|
||||
|
||||
@Nullable private Releasable tryAcquire() throws InterruptedException {
|
||||
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
return () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
semaphore.release(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public int getActiveOperationsCount() {
|
||||
int availablePermits = semaphore.availablePermits();
|
||||
if (availablePermits == 0) {
|
||||
// when blockOperations is holding all permits
|
||||
return 0;
|
||||
} else {
|
||||
return TOTAL_PERMITS - availablePermits;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
|
@ -360,22 +359,20 @@ public class RecoverySourceHandler {
|
|||
cancellableThreads.checkForCancel();
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode());
|
||||
|
||||
|
||||
cancellableThreads.execute(recoveryTarget::finalizeRecovery);
|
||||
|
||||
if (isPrimaryRelocation()) {
|
||||
logger.trace("[{}][{}] performing relocation hand-off to {}", indexName, shardId, request.targetNode());
|
||||
try {
|
||||
cancellableThreads.execute(() -> shard.relocated("to " + request.targetNode()));
|
||||
} catch (Throwable e) {
|
||||
logger.debug("[{}][{}] completing relocation hand-off to {} failed", e, indexName, shardId, request.targetNode());
|
||||
throw e;
|
||||
}
|
||||
/**
|
||||
* if the recovery process fails after setting the shard state to RELOCATED, both relocation source and
|
||||
* target are failed (see {@link IndexShard#updateRoutingEntry}).
|
||||
*/
|
||||
try {
|
||||
shard.relocated("to " + request.targetNode());
|
||||
} catch (IllegalIndexShardStateException e) {
|
||||
// we can ignore this exception since, on the other node, when it moved to phase3
|
||||
// it will also send shard started, which might cause the index shard we work against
|
||||
// to move be closed by the time we get to the relocated method
|
||||
}
|
||||
}
|
||||
stopWatch.stop();
|
||||
logger.trace("[{}][{}] finalizing recovery to {}: took [{}]",
|
||||
|
|
|
@ -263,12 +263,6 @@ public class RecoveryTargetService extends AbstractComponent implements IndexEve
|
|||
return;
|
||||
}
|
||||
|
||||
if (cause instanceof IndexShardClosedException) {
|
||||
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source shard is " +
|
||||
"closed", cause), false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (cause instanceof AlreadyClosedException) {
|
||||
onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source shard is " +
|
||||
"closed", cause), false);
|
||||
|
|
|
@ -136,7 +136,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
|||
@Override
|
||||
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
|
||||
// remove unneeded threading by wrapping listener with SAME to prevent super.doExecute from wrapping it with LISTENER
|
||||
super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener));
|
||||
super.doExecute(task, request, new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SAME, listener, false));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -390,7 +390,15 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
AtomicBoolean executed = new AtomicBoolean();
|
||||
Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() {
|
||||
|
||||
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
|
||||
boolean executeOnPrimary = true;
|
||||
// whether shard has been marked as relocated already (i.e. relocation completed)
|
||||
if (primaryShard.relocating() && randomBoolean()) {
|
||||
isRelocated.set(true);
|
||||
executeOnPrimary = false;
|
||||
}
|
||||
action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) {
|
||||
@Override
|
||||
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
|
||||
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
|
||||
|
@ -403,15 +411,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
|
||||
boolean executeOnPrimary = true;
|
||||
// whether shard has been marked as relocated already (i.e. relocation completed)
|
||||
if (primaryShard.relocating() && randomBoolean()) {
|
||||
isRelocated.set(true);
|
||||
executeOnPrimary = false;
|
||||
}
|
||||
primaryPhase.messageReceived(request, createTransportChannel(listener), task);
|
||||
}.run();
|
||||
if (executeOnPrimary) {
|
||||
assertTrue(executed.get());
|
||||
assertTrue(listener.isDone());
|
||||
|
@ -445,7 +445,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
ReplicationTask task = maybeTask();
|
||||
AtomicBoolean executed = new AtomicBoolean();
|
||||
Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() {
|
||||
action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) {
|
||||
@Override
|
||||
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
|
||||
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
|
||||
|
@ -458,8 +458,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
primaryPhase.messageReceived(request, createTransportChannel(listener), task);
|
||||
}.run();
|
||||
assertThat(executed.get(), equalTo(true));
|
||||
assertPhase(task, "finished");
|
||||
}
|
||||
|
@ -579,16 +578,18 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings));
|
||||
state = ClusterState.builder(state).metaData(metaData).build();
|
||||
setState(clusterService, state);
|
||||
Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() {
|
||||
AtomicBoolean executed = new AtomicBoolean();
|
||||
action.new AsyncPrimaryAction(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), null) {
|
||||
@Override
|
||||
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
|
||||
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
|
||||
boolean executeOnReplicas) {
|
||||
assertFalse(executeOnReplicas);
|
||||
assertFalse(executed.getAndSet(true));
|
||||
return new NoopReplicationOperation(request, actionListener);
|
||||
}
|
||||
};
|
||||
primaryPhase.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), null);
|
||||
}.run();
|
||||
assertThat(executed.get(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testCounterOnPrimary() throws Exception {
|
||||
|
@ -604,17 +605,16 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
final boolean throwExceptionOnCreation = i == 1;
|
||||
final boolean throwExceptionOnRun = i == 2;
|
||||
final boolean respondWithError = i == 3;
|
||||
Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() {
|
||||
|
||||
action.new AsyncPrimaryAction(request, createTransportChannel(listener), task) {
|
||||
@Override
|
||||
protected ReplicationOperation<Request, Request, Action.PrimaryResult> createReplicatedOperation(Request request,
|
||||
ActionListener<Action.PrimaryResult> listener, Action.PrimaryShardReference primaryShardReference,
|
||||
ActionListener<Action.PrimaryResult> actionListener, Action.PrimaryShardReference primaryShardReference,
|
||||
boolean executeOnReplicas) {
|
||||
assertIndexShardCounter(1);
|
||||
if (throwExceptionOnCreation) {
|
||||
throw new ElasticsearchException("simulated exception, during createReplicatedOperation");
|
||||
}
|
||||
return new NoopReplicationOperation(request, listener) {
|
||||
return new NoopReplicationOperation(request, actionListener) {
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
assertIndexShardCounter(1);
|
||||
|
@ -629,18 +629,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
try {
|
||||
primaryPhase.messageReceived(request, createTransportChannel(listener), task);
|
||||
} catch (ElasticsearchException e) {
|
||||
if (throwExceptionOnCreation || throwExceptionOnRun) {
|
||||
assertThat(e.getMessage(), containsString("simulated"));
|
||||
assertIndexShardCounter(0);
|
||||
return; // early terminate
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}.run();
|
||||
assertIndexShardCounter(0);
|
||||
assertTrue(listener.isDone());
|
||||
assertPhase(task, "finished");
|
||||
|
@ -648,7 +637,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
try {
|
||||
listener.get();
|
||||
} catch (ExecutionException e) {
|
||||
if (respondWithError) {
|
||||
if (throwExceptionOnCreation || throwExceptionOnRun || respondWithError) {
|
||||
Throwable cause = e.getCause();
|
||||
assertThat(cause, instanceOf(ElasticsearchException.class));
|
||||
assertThat(cause.getMessage(), containsString("simulated"));
|
||||
|
@ -787,9 +776,9 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryShardReference getPrimaryShardReference(ShardId shardId) {
|
||||
protected void acquirePrimaryShardReference(ShardId shardId, ActionListener<PrimaryShardReference> onReferenceAcquired) {
|
||||
count.incrementAndGet();
|
||||
return new PrimaryShardReference(null, null) {
|
||||
PrimaryShardReference primaryShardReference = new PrimaryShardReference(null, null) {
|
||||
@Override
|
||||
public boolean isRelocated() {
|
||||
return isRelocated.get();
|
||||
|
@ -812,13 +801,15 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
public void close() {
|
||||
count.decrementAndGet();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
onReferenceAcquired.onResponse(primaryShardReference);
|
||||
}
|
||||
|
||||
protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) {
|
||||
@Override
|
||||
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, ActionListener<Releasable> onLockAcquired) {
|
||||
count.incrementAndGet();
|
||||
return count::decrementAndGet;
|
||||
onLockAcquired.onResponse(count::decrementAndGet);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util.concurrent;
|
||||
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class SuspendableRefContainerTests extends ESTestCase {
|
||||
|
||||
public void testBasicAcquire() throws InterruptedException {
|
||||
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
|
||||
Releasable lock1 = randomLockingMethod(refContainer);
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
Releasable lock2 = randomLockingMethod(refContainer);
|
||||
assertThat(refContainer.activeRefs(), equalTo(2));
|
||||
lock1.close();
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
lock1.close(); // check idempotence
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
lock2.close();
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAcquisitionBlockingBlocksNewAcquisitions() throws InterruptedException {
|
||||
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
|
||||
try (Releasable block = refContainer.blockAcquisition()) {
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
assertThat(refContainer.tryAcquire(), nullValue());
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
try (Releasable lock = refContainer.tryAcquire()) {
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
}
|
||||
|
||||
// same with blocking acquire
|
||||
AtomicBoolean acquired = new AtomicBoolean();
|
||||
Thread t = new Thread(() -> {
|
||||
try (Releasable lock = randomBoolean() ? refContainer.acquire() : refContainer.acquireUninterruptibly()) {
|
||||
acquired.set(true);
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
} catch (InterruptedException e) {
|
||||
fail("Interrupted");
|
||||
}
|
||||
});
|
||||
try (Releasable block = refContainer.blockAcquisition()) {
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
t.start();
|
||||
// check that blocking acquire really blocks
|
||||
assertThat(acquired.get(), equalTo(false));
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
t.join();
|
||||
assertThat(acquired.get(), equalTo(true));
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAcquisitionBlockingWaitsOnExistingAcquisitions() throws InterruptedException {
|
||||
SuspendableRefContainer refContainer = new SuspendableRefContainer();
|
||||
|
||||
AtomicBoolean acquired = new AtomicBoolean();
|
||||
Thread t = new Thread(() -> {
|
||||
try (Releasable block = refContainer.blockAcquisition()) {
|
||||
acquired.set(true);
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
});
|
||||
try (Releasable lock = randomLockingMethod(refContainer)) {
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
t.start();
|
||||
assertThat(acquired.get(), equalTo(false));
|
||||
assertThat(refContainer.activeRefs(), equalTo(1));
|
||||
}
|
||||
t.join();
|
||||
assertThat(acquired.get(), equalTo(true));
|
||||
assertThat(refContainer.activeRefs(), equalTo(0));
|
||||
}
|
||||
|
||||
private Releasable randomLockingMethod(SuspendableRefContainer refContainer) throws InterruptedException {
|
||||
switch (randomInt(2)) {
|
||||
case 0: return refContainer.tryAcquire();
|
||||
case 1: return refContainer.acquire();
|
||||
case 2: return refContainer.acquireUninterruptibly();
|
||||
}
|
||||
throw new IllegalArgumentException("randomLockingMethod inconsistent");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class IndexShardOperationsLockTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
|
||||
private IndexShardOperationsLock block;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupThreadPool() {
|
||||
threadPool = new TestThreadPool("IndexShardOperationsLockTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownThreadPool() {
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
threadPool = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createIndexShardOperationsLock() {
|
||||
block = new IndexShardOperationsLock(new ShardId("blubb", "id", 0), logger, threadPool);
|
||||
}
|
||||
|
||||
@After
|
||||
public void checkNoInflightOperations() {
|
||||
assertThat(block.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE));
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
|
||||
int numThreads = 10;
|
||||
|
||||
List<PlainActionFuture<Releasable>> futures = new ArrayList<>();
|
||||
List<Thread> operationThreads = new ArrayList<>();
|
||||
CountDownLatch latch = new CountDownLatch(numThreads / 2);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
releasable.close();
|
||||
super.onResponse(releasable);
|
||||
}
|
||||
};
|
||||
Thread thread = new Thread() {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
block.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
}
|
||||
};
|
||||
futures.add(future);
|
||||
operationThreads.add(thread);
|
||||
}
|
||||
|
||||
threadPool.generic().execute(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
blockAndWait().close();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
for (Thread thread : operationThreads) {
|
||||
thread.start();
|
||||
}
|
||||
|
||||
for (PlainActionFuture<Releasable> future : futures) {
|
||||
assertNotNull(future.get(1, TimeUnit.MINUTES));
|
||||
}
|
||||
|
||||
for (Thread thread : operationThreads) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
block.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
assertTrue(future.isDone());
|
||||
future.get().close();
|
||||
}
|
||||
|
||||
public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
block.close();
|
||||
block.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
ExecutionException exception = expectThrows(ExecutionException.class, future::get);
|
||||
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
|
||||
}
|
||||
|
||||
public void testBlockIfClosed() throws ExecutionException, InterruptedException {
|
||||
block.close();
|
||||
expectThrows(IndexShardClosedException.class, () -> block.blockOperations(randomInt(10), TimeUnit.MINUTES,
|
||||
() -> { throw new IllegalArgumentException("fake error"); }));
|
||||
}
|
||||
|
||||
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
try (Releasable releasable = blockAndWait()) {
|
||||
block.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
assertFalse(future.isDone());
|
||||
}
|
||||
future.get(1, TimeUnit.MINUTES).close();
|
||||
}
|
||||
|
||||
protected Releasable blockAndWait() throws InterruptedException {
|
||||
CountDownLatch blockAcquired = new CountDownLatch(1);
|
||||
CountDownLatch releaseBlock = new CountDownLatch(1);
|
||||
CountDownLatch blockReleased = new CountDownLatch(1);
|
||||
boolean throwsException = randomBoolean();
|
||||
IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
|
||||
threadPool.generic().execute(() -> {
|
||||
try {
|
||||
block.blockOperations(1, TimeUnit.MINUTES, () -> {
|
||||
try {
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
if (throwsException) {
|
||||
throw exception;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException();
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
if (e != exception) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} finally {
|
||||
blockReleased.countDown();
|
||||
}
|
||||
});
|
||||
blockAcquired.await();
|
||||
return () -> {
|
||||
releaseBlock.countDown();
|
||||
try {
|
||||
blockReleased.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void testActiveOperationsCount() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future1 = new PlainActionFuture<>();
|
||||
block.acquire(future1, ThreadPool.Names.GENERIC, true);
|
||||
assertTrue(future1.isDone());
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(1));
|
||||
|
||||
PlainActionFuture<Releasable> future2 = new PlainActionFuture<>();
|
||||
block.acquire(future2, ThreadPool.Names.GENERIC, true);
|
||||
assertTrue(future2.isDone());
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(2));
|
||||
|
||||
future1.get().close();
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(1));
|
||||
future1.get().close(); // check idempotence
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(1));
|
||||
future2.get().close();
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(0));
|
||||
|
||||
try (Releasable releasable = blockAndWait()) {
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(0));
|
||||
}
|
||||
|
||||
PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();
|
||||
block.acquire(future3, ThreadPool.Names.GENERIC, true);
|
||||
assertTrue(future3.isDone());
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(1));
|
||||
future3.get().close();
|
||||
assertThat(block.getActiveOperationsCount(), equalTo(0));
|
||||
}
|
||||
}
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.action.index.IndexRequest;
|
|||
import org.elasticsearch.action.index.TransportIndexAction;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
|
@ -105,6 +106,7 @@ import org.elasticsearch.test.FieldMaskingReader;
|
|||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
|
@ -121,6 +123,7 @@ import java.util.concurrent.BrokenBarrierException;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -295,13 +298,13 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
client().admin().indices().prepareDelete("test").get();
|
||||
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
||||
try {
|
||||
indexShard.acquirePrimaryOperationLock();
|
||||
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm());
|
||||
indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX);
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
|
@ -339,21 +342,33 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (newPrimaryShardRouting.isRelocationTarget() == false) {
|
||||
try {
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm);
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX);
|
||||
fail("shard shouldn't accept operations as replica");
|
||||
} catch (IllegalStateException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
Releasable operation1 = indexShard.acquirePrimaryOperationLock();
|
||||
Releasable operation1 = acquirePrimaryOperationLockBlockingly(indexShard);
|
||||
assertEquals(1, indexShard.getActiveOperationsCount());
|
||||
Releasable operation2 = indexShard.acquirePrimaryOperationLock();
|
||||
Releasable operation2 = acquirePrimaryOperationLockBlockingly(indexShard);
|
||||
assertEquals(2, indexShard.getActiveOperationsCount());
|
||||
|
||||
Releasables.close(operation1, operation2);
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
}
|
||||
|
||||
private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
indexShard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX);
|
||||
return fut.get();
|
||||
}
|
||||
|
||||
private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX);
|
||||
return fut.get();
|
||||
}
|
||||
|
||||
public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException {
|
||||
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
|
||||
ensureGreen("test");
|
||||
|
@ -399,20 +414,20 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (newShardRouting.primary() == false) {
|
||||
try {
|
||||
indexShard.acquirePrimaryOperationLock();
|
||||
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
|
||||
fail("shard shouldn't accept primary ops");
|
||||
} catch (IllegalStateException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Releasable operation1 = indexShard.acquireReplicaOperationLock(primaryTerm);
|
||||
Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm);
|
||||
assertEquals(1, indexShard.getActiveOperationsCount());
|
||||
Releasable operation2 = indexShard.acquireReplicaOperationLock(primaryTerm);
|
||||
Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm);
|
||||
assertEquals(2, indexShard.getActiveOperationsCount());
|
||||
|
||||
try {
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm - 1);
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm - 1, null, ThreadPool.Names.INDEX);
|
||||
fail("you can not increment the operation counter with an older primary term");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertThat(e.getMessage(), containsString("operation term"));
|
||||
|
@ -420,7 +435,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
|
||||
// but you can increment with a newer one..
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm + 1 + randomInt(20)).close();
|
||||
acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close();
|
||||
Releasables.close(operation1, operation2);
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
}
|
||||
|
@ -882,13 +897,18 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||
final IndexShard shard = test.getShardOrNull(0);
|
||||
assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.STARTED)));
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
latch.countDown();
|
||||
shard.relocated("simulated recovery");
|
||||
try {
|
||||
shard.relocated("simulated recovery");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
try (Releasable ignored = shard.acquirePrimaryOperationLock()) {
|
||||
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) {
|
||||
// start finalization of recovery
|
||||
recoveryThread.start();
|
||||
latch.await();
|
||||
|
@ -898,12 +918,50 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
// recovery can be now finalized
|
||||
recoveryThread.join();
|
||||
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
|
||||
try (Releasable ignored = shard.acquirePrimaryOperationLock()) {
|
||||
try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) {
|
||||
// lock can again be acquired
|
||||
assertThat(shard.state(), equalTo(IndexShardState.RELOCATED));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("test").setSettings(
|
||||
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
|
||||
).get());
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService(resolveIndex("test"));
|
||||
final IndexShard shard = test.getShardOrNull(0);
|
||||
assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.STARTED)));
|
||||
Thread recoveryThread = new Thread(() -> {
|
||||
try {
|
||||
shard.relocated("simulated recovery");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
|
||||
recoveryThread.start();
|
||||
List<PlainActionFuture<Releasable>> onLockAcquiredActions = new ArrayList<>();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
PlainActionFuture<Releasable> onLockAcquired = new PlainActionFuture<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
releasable.close();
|
||||
super.onResponse(releasable);
|
||||
}
|
||||
};
|
||||
shard.acquirePrimaryOperationLock(onLockAcquired, ThreadPool.Names.INDEX);
|
||||
onLockAcquiredActions.add(onLockAcquired);
|
||||
}
|
||||
|
||||
for (PlainActionFuture<Releasable> onLockAcquired : onLockAcquiredActions) {
|
||||
assertNotNull(onLockAcquired.get(30, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
recoveryThread.join();
|
||||
}
|
||||
|
||||
public void testStressRelocated() throws Exception {
|
||||
assertAcked(client().admin().indices().prepareCreate("test").setSettings(
|
||||
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
|
||||
|
@ -920,10 +978,10 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
indexThreads[i] = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try (Releasable operationLock = shard.acquirePrimaryOperationLock()) {
|
||||
try (Releasable operationLock = acquirePrimaryOperationLockBlockingly(shard)) {
|
||||
allPrimaryOperationLocksAcquired.countDown();
|
||||
barrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
} catch (InterruptedException | BrokenBarrierException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
@ -932,7 +990,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
}
|
||||
AtomicBoolean relocated = new AtomicBoolean();
|
||||
final Thread recoveryThread = new Thread(() -> {
|
||||
shard.relocated("simulated recovery");
|
||||
try {
|
||||
shard.relocated("simulated recovery");
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
relocated.set(true);
|
||||
});
|
||||
// ensure we wait for all primary operation locks to be acquired
|
||||
|
@ -1410,8 +1472,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndexShard newShard = new IndexShard(initializingShardRouting, indexService.getIndexSettings(), shard.shardPath(),
|
||||
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
|
||||
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
|
||||
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
|
||||
);
|
||||
indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners));
|
||||
return newShard;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -31,9 +32,11 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -103,7 +106,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
|
|||
assertTrue(response.success());
|
||||
}
|
||||
|
||||
public void testSyncFailsIfOperationIsInFlight() throws InterruptedException {
|
||||
public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, ExecutionException {
|
||||
createIndex("test");
|
||||
client().prepareIndex("test", "test", "1").setSource("{}").get();
|
||||
IndexService test = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
|
||||
|
@ -111,7 +114,9 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
|
|||
|
||||
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
|
||||
final ShardId shardId = shard.shardId();
|
||||
try (Releasable operationLock = shard.acquirePrimaryOperationLock()) {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
shard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX);
|
||||
try (Releasable operationLock = fut.get()) {
|
||||
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
|
||||
flushService.attemptSyncedFlush(shardId, listener);
|
||||
listener.latch.await();
|
||||
|
|
|
@ -432,22 +432,23 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/18553")
|
||||
public void testIndexAndRelocateConcurrently() throws ExecutionException, InterruptedException {
|
||||
int halfNodes = randomIntBetween(1, 3);
|
||||
Settings blueSetting = Settings.builder().put("node.attr.color", "blue").build();
|
||||
InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(blueSetting, blueSetting);
|
||||
InternalTestCluster.Async<List<String>> blueFuture = internalCluster().startNodesAsync(halfNodes, blueSetting);
|
||||
Settings redSetting = Settings.builder().put("node.attr.color", "red").build();
|
||||
InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(redSetting, redSetting);
|
||||
InternalTestCluster.Async<java.util.List<String>> redFuture = internalCluster().startNodesAsync(halfNodes, redSetting);
|
||||
blueFuture.get();
|
||||
redFuture.get();
|
||||
logger.info("blue nodes: {}", blueFuture.get());
|
||||
logger.info("red nodes: {}", redFuture.get());
|
||||
ensureStableCluster(4);
|
||||
ensureStableCluster(halfNodes * 2);
|
||||
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.exclude.color", "blue")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(indexSettings())));
|
||||
.put(indexSettings())
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // NORELEASE: set to randomInt(halfNodes - 1) once replica data loss is fixed
|
||||
));
|
||||
ensureYellow();
|
||||
assertAllShardsOnNodes("test", redFuture.get().toArray(new String[2]));
|
||||
int numDocs = randomIntBetween(100, 150);
|
||||
|
@ -479,9 +480,11 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
numDocs *= 2;
|
||||
|
||||
logger.info(" --> waiting for relocation to complete");
|
||||
ensureGreen("test");// move all shards to the new node (it waits on relocation)
|
||||
ensureGreen("test"); // move all shards to the new nodes (it waits on relocation)
|
||||
|
||||
final int numIters = randomIntBetween(10, 20);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
logger.info(" --> checking iteration {}", i);
|
||||
SearchResponse afterRelocation = client().prepareSearch().setSize(ids.size()).get();
|
||||
assertNoFailures(afterRelocation);
|
||||
assertSearchHits(afterRelocation, ids.toArray(new String[ids.size()]));
|
||||
|
|
Loading…
Reference in New Issue