initial work on local gateway

This commit is contained in:
kimchy 2010-08-29 01:24:23 +03:00
parent 163b7be639
commit 4f4471483d
27 changed files with 1103 additions and 95 deletions

View File

@ -94,7 +94,7 @@ public class ClusterState {
}
public RoutingNodes routingNodes() {
return routingTable.routingNodes(metaData);
return routingTable.routingNodes(metaData, blocks);
}
public RoutingNodes getRoutingNodes() {
@ -117,7 +117,7 @@ public class ClusterState {
if (routingNodes != null) {
return routingNodes;
}
routingNodes = routingTable.routingNodes(metaData);
routingNodes = routingTable.routingNodes(metaData, blocks);
return routingNodes;
}
@ -177,6 +177,11 @@ public class ClusterState {
return this;
}
public Builder version(long version) {
this.version = version;
return this;
}
public Builder state(ClusterState state) {
this.version = state.version();
this.nodes = state.nodes();

View File

@ -91,6 +91,10 @@ public class ClusterBlocks {
return levelHolders[level.id()].indices();
}
public boolean hasIndexBlock(String index, ClusterBlock block) {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}
public void indexBlockedRaiseException(ClusterBlockLevel level, String index) throws ClusterBlockException {
if (!indexBlocked(level, index)) {
return;
@ -136,22 +140,6 @@ public class ClusterBlocks {
}
}
static class LevelHolder {
private final Set<ClusterBlock> global = Sets.newHashSet();
private final Map<String, Set<ClusterBlock>> indices = Maps.newHashMap();
LevelHolder() {
}
public Set<ClusterBlock> global() {
return global;
}
public Map<String, Set<ClusterBlock>> indices() {
return indices;
}
}
public static Builder builder() {
return new Builder();
}

View File

@ -303,12 +303,17 @@ public class MetaData implements Iterable<IndexMetaData> {
public static MetaData fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
XContentParser.Token token = parser.currentToken();
String currentFieldName = parser.currentName();
if (!"meta-data".equals(currentFieldName)) {
token = parser.nextToken();
currentFieldName = parser.currentName();
if (token == null) {
// no data...
return builder.build();
}
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();

View File

@ -24,11 +24,14 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.ShardsAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
@ -53,6 +56,7 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -218,7 +222,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
listener.timeout = timeoutTask;
}
return newClusterStateBuilder().state(currentState).metaData(newMetaData).build();
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks.isEmpty()) {
for (ClusterBlock block : request.blocks) {
blocks.addIndexBlock(request.index, block);
}
}
return newClusterStateBuilder().state(currentState).blocks(blocks).metaData(newMetaData).build();
} catch (Exception e) {
listener.onFailure(e);
return currentState;
@ -314,6 +325,8 @@ public class MetaDataCreateIndexService extends AbstractComponent {
TimeValue timeout = TimeValue.timeValueSeconds(5);
Set<ClusterBlock> blocks = Sets.newHashSet();
public Request(String cause, String index) {
this.cause = cause;
this.index = index;
@ -336,6 +349,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
return this;
}
public Request blocks(Set<ClusterBlock> blocks) {
this.blocks.addAll(blocks);
return this;
}
public Request timeout(TimeValue timeout) {
this.timeout = timeout;
return this;

View File

@ -153,7 +153,10 @@ public class DiscoveryNode implements Streamable, Serializable {
*/
public boolean dataNode() {
String data = attributes.get("data");
return data == null || data.equals("true");
if (data == null) {
return !clientNode();
}
return data.equals("true");
}
/**
@ -175,6 +178,24 @@ public class DiscoveryNode implements Streamable, Serializable {
return clientNode();
}
/**
* Can this node become master or not.
*/
public boolean masterNode() {
String master = attributes.get("master");
if (master == null) {
return !clientNode();
}
return master.equals("true");
}
/**
* Can this node become master or not.
*/
public boolean isMasterNode() {
return masterNode();
}
public static DiscoveryNode readNode(StreamInput in) throws IOException {
DiscoveryNode node = new DiscoveryNode();
node.readFrom(in);

View File

@ -46,13 +46,16 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
private final ImmutableMap<String, DiscoveryNode> dataNodes;
private final ImmutableMap<String, DiscoveryNode> masterNodes;
private final String masterNodeId;
private final String localNodeId;
private DiscoveryNodes(ImmutableMap<String, DiscoveryNode> nodes, ImmutableMap<String, DiscoveryNode> dataNodes, String masterNodeId, String localNodeId) {
private DiscoveryNodes(ImmutableMap<String, DiscoveryNode> nodes, ImmutableMap<String, DiscoveryNode> dataNodes, ImmutableMap<String, DiscoveryNode> masterNodes, String masterNodeId, String localNodeId) {
this.nodes = nodes;
this.dataNodes = dataNodes;
this.masterNodes = masterNodes;
this.masterNodeId = masterNodeId;
this.localNodeId = localNodeId;
}
@ -104,6 +107,14 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
return dataNodes();
}
public ImmutableMap<String, DiscoveryNode> masterNodes() {
return this.masterNodes;
}
public ImmutableMap<String, DiscoveryNode> getMasterNodes() {
return masterNodes();
}
public DiscoveryNode get(String nodeId) {
return nodes.get(nodeId);
}
@ -366,12 +377,16 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
public DiscoveryNodes build() {
ImmutableMap.Builder<String, DiscoveryNode> dataNodesBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableMap.builder();
for (Map.Entry<String, DiscoveryNode> nodeEntry : nodes.entrySet()) {
if (nodeEntry.getValue().dataNode()) {
dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
}
if (nodeEntry.getValue().masterNode()) {
masterNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
}
return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodeId, localNodeId);
}
return new DiscoveryNodes(ImmutableMap.copyOf(nodes), dataNodesBuilder.build(), masterNodesBuilder.build(), masterNodeId, localNodeId);
}
public static void writeTo(DiscoveryNodes nodes, StreamOutput out) throws IOException {

View File

@ -105,6 +105,34 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return shards.get(shardId);
}
public boolean allPrimaryShardsActive() {
return primaryShardsActive() == shards().size();
}
public int primaryShardsActive() {
int counter = 0;
for (IndexShardRoutingTable shardRoutingTable : this) {
if (shardRoutingTable.primaryShard().active()) {
counter++;
}
}
return counter;
}
public boolean allPrimaryShardsUnassigned() {
return primaryShardsUnassigned() == shards.size();
}
public int primaryShardsUnassigned() {
int counter = 0;
for (IndexShardRoutingTable shardRoutingTable : this) {
if (shardRoutingTable.primaryShard().unassigned()) {
counter++;
}
}
return counter;
}
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
List<ShardRouting> shards = newArrayList();
for (IndexShardRoutingTable shardRoutingTable : this) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.util.concurrent.NotThreadSafe;
@ -35,6 +36,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final MetaData metaData;
private final ClusterBlocks blocks;
private final RoutingTable routingTable;
private final Map<String, RoutingNode> nodesToShards = newHashMap();
@ -43,8 +46,9 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
public RoutingNodes(MetaData metaData, RoutingTable routingTable) {
public RoutingNodes(MetaData metaData, ClusterBlocks blocks, RoutingTable routingTable) {
this.metaData = metaData;
this.blocks = blocks;
this.routingTable = routingTable;
Map<String, List<MutableShardRouting>> nodesToShards = newHashMap();
for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
@ -100,6 +104,14 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return metaData();
}
public ClusterBlocks blocks() {
return this.blocks;
}
public ClusterBlocks getBlocks() {
return this.blocks;
}
public int requiredAverageNumberOfShardsPerNode() {
return metaData.totalNumberOfShards() / nodesToShards.size();
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Iterables;
@ -72,8 +74,12 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return indicesRouting();
}
public RoutingNodes routingNodes(MetaData metaData) {
return new RoutingNodes(metaData, this);
public RoutingNodes routingNodes(ClusterState state) {
return routingNodes(state.metaData(), state.blocks());
}
public RoutingNodes routingNodes(MetaData metaData, ClusterBlocks blocks) {
return new RoutingNodes(metaData, blocks, this);
}
public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {

View File

@ -135,16 +135,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override protected void doStart() throws ElasticSearchException {
Map<String, String> nodeAttributes = buildCommonNodesAttributes(settings);
Boolean zenMaster = componentSettings.getAsBoolean("master", null);
if (zenMaster != null) {
if (zenMaster.equals(Boolean.FALSE)) {
nodeAttributes.put("zen.master", "false");
}
} else if (nodeAttributes.containsKey("client")) {
if (nodeAttributes.get("client").equals("true")) {
nodeAttributes.put("zen.master", "false");
}
}
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
String nodeId = UUID.randomUUID().toString();
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes);

View File

@ -79,12 +79,10 @@ public class ElectMasterService extends AbstractComponent {
// clean non master nodes
for (Iterator<DiscoveryNode> it = possibleNodes.iterator(); it.hasNext();) {
DiscoveryNode node = it.next();
if (node.attributes().containsKey("zen.master")) {
if (node.attributes().get("zen.master").equals("false")) {
if (!node.masterNode()) {
it.remove();
}
}
}
Collections.sort(possibleNodes, nodeComparator);
return possibleNodes;
}

View File

@ -49,7 +49,8 @@ public class NodeEnvironment extends AbstractComponent {
@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
super(settings);
if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false)) {
if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false) ||
!settings.getAsBoolean("node.master", true)) {
nodeFile = null;
lock = null;
return;

View File

@ -0,0 +1,325 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.BinaryXContentBuilder;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.gateway.GatewayException;
import org.elasticsearch.index.gateway.none.NoneIndexGatewayModule;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.*;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements Gateway, ClusterStateListener {
public static final ClusterBlock INDEX_NOT_RECOVERED_BLOCK = new ClusterBlock(3, "index not recovered (not enough nodes with shards allocated found)", ClusterBlockLevel.ALL);
private File location;
private final ClusterName clusterName;
private final ClusterService clusterService;
private final NodeEnvironment nodeEnv;
private final MetaDataCreateIndexService createIndexService;
private final TransportNodesListGatewayState listGatewayState;
private volatile LocalGatewayState currentState;
@Inject public LocalGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
NodeEnvironment nodeEnv, ClusterName clusterName, ThreadPool threadPool, TransportNodesListGatewayState listGatewayState) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.createIndexService = createIndexService;
this.nodeEnv = nodeEnv;
this.listGatewayState = listGatewayState.initGateway(this);
}
@Override public String type() {
return "local";
}
public LocalGatewayState currentState() {
return this.currentState;
}
@Override protected void doStart() throws ElasticSearchException {
// if this is not a possible master node or data node, bail, we won't save anything here...
if (!clusterService.state().nodes().localNode().masterNode() || !clusterService.state().nodes().localNode().dataNode()) {
location = null;
return;
}
// create the location where the state will be stored
this.location = new File(nodeEnv.nodeFile(), "_state");
this.location.mkdirs();
try {
long version = findLatestStateVersion();
if (version != -1) {
this.currentState = readState(Streams.copyToByteArray(new FileInputStream(new File(location, "state-" + version))));
}
} catch (Exception e) {
logger.warn("failed to read local state", e);
}
clusterService.add(this);
}
@Override protected void doStop() throws ElasticSearchException {
clusterService.remove(this);
}
@Override protected void doClose() throws ElasticSearchException {
}
@Override public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(clusterService.state().nodes().dataNodes().keySet());
nodesIds.addAll(clusterService.state().nodes().masterNodes().keySet());
TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet();
TransportNodesListGatewayState.NodeLocalGatewayState electedState = null;
for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) {
if (nodeState.state() == null) {
continue;
}
if (electedState == null) {
electedState = nodeState;
} else if (nodeState.state().version() > electedState.state().version()) {
electedState = nodeState;
}
}
if (electedState == null) {
logger.debug("no state elected");
listener.onSuccess();
return;
}
logger.debug("elected state from [{}]", electedState.node());
final LocalGatewayState state = electedState.state();
final AtomicInteger indicesCounter = new AtomicInteger(state.metaData().indices().size());
clusterService.submitStateUpdateTask("local-gateway-elected-state", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
MetaData.Builder metaDataBuilder = newMetaDataBuilder()
.metaData(currentState.metaData());
// mark the metadata as read from gateway
metaDataBuilder.markAsRecoveredFromGateway();
return newClusterStateBuilder().state(currentState)
.version(state.version())
.metaData(metaDataBuilder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
// go over the meta data and create indices, we don't really need to copy over
// the meta data per index, since we create the index and it will be added automatically
for (final IndexMetaData indexMetaData : state.metaData()) {
try {
createIndexService.createIndex(new MetaDataCreateIndexService.Request("gateway", indexMetaData.index())
.settings(indexMetaData.settings())
.mappingsCompressed(indexMetaData.mappings())
.blocks(ImmutableSet.of(INDEX_NOT_RECOVERED_BLOCK))
.timeout(timeValueSeconds(30)),
new MetaDataCreateIndexService.Listener() {
@Override public void onResponse(MetaDataCreateIndexService.Response response) {
if (indicesCounter.decrementAndGet() == 0) {
listener.onSuccess();
}
}
@Override public void onFailure(Throwable t) {
logger.error("failed to create index [{}]", indexMetaData.index(), t);
}
});
} catch (IOException e) {
logger.error("failed to create index [{}]", indexMetaData.index(), e);
}
}
}
});
}
@Override public Class<? extends Module> suggestIndexGateway() {
return NoneIndexGatewayModule.class;
}
@Override public void reset() throws Exception {
}
@Override public void clusterChanged(final ClusterChangedEvent event) {
// nothing to do until we actually recover from hte gateway
if (!event.state().metaData().recoveredFromGateway()) {
return;
}
// go over the indices, if they are blocked, and all are allocated, update the cluster state that it is no longer blocked
for (Map.Entry<String, ImmutableSet<ClusterBlock>> entry : event.state().blocks().indices().entrySet()) {
final String index = entry.getKey();
ImmutableSet<ClusterBlock> indexBlocks = entry.getValue();
if (indexBlocks.contains(INDEX_NOT_RECOVERED_BLOCK)) {
IndexRoutingTable indexRoutingTable = event.state().routingTable().index(index);
if (indexRoutingTable != null && indexRoutingTable.allPrimaryShardsActive()) {
clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
blocks.removeIndexBlock(index, INDEX_NOT_RECOVERED_BLOCK);
return ClusterState.builder().state(currentState).blocks(blocks).build();
}
});
}
}
}
if (!event.routingTableChanged() && !event.metaDataChanged()) {
return;
}
// builder the current state
LocalGatewayState.Builder builder = LocalGatewayState.builder();
if (currentState != null) {
builder.state(currentState);
}
builder.version(event.state().version());
builder.metaData(event.state().metaData());
// remove from the current state all the shards that are primary and started, we won't need them anymore
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.primaryShard().active()) {
builder.remove(indexShardRoutingTable.shardId());
}
}
}
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// out node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
builder.put(shardRouting.shardId(), event.state().version());
}
}
}
try {
LocalGatewayState stateToWrite = builder.build();
BinaryXContentBuilder xContentBuilder = XContentFactory.contentBinaryBuilder(XContentType.JSON);
xContentBuilder.prettyPrint();
xContentBuilder.startObject();
LocalGatewayState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
File stateFile = new File(location, "state-" + event.state().version());
FileOutputStream fos = new FileOutputStream(stateFile);
fos.write(xContentBuilder.unsafeBytes(), 0, xContentBuilder.unsafeBytesLength());
fos.close();
FileSystemUtils.syncFile(stateFile);
currentState = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
}
// delete all the other files
File[] files = location.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return !name.equals("state-" + event.state().version());
}
});
for (File file : files) {
file.delete();
}
}
private long findLatestStateVersion() throws IOException {
long index = -1;
for (File stateFile : location.listFiles()) {
if (logger.isTraceEnabled()) {
logger.trace("[findLatestState]: Processing [" + stateFile.getName() + "]");
}
String name = stateFile.getName();
if (!name.startsWith("state-")) {
continue;
}
long fileIndex = Long.parseLong(name.substring(name.indexOf('-') + 1));
if (fileIndex >= index) {
// try and read the meta data
try {
readState(Streams.copyToByteArray(new FileInputStream(stateFile)));
index = fileIndex;
} catch (IOException e) {
logger.warn("[findLatestState]: Failed to read state from [" + name + "], ignoring...", e);
}
}
}
return index;
}
private LocalGatewayState readState(byte[] data) throws IOException {
XContentParser parser = null;
try {
parser = XContentFactory.xContent(XContentType.JSON).createParser(data);
return LocalGatewayState.Builder.fromXContent(parser, settings);
} finally {
if (parser != null) {
parser.close();
}
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.gateway.Gateway;
/**
* @author kimchy (shay.banon)
*/
public class LocalGatewayModule extends AbstractModule implements PreProcessModule {
@Override protected void configure() {
bind(Gateway.class).to(LocalGateway.class).asEagerSingleton();
bind(TransportNodesListGatewayState.class).asEagerSingleton();
}
@Override public void processModule(Module module) {
if (module instanceof ShardAllocationModule) {
((ShardAllocationModule) module).addNodeAllocation(LocalGatewayNodeAllocation.class);
}
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.allocation.NodeAllocation;
import org.elasticsearch.cluster.routing.allocation.NodeAllocations;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
public class LocalGatewayNodeAllocation extends NodeAllocation {
private final TransportNodesListGatewayState listGatewayState;
@Inject public LocalGatewayNodeAllocation(Settings settings, TransportNodesListGatewayState listGatewayState) {
super(settings);
this.listGatewayState = listGatewayState;
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingNodes routingNodes, DiscoveryNodes nodes) {
boolean changed = false;
for (IndexRoutingTable indexRoutingTable : routingNodes.routingTable()) {
// only do the allocation if there is a local "INDEX NOT RECOVERED" block
if (!routingNodes.blocks().hasIndexBlock(indexRoutingTable.index(), LocalGateway.INDEX_NOT_RECOVERED_BLOCK)) {
continue;
}
if (indexRoutingTable.allPrimaryShardsUnassigned()) {
// all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(nodes.dataNodes().keySet());
nodesIds.addAll(nodes.masterNodes().keySet());
TransportNodesListGatewayState.NodesLocalGatewayState nodesState = listGatewayState.list(nodesIds, null).actionGet();
// make a list of ShardId to Node, each one from the latest version
Map<ShardId, Tuple<DiscoveryNode, Long>> shards = Maps.newHashMap();
for (TransportNodesListGatewayState.NodeLocalGatewayState nodeState : nodesState) {
for (Map.Entry<ShardId, Long> entry : nodeState.state().shards().entrySet()) {
if (entry.getKey().index().name().equals(indexRoutingTable.index())) {
Tuple<DiscoveryNode, Long> t = shards.get(entry.getKey());
if (t == null || entry.getValue() > t.v2().longValue()) {
t = new Tuple<DiscoveryNode, Long>(nodeState.node(), entry.getValue());
shards.put(entry.getKey(), t);
}
}
}
}
// check if we managed to allocate to all of them, if not, move all relevant shards to ignored
if (shards.size() < indexRoutingTable.shards().size()) {
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.index().equals(indexRoutingTable.index())) {
it.remove();
routingNodes.ignoredUnassigned().add(shardRouting);
}
}
} else {
changed = true;
// we found all nodes to allocate to, do the allocation
for (Iterator<MutableShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext();) {
MutableShardRouting shardRouting = it.next();
if (shardRouting.primary()) {
DiscoveryNode node = shards.get(shardRouting.shardId()).v1();
RoutingNode routingNode = routingNodes.node(node.id());
routingNode.add(shardRouting);
it.remove();
}
}
}
}
}
return changed;
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Map;
/**
* @author kimchy (shay.banon)
*/
public class LocalGatewayState {
public static class StartedShard {
private final long version;
private final ShardId shardId;
public StartedShard(long version, ShardId shardId) {
this.version = version;
this.shardId = shardId;
}
public long version() {
return version;
}
public ShardId shardId() {
return shardId;
}
}
private final long version;
private final MetaData metaData;
private final ImmutableMap<ShardId, Long> shards;
public LocalGatewayState(long version, MetaData metaData, Map<ShardId, Long> shards) {
this.version = version;
this.metaData = metaData;
this.shards = ImmutableMap.copyOf(shards);
}
public long version() {
return version;
}
public MetaData metaData() {
return metaData;
}
public ImmutableMap<ShardId, Long> shards() {
return this.shards;
}
public Long startedShardVersion(ShardId shardId) {
return shards.get(shardId);
}
public static Builder builder() {
return new Builder();
}
public static class Builder {
private long version;
private MetaData metaData;
private Map<ShardId, Long> shards = Maps.newHashMap();
public Builder state(LocalGatewayState state) {
this.version = state.version();
this.metaData = state.metaData();
this.shards.putAll(state.shards);
return this;
}
public Builder version(long version) {
this.version = version;
return this;
}
public Builder metaData(MetaData metaData) {
this.metaData = metaData;
return this;
}
public Builder remove(ShardId shardId) {
this.shards.remove(shardId);
return this;
}
public Builder put(ShardId shardId, long version) {
this.shards.put(shardId, version);
return this;
}
public LocalGatewayState build() {
return new LocalGatewayState(version, metaData, shards);
}
public static void toXContent(LocalGatewayState state, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject("state");
builder.field("version", state.version());
MetaData.Builder.toXContent(state.metaData(), builder, params);
builder.startArray("shards");
for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
builder.startObject();
builder.field("index", entry.getKey().index().name());
builder.field("id", entry.getKey().id());
builder.field("version", entry.getValue());
builder.endObject();
}
builder.endArray();
builder.endObject();
}
public static LocalGatewayState fromXContent(XContentParser parser, @Nullable Settings globalSettings) throws IOException {
Builder builder = new Builder();
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
if (token == null) {
// no data...
return builder.build();
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if ("meta-data".equals(currentFieldName)) {
builder.metaData = MetaData.Builder.fromXContent(parser, globalSettings);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if ("shards".equals(currentFieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
String shardIndex = null;
int shardId = -1;
long version = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if ("index".equals(currentFieldName)) {
shardIndex = parser.text();
} else if ("id".equals(currentFieldName)) {
shardId = parser.intValue();
} else if ("version".equals(currentFieldName)) {
version = parser.longValue();
}
}
}
builder.shards.put(new ShardId(shardIndex, shardId), version);
}
}
}
} else if (token.isValue()) {
if ("version".equals(currentFieldName)) {
builder.version = parser.longValue();
}
}
}
return builder.build();
}
public static LocalGatewayState readFrom(StreamInput in, @Nullable Settings globalSettings) throws IOException {
LocalGatewayState.Builder builder = new Builder();
builder.version = in.readLong();
builder.metaData = MetaData.Builder.readFrom(in, globalSettings);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
builder.shards.put(ShardId.readShardId(in), in.readLong());
}
return builder.build();
}
public static void writeTo(LocalGatewayState state, StreamOutput out) throws IOException {
out.writeLong(state.version());
MetaData.Builder.writeTo(state.metaData(), out);
out.writeVInt(state.shards.size());
for (Map.Entry<ShardId, Long> entry : state.shards.entrySet()) {
entry.getKey().writeTo(out);
out.writeLong(entry.getValue());
}
}
}
}

View File

@ -0,0 +1,220 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public class TransportNodesListGatewayState extends TransportNodesOperationAction<TransportNodesListGatewayState.Request, TransportNodesListGatewayState.NodesLocalGatewayState, TransportNodesListGatewayState.NodeRequest, TransportNodesListGatewayState.NodeLocalGatewayState> {
private LocalGateway gateway;
@Inject public TransportNodesListGatewayState(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService) {
super(settings, clusterName, threadPool, clusterService, transportService);
}
TransportNodesListGatewayState initGateway(LocalGateway gateway) {
this.gateway = gateway;
return this;
}
public ActionFuture<NodesLocalGatewayState> list(Set<String> nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(nodesIds).timeout(timeout));
}
@Override protected String transportAction() {
return "/gateway/local/state";
}
@Override protected String transportNodeAction() {
return "/gateway/local/state/node";
}
@Override protected Request newRequest() {
return new Request();
}
@Override protected NodeRequest newNodeRequest() {
return new NodeRequest();
}
@Override protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId);
}
@Override protected NodeLocalGatewayState newNodeResponse() {
return new NodeLocalGatewayState();
}
@Override protected NodesLocalGatewayState newResponse(Request request, AtomicReferenceArray responses) {
final List<NodeLocalGatewayState> nodesList = Lists.newArrayList();
final List<FailedNodeException> failures = Lists.newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodeLocalGatewayState) { // will also filter out null response for unallocated ones
nodesList.add((NodeLocalGatewayState) resp);
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
}
}
return new NodesLocalGatewayState(clusterName, nodesList.toArray(new NodeLocalGatewayState[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
}
@Override protected NodeLocalGatewayState nodeOperation(NodeRequest request) throws ElasticSearchException {
return new NodeLocalGatewayState(clusterService.state().nodes().localNode(), gateway.currentState());
}
@Override protected boolean accumulateExceptions() {
return true;
}
static class Request extends NodesOperationRequest {
public Request() {
}
public Request(Set<String> nodesIds) {
super(nodesIds.toArray(new String[nodesIds.size()]));
}
@Override public Request timeout(TimeValue timeout) {
super.timeout(timeout);
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodesLocalGatewayState extends NodesOperationResponse<NodeLocalGatewayState> {
private FailedNodeException[] failures;
NodesLocalGatewayState() {
}
public NodesLocalGatewayState(ClusterName clusterName, NodeLocalGatewayState[] nodes, FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
}
public FailedNodeException[] failures() {
return failures;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeLocalGatewayState[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NodeLocalGatewayState();
nodes[i].readFrom(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeLocalGatewayState response : nodes) {
response.writeTo(out);
}
}
}
static class NodeRequest extends NodeOperationRequest {
NodeRequest() {
}
NodeRequest(String nodeId) {
super(nodeId);
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
public static class NodeLocalGatewayState extends NodeOperationResponse {
private LocalGatewayState state;
NodeLocalGatewayState() {
}
public NodeLocalGatewayState(DiscoveryNode node, LocalGatewayState state) {
super(node);
this.state = state;
}
public LocalGatewayState state() {
return state;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
state = LocalGatewayState.Builder.readFrom(in, null);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (state == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
LocalGatewayState.Builder.writeTo(state, out);
}
}
}
}

View File

@ -69,17 +69,17 @@ public class ElectReplicaAsPrimaryDuringRelocationTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the replica shards");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(2));

View File

@ -72,7 +72,7 @@ public class FailedShardsRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the shards (primaries)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -90,7 +90,7 @@ public class FailedShardsRoutingTests {
}
logger.info("Start the shards (backups)");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -112,7 +112,7 @@ public class FailedShardsRoutingTests {
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
@ -123,11 +123,11 @@ public class FailedShardsRoutingTests {
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
logger.info("Fail the shards on node 3");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
@ -139,7 +139,7 @@ public class FailedShardsRoutingTests {
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
@ -150,11 +150,11 @@ public class FailedShardsRoutingTests {
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
logger.info("Start the shards on node 3");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
@ -227,7 +227,7 @@ public class FailedShardsRoutingTests {
}
logger.info("Start the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -250,12 +250,12 @@ public class FailedShardsRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Fail backup shards on node2");
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
List<MutableShardRouting> failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING);
routingTable = strategy.applyFailedShards(clusterState, failedShards);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
@ -272,7 +272,7 @@ public class FailedShardsRoutingTests {
// fail them again...
routingTable = strategy.applyFailedShards(clusterState, failedShards);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));

View File

@ -68,13 +68,13 @@ public class PrimaryElectionRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the backup shard (on node2)");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -84,7 +84,7 @@ public class PrimaryElectionRoutingTests {
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));

View File

@ -68,7 +68,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();

View File

@ -84,11 +84,11 @@ public class RebalanceAfterActiveTests {
}
logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@ -103,7 +103,7 @@ public class RebalanceAfterActiveTests {
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@ -112,33 +112,33 @@ public class RebalanceAfterActiveTests {
}
logger.info("start the replica shards, rebalancing should start");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
// we only allow one relocation at a time
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(5));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(5));
logger.info("complete relocation, other half of relocation should happen");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
// we now only relocate 3, since 2 remain where they are!
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(7));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(3));
logger.info("complete relocation, thats it!");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(10));
// make sure we have an even relocation

View File

@ -87,7 +87,7 @@ public class ReplicaAllocatedAfterPrimaryTests {
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), nullValue());
logger.info("Start all the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();

View File

@ -92,9 +92,10 @@ public class SingleShardNoReplicasRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).build();
routingTable = strategy.reroute(clusterState);
assertThat(routingTable == prevRoutingTable, equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Marking the shard as started");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -141,7 +142,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(routingTable == prevRoutingTable, equalTo(true));
logger.info("Start the shard on node 2");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -190,7 +191,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(routingTable.index("test").shard(0).shards().get(0).currentNodeId(), equalTo("node1"));
logger.info("Marking the shard as failed");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -255,7 +256,7 @@ public class SingleShardNoReplicasRoutingTests {
int nodeIndex = Integer.parseInt(nodeId.substring("node".length()));
assertThat(nodeIndex, lessThan(25));
}
RoutingNodes routingNodes = routingTable.routingNodes(metaData);
RoutingNodes routingNodes = clusterState.routingNodes();
Set<String> encounteredIndices = newHashSet();
for (RoutingNode routingNode : routingNodes) {
assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(0));
@ -348,7 +349,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), equalTo(INITIALIZING));
}
RoutingNodes routingNodes = routingTable.routingNodes(metaData);
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(numberOfIndices));
assertThat(routingNodes.node("node1").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4)));
assertThat(routingNodes.node("node2").numberOfShardsWithState(INITIALIZING), anyOf(equalTo(3), equalTo(4)));
@ -365,7 +366,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -377,7 +378,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED)));
}
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat("4 source shard routing are relocating", routingNodes.numberOfShardsOfType(RELOCATING), equalTo(4));
assertThat("4 target shard routing are initializing", routingNodes.numberOfShardsOfType(INITIALIZING), equalTo(4));
@ -394,7 +395,7 @@ public class SingleShardNoReplicasRoutingTests {
assertThat(routingTable.index("test" + i).shard(0).shards().size(), equalTo(1));
assertThat(routingTable.index("test" + i).shard(0).shards().get(0).state(), anyOf(equalTo(RELOCATING), equalTo(STARTED)));
}
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(routingNodes.numberOfShardsOfType(STARTED), equalTo(numberOfIndices));
for (RoutingNode routingNode : routingNodes) {
assertThat(routingNode.numberOfShardsWithState(STARTED), equalTo(2));

View File

@ -95,7 +95,7 @@ public class SingleShardOneReplicaRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -118,7 +118,7 @@ public class SingleShardOneReplicaRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();

View File

@ -99,7 +99,7 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the primary shard (on node1)");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -123,11 +123,11 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Start the backup shard");
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
@ -148,7 +148,7 @@ public class TenShardsOneReplicaRoutingTests {
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
@ -159,11 +159,11 @@ public class TenShardsOneReplicaRoutingTests {
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(6));
logger.info("Start the shards on node 3");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = routingTable.routingNodes(metaData);
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));

View File

@ -60,13 +60,13 @@ public class UpdateNumberOfReplicasTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the primary shards");
RoutingNodes routingNodes = routingTable.routingNodes(clusterState.metaData());
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start all the replica shards");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -82,7 +82,7 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node2"));
logger.info("add another replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(2).build();
metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(2).build();
@ -117,7 +117,7 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
@ -134,7 +134,7 @@ public class UpdateNumberOfReplicasTests {
assertThat(routingTable.index("test").shard(0).replicaShards().get(1).currentNodeId(), equalTo("node3"));
logger.info("now remove a replica");
routingNodes = routingTable.routingNodes(clusterState.metaData());
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = newRoutingTableBuilder().routingTable(routingTable).updateNumberOfReplicas(1).build();
metaData = MetaData.newMetaDataBuilder().metaData(clusterState.metaData()).updateNumberOfReplicas(1).build();