Merge remote-tracking branch 'dakrone/sharedfs-recovery-any-node'

This commit is contained in:
Lee Hinman 2015-05-04 14:15:30 -06:00
commit eabeae980b
4 changed files with 62 additions and 2 deletions

View File

@ -165,6 +165,7 @@ public class IndexMetaData implements Diffable<IndexMetaData> {
public static final String SETTING_LEGACY_ROUTING_HASH_FUNCTION = "index.legacy.routing.hash.type";
public static final String SETTING_LEGACY_ROUTING_USE_TYPE = "index.legacy.routing.use_type";
public static final String SETTING_DATA_PATH = "index.data_path";
public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node";
public static final String INDEX_UUID_NA_VALUE = "_na_";
// hard-coded hash function as of 2.0

View File

@ -372,6 +372,14 @@ public class GatewayAllocator extends AbstractComponent {
return changed;
}
/**
* Build a map of DiscoveryNodes to shard state number for the given shard.
* A state of -1 means the shard does not exist on the node, where any
* shard state >= 0 is the state version of the shard on that node's disk.
*
* A shard on shared storage will return at least shard state 0 for all
* nodes, indicating that the shard can be allocated to any node.
*/
private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, IndexMetaData indexMetaData) {
ObjectLongOpenHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId());
ObjectOpenHashSet<String> nodeIds;
@ -405,10 +413,18 @@ public class GatewayAllocator extends AbstractComponent {
logListActionFailures(shard, "state", response.failures());
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
long version = nodeShardState.version();
Settings idxSettings = indexMetaData.settings();
if (IndexMetaData.isOnSharedFilesystem(idxSettings) &&
idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false)) {
// Shared filesystems use 0 as a minimum shard state, which
// means that the shard can be allocated to any node
version = Math.max(0, version);
}
// -1 version means it does not exists, which is what the API returns, and what we expect to
logger.trace("[{}] on node [{}] has version [{}] of shard",
shard, nodeShardState.getNode(), nodeShardState.version());
shardStates.put(nodeShardState.getNode(), nodeShardState.version());
shard, nodeShardState.getNode(), version);
shardStates.put(nodeShardState.getNode(), version);
}
return shardStates;
}

View File

@ -71,6 +71,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_BLOCKS_READ);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_BLOCKS_WRITE);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_BLOCKS_METADATA);
indexDynamicSettings.addDynamicSetting(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE);
indexDynamicSettings.addDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE);
indexDynamicSettings.addDynamicSetting(IndexShard.INDEX_REFRESH_INTERVAL, Validator.TIME);
indexDynamicSettings.addDynamicSetting(GatewayAllocator.INDEX_RECOVERY_INITIAL_SHARDS);

View File

@ -448,4 +448,46 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
assertThat(hits[2].field("foo").getValue().toString(), equalTo("eggplant"));
assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo"));
}
@Test
public void testIndexOnSharedFSRecoversToAnyNode() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true)
.build();
// only one node, so all primaries will end up on node1
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string,index=not_analyzed").get();
ensureGreen(IDX);
// Index some documents
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foo").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get();
// start a second node
internalCluster().startNode(nodeSettings);
// node1 is master, stop that one, since we only have primaries,
// usually this would mean data loss, but not on shared fs!
internalCluster().stopCurrentMasterNode();
ensureGreen(IDX);
refresh();
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addFieldDataField("foo").addSort("foo", SortOrder.ASC).get();
assertHitCount(resp, 4);
}
}