[CORE] Allow rebalancing primary shards on shared filesystems

Instead of failing the Engine for a shared filesystem, this change
allows a "soft close" of the Engine, where only the IndexWriter is
closed so that the replica can open an IndexWriter using the same
filesystem directory/mount.

Fixes #10469
This commit is contained in:
Simon Willnauer 2015-05-04 14:30:33 +02:00
parent 24e73a2c83
commit b5a5bc4f89
7 changed files with 276 additions and 22 deletions

View File

@ -458,7 +458,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
} catch (IOException e) {
indicesServices.addPendingDelete(lock.getShardId(), indexSettings);
logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId().id());
logger.debug("[{}] failed to delete shard content - scheduled a retry", e, lock.getShardId().id());
}
}
}

View File

@ -1045,8 +1045,15 @@ public abstract class Engine implements Closeable {
protected abstract SearcherManager getSearcherManager();
/**
* Method to close the engine while the write lock is held.
*/
protected abstract void closeNoLock(String reason);
/**
* Flush the engine (committing segments to disk and truncating the
* translog) and close it.
*/
public void flushAndClose() throws IOException {
if (isClosed.get() == false) {
logger.trace("flushAndClose now acquire writeLock");

View File

@ -669,7 +669,7 @@ public class InternalEngine extends Engine {
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
logger.trace("waiting fore in-flight flush to finish");
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {

View File

@ -94,7 +94,7 @@ public class RecoverySourceHandler {
private final IndexService indexService;
private final MappingUpdatedAction mappingUpdatedAction;
private final RecoveryResponse response;
protected final RecoveryResponse response;
private final CancellableThreads cancellableThreads = new CancellableThreads() {
@Override
protected void onCancel(String reason, @Nullable Throwable suppressedException) {

View File

@ -22,13 +22,17 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
* as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock.
@ -37,6 +41,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
private static final Translog.View EMPTY_VIEW = new EmptyView();
public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
@ -45,24 +50,78 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {
}
@Override
public void phase1(SnapshotIndexCommit snapshot, final Translog.View translogView) {
if (request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary()) {
// here we simply fail the primary shard since we can't move them (have 2 writers open at the same time)
// by failing the shard we play safe and just go through the entire reallocation procedure of the primary
// it would be ideal to make sure we flushed the translog here but that is not possible in the current design.
IllegalStateException exception = new IllegalStateException("Can't relocate primary - failing");
shard.failShard("primary_relocation", exception);
throw exception;
public RecoveryResponse recoverToTarget() {
boolean engineClosed = false;
try {
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
if (isPrimaryRelocation()) {
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
try {
// if we relocate we need to close the engine in order to open a new
// IndexWriter on the other end of the relocation
engineClosed = true;
shard.engine().flushAndClose();
} catch (IOException e) {
logger.warn("close engine failed", e);
shard.failShard("failed to close engine (phase1)", e);
}
}
prepareTargetForTranslog(EMPTY_VIEW);
finalizeRecovery();
return response;
} catch (Throwable t) {
if (engineClosed) {
// If the relocation fails then the primary is closed and can't be
// used anymore... (because it's closed) that's a problem, so in
// that case, fail the shard to reallocate a new IndexShard and
// create a new IndexWriter
logger.info("recovery failed for primary shadow shard, failing shard");
shard.failShard("primary relocation failed on shared filesystem", t);
} else {
logger.info("recovery failed on shared filesystem", t);
}
throw t;
}
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
prepareTargetForTranslog(translogView);
}
@Override
protected int sendSnapshot(Translog.Snapshot snapshot) {
logger.trace("{} recovery [phase2] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode());
logger.trace("{} skipping recovery of translog snapshot on shared filesystem to: {}",
shard.shardId(), request.targetNode());
return 0;
}
private boolean isPrimaryRelocation() {
return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary();
}
/**
* An empty view since we don't recover from translog even in the shared FS case
*/
private static class EmptyView implements Translog.View {
@Override
public int totalOperations() {
return 0;
}
@Override
public long sizeInBytes() {
return 0;
}
@Override
public Translog.Snapshot snapshot() {
return null;
}
@Override
public long minTranslogId() {
return 0;
}
@Override
public void close() {
}
}
}

View File

@ -19,30 +19,42 @@
package org.elasticsearch.index;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@ -309,6 +321,174 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
}
@Test
public void testPrimaryRelocationWithConcurrentIndexing() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir();
final String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);
final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);
Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs)) {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertTrue(indexResponse.isCreated());
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase2finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
}
@Test
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir();
final String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);
final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final int numPhase3Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);
final CountDownLatch phase3finished = new CountDownLatch(1);
final AtomicBoolean keepFailing = new AtomicBoolean(true);
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(),
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action,
TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (keepFailing.get() && action.equals(RecoveryTarget.Actions.TRANSLOG_OPS)) {
logger.info("--> failing translog ops");
throw new ElasticsearchException("failing on purpose");
}
super.sendRequest(node, requestId, action, request, options);
}
});
Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs + numPhase3Docs)) {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertTrue(indexResponse.isCreated());
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
} else if (docCount == (numPhase1Docs + numPhase2Docs)) {
phase2finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase3finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
// stop failing
keepFailing.set(false);
// wait for more docs to be indexed
phase3finished.await();
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
}
@Test
public void testIndexWithShadowReplicasCleansUp() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()

View File

@ -20,6 +20,7 @@ package org.elasticsearch.test.engine;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
@ -29,9 +30,12 @@ import java.io.IOException;
final class MockInternalEngine extends InternalEngine {
private MockEngineSupport support;
private final boolean randomizeFlushOnClose;
MockInternalEngine(EngineConfig config, FsTranslog translog, boolean skipInitialTranslogRecovery) throws EngineException {
super(config, translog, skipInitialTranslogRecovery);
randomizeFlushOnClose = IndexMetaData.isOnSharedFilesystem(config.getIndexSettings()) == false;
}
private synchronized MockEngineSupport support() {
@ -56,13 +60,17 @@ final class MockInternalEngine extends InternalEngine {
@Override
public void flushAndClose() throws IOException {
switch (support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) {
case FLUSH_AND_CLOSE:
super.flushAndClose();
break;
case CLOSE:
super.close();
break;
if (randomizeFlushOnClose) {
switch (support().flushOrClose(this, MockEngineSupport.CloseAction.FLUSH_AND_CLOSE)) {
case FLUSH_AND_CLOSE:
super.flushAndClose();
break;
case CLOSE:
super.close();
break;
}
} else {
super.flushAndClose();
}
}