Engine: Let AlreadyClosedException and EngineClosedException bubble up

Whe we call optimize we ignore Exceptions that indicate a closed shard.
However, when a shard is closed while an optimize request is in flight it
might also trigger an AlreadyClosedException from the IndexWriter when we
get the config or ForceMergeFailedEngineException with the EngineClosedException
wrapped inside. Because these are not identified as exceptions that indicate
a closed shard (TransportActions.isShardNotAvailableException(..)) optimize
would sometimes report failures when shards were relocating while optimize was called
and sometimes not. This caused weird test failures, see #13266 .
Instead, we should let EngineClosedException bubble up and also recognize
AlreadyClosedException as an indicator for a closed shard.
This commit is contained in:
Britta Weber 2015-09-08 14:23:34 +02:00
parent 11c87106ce
commit 2288d44eeb
10 changed files with 55 additions and 54 deletions

View File

@ -583,7 +583,6 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.index.query.QueryParsingException.class,
org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class,
org.elasticsearch.index.engine.DeleteByQueryFailedEngineException.class,
org.elasticsearch.index.engine.ForceMergeFailedEngineException.class,
org.elasticsearch.discovery.MasterNotDiscoveredException.class,
org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException.class,
org.elasticsearch.node.NodeClosedException.class,

View File

@ -74,7 +74,7 @@ public class TransportOptimizeAction extends TransportBroadcastByNodeAction<Opti
}
@Override
protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) {
protected EmptyResult shardOperation(OptimizeRequest request, ShardRouting shardRouting) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
indexShard.optimize(request);
return EmptyResult.INSTANCE;

View File

@ -119,7 +119,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
}
@Override
protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) {
protected ShardUpgradeResult shardOperation(UpgradeRequest request, ShardRouting shardRouting) throws IOException {
IndexShard indexShard = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).shardSafe(shardRouting.shardId().id());
org.apache.lucene.util.Version oldestLuceneSegment = indexShard.upgrade(request);
// We are using the current version of Elasticsearch as upgrade version since we update mapping to match the current version

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.support;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
@ -36,7 +37,8 @@ public class TransportActions {
actual instanceof IndexNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException) {
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException) {
return true;
}
return false;

View File

@ -181,7 +181,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
* @param shardRouting the shard on which to execute the operation
* @return the result of the shard-level operation for the shard
*/
protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting);
protected abstract ShardOperationResult shardOperation(Request request, ShardRouting shardRouting) throws IOException;
/**
* Determines the shards on which this operation will be executed on. The operation is executed once per shard.

View File

@ -502,14 +502,14 @@ public abstract class Engine implements Closeable {
/**
* Optimizes to 1 segment
*/
public void forceMerge(boolean flush) {
public void forceMerge(boolean flush) throws IOException {
forceMerge(flush, 1, false, false, false);
}
/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException;
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
/**
* Snapshots the index and returns a handle to it. If needed will try and "commit" the

View File

@ -1,39 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
/**
*
*/
public class ForceMergeFailedEngineException extends EngineException {
public ForceMergeFailedEngineException(ShardId shardId, Throwable t) {
super(shardId, "force merge failed", t);
}
public ForceMergeFailedEngineException(StreamInput in) throws IOException{
super(in);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
@ -823,7 +824,7 @@ public class InternalEngine extends Engine {
@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
@ -865,9 +866,8 @@ public class InternalEngine extends Engine {
store.decRef();
}
} catch (Throwable t) {
ForceMergeFailedEngineException ex = new ForceMergeFailedEngineException(shardId, t);
maybeFailEngine("force merge", ex);
throw ex;
maybeFailEngine("force merge", t);
throw t;
} finally {
try {
mp.setUpgradeInProgress(false, false); // reset it just to make sure we reset it in a case of an error

View File

@ -697,7 +697,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public void optimize(OptimizeRequest optimize) {
public void optimize(OptimizeRequest optimize) throws IOException {
verifyStarted();
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
@ -708,7 +708,7 @@ public class IndexShard extends AbstractIndexShardComponent {
/**
* Upgrades the shard to the current version of Lucene and returns the minimum segment version
*/
public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) {
public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOException {
verifyStarted();
if (logger.isTraceEnabled()) {
logger.trace("upgrade with {}", upgrade);

View File

@ -41,6 +41,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityIT;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Base64;
@ -94,6 +95,7 @@ import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
@ -1000,8 +1002,7 @@ public class InternalEngineTests extends ESTestCase {
indexed.countDown();
try {
engine.forceMerge(randomBoolean(), 1, false, randomBoolean(), randomBoolean());
} catch (ForceMergeFailedEngineException ex) {
// ok
} catch (IOException e) {
return;
}
}
@ -2019,4 +2020,42 @@ public class InternalEngineTests extends ESTestCase {
assertThat(topDocs.totalHits, equalTo(numDocs));
}
}
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Throwable> throwable = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");
Thread mergeThread = new Thread() {
@Override
public void run() {
boolean stop = false;
logger.info("try with {}", operation);
while (stop == false) {
try {
switch (operation) {
case "optimize": {
engine.forceMerge(true, 1, false, false, false);
break;
}
case "refresh": {
engine.refresh("test refresh");
break;
}
case "flush": {
engine.flush(true, false);
break;
}
}
} catch (Throwable t) {
throwable.set(t);
stop = true;
}
}
}
};
mergeThread.start();
engine.close();
mergeThread.join();
logger.info("exception caught: ", throwable.get());
assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get()));
}
}