[CORE] Allow primary promotion on shadow replica without failing the shard

Today we fail the shard if we need to upgrade a replica to a primary on shadow replicas
on shared filesystem. Yet, this commit allows promotion by re-initializing on the master preventing
reallocation of all replicas.
This commit is contained in:
Simon Willnauer 2015-02-19 12:54:24 +01:00
parent 306b7b0f2b
commit 1ed6451229
8 changed files with 72 additions and 41 deletions

View File

@ -101,16 +101,12 @@ public class MutableShardRouting extends ImmutableShardRouting {
}
/**
* Set the shards state to <code>UNASSIGNED</code>.
* //TODO document the state
* Moves the shard from started to initializing and bumps the version
*/
void deassignNode() {
void reinitializeShard() {
assert state == ShardRoutingState.STARTED;
version++;
assert state != ShardRoutingState.UNASSIGNED;
state = ShardRoutingState.UNASSIGNED;
this.currentNodeId = null;
this.relocatingNodeId = null;
state = ShardRoutingState.INITIALIZING;
}
/**

View File

@ -520,6 +520,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return nodesToShards.values().toArray(new RoutingNode[nodesToShards.size()]);
}
public void reinitShadowPrimary(MutableShardRouting candidate) {
if (candidate.relocating()) {
cancelRelocation(candidate);
}
candidate.reinitializeShard();
inactivePrimaryCount++;
inactiveShardCount++;
}
public final static class UnassignedShards implements Iterable<MutableShardRouting> {
private final List<MutableShardRouting> unassigned;

View File

@ -21,11 +21,13 @@ package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
@ -285,6 +287,7 @@ public class AllocationService extends AbstractComponent {
shardsToFail.add(routing);
}
}
}
}
for (ShardRouting shardToFail : shardsToFail) {
@ -293,10 +296,12 @@ public class AllocationService extends AbstractComponent {
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
if (candidate != null) {
IndexMetaData index = allocation.metaData().index(candidate.index());
routingNodes.swapPrimaryFlag(shardEntry, candidate);
if (candidate.relocatingNodeId() != null) {
changed = true;
@ -311,6 +316,10 @@ public class AllocationService extends AbstractComponent {
}
}
}
if (IndexMetaData.isIndexUsingShadowReplicas(index.settings())) {
routingNodes.reinitShadowPrimary(candidate);
changed = true;
}
}
}
}

View File

@ -1187,13 +1187,25 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
protected void createNewEngine() {
private void createNewEngine() {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
}
assert this.currentEngineReference.get() == null;
this.currentEngineReference.set(engineFactory.newReadWriteEngine(config));
this.currentEngineReference.set(newEngine());
}
}
protected Engine newEngine() {
return engineFactory.newReadWriteEngine(config);
}
/**
* Returns <code>true</code> iff this shard allows primary promotion, otherwise <code>false</code>
*/
public boolean allowsPrimaryPromotion() {
return true;
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -31,6 +32,7 @@ import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
@ -62,7 +64,7 @@ import org.elasticsearch.threadpool.ThreadPool;
* promoted to a primary causes the shard to fail, kicking off a re-allocation
* of the primary shard.
*/
public class ShadowIndexShard extends IndexShard {
public final class ShadowIndexShard extends IndexShard {
private final Object mutex = new Object();
@ -98,28 +100,19 @@ public class ShadowIndexShard extends IndexShard {
*/
@Override
public IndexShard routingEntry(ShardRouting newRouting) {
ShardRouting shardRouting = this.routingEntry();
super.routingEntry(newRouting);
// check for a shadow replica that now needs to be transformed into
// a normal primary today we simply fail it to force reallocation
if (shardRouting != null && shardRouting.primary() == false && // currently a replica
newRouting.primary() == true) {// becoming a primary
failShard("can't promote shadow replica to primary",
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
if (newRouting.primary() == true) {// becoming a primary
throw new ElasticsearchIllegalStateException("can't promote shard to primary");
}
return this;
return super.routingEntry(newRouting);
}
@Override
protected void createNewEngine() {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
}
assert this.currentEngineReference.get() == null;
protected Engine newEngine() {
assert this.shardRouting.primary() == false;
// Use the read-only engine for shadow replicas
this.currentEngineReference.set(engineFactory.newReadOnlyEngine(config));
}
return engineFactory.newReadOnlyEngine(config);
}
public boolean allowsPrimaryPromotion() {
return false;
}
}

View File

@ -47,7 +47,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.aliases.IndexAlias;
@ -70,7 +69,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Maps.newHashMap;
import static org.elasticsearch.ExceptionsHelper.detailedMessage;
@ -95,7 +93,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final NodeEnvironment nodeEnvironment;
static class FailedShard {
public final long version;
@ -111,15 +108,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
private final boolean sendRefreshMapping;
private final AtomicLong recoveryIdGenerator = new AtomicLong();
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
NodeEnvironment nodeEnvironment) {
NodeMappingRefreshAction nodeMappingRefreshAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -130,7 +125,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true);
this.nodeEnvironment = nodeEnvironment;
}
@Override
@ -584,11 +578,16 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
if (shardHasBeenRemoved == false && !shardRouting.equals(indexShard.routingEntry())) {
if (shardRouting.primary() && indexShard.routingEntry().primary() == false && shardRouting.initializing() && indexShard.allowsPrimaryPromotion() == false) {
logger.debug("{} reinitialize shard on primary promotion", indexShard.shardId());
indexService.removeShard(shardId, "promoted to primary");
} else {
// if we happen to remove the shardRouting by id above we don't need to jump in here!
indexShard.routingEntry(shardRouting);
indexService.shardInjectorSafe(shardId).getInstance(IndexShardGatewayService.class).routingStateChanged();
}
}
}
if (shardRouting.initializing()) {
applyInitializingShard(routingTable, nodes, indexMetaData, routingTable.index(shardRouting.index()).shard(shardRouting.id()), shardRouting);

View File

@ -32,6 +32,8 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.file.Path;
@ -165,6 +167,15 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foobar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "foobar").get();
gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("foobar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("foobar"));
}
@Test

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -86,7 +87,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
@IndexSettings Settings indexSettings) {
if (indexShard != null && shardId.equals(sid)) {
logger.info("Shard state before potentially flushing is {}", indexShard.state());
if (validCheckIndexStates.contains(indexShard.state()) && indexShard.engine() instanceof InternalEngine) {
if (validCheckIndexStates.contains(indexShard.state()) && IndexMetaData.isOnSharedFilesystem(indexSettings) == false) {
// When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index