Fail replica shards locally upon failures
When a replication operation (index/delete/update) fails to be executed properly, we fail the replica and allow master to allocate a new copy of it. At the moment, the node hosting the primary shard is responsible of notifying the master of a failed replica. However, if the replica shard is initializing (`POST_RECOVERY` state), we have a racing condition between the failed shard message and moving the shard into the `STARTED` state. If the latter happen first, master will fail to resolve the fail shard message. This commit builds on #5800 and fails the engine of the replica shard if a replication operation fails. This protects us against the above as the shard will reject the `STARTED` command from master. It also makes us more resilient to other racing conditions in this area. Closes #5847
This commit is contained in:
parent
b6515e2979
commit
12bbe28649
|
@ -22,7 +22,6 @@ package org.elasticsearch.action.support.replication;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
|
@ -46,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
|
||||
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
|
||||
extends TransportAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.action.support.replication;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -42,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
|
|||
/**
|
||||
*/
|
||||
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
|
||||
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
|
||||
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
|
||||
extends TransportAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
|
|
|
@ -22,7 +22,10 @@ package org.elasticsearch.action.support.replication;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.*;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
|
@ -44,6 +47,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.service.IndexService;
|
||||
import org.elasticsearch.index.shard.service.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
@ -54,11 +59,9 @@ import java.io.IOException;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
|
||||
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ShardReplicationOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
|
||||
|
||||
protected final TransportService transportService;
|
||||
protected final ClusterService clusterService;
|
||||
|
@ -242,7 +245,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
@Override
|
||||
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
|
||||
shardOperationOnReplica(request);
|
||||
try {
|
||||
shardOperationOnReplica(request);
|
||||
} catch (Throwable t) {
|
||||
failReplicaIfNeeded(request.request.index(), request.shardId, t);
|
||||
throw t;
|
||||
}
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
@ -700,7 +708,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
|
||||
if (!nodeId.equals(clusterState.nodes().localNodeId())) {
|
||||
DiscoveryNode node = clusterState.nodes().get(nodeId);
|
||||
final DiscoveryNode node = clusterState.nodes().get(nodeId);
|
||||
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty vResponse) {
|
||||
|
@ -710,9 +718,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
if (!ignoreReplicaException(exp.unwrapCause())) {
|
||||
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), exp);
|
||||
logger.warn("Failed to perform " + transportAction + " on remote replica " + node + shardIt.shardId(), exp);
|
||||
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
|
||||
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(exp) + "]");
|
||||
"Failed to perform [" + transportAction + "] on replica, message [" + ExceptionsHelper.detailedMessage(exp) + "]");
|
||||
}
|
||||
finishIfPossible();
|
||||
}
|
||||
|
@ -733,11 +741,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
try {
|
||||
shardOperationOnReplica(shardRequest);
|
||||
} catch (Throwable e) {
|
||||
if (!ignoreReplicaException(e)) {
|
||||
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
|
||||
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
|
||||
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
||||
}
|
||||
failReplicaIfNeeded(shard.index(), shard.id(), e);
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(response.response());
|
||||
|
@ -751,11 +755,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
if (!ignoreReplicaException(e)) {
|
||||
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
|
||||
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
|
||||
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
||||
}
|
||||
failReplicaIfNeeded(shard.index(), shard.id(), e);
|
||||
// we want to decrement the counter here, in teh failure handling, cause we got rejected
|
||||
// from executing on the thread pool
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
|
@ -766,11 +766,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
try {
|
||||
shardOperationOnReplica(shardRequest);
|
||||
} catch (Throwable e) {
|
||||
if (!ignoreReplicaException(e)) {
|
||||
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
|
||||
shardStateAction.shardFailed(shard, indexMetaData.getUUID(),
|
||||
"Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
|
||||
}
|
||||
failReplicaIfNeeded(shard.index(), shard.id(), e);
|
||||
}
|
||||
if (counter.decrementAndGet() == 0) {
|
||||
listener.onResponse(response.response());
|
||||
|
@ -778,6 +774,23 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
|
||||
if (!ignoreReplicaException(t)) {
|
||||
IndexService indexService = indicesService.indexService(index);
|
||||
if (indexService == null) {
|
||||
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
|
||||
return;
|
||||
}
|
||||
IndexShard indexShard = indexService.shard(shardId);
|
||||
if (indexShard == null) {
|
||||
logger.debug("ignoring failed replica [{}][{}] because index was already removed.", index, shardId);
|
||||
return;
|
||||
}
|
||||
indexShard.failShard(transportAction + " failed on replica", t);
|
||||
}
|
||||
}
|
||||
|
||||
public static class PrimaryResponse<Response, ReplicaRequest> {
|
||||
|
|
|
@ -134,8 +134,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
|||
|
||||
void recover(RecoveryHandler recoveryHandler) throws EngineException;
|
||||
|
||||
/** fail engine due to some error. the engine will also be closed. */
|
||||
void failEngine(String reason, @Nullable Throwable failure);
|
||||
|
||||
static interface FailedEngineListener {
|
||||
void onFailedEngine(ShardId shardId, Throwable t);
|
||||
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -209,7 +209,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
@Override
|
||||
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
|
||||
ByteSizeValue preValue = this.indexingBufferSize;
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
this.indexingBufferSize = indexingBufferSize;
|
||||
IndexWriter indexWriter = this.indexWriter;
|
||||
if (indexWriter != null) {
|
||||
|
@ -309,7 +309,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
public GetResult get(Get get) throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
if (get.realtime()) {
|
||||
VersionValue versionValue = versionMap.get(versionKey(get.uid()));
|
||||
if (versionValue != null) {
|
||||
|
@ -368,7 +368,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void create(Create create) throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -378,14 +378,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
possibleMergeNeeded = true;
|
||||
flushNeeded = true;
|
||||
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
|
||||
maybeFailEngine(t);
|
||||
throw new CreateFailedEngineException(shardId, create, t);
|
||||
maybeFailEngine(t);
|
||||
throw new CreateFailedEngineException(shardId, create, t);
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeFailEngine(Throwable t) {
|
||||
if (t instanceof OutOfMemoryError || (t instanceof IllegalStateException && t.getMessage().contains("OutOfMemoryError"))) {
|
||||
failEngine(t);
|
||||
failEngine("out of memory", t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -451,7 +451,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void index(Index index) throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -523,7 +523,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void delete(Delete delete) throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -587,7 +587,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public void delete(DeleteByQuery delete) throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
IndexWriter writer = this.indexWriter;
|
||||
if (writer == null) {
|
||||
throw new EngineClosedException(shardId);
|
||||
|
@ -645,8 +645,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
return dirty || !searcherManager.isSearcherCurrent();
|
||||
} catch (IOException e) {
|
||||
logger.error("failed to access searcher manager", e);
|
||||
failEngine(e);
|
||||
throw new EngineException(shardId, "failed to access searcher manager",e);
|
||||
failEngine("failed to access searcher manager", e);
|
||||
throw new EngineException(shardId, "failed to access searcher manager", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -668,7 +668,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
|
||||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
// maybeRefresh will only allow one refresh to execute, and the rest will "pass through",
|
||||
// but, we want to make sure not to loose ant refresh calls, if one is taking time
|
||||
|
@ -687,7 +687,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
} catch (EngineClosedException e) {
|
||||
throw e;
|
||||
} catch (Throwable t) {
|
||||
failEngine(t);
|
||||
failEngine("refresh failed", t);
|
||||
throw new RefreshFailedEngineException(shardId, t);
|
||||
}
|
||||
}
|
||||
|
@ -746,7 +746,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
}
|
||||
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter indexWriter = currentIndexWriter();
|
||||
if (onGoingRecoveries.get() > 0) {
|
||||
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
|
||||
|
@ -774,7 +774,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
|
||||
// translog on an index that was opened on a committed point in time that is "in the future"
|
||||
// of that translog
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter indexWriter = currentIndexWriter();
|
||||
// we allow to *just* commit if there is an ongoing recovery happening...
|
||||
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
|
||||
|
@ -792,7 +792,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
// reread the last committed segment infos
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
readLastCommittedSegmentsInfo();
|
||||
} catch (Throwable e) {
|
||||
|
@ -801,14 +801,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
}
|
||||
|
||||
} catch (FlushFailedEngineException ex){
|
||||
maybeFailEngine(ex.getCause());
|
||||
throw ex;
|
||||
} catch (FlushFailedEngineException ex) {
|
||||
maybeFailEngine(ex.getCause());
|
||||
throw ex;
|
||||
} finally {
|
||||
flushLock.unlock();
|
||||
flushing.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (indexWriter == null) {
|
||||
throw new EngineClosedException(shardId, failedEngine);
|
||||
|
@ -817,6 +818,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
/**
|
||||
* Returns the current index writer. This method will never return <code>null</code>
|
||||
*
|
||||
* @throws EngineClosedException if the engine is already closed
|
||||
*/
|
||||
private IndexWriter currentIndexWriter() {
|
||||
|
@ -857,7 +859,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
return;
|
||||
}
|
||||
possibleMergeNeeded = false;
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
Merges.maybeMerge(currentIndexWriter());
|
||||
} catch (Throwable t) {
|
||||
maybeFailEngine(t);
|
||||
|
@ -872,7 +874,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
if (optimizeMutex.compareAndSet(false, true)) {
|
||||
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter writer = currentIndexWriter();
|
||||
|
||||
if (writer.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy) {
|
||||
|
@ -923,7 +925,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public SnapshotIndexCommit snapshotIndex() throws EngineException {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
|
||||
ensureOpen();
|
||||
return deletionPolicy.snapshot();
|
||||
|
@ -1002,7 +1004,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public SegmentsStats segmentsStats() {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Searcher searcher = acquireSearcher("segments_stats");
|
||||
try {
|
||||
|
@ -1019,7 +1021,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
|
||||
@Override
|
||||
public List<Segment> segments() {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
Map<String, Segment> segments = new HashMap<>();
|
||||
|
||||
|
@ -1133,21 +1135,28 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
|
||||
@Override
|
||||
public void onFailedMerge(MergePolicy.MergeException e) {
|
||||
failEngine(e);
|
||||
failEngine("merge exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void failEngine(Throwable failure) {
|
||||
@Override
|
||||
public void failEngine(String reason, @Nullable Throwable failure) {
|
||||
if (failEngineLock.tryLock()) {
|
||||
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
|
||||
if (failedEngine != null) {
|
||||
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
logger.warn("failed engine", failure);
|
||||
failedEngine = failure;
|
||||
logger.warn("failed engine [{}]", reason, failure);
|
||||
// we must set a failure exception, generate one if not supplied
|
||||
if (failure == null) {
|
||||
failedEngine = new EngineException(shardId(), reason);
|
||||
} else {
|
||||
failedEngine = failure;
|
||||
}
|
||||
for (FailedEngineListener listener : failedEngineListeners) {
|
||||
listener.onFailedEngine(shardId, failure);
|
||||
listener.onFailedEngine(shardId, reason, failure);
|
||||
}
|
||||
} finally {
|
||||
// close the engine whatever happens...
|
||||
|
@ -1155,7 +1164,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
}
|
||||
|
||||
} else {
|
||||
logger.debug("Tried to fail engine but could not acquire lock - engine should be failed by now", failure);
|
||||
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1229,7 +1238,9 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
assert isMergedSegment(reader);
|
||||
final Engine.Searcher searcher = new SimpleSearcher("warmer", new IndexSearcher(reader));
|
||||
final IndicesWarmer.WarmerContext context = new IndicesWarmer.WarmerContext(shardId, searcher);
|
||||
if (warmer != null) warmer.warm(context);
|
||||
if (warmer != null) {
|
||||
warmer.warm(context);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// Don't fail a merge if the warm-up failed
|
||||
if (!closed) {
|
||||
|
@ -1281,7 +1292,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
!codecName.equals(InternalEngine.this.codecName) ||
|
||||
failOnMergeFailure != InternalEngine.this.failOnMergeFailure ||
|
||||
codecBloomLoad != codecService.isLoadBloomFilter()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
if (indexConcurrency != InternalEngine.this.indexConcurrency) {
|
||||
logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngine.this.indexConcurrency, indexConcurrency);
|
||||
InternalEngine.this.indexConcurrency = indexConcurrency;
|
||||
|
@ -1488,11 +1499,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
endRecovery();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final class InternalLock implements Releasable {
|
||||
private final ThreadLocal<Boolean> lockIsHeld;
|
||||
private final Lock lock;
|
||||
|
||||
|
||||
InternalLock(Lock lock) {
|
||||
ThreadLocal<Boolean> tl = null;
|
||||
assert (tl = new ThreadLocal<>()) != null;
|
||||
|
@ -1511,8 +1522,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
|||
assert onAssertLock();
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
protected boolean onAssertRelease() {
|
||||
lockIsHeld.set(Boolean.FALSE);
|
||||
return true;
|
||||
|
|
|
@ -156,6 +156,8 @@ public interface IndexShard extends IndexShardComponent {
|
|||
|
||||
void recover(Engine.RecoveryHandler recoveryHandler) throws EngineException;
|
||||
|
||||
void failShard(String reason, @Nullable Throwable e);
|
||||
|
||||
Engine.Searcher acquireSearcher(String source);
|
||||
|
||||
Engine.Searcher acquireSearcher(String source, Mode mode);
|
||||
|
|
|
@ -630,6 +630,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
engine.recover(recoveryHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failShard(String reason, @Nullable Throwable e) {
|
||||
// fail the engine. This will cause this shard to also be removed from the node's index service.
|
||||
engine.failEngine(reason, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Searcher acquireSearcher(String source) {
|
||||
return acquireSearcher(source, Mode.READ);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
|
@ -799,7 +800,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
|
||||
private class FailedEngineHandler implements Engine.FailedEngineListener {
|
||||
@Override
|
||||
public void onFailedEngine(final ShardId shardId, final Throwable failure) {
|
||||
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
|
||||
ShardRouting shardRouting = null;
|
||||
final IndexService indexService = indicesService.indexService(shardId.index().name());
|
||||
if (indexService != null) {
|
||||
|
@ -809,29 +810,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
}
|
||||
}
|
||||
if (shardRouting == null) {
|
||||
logger.warn("[{}][{}] engine failed, but can't find index shard", shardId.index().name(), shardId.id());
|
||||
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]",
|
||||
shardId.index().name(), shardId.id(), reason);
|
||||
return;
|
||||
}
|
||||
final ShardRouting fShardRouting = shardRouting;
|
||||
final String indexUUID = indexService.indexUUID(); // we know indexService is not null here.
|
||||
final String failureMessage = "engine failure, message [" + reason + "]" +
|
||||
(failure == null ? "" : "[" + detailedMessage(failure) + "]");
|
||||
threadPool.generic().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (mutex) {
|
||||
if (indexService.hasShard(shardId.id())) {
|
||||
try {
|
||||
indexService.removeShard(shardId.id(), "engine failure [" + ExceptionsHelper.detailedMessage(failure) + "]");
|
||||
|
||||
indexService.removeShard(shardId.id(), failureMessage);
|
||||
} catch (IndexShardMissingException e) {
|
||||
// the node got closed on us, ignore it
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("[{}][{}] failed to delete shard after failed engine", e1, indexService.index().name(), shardId.id());
|
||||
logger.warn("[{}][{}] failed to delete shard after failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason);
|
||||
}
|
||||
}
|
||||
try {
|
||||
failedShards.put(fShardRouting.shardId(), new FailedShard(fShardRouting.version()));
|
||||
shardStateAction.shardFailed(fShardRouting, indexUUID, "engine failure, message [" + detailedMessage(failure) + "]");
|
||||
shardStateAction.shardFailed(fShardRouting, indexUUID, failureMessage);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine", e1, indexService.index().name(), shardId.id());
|
||||
logger.warn("[{}][{}] failed to mark shard as failed after a failed engine ([{}])", e1, indexService.index().name(), shardId.id(), reason);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue