From 3b4b05e2c971d4e5614a1684bc0d1ea94e6d3b9b Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 30 Oct 2013 14:12:50 -0400 Subject: [PATCH] Fix bug in TransportShardReplicationOperationAction retry mechanism This issue was causing some index requests against shards in POST_RECOVERY state to hang. --- ...nsportShardReplicationOperationAction.java | 18 +++-- .../recovery/RecoveryWhileUnderLoadTests.java | 73 ++++++++++++++++++- 2 files changed, 81 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index d4896e841ab..2bc0e77b5e6 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -422,14 +422,14 @@ public abstract class TransportShardReplicationOperationAction response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request)); performReplicas(response); @@ -544,7 +548,7 @@ public abstract class TransportShardReplicationOperationAction creating test index ..."); - prepareCreate("test", 1); + assertAcked(prepareCreate("test", 1)); final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); @@ -135,7 +137,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { @Slow public void recoverWhileUnderLoadAllocateBackupsRelocatePrimariesTest() throws Exception { logger.info("--> creating test index ..."); - prepareCreate("test", 1); + assertAcked(prepareCreate("test", 1)); final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); @@ -209,7 +211,7 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { @Slow public void recoverWhileUnderLoadWithNodeShutdown() throws Exception { logger.info("--> creating test index ..."); - prepareCreate("test", 2); + assertAcked(prepareCreate("test", 2)); final AtomicLong idGenerator = new AtomicLong(); final AtomicLong indexCounter = new AtomicLong(); @@ -297,6 +299,71 @@ public class RecoveryWhileUnderLoadTests extends AbstractIntegrationTest { } + @Test + @TestLogging("action.search.type:TRACE,action.admin.indices.refresh:TRACE,action.index:TRACE,action.support.replication:TRACE,cluster.service:DEBUG") + @Slow + public void recoverWhileRelocating() throws Exception { + final int numShards = between(5, 10); + final int numReplicas = 0; + cluster().ensureAtLeastNumNodes(3); + logger.info("--> creating test index ..."); + int allowNodes = 2; + assertAcked(prepareCreate("test").setSettings(randomSettingsBuilder().put("number_of_shards", numShards).put("number_of_replicas", numReplicas).build())); + + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong indexCounter = new AtomicLong(); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread[] writers = new Thread[5]; + final CountDownLatch stopLatch = new CountDownLatch(writers.length); + logger.info("--> starting {} indexing threads", writers.length); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + final Client client = client(); + writers[i] = new Thread() { + @Override + public void run() { + try { + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + long id = idGenerator.incrementAndGet(); + client.prepareIndex("test", "type1", Long.toString(id) + "-" + indexerId) + .setSource(MapBuilder.newMapBuilder().put("test", "value" + id).map()).execute().actionGet(); + indexCounter.incrementAndGet(); + } + logger.info("**** done indexing thread {}", indexerId); + } catch (Throwable e) { + logger.warn("**** failed indexing thread {}", e, indexerId); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + for (int i = 0; i < 100000; i += 1000) { + logger.info("--> waiting for {} docs to be indexed ...", i); + waitForDocs(i); + logger.info("--> {} docs indexed", i); + allowNodes = 2 / allowNodes; + allowNodes("test", allowNodes); + logger.info("--> waiting for GREEN health status ..."); + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false)); + } + + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForYellowStatus().execute().actionGet().isTimedOut(), equalTo(false)); + + logger.info("--> refreshing the index"); + refreshAndAssert(); + logger.info("--> verifying indexed content"); + iterateAssertCount(5, indexCounter.get(), 10); + } + private void iterateAssertCount(final int numberOfShards, final long numberOfDocs, final int iterations) throws Exception { SearchResponse[] iterationResults = new SearchResponse[iterations]; boolean error = false;