Merge pull request #16625 from ywelsch/fix/fail-on-persist-shard-state-metadata

Write shard state metadata as soon as shard is created / initializing
This commit is contained in:
Yannick Welsch 2016-02-29 14:33:08 +01:00
commit 7fc9f03c8d
9 changed files with 159 additions and 162 deletions

View File

@ -187,12 +187,14 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
}
if (nodeShardState.storeException() == null) {
if (allocationId == null && nodeShardState.legacyVersion() != ShardStateMetaData.NO_VERSION) {
// old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard
allocationId = "_n/a_";
if (allocationId == null && nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (allocationId != null) {
assert nodeShardState.legacyVersion() == ShardStateMetaData.NO_VERSION : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);
} else {
logger.trace("[{}] on node [{}] has no allocation id, out-dated shard (shard state version: [{}])", shard, nodeShardState.getNode(), nodeShardState.legacyVersion());
}
logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
} else {
logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
allocationId = null;
@ -299,9 +301,20 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
continue;
}
// no version means it does not exists, which is what the API returns, and what we expect to
if (nodeShardState.storeException() == null) {
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
if (version == ShardStateMetaData.NO_VERSION && nodeShardState.allocationId() == null) {
logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());
} else if (version != ShardStateMetaData.NO_VERSION) {
assert nodeShardState.allocationId() == null : "Allocation id and legacy version cannot be both present";
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
} else {
// shard was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but
// did not make it to STARTED state before the cluster crashed (otherwise list of active allocation ids would be
// non-empty and allocation id - based allocation mode would be chosen).
// Prefer this shard copy again.
version = Long.MAX_VALUE;
logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), nodeShardState.allocationId());
}
} else {
// when there is an store exception, we disregard the reported version and assign it as no version (same as shard does not exist)
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating no version", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);

View File

@ -215,7 +215,7 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
@Override
public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) {
if (indexSettings.getIndex().equals(indexShard.indexSettings().getIndex()) == false) {
// this is from a different index
return TerminationHandle.NO_WAIT;
}

View File

@ -153,7 +153,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;
/** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
@ -205,7 +204,6 @@ public class IndexShard extends AbstractIndexShardComponent {
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.idxSettings = indexSettings;
this.codecService = new CodecService(mapperService, logger);
this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
@ -248,7 +246,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.suspendableRefContainer = new SuspendableRefContainer();
this.searcherWrapper = indexSearcherWrapper;
QueryShardContext queryShardContext = new QueryShardContext(idxSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
QueryShardContext queryShardContext = new QueryShardContext(indexSettings, indexCache.bitsetFilterCache(), indexFieldDataService, mapperService, similarityService, provider.getScriptService(), provider.getIndicesQueriesRegistry());
this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, queryShardContext);
}
@ -256,10 +254,6 @@ public class IndexShard extends AbstractIndexShardComponent {
return this.store;
}
public IndexSettings getIndexSettings() {
return idxSettings;
}
/** returns true if this shard supports indexing (i.e., write) operations. */
public boolean canIndex() {
return true;
@ -319,8 +313,9 @@ public class IndexShard extends AbstractIndexShardComponent {
* unless explicitly disabled.
*
* @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
* @throws IOException if shard state could not be persisted
*/
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) {
public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
final ShardRouting currentRouting = this.shardRouting;
if (!newRouting.shardId().equals(shardId())) {
throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
@ -328,57 +323,54 @@ public class IndexShard extends AbstractIndexShardComponent {
if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
}
try {
if (currentRouting != null) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
return;
}
if (currentRouting != null) {
if (!newRouting.primary() && currentRouting.primary()) {
logger.warn("suspect illegal state: trying to move shard from primary mode to replica mode");
}
// if its the same routing, return
if (currentRouting.equals(newRouting)) {
return;
}
}
if (state == IndexShardState.POST_RECOVERY) {
// if the state is started or relocating (cause it might move right away from started to relocating)
// then move to STARTED
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
getEngine().refresh("cluster_state_started");
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
if (state == IndexShardState.POST_RECOVERY) {
// if the state is started or relocating (cause it might move right away from started to relocating)
// then move to STARTED
if (newRouting.state() == ShardRoutingState.STARTED || newRouting.state() == ShardRoutingState.RELOCATING) {
// we want to refresh *before* we move to internal STARTED state
try {
getEngine().refresh("cluster_state_started");
} catch (Throwable t) {
logger.debug("failed to refresh due to move to cluster wide started", t);
}
boolean movedToStarted = false;
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
}
}
if (movedToStarted) {
indexEventListener.afterIndexShardStarted(this);
boolean movedToStarted = false;
synchronized (mutex) {
// do the check under a mutex, so we make sure to only change to STARTED if in POST_RECOVERY
if (state == IndexShardState.POST_RECOVERY) {
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
movedToStarted = true;
} else {
logger.debug("state [{}] not changed, not in POST_RECOVERY, global state is [{}]", state, newRouting.state());
}
}
if (movedToStarted) {
indexEventListener.afterIndexShardStarted(this);
}
}
}
if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
} finally {
if (persistState) {
persistMetadata(newRouting, currentRouting);
}
if (state == IndexShardState.RELOCATED &&
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
// if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery
// failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two
// active primaries.
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
}
this.shardRouting = newRouting;
indexEventListener.shardRoutingChanged(this, currentRouting, newRouting);
if (persistState) {
persistMetadata(newRouting, currentRouting);
}
}
@ -733,7 +725,7 @@ public class IndexShard extends AbstractIndexShardComponent {
luceneVersion = segment.getVersion();
}
}
return luceneVersion == null ? idxSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}
/**
@ -1046,18 +1038,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/**
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
*
* @throws IOException if the delete fails
*/
public void deleteShardState() throws IOException {
if (this.routingEntry() != null && this.routingEntry().active()) {
throw new IllegalStateException("Can't delete shard state on an active shard");
}
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
}
public boolean isActive() {
return active.get();
}
@ -1070,7 +1050,7 @@ public class IndexShard extends AbstractIndexShardComponent {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData());
boolean shouldExist = shardRouting.allocatedPostIndexCreate(indexSettings.getIndexMetaData());
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this, shouldExist, localNode);
@ -1344,27 +1324,25 @@ public class IndexShard extends AbstractIndexShardComponent {
}
// pkg private for testing
void persistMetadata(ShardRouting newRouting, ShardRouting currentRouting) {
void persistMetadata(ShardRouting newRouting, @Nullable ShardRouting currentRouting) throws IOException {
assert newRouting != null : "newRouting must not be null";
if (newRouting.active()) {
try {
final String writeReason;
if (currentRouting == null) {
writeReason = "freshly started, allocation id [" + newRouting.allocationId() + "]";
} else if (currentRouting.equals(newRouting) == false) {
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
} else {
logger.trace("{} skip writing shard state, has been written before", shardId);
return;
}
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
} catch (IOException e) { // this is how we used to handle it.... :(
logger.warn("failed to write shard state", e);
// we failed to write the shard state, we will try and write
// it next time...
// only persist metadata if routing information that is persisted in shard state metadata actually changed
if (currentRouting == null
|| currentRouting.primary() != newRouting.primary()
|| currentRouting.allocationId().equals(newRouting.allocationId()) == false) {
assert currentRouting == null || currentRouting.isSameAllocation(newRouting);
final String writeReason;
if (currentRouting == null) {
writeReason = "initial state with allocation id [" + newRouting.allocationId() + "]";
} else {
writeReason = "routing changed from " + currentRouting + " to " + newRouting;
}
logger.trace("{} writing shard state, reason [{}]", shardId, writeReason);
final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), getIndexUUID(), newRouting.allocationId());
ShardStateMetaData.FORMAT.write(newShardStateMetadata, newShardStateMetadata.legacyVersion, shardPath().getShardStatePath());
} else {
logger.trace("{} skip writing shard state, has been written before", shardId);
}
}
@ -1396,7 +1374,7 @@ public class IndexShard extends AbstractIndexShardComponent {
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
indexSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
}
public Releasable acquirePrimaryOperationLock() {

View File

@ -51,12 +51,12 @@ public final class ShadowIndexShard extends IndexShard {
/**
* In addition to the regular accounting done in
* {@link IndexShard#updateRoutingEntry(org.elasticsearch.cluster.routing.ShardRouting, boolean)},
* {@link IndexShard#updateRoutingEntry(ShardRouting, boolean)},
* if this shadow replica needs to be promoted to a primary, the shard is
* failed in order to allow a new primary to be re-allocated.
*/
@Override
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) {
public void updateRoutingEntry(ShardRouting newRouting, boolean persistState) throws IOException {
if (newRouting.primary() == true) {// becoming a primary
throw new IllegalStateException("can't promote shard to primary");
}

View File

@ -904,7 +904,7 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
if (!CACHEABLE_SEARCH_TYPES.contains(context.searchType())) {
return false;
}
IndexSettings settings = context.indexShard().getIndexSettings();
IndexSettings settings = context.indexShard().indexSettings();
// if not explicitly set in the request, use the index setting, if not, use the request
if (request.requestCache() == null) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {

View File

@ -342,8 +342,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
// to recover from in case of a full cluster shutdown just when this code executes...
indexShard().deleteShardState(); // we have to delete it first since even if we fail to rename the shard
// might be invalid
renameAllTempFiles();
final Store store = store();
// now write checksums

View File

@ -71,7 +71,7 @@ public class GeoDistanceSortParser implements SortParser {
MultiValueMode sortMode = null;
NestedInnerQueryParseSupport nestedHelper = null;
final boolean indexCreatedBeforeV2_0 = context.indexShard().getIndexSettings().getIndexVersionCreated().before(Version.V_2_0_0);
final boolean indexCreatedBeforeV2_0 = context.indexShard().indexSettings().getIndexVersionCreated().before(Version.V_2_0_0);
boolean coerce = GeoDistanceSortBuilder.DEFAULT_COERCE;
boolean ignoreMalformed = GeoDistanceSortBuilder.DEFAULT_IGNORE_MALFORMED;

View File

@ -117,7 +117,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
*/
public void testNoMatchingAllocationIdFound() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
testAllocator.addData(node1, 1, "id1", randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "id1", randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -145,7 +145,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean(), new CorruptIndexException("test", "test"));
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
testAllocator.addData(node1, 3, null, randomBoolean(), new CorruptIndexException("test", "test"));
@ -164,7 +164,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
boolean useAllocationIds = randomBoolean();
if (useAllocationIds) {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
@ -188,8 +188,8 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
String replicaAllocId = Strings.randomBase64UUID();
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), primaryAllocId, replicaAllocId);
boolean node1HasPrimaryShard = randomBoolean();
testAllocator.addData(node1, 1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, 1, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard);
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -206,7 +206,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
testAllocator.addData(node1, 3, null, randomBoolean());
@ -225,7 +225,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
final RoutingAllocation allocation;
if (randomBoolean()) {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
testAllocator.addData(node1, 1, "allocId1", randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, "allocId1", randomBoolean());
} else {
allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 3, null, randomBoolean());
@ -250,13 +250,36 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
}
/**
* Tests that shard with allocation id is chosen if such a shard is available in version-based allocation mode. This happens if a shard
* was already selected in a 5.x cluster as primary for recovery, was initialized (and wrote a new state file) but did not make it to
* STARTED state before the cluster crashed (otherwise list of active allocation ids would be non-empty and allocation id - based
* allocation mode would be chosen).
*/
public void testVersionBasedAllocationPrefersShardWithAllocationId() {
RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
testAllocator.addData(node1, 10, null, randomBoolean());
testAllocator.addData(node2, ShardStateMetaData.NO_VERSION, "some allocId", randomBoolean());
testAllocator.addData(node3, 12, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), equalTo("some allocId"));
}
/**
* Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
* deciders say yes, we allocate to that node.
*/
public void testRestore() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -268,8 +291,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say throttle, we add it to ignored shards.
*/
public void testRestoreThrottle() {
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -280,8 +307,12 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say no, we still allocate to that node.
*/
public void testRestoreForcesAllocateIfShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "some allocId"), randomBoolean());
boolean shardStateHasAllocationId = randomBoolean();
String allocationId = shardStateHasAllocationId ? "some allocId" : null;
long legacyVersion = shardStateHasAllocationId ? ShardStateMetaData.NO_VERSION : 1;
boolean clusterHasActiveAllocationIds = shardStateHasAllocationId ? randomBoolean() : false;
RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), clusterHasActiveAllocationIds);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -293,7 +324,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* the unassigned list to be allocated later.
*/
public void testRestoreDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, false);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
@ -301,11 +332,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
}
private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders) {
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) {
Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0;
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
.putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
.putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build();
RoutingTable routingTable = RoutingTable.builder()
@ -323,8 +354,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say yes, we allocate to that node.
*/
public void testRecoverOnAnyNode() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -336,8 +370,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say throttle, we add it to ignored shards.
*/
public void testRecoverOnAnyNodeThrottle() {
RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(throttleAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
@ -348,8 +385,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* deciders say no, we still allocate to that node.
*/
public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
testAllocator.addData(node1, 1, randomFrom(null, "allocId"), randomBoolean());
boolean hasActiveAllocation = randomBoolean();
String allocationId = hasActiveAllocation ? "allocId" : null;
long legacyVersion = hasActiveAllocation ? ShardStateMetaData.NO_VERSION : 1;
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders(), hasActiveAllocation);
testAllocator.addData(node1, legacyVersion, allocationId, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@ -361,7 +401,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
* BalancedShardAllocator assign the shard
*/
public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders(), randomBoolean());
testAllocator.addData(node1, ShardStateMetaData.NO_VERSION, null, randomBoolean());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
@ -369,13 +409,13 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
}
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders) {
Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders, boolean hasActiveAllocation) {
Version version = hasActiveAllocation ? Version.CURRENT : Version.V_2_0_0;
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(version)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? Sets.newHashSet("allocId") : Collections.emptySet()))
.numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, hasActiveAllocation ? Sets.newHashSet("allocId") : Collections.emptySet()))
.build();
RoutingTable routingTable = RoutingTable.builder()

View File

@ -204,13 +204,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// test if we still write it even if the shard is not active
ShardRouting inactiveRouting = TestShardRouting.newShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), null, null, true, ShardRoutingState.INITIALIZING);
shard.persistMetadata(inactiveRouting, shard.shardRouting);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, getShardStateMetadata(shard));
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// check that we don't write shard state metadata if persist == false
ShardRouting updatedRouting = new ShardRouting(shard.shardRouting);
TestShardRouting.relocate(updatedRouting, "some node", 42L);
shard.updateRoutingEntry(updatedRouting, false);
@ -218,6 +212,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
// check that we write shard state metadata if persist == true
shard.updateRoutingEntry(routing, false); // move back state in IndexShard
routing = new ShardRouting(updatedRouting);
shard.updateRoutingEntry(routing, true);
@ -226,33 +221,6 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.primary(), shard.indexSettings().getUUID(), routing.allocationId()));
}
public void testDeleteShardState() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
IndexService test = indicesService.indexService("test");
IndexShard shard = test.getShardOrNull(0);
try {
shard.deleteShardState();
fail("shard is active metadata delete must fail");
} catch (IllegalStateException ex) {
// fine - only delete if non-active
}
ShardRouting routing = shard.routingEntry();
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
routing = TestShardRouting.newShardRouting(shard.shardId.getIndex(), shard.shardId.id(), routing.currentNodeId(), null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.allocationId());
shard.updateRoutingEntry(routing, true);
shard.deleteShardState();
assertNull("no shard state expected after delete on initializing", load(logger, env.availableShardPaths(shard.shardId)));
}
public void testFailShard() throws Exception {
createIndex("test");
ensureGreen();
@ -973,7 +941,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
assertHitCount(client().prepareSearch().get(), 1);
}
public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException {
public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException, IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);