add a version to routing table, so it can be reused when not changed after publishing

This commit is contained in:
kimchy 2011-06-09 22:51:40 +03:00
parent 1d9942847e
commit f87b9e3656
4 changed files with 93 additions and 45 deletions

View File

@ -48,13 +48,20 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
public static final RoutingTable EMPTY_ROUTING_TABLE = newRoutingTableBuilder().build(); public static final RoutingTable EMPTY_ROUTING_TABLE = newRoutingTableBuilder().build();
private final long version;
// index to IndexRoutingTable map // index to IndexRoutingTable map
private final ImmutableMap<String, IndexRoutingTable> indicesRouting; private final ImmutableMap<String, IndexRoutingTable> indicesRouting;
RoutingTable(Map<String, IndexRoutingTable> indicesRouting) { RoutingTable(long version, Map<String, IndexRoutingTable> indicesRouting) {
this.version = version;
this.indicesRouting = ImmutableMap.copyOf(indicesRouting); this.indicesRouting = ImmutableMap.copyOf(indicesRouting);
} }
public long version() {
return this.version;
}
@Override public UnmodifiableIterator<IndexRoutingTable> iterator() { @Override public UnmodifiableIterator<IndexRoutingTable> iterator() {
return indicesRouting.values().iterator(); return indicesRouting.values().iterator();
} }
@ -203,15 +210,56 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
public static class Builder { public static class Builder {
private long version;
private final Map<String, IndexRoutingTable> indicesRouting = newHashMap(); private final Map<String, IndexRoutingTable> indicesRouting = newHashMap();
public Builder routingTable(RoutingTable routingTable) { public Builder routingTable(RoutingTable routingTable) {
version = routingTable.version;
for (IndexRoutingTable indexRoutingTable : routingTable) { for (IndexRoutingTable indexRoutingTable : routingTable) {
indicesRouting.put(indexRoutingTable.index(), indexRoutingTable); indicesRouting.put(indexRoutingTable.index(), indexRoutingTable);
} }
return this; return this;
} }
public Builder updateNodes(RoutingNodes routingNodes) {
// this is being called without pre initializing the routing table, so we must copy over the version as well
this.version = routingNodes.routingTable().version();
Map<String, IndexRoutingTable.Builder> indexRoutingTableBuilders = newHashMap();
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting shardRoutingEntry : routingNode) {
// every relocating shard has a double entry, ignore the target one.
if (shardRoutingEntry.state() == ShardRoutingState.INITIALIZING && shardRoutingEntry.relocatingNodeId() != null)
continue;
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {
indexBuilder = new IndexRoutingTable.Builder(index);
indexRoutingTableBuilders.put(index, indexBuilder);
}
boolean allocatedPostApi = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id()).allocatedPostApi();
indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry), !allocatedPostApi);
}
}
for (MutableShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) {
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {
indexBuilder = new IndexRoutingTable.Builder(index);
indexRoutingTableBuilders.put(index, indexBuilder);
}
boolean allocatedPostApi = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id()).allocatedPostApi();
indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry), !allocatedPostApi);
}
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
add(indexBuilder);
}
return this;
}
public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) throws IndexMissingException { public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) throws IndexMissingException {
if (indices == null || indices.length == 0) { if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]); indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
@ -263,47 +311,18 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return this; return this;
} }
public Builder updateNodes(RoutingNodes routingNodes) { public Builder version(long version) {
Map<String, IndexRoutingTable.Builder> indexRoutingTableBuilders = newHashMap(); this.version = version;
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting shardRoutingEntry : routingNode) {
// every relocating shard has a double entry, ignore the target one.
if (shardRoutingEntry.state() == ShardRoutingState.INITIALIZING && shardRoutingEntry.relocatingNodeId() != null)
continue;
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {
indexBuilder = new IndexRoutingTable.Builder(index);
indexRoutingTableBuilders.put(index, indexBuilder);
}
boolean allocatedPostApi = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id()).allocatedPostApi();
indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry), !allocatedPostApi);
}
}
for (MutableShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) {
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {
indexBuilder = new IndexRoutingTable.Builder(index);
indexRoutingTableBuilders.put(index, indexBuilder);
}
boolean allocatedPostApi = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id()).allocatedPostApi();
indexBuilder.addShard(new ImmutableShardRouting(shardRoutingEntry), !allocatedPostApi);
}
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
add(indexBuilder);
}
return this; return this;
} }
public RoutingTable build() { public RoutingTable build() {
return new RoutingTable(indicesRouting); return new RoutingTable(version, indicesRouting);
} }
public static RoutingTable readFrom(StreamInput in) throws IOException { public static RoutingTable readFrom(StreamInput in) throws IOException {
Builder builder = new Builder(); Builder builder = new Builder();
builder.version = in.readLong();
int size = in.readVInt(); int size = in.readVInt();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
IndexRoutingTable index = IndexRoutingTable.Builder.readFrom(in); IndexRoutingTable index = IndexRoutingTable.Builder.readFrom(in);
@ -314,6 +333,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
} }
public static void writeTo(RoutingTable table, StreamOutput out) throws IOException { public static void writeTo(RoutingTable table, StreamOutput out) throws IOException {
out.writeLong(table.version);
out.writeVInt(table.indicesRouting.size()); out.writeVInt(table.indicesRouting.size());
for (IndexRoutingTable index : table.indicesRouting.values()) { for (IndexRoutingTable index : table.indicesRouting.values()) {
IndexRoutingTable.Builder.writeTo(index, out); IndexRoutingTable.Builder.writeTo(index, out);

View File

@ -21,11 +21,18 @@ package org.elasticsearch.cluster.service;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.operation.OperationRouting; import org.elasticsearch.cluster.routing.operation.OperationRouting;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -189,7 +196,11 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (previousClusterState != clusterState) { if (previousClusterState != clusterState) {
if (clusterState.nodes().localNodeMaster()) { if (clusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers // only the master controls the version numbers
clusterState = new ClusterState(clusterState.version() + 1, clusterState); Builder builder = ClusterState.builder().state(clusterState).version(clusterState.version() + 1);
if (previousClusterState.routingTable() != clusterState.routingTable()) {
builder.routingTable(RoutingTable.builder().routingTable(clusterState.routingTable()).version(clusterState.routingTable().version() + 1));
}
clusterState = builder.build();
} else { } else {
// we got this cluster state from the master, filter out based on versions (don't call listeners) // we got this cluster state from the master, filter out based on versions (don't call listeners)
if (clusterState.version() < previousClusterState.version()) { if (clusterState.version() < previousClusterState.version()) {

View File

@ -21,7 +21,11 @@ package org.elasticsearch.discovery.local;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -232,7 +236,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
if (nodeSpecificClusterState.nodes().localNode() != null) { if (nodeSpecificClusterState.nodes().localNode() != null) {
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() { discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
return nodeSpecificClusterState; ClusterState.Builder builder = ClusterState.builder().state(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
return builder.build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {

View File

@ -414,22 +414,29 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}); });
} }
void handleNewClusterStateFromMaster(final ClusterState clusterState) { void handleNewClusterStateFromMaster(final ClusterState newState) {
if (master) { if (master) {
logger.warn("master should not receive new cluster state from [{}]", clusterState.nodes().masterNode()); logger.warn("master should not receive new cluster state from [{}]", newState.nodes().masterNode());
} else { } else {
if (clusterState.nodes().localNode() == null) { if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", clusterState.nodes().masterNode()); logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
} else { } else {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + clusterState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() { clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) { @Override public ClusterState execute(ClusterState currentState) {
latestDiscoNodes = clusterState.nodes(); latestDiscoNodes = newState.nodes();
// check to see that we monitor the correct master of the cluster // check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) { if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster stare received and we monitor the wrong master [" + masterFD.masterNode() + "]"); masterFD.restart(latestDiscoNodes.masterNode(), "new cluster stare received and we monitor the wrong master [" + masterFD.masterNode() + "]");
} }
return clusterState;
ClusterState.Builder builder = ClusterState.builder().state(newState);
// if the routing table did not change, use the original one
if (newState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
return builder.build();
} }
@Override public void clusterStateProcessed(ClusterState clusterState) { @Override public void clusterStateProcessed(ClusterState clusterState) {