From c65aff7775ba21b63c464d6a56d1956e4a8404f8 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Tue, 12 Feb 2013 17:02:26 +0100 Subject: [PATCH] Index with no replicas might loose on going documents while relocating a shard fixes #26421 --- ...nsportShardReplicationOperationAction.java | 43 +- .../integration/recovery/RelocationTests.java | 436 ++++++++++++++++++ 2 files changed, 476 insertions(+), 3 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/integration/recovery/RelocationTests.java diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 9003eb6703b..2e71cb3b929 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.*; import org.elasticsearch.action.support.TransportAction; @@ -551,23 +552,56 @@ public abstract class TransportShardReplicationOperationAction response) { - if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) { + if (ignoreReplicas()) { postPrimaryOperation(request, response); listener.onResponse(response.response()); return; } + ShardRouting shard; + // we double check on the state, if it got changed we need to make sure we take the latest one cause // maybe a replica shard started its recovery process and we need to apply it there... + + // we also need to make sure if the new state has a new primary shard (that we indexed to before) started + // and assigned to another node (while the indexing happened). In that case, we want to apply it on the + // new primary shard as well... ClusterState newState = clusterService.state(); + ShardRouting newPrimaryShard = null; if (clusterState != newState) { + shardIt.reset(); + ShardRouting originalPrimaryShard = null; + while ((shard = shardIt.nextOrNull()) != null) { + if (shard.primary()) { + originalPrimaryShard = shard; + break; + } + } + if (originalPrimaryShard == null || !originalPrimaryShard.active()) { + throw new ElasticSearchIllegalStateException("unexpected state, failed to find primary shard on an index operation that succeeded"); + } + clusterState = newState; shardIt = shards(newState, request); + while ((shard = shardIt.nextOrNull()) != null) { + if (shard.primary()) { + if (originalPrimaryShard.currentNodeId().equals(shard.currentNodeId())) { + newPrimaryShard = null; + } else { + newPrimaryShard = shard; + } + break; + } + } } // initialize the counter int replicaCounter = shardIt.assignedReplicasIncludingRelocating(); + if (newPrimaryShard != null) { + replicaCounter++; + } + if (replicaCounter == 0) { postPrimaryOperation(request, response); listener.onResponse(response.response()); @@ -584,10 +618,13 @@ public abstract class TransportShardReplicationOperationAction starting [node1] ..."); + startNode("node1"); + + logger.info("--> creating test index ..."); + client("node1").admin().indices().prepareCreate("test") + .setSettings(ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ) + .execute().actionGet(); + + logger.info("--> index 10 docs"); + for (int i = 0; i < 10; i++) { + client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush so we have an actual index"); + client("node1").admin().indices().prepareFlush().execute().actionGet(); + logger.info("--> index more docs so we have something in the translog"); + for (int i = 10; i < 20; i++) { + client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + + logger.info("--> verifying count"); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client("node1").prepareCount("test").execute().actionGet().count(), equalTo(20l)); + + logger.info("--> start another node"); + startNode("node2"); + ClusterHealthResponse clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + + logger.info("--> relocate the shard from node1 to node2"); + client("node1").admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), "node1", "node2")) + .execute().actionGet(); + + clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + + logger.info("--> verifying count again..."); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + assertThat(client("node1").prepareCount("test").execute().actionGet().count(), equalTo(20l)); + } + + @Test + public void testPrimaryRelocationWhileIndexingWith1RelocationAnd1Writer() throws Exception { + testPrimaryRelocationWhileIndexing(1, 1, false); + } + + @Test + public void testPrimaryRelocationWhileIndexingWith5RelocationAnd1Writer() throws Exception { + testPrimaryRelocationWhileIndexing(5, 1, false); + } + + @Test + public void testPrimaryRelocationWhileIndexingWith10RelocationAnd5Writers() throws Exception { + testPrimaryRelocationWhileIndexing(10, 5, false); + } + +// @Test +// public void testPrimaryRelocationWhileBulkIndexingWith1RelocationAnd1Writer() throws Exception { +// testPrimaryRelocationWhileIndexing(1, 1, true); +// } +// +// @Test +// public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd1Writer() throws Exception { +// testPrimaryRelocationWhileIndexing(10, 1, true); +// } +// +// @Test +// public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd5Writers() throws Exception { +// testPrimaryRelocationWhileIndexing(10, 5, true); +// } + + private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { + logger.info("--> starting [node1] ..."); + startNode("node1"); + + logger.info("--> creating test index ..."); + client("node1").admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ).execute().actionGet(); + + logger.info("--> starting [node2] ..."); + startNode("node2"); + + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong indexCounter = new AtomicLong(); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread[] writers = new Thread[numberOfWriters]; + final CountDownLatch stopLatch = new CountDownLatch(writers.length); + + logger.info("--> starting {} indexing threads", writers.length); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + writers[i] = new Thread() { + @Override + public void run() { + try { + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + if (batch) { + BulkRequestBuilder bulkRequest = client("node1").prepareBulk(); + for (int i = 0; i < 100; i++) { + long id = idGenerator.incrementAndGet(); + if (id % 1000 == 0) { + client("node1").admin().indices().prepareFlush().execute().actionGet(); + } + bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id)) + .setSource("test", "value" + id)); + } + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (!bulkItemResponse.failed()) { + indexCounter.incrementAndGet(); + } else { + logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.failure().getId(), bulkItemResponse.failure().getMessage()); + } + } + } else { + long id = idGenerator.incrementAndGet(); + if (id % 1000 == 0) { + client("node1").admin().indices().prepareFlush().execute().actionGet(); + } + client("node1").prepareIndex("test", "type1", Long.toString(id)) + .setSource("test", "value" + id).execute().actionGet(); + indexCounter.incrementAndGet(); + } + } + logger.info("**** done indexing thread {}", indexerId); + } catch (Exception e) { + logger.warn("**** failed indexing thread {}", e, indexerId); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + logger.info("--> waiting for 2000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 2000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 2000 docs indexed"); + + logger.info("--> starting relocations..."); + for (int i = 0; i < numberOfRelocations; i++) { + String fromNode = "node" + (1 + (i % 2)); + String toNode = "node1".equals(fromNode) ? "node2" : "node1"; + logger.info("--> START relocate the shard from {} to {}", fromNode, toNode); + client("node1").admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode)) + .execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); + } + logger.info("--> done relocations"); + + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + logger.info("--> refreshing the index"); + client("node1").admin().indices().prepareRefresh("test").execute().actionGet(); + logger.info("--> searching the index"); + boolean ranOnce = false; + for (int i = 0; i < 10; i++) { + try { + logger.info("--> START search test round {}", i + 1); + SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().hits(); + ranOnce = true; + if (hits.totalHits() != indexCounter.get()) { + int[] hitIds = new int[(int) indexCounter.get()]; + for (int hit = 0; hit < indexCounter.get(); hit++) { + hitIds[hit] = hit + 1; + } + TIntSet set = new TIntHashSet(hitIds); + for (SearchHit hit : hits.hits()) { + int id = Integer.parseInt(hit.id()); + if (!set.remove(id)) { + logger.error("Extra id [{}]", id); + } + } + set.forEach(new TIntProcedure() { + @Override + public boolean execute(int value) { + logger.error("Missing id [{}]", value); + return true; + } + }); + } + assertThat(hits.totalHits(), equalTo(indexCounter.get())); + logger.info("--> DONE search test round {}", i + 1); + } catch (SearchPhaseExecutionException ex) { + // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? + logger.warn("Got exception while searching.", ex); + } + } + if (!ranOnce) { + assert false; + } + } + + @Test + public void testReplicaRelocationWhileIndexingWith1RelocationAnd1Writer() throws Exception { + testReplicaRelocationWhileIndexing(1, 1, false); + } + + @Test + public void testReplicaRelocationWhileIndexingWith5RelocationAnd1Writer() throws Exception { + testReplicaRelocationWhileIndexing(5, 1, false); + } + + @Test + public void testReplicaRelocationWhileIndexingWith10RelocationAnd5Writers() throws Exception { + testReplicaRelocationWhileIndexing(10, 5, false); + } + + private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception { + logger.info("--> starting [node1] ..."); + startNode("node1"); + + logger.info("--> creating test index ..."); + client("node1").admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + ).execute().actionGet(); + + logger.info("--> starting [node2] ..."); + startNode("node2"); + + ClusterHealthResponse healthResponse = client("node2").admin().cluster().prepareHealth().setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.timedOut(), equalTo(false)); + + logger.info("--> starting [node3] ..."); + startNode("node3"); + + healthResponse = client("node3").admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().execute().actionGet(); + assertThat(healthResponse.timedOut(), equalTo(false)); + + final AtomicLong idGenerator = new AtomicLong(); + final AtomicLong indexCounter = new AtomicLong(); + final AtomicBoolean stop = new AtomicBoolean(false); + Thread[] writers = new Thread[numberOfWriters]; + final CountDownLatch stopLatch = new CountDownLatch(writers.length); + + logger.info("--> starting {} indexing threads", writers.length); + for (int i = 0; i < writers.length; i++) { + final int indexerId = i; + writers[i] = new Thread() { + @Override + public void run() { + try { + logger.info("**** starting indexing thread {}", indexerId); + while (!stop.get()) { + if (batch) { + BulkRequestBuilder bulkRequest = client("node1").prepareBulk(); + for (int i = 0; i < 100; i++) { + long id = idGenerator.incrementAndGet(); + if (id % 1000 == 0) { + client("node1").admin().indices().prepareFlush().execute().actionGet(); + } + bulkRequest.add(client("node1").prepareIndex("test", "type1", Long.toString(id)) + .setSource("test", "value" + id)); + } + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + if (!bulkItemResponse.failed()) { + indexCounter.incrementAndGet(); + } else { + logger.warn("**** failed bulk indexing thread {}, {}/{}", indexerId, bulkItemResponse.failure().getId(), bulkItemResponse.failure().getMessage()); + } + } + } else { + long id = idGenerator.incrementAndGet(); + if (id % 1000 == 0) { + client("node1").admin().indices().prepareFlush().execute().actionGet(); + } + client("node1").prepareIndex("test", "type1", Long.toString(id)) + .setSource("test", "value" + id).execute().actionGet(); + indexCounter.incrementAndGet(); + } + } + logger.info("**** done indexing thread {}", indexerId); + } catch (Exception e) { + logger.warn("**** failed indexing thread {}", e, indexerId); + } finally { + stopLatch.countDown(); + } + } + }; + writers[i].start(); + } + + logger.info("--> waiting for 2000 docs to be indexed ..."); + while (client("node1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count() < 2000) { + Thread.sleep(100); + client("node1").admin().indices().prepareRefresh().execute().actionGet(); + } + logger.info("--> 2000 docs indexed"); + + logger.info("--> starting relocations..."); + for (int i = 0; i < numberOfRelocations; i++) { + String fromNode = "node" + (2 + (i % 2)); + String toNode = "node2".equals(fromNode) ? "node3" : "node2"; + logger.info("--> START relocate the shard from {} to {}", fromNode, toNode); + client("node1").admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(new ShardId("test", 0), fromNode, toNode)) + .execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + clusterHealthResponse = client("node2").admin().cluster().prepareHealth().setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.timedOut(), equalTo(false)); + logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); + } + logger.info("--> done relocations"); + + logger.info("--> marking and waiting for indexing threads to stop ..."); + stop.set(true); + stopLatch.await(); + logger.info("--> indexing threads stopped"); + + logger.info("--> refreshing the index"); + client("node1").admin().indices().prepareRefresh("test").execute().actionGet(); + logger.info("--> searching the index"); + boolean ranOnce = false; + for (int i = 0; i < 10; i++) { + try { + logger.info("--> START search test round {}", i + 1); + SearchHits hits = client("node1").prepareSearch("test").setQuery(matchAllQuery()).setSize((int) indexCounter.get()).setNoFields().execute().actionGet().hits(); + ranOnce = true; + if (hits.totalHits() != indexCounter.get()) { + int[] hitIds = new int[(int) indexCounter.get()]; + for (int hit = 0; hit < indexCounter.get(); hit++) { + hitIds[hit] = hit + 1; + } + TIntSet set = new TIntHashSet(hitIds); + for (SearchHit hit : hits.hits()) { + int id = Integer.parseInt(hit.id()); + if (!set.remove(id)) { + logger.error("Extra id [{}]", id); + } + } + set.forEach(new TIntProcedure() { + @Override + public boolean execute(int value) { + logger.error("Missing id [{}]", value); + return true; + } + }); + } + assertThat(hits.totalHits(), equalTo(indexCounter.get())); + logger.info("--> DONE search test round {}", i + 1); + } catch (SearchPhaseExecutionException ex) { + // TODO: the first run fails with this failure, waiting for relocating nodes set to 0 is not enough? + logger.warn("Got exception while searching.", ex); + } + } + if (!ranOnce) { + assert false; + } + } +}