Fix bug in TransportShardReplicationOperationAction retry mechanism

This issue was causing some index requests against shards in POST_RECOVERY state to hang.
This commit is contained in:
Igor Motov 2013-10-30 14:12:50 -04:00
parent 5d46e69154
commit 3b4b05e2c9
2 changed files with 81 additions and 10 deletions

View File

@ -422,14 +422,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void run() {
try {
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
performOnPrimary(shard.id(), shard, clusterState);
} catch (Throwable t) {
listener.onFailure(t);
}
}
});
} else {
performOnPrimary(shard.id(), fromClusterEvent, shard, clusterState);
performOnPrimary(shard.id(), shard, clusterState);
}
} catch (Throwable t) {
listener.onFailure(t);
@ -490,9 +490,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
public void postAdded() {
logger.trace("listener to cluster state added, trying to index again");
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
// check if state version changed while we were adding this listener
if (clusterState.version() != clusterService.state().version()) {
logger.trace("state change while we were trying to add listener, trying to index again");
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
}
@ -535,7 +539,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final ShardRouting shard, ClusterState clusterState) {
void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
performReplicas(response);
@ -544,7 +548,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
retry(fromDiscoveryListener, null);
retry(false, null);
return;
}
if (e instanceof ElasticSearchException && ((ElasticSearchException) e).status() == RestStatus.CONFLICT) {

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.junit.annotations.TestLogging;
import org.elasticsearch.test.AbstractIntegrationTest;
@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
@ -58,7 +60,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
@Slow
public void recoverWhileUnderLoadAllocateBackupsTest() throws Exception {
logger.info("--> creating test index ...");
prepareCreate("test", 1);
assertAcked(prepareCreate("test", 1));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
@ -135,7 +137,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
@Slow
public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception {
logger.info("--> creating test index ...");
prepareCreate("test", 1);
assertAcked(prepareCreate("test", 1));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
@ -209,7 +211,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
@Slow
public void recoverWhileUnderLoadWithNodeShutdown() throws Exception {
logger.info("--> creating test index ...");
prepareCreate("test", 2);
assertAcked(prepareCreate("test", 2));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
@ -297,6 +299,71 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest {
}
@Test
@TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE,action.index:TRACE,action.support.replication:TRACE,cluster.service:DEBUG")
@Slow
public void recoverWhileRelocating() throws Exception {
final int numShards = between(5, 10);
final int numReplicas = 0;
cluster().ensureAtLeastNumNodes(3);
logger.info("--> creating test index ...");
int allowNodes = 2;
assertAcked(prepareCreate("test").setSettings(randomSettingsBuilder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build()));
final AtomicLong idGenerator = new AtomicLong();
final AtomicLong indexCounter = new AtomicLong();
final AtomicBoolean stop = new AtomicBoolean(false);
Thread[] writers = new Thread[5];
final CountDownLatch stopLatch = new CountDownLatch(writers.length);
logger.info("--> starting {} indexing threads", writers.length);
for (int i = 0; i < writers.length; i++) {
final int indexerId = i;
final Client client = client();
writers[i] = new Thread() {
@Override
public void run() {
try {
logger.info("**** starting indexing thread {}", indexerId);
while (!stop.get()) {
long id = idGenerator.incrementAndGet();
client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId)
.setSource(MapBuilder.<String, Object>newMapBuilder().put("test", "value" + id).map()).execute().actionGet();
indexCounter.incrementAndGet();
}
logger.info("**** done indexing thread {}", indexerId);
} catch (Throwable e) {
logger.warn("**** failed indexing thread {}", e, indexerId);
} finally {
stopLatch.countDown();
}
}
};
writers[i].start();
}
for (int i = 0; i < 100000; i += 1000) {
logger.info("--> waiting for {} docs to be indexed ...", i);
waitForDocs(i);
logger.info("--> {} docs indexed", i);
allowNodes = 2 / allowNodes;
allowNodes("test", allowNodes);
logger.info("--> waiting for GREEN health status ...");
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
}
logger.info("--> marking and waiting for indexing threads to stop ...");
stop.set(true);
stopLatch.await();
logger.info("--> indexing threads stopped");
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().execute().actionGet().isTimedOut(), equalTo(false));
logger.info("--> refreshing the index");
refreshAndAssert();
logger.info("--> verifying indexed content");
iterateAssertCount(5, indexCounter.get(), 10);
}
private void iterateAssertCount(final int numberOfShards, final long numberOfDocs, final int iterations) throws Exception {
SearchResponse[] iterationResults = new SearchResponse[iterations];
boolean error = false;