diff --git a/src/main/java/org/elasticsearch/index/IndexService.java b/src/main/java/org/elasticsearch/index/IndexService.java index 548c714be0f..fa575e8e884 100644 --- a/src/main/java/org/elasticsearch/index/IndexService.java +++ b/src/main/java/org/elasticsearch/index/IndexService.java @@ -458,7 +458,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone } } catch (IOException e) { indicesServices.addPendingDelete(lock.getShardId(), indexSettings); - logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId().id()); + logger.debug("[{}] failed to delete shard content - scheduled a retry", e, lock.getShardId().id()); } } } diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 18ce08ff52d..5eba1a29fc6 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1045,8 +1045,15 @@ public abstract class Engine implements Closeable { protected abstract SearcherManager getSearcherManager(); + /** + * Method to close the engine while the write lock is held. + */ protected abstract void closeNoLock(String reason); + /** + * Flush the engine (committing segments to disk and truncating the + * translog) and close it. + */ public void flushAndClose() throws IOException { if (isClosed.get() == false) { logger.trace("flushAndClose now acquire writeLock"); diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 84b64722cfd..e03cb7b97f6 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,7 +669,7 @@ public class InternalEngine extends Engine { if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf if (waitIfOngoing) { - logger.trace("waiting fore in-flight flush to finish"); + logger.trace("waiting for in-flight flush to finish"); flushLock.lock(); logger.trace("acquired flush lock after blocking"); } else { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index cb489f82233..c29cd2312a0 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -94,7 +94,7 @@ public class RecoverySourceHandler { private final IndexService indexService; private final MappingUpdatedAction mappingUpdatedAction; - private final RecoveryResponse response; + protected final RecoveryResponse response; private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override protected void onCancel(String reason, @Nullable Throwable suppressedException) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index d06bd46405a..5aa698e176b 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -22,13 +22,17 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked * as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock. @@ -37,6 +41,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { private final IndexShard shard; private final StartRecoveryRequest request; + private static final Translog.View EMPTY_VIEW = new EmptyView(); public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) { super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger); @@ -45,24 +50,78 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { } @Override - public void phase1(SnapshotIndexCommit snapshot, final Translog.View translogView) { - if (request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary()) { - // here we simply fail the primary shard since we can't move them (have 2 writers open at the same time) - // by failing the shard we play safe and just go through the entire reallocation procedure of the primary - // it would be ideal to make sure we flushed the translog here but that is not possible in the current design. - IllegalStateException exception = new IllegalStateException("Can't relocate primary - failing"); - shard.failShard("primary_relocation", exception); - throw exception; + public RecoveryResponse recoverToTarget() { + boolean engineClosed = false; + try { + logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode()); + if (isPrimaryRelocation()) { + logger.debug("[phase1] closing engine on primary for shared filesystem recovery"); + try { + // if we relocate we need to close the engine in order to open a new + // IndexWriter on the other end of the relocation + engineClosed = true; + shard.engine().flushAndClose(); + } catch (IOException e) { + logger.warn("close engine failed", e); + shard.failShard("failed to close engine (phase1)", e); + } + } + prepareTargetForTranslog(EMPTY_VIEW); + finalizeRecovery(); + return response; + } catch (Throwable t) { + if (engineClosed) { + // If the relocation fails then the primary is closed and can't be + // used anymore... (because it's closed) that's a problem, so in + // that case, fail the shard to reallocate a new IndexShard and + // create a new IndexWriter + logger.info("recovery failed for primary shadow shard, failing shard"); + shard.failShard("primary relocation failed on shared filesystem", t); + } else { + logger.info("recovery failed on shared filesystem", t); + } + throw t; } - logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode()); - prepareTargetForTranslog(translogView); } - @Override protected int sendSnapshot(Translog.Snapshot snapshot) { - logger.trace("{} recovery [phase2] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode()); + logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}", + shard.shardId(), request.targetNode()); return 0; } + private boolean isPrimaryRelocation() { + return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); + } + + /** + * An empty view since we don't recover from translog even in the shared FS case + */ + private static class EmptyView implements Translog.View { + + @Override + public int totalOperations() { + return 0; + } + + @Override + public long sizeInBytes() { + return 0; + } + + @Override + public Translog.Snapshot snapshot() { + return null; + } + + @Override + public long minTranslogId() { + return 0; + } + + @Override + public void close() { + } + } } diff --git a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java index c0a63d22922..b3a7b97ecfd 100644 --- a/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java +++ b/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasTests.java @@ -19,30 +19,42 @@ package org.elasticsearch.index; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShadowIndexShard; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.*; import org.junit.Test; +import java.io.IOException; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static com.google.common.collect.Lists.newArrayList; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; @@ -309,6 +321,174 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest { assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar")); } + @Test + public void testPrimaryRelocationWithConcurrentIndexing() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .build(); + + String node1 = internalCluster().startNode(nodeSettings); + Path dataPath = createTempDir(); + final String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureYellow(IDX); + // Node1 has the primary, now node2 has the replica + String node2 = internalCluster().startNode(nodeSettings); + ensureGreen(IDX); + flushAndRefresh(IDX); + String node3 = internalCluster().startNode(nodeSettings); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch started = new CountDownLatch(1); + + final int numPhase1Docs = scaledRandomIntBetween(25, 200); + final int numPhase2Docs = scaledRandomIntBetween(25, 200); + final CountDownLatch phase1finished = new CountDownLatch(1); + final CountDownLatch phase2finished = new CountDownLatch(1); + + Thread thread = new Thread() { + @Override + public void run() { + started.countDown(); + while (counter.get() < (numPhase1Docs + numPhase2Docs)) { + final IndexResponse indexResponse = client().prepareIndex(IDX, "doc", + Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get(); + assertTrue(indexResponse.isCreated()); + final int docCount = counter.get(); + if (docCount == numPhase1Docs) { + phase1finished.countDown(); + } + } + logger.info("--> stopping indexing thread"); + phase2finished.countDown(); + } + }; + thread.start(); + started.await(); + phase1finished.await(); // wait for a certain number of documents to be indexed + logger.info("--> excluding {} from allocation", node1); + // now prevent primary from being allocated on node 1 move to node_3 + Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build(); + client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet(); + // wait for more documents to be indexed post-recovery, also waits for + // indexing thread to stop + phase2finished.await(); + ensureGreen(IDX); + thread.join(); + logger.info("--> performing query"); + flushAndRefresh(); + + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, counter.get()); + assertHitCount(resp, numPhase1Docs + numPhase2Docs); + } + + @Test + public void testPrimaryRelocationWhereRecoveryFails() throws Exception { + Settings nodeSettings = ImmutableSettings.builder() + .put("node.add_id_to_custom_path", false) + .put("node.enable_custom_paths", true) + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) + .build(); + + String node1 = internalCluster().startNode(nodeSettings); + Path dataPath = createTempDir(); + final String IDX = "test"; + + Settings idxSettings = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString()) + .put(IndexMetaData.SETTING_SHADOW_REPLICAS, true) + .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) + .build(); + + prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get(); + ensureYellow(IDX); + // Node1 has the primary, now node2 has the replica + String node2 = internalCluster().startNode(nodeSettings); + ensureGreen(IDX); + flushAndRefresh(IDX); + String node3 = internalCluster().startNode(nodeSettings); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch started = new CountDownLatch(1); + + final int numPhase1Docs = scaledRandomIntBetween(25, 200); + final int numPhase2Docs = scaledRandomIntBetween(25, 200); + final int numPhase3Docs = scaledRandomIntBetween(25, 200); + final CountDownLatch phase1finished = new CountDownLatch(1); + final CountDownLatch phase2finished = new CountDownLatch(1); + final CountDownLatch phase3finished = new CountDownLatch(1); + + final AtomicBoolean keepFailing = new AtomicBoolean(true); + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1)); + mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(), + new MockTransportService.DelegateTransport(mockTransportService.original()) { + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, + TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { + if (keepFailing.get() && action.equals(RecoveryTarget.Actions.TRANSLOG_OPS)) { + logger.info("--> failing translog ops"); + throw new ElasticsearchException("failing on purpose"); + } + super.sendRequest(node, requestId, action, request, options); + } + }); + + Thread thread = new Thread() { + @Override + public void run() { + started.countDown(); + while (counter.get() < (numPhase1Docs + numPhase2Docs + numPhase3Docs)) { + final IndexResponse indexResponse = client().prepareIndex(IDX, "doc", + Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get(); + assertTrue(indexResponse.isCreated()); + final int docCount = counter.get(); + if (docCount == numPhase1Docs) { + phase1finished.countDown(); + } else if (docCount == (numPhase1Docs + numPhase2Docs)) { + phase2finished.countDown(); + } + } + logger.info("--> stopping indexing thread"); + phase3finished.countDown(); + } + }; + thread.start(); + started.await(); + phase1finished.await(); // wait for a certain number of documents to be indexed + logger.info("--> excluding {} from allocation", node1); + // now prevent primary from being allocated on node 1 move to node_3 + Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build(); + client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet(); + // wait for more documents to be indexed post-recovery, also waits for + // indexing thread to stop + phase2finished.await(); + // stop failing + keepFailing.set(false); + // wait for more docs to be indexed + phase3finished.await(); + ensureGreen(IDX); + thread.join(); + logger.info("--> performing query"); + flushAndRefresh(); + + SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get(); + assertHitCount(resp, counter.get()); + } + @Test public void testIndexWithShadowReplicasCleansUp() throws Exception { Settings nodeSettings = ImmutableSettings.builder() diff --git a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java index e20980b32ad..29b813c4499 100644 --- a/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/src/test/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -20,6 +20,7 @@ package org.elasticsearch.test.engine; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.SearcherManager; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; @@ -29,9 +30,12 @@ import java.io.IOException; final class MockInternalEngine extends InternalEngine { private MockEngineSupport support; + private final boolean randomizeFlushOnClose; + MockInternalEngine(EngineConfig config, FsTranslog translog, boolean skipInitialTranslogRecovery) throws EngineException { super(config, translog, skipInitialTranslogRecovery); + randomizeFlushOnClose = IndexMetaData.isOnSharedFilesystem(config.getIndexSettings()) == false; } private synchronized MockEngineSupport support() { @@ -56,13 +60,17 @@ final class MockInternalEngine extends InternalEngine { @Override public void flushAndClose() throws IOException { - switch (support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) { - case FLUSH_AND_CLOSE: - super.flushAndClose(); - break; - case CLOSE: - super.close(); - break; + if (randomizeFlushOnClose) { + switch (support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) { + case FLUSH_AND_CLOSE: + super.flushAndClose(); + break; + case CLOSE: + super.close(); + break; + } + } else { + super.flushAndClose(); } }