[GATEWAY] Cleanup LocalGatewayShardsState

This commit tries to cleanup LocalGatewayShardsState to be more efficient
and easier to understand.
This commit is contained in:
Simon Willnauer 2014-12-09 17:12:42 +01:00
parent 544ef8cb17
commit 59534391da
7 changed files with 260 additions and 126 deletions

View File

@ -53,6 +53,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final boolean allShardsStarted;
/**
* The initializing list, including ones that are initializing on a target node because of relocation.
@ -73,7 +74,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
ImmutableList.Builder<ShardRouting> activeShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> assignedShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> allInitializingShards = ImmutableList.builder();
boolean allShardsStarted = true;
for (ShardRouting shard : shards) {
if (shard.primary()) {
primary = shard;
@ -93,7 +94,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
if (shard.state() != ShardRoutingState.STARTED) {
allShardsStarted = false;
}
}
this.allShardsStarted = allShardsStarted;
this.primary = primary;
if (primary != null) {
@ -240,22 +245,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return this.assignedShards;
}
/**
* Returns the number of shards in a specific state
*
* @param state state of the shards to count
* @return number of shards in <code>state</code>
*/
public int countWithState(ShardRoutingState state) {
int count = 0;
for (ShardRouting shard : this) {
if (state == shard.state()) {
count++;
}
}
return count;
}
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(shards));
}
@ -268,18 +257,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, shuffler.shuffle(shards, seed));
}
public ShardIterator activeShardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards));
}
public ShardIterator activeShardsIt() {
return new PlainShardIterator(shardId, activeShards);
}
public ShardIterator activeShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
}
/**
* Returns an iterator over active and initializing shards. Making sure though that
* its random within the active shards, and initializing shards are the last to iterate through.
@ -302,18 +279,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator assignedShardsRandomIt() {
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards));
}
public ShardIterator assignedShardsIt() {
return new PlainShardIterator(shardId, assignedShards);
}
public ShardIterator assignedShardsIt(int seed) {
return new PlainShardIterator(shardId, shuffler.shuffle(assignedShards, seed));
}
/**
* Returns an iterator only on the primary shard.
*/
@ -382,6 +347,13 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
/**
* Returns <code>true</code> iff all shards in the routing table are started otherwise <code>false</code>
*/
public boolean allShardsStarted() {
return allShardsStarted;
}
static class AttributesKey {
final String[] attributes;

View File

@ -50,6 +50,8 @@ public class MutableShardRouting extends ImmutableShardRouting {
super(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version);
}
/**
* Assign this shard to a node.
*

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
@ -39,7 +40,6 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.*;
import java.nio.file.*;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -61,7 +61,9 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
public LocalGatewayShardsState(Settings settings, NodeEnvironment nodeEnv, TransportNodesListGatewayStartedShards listGatewayStartedShards) throws Exception {
super(settings);
this.nodeEnv = nodeEnv;
listGatewayStartedShards.initGateway(this);
if (listGatewayStartedShards != null) { // for testing
listGatewayStartedShards.initGateway(this);
}
if (DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
@ -81,80 +83,87 @@ public class LocalGatewayShardsState extends AbstractComponent implements Cluste
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
return;
}
final ClusterState state = event.state();
if (state.blocks().disableStatePersistence() == false
&& state.nodes().localNode().dataNode()
&& event.routingTableChanged()) {
// now, add all the ones that are active and on this node
RoutingNode routingNode = state.readOnlyRoutingNodes().node(state.nodes().localNodeId());
final Map<ShardId, ShardStateInfo> newState;
if (routingNode != null) {
newState = persistRoutingNodeState(routingNode);
} else {
newState = Maps.newHashMap();
}
if (!event.state().nodes().localNode().dataNode()) {
return;
}
if (!event.routingTableChanged()) {
return;
}
Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
newState.putAll(this.currentState);
// remove from the current state all the shards that are completely started somewhere, we won't need them anymore
// and if they are still here, we will add them in the next phase
// Also note, this works well when closing an index, since a closed index will have no routing shards entries
// so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed)
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) {
newState.remove(indexShardRoutingTable.shardId());
// preserve all shards that:
// * are not already in the new map AND
// * belong to an active index AND
// * used to be on this node but are not yet completely stated on any other node
// since these shards are NOT active on this node the won't need to be written above - we just preserve these
// in this map until they are fully started anywhere else or are re-assigned and we need to update the state
final RoutingTable indexRoutingTables = state.routingTable();
for (Map.Entry<ShardId, ShardStateInfo> entry : this.currentState.entrySet()) {
ShardId shardId = entry.getKey();
if (newState.containsKey(shardId) == false) { // this shard used to be here
String indexName = shardId.index().getName();
if (state.metaData().hasIndex(indexName)) { // it's index is not deleted
IndexRoutingTable index = indexRoutingTables.index(indexName);
if (index != null && index.shard(shardId.id()).allShardsStarted() == false) {
// not all shards are active on another node so we put it back until they are active
newState.put(shardId, entry.getValue());
}
}
}
}
this.currentState = newState;
}
// remove deleted indices from the started shards
for (ShardId shardId : currentState.keySet()) {
if (!event.state().metaData().hasIndex(shardId.index().name())) {
newState.remove(shardId);
}
}
// now, add all the ones that are active and on this node
RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId());
if (routingNode != null) {
// our node is not in play yet...
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
newState.put(shardRouting.shardId(), new ShardStateInfo(shardRouting.version(), shardRouting.primary()));
}
}
}
// go over the write started shards if needed
for (Iterator<Map.Entry<ShardId, ShardStateInfo>> it = newState.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<ShardId, ShardStateInfo> entry = it.next();
ShardId shardId = entry.getKey();
ShardStateInfo shardStateInfo = entry.getValue();
String writeReason = null;
ShardStateInfo currentShardStateInfo = currentState.get(shardId);
if (currentShardStateInfo == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (currentShardStateInfo.version != shardStateInfo.version) {
writeReason = "version changed from [" + currentShardStateInfo.version + "] to [" + shardStateInfo.version + "]";
}
// we update the write reason if we really need to write a new one...
if (writeReason == null) {
continue;
}
try {
writeShardState(writeReason, shardId, shardStateInfo, currentShardStateInfo);
} catch (Exception e) {
// we failed to write the shard state, remove it from our builder, we will try and write
// it next time...
it.remove();
}
}
this.currentState = newState;
}
Map<ShardId, ShardStateInfo> persistRoutingNodeState(RoutingNode routingNode) {
final Map<ShardId, ShardStateInfo> newState = Maps.newHashMap();
for (MutableShardRouting shardRouting : routingNode) {
if (shardRouting.active()) {
ShardId shardId = shardRouting.shardId();
ShardStateInfo shardStateInfo = new ShardStateInfo(shardRouting.version(), shardRouting.primary());
final ShardStateInfo previous = currentState.get(shardId);
if(maybeWriteShardState(shardId, shardStateInfo, previous) ) {
newState.put(shardId, shardStateInfo);
} else if (previous != null) {
currentState.put(shardId, previous);
}
}
}
return newState;
}
Map<ShardId, ShardStateInfo> getCurrentState() {
return currentState;
}
boolean maybeWriteShardState(ShardId shardId, ShardStateInfo shardStateInfo, ShardStateInfo previousState) {
final String writeReason;
if (previousState == null) {
writeReason = "freshly started, version [" + shardStateInfo.version + "]";
} else if (previousState.version < shardStateInfo.version) {
writeReason = "version changed from [" + previousState.version + "] to [" + shardStateInfo.version + "]";
} else {
logger.trace("skip writing shard state - has been written before shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]");
assert previousState.version <= shardStateInfo.version : "version should not go backwards for shardID: " + shardId + " previous version: [" + previousState.version + "] current version [" + shardStateInfo.version + "]";
return previousState.version == shardStateInfo.version;
}
try {
writeShardState(writeReason, shardId, shardStateInfo, previousState);
} catch (Exception e) {
logger.warn("failed to write shard state for shard " + shardId, e);
// we failed to write the shard state, we will try and write
// it next time...
}
return true;
}
private Map<ShardId, ShardStateInfo> loadShardsStateInfo() throws Exception {
Set<ShardId> shardIds = nodeEnv.findAllShardIds();
long highestVersion = -1;

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.Nullable;
/**
*/
public class ShardStateInfo {
public final class ShardStateInfo {
public final long version;
@ -35,4 +35,32 @@ public class ShardStateInfo {
this.version = version;
this.primary = primary;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardStateInfo that = (ShardStateInfo) o;
if (version != that.version) return false;
if (primary != null ? !primary.equals(that.primary) : that.primary != null) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (primary != null ? primary.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "ShardStateInfo{" +
"version=" + version +
", primary=" + primary +
'}';
}
}

View File

@ -283,18 +283,4 @@ public class NodeEnvironmentTests extends ElasticsearchTestCase {
}
env.close();
}
public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(ImmutableSettings.EMPTY);
}
public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Settings build = ImmutableSettings.builder()
.put(settings)
.put("path.home", newTempDirPath().toAbsolutePath())
.putArray("path.data", tmpPaths()).build();
return new NodeEnvironment(build, new Environment(build));
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway.local.state.shards;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class LocalGatewayShardStateTests extends ElasticsearchTestCase {
public void testWriteShardState() throws Exception {
LocalGatewayShardsState state = new LocalGatewayShardsState(ImmutableSettings.EMPTY, newNodeEnvironment(), null);
ShardId id = new ShardId("foo", 1);
long version = between(1, Integer.MAX_VALUE / 2);
boolean primary = randomBoolean();
ShardStateInfo state1 = new ShardStateInfo(version, primary);
state.maybeWriteShardState(id, state1, null);
ShardStateInfo shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state1);
ShardStateInfo state2 = new ShardStateInfo(version, primary);
state.maybeWriteShardState(id, state2, state1);
shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state1);
ShardStateInfo state3 = new ShardStateInfo(version+1, primary);
state.maybeWriteShardState(id, state3, state1);
shardStateInfo = state.loadShardInfo(id);
assertEquals(shardStateInfo, state3);
assertTrue(state.getCurrentState().isEmpty());
}
public void testPersistRoutingNode() throws Exception {
LocalGatewayShardsState state = new LocalGatewayShardsState(ImmutableSettings.EMPTY, newNodeEnvironment(), null);
int numShards = between(0, 100);
List<MutableShardRouting> shards = new ArrayList<>();
List<MutableShardRouting> active = new ArrayList<>();
for (int i = 0; i < numShards; i++) {
long version = between(1, Integer.MAX_VALUE / 2);
ShardRoutingState shardRoutingState = randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.RELOCATING, ShardRoutingState.STARTED);
MutableShardRouting mutableShardRouting = new MutableShardRouting("idx", i, "foo", randomBoolean(), shardRoutingState, version);
if (mutableShardRouting.active()) {
active.add(mutableShardRouting);
}
shards.add(mutableShardRouting);
}
RoutingNode node = new RoutingNode("foo", new DiscoveryNode("foo", null, Version.CURRENT), shards);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMap = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMap.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMap.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
assertTrue(state.getCurrentState().isEmpty());
state.getCurrentState().putAll(shardIdShardStateInfoMap);
if (randomBoolean()) { // sometimes write the same thing twice
shardIdShardStateInfoMap = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMap.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMap.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
}
List<MutableShardRouting> nextRoundOfShards = new ArrayList<>();
for (MutableShardRouting routing : shards) {
nextRoundOfShards.add(new MutableShardRouting(routing, routing.version() + 1));
}
node = new RoutingNode("foo", new DiscoveryNode("foo", null, Version.CURRENT), nextRoundOfShards);
Map<ShardId, ShardStateInfo> shardIdShardStateInfoMapNew = state.persistRoutingNodeState(node);
assertEquals(shardIdShardStateInfoMapNew.size(), active.size());
for (Map.Entry<ShardId, ShardStateInfo> written : shardIdShardStateInfoMapNew.entrySet()) {
ShardStateInfo shardStateInfo = state.loadShardInfo(written.getKey());
assertEquals(shardStateInfo, written.getValue());
ShardStateInfo oldStateInfo = shardIdShardStateInfoMap.get(written.getKey());
assertEquals(oldStateInfo.version, written.getValue().version - 1);
if (randomBoolean()) {
assertNull(state.loadShardInfo(new ShardId("no_such_index", written.getKey().id())));
}
}
}
}

View File

@ -583,4 +583,16 @@ public abstract class ElasticsearchTestCase extends AbstractRandomizedTest {
return absPaths;
}
public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(ImmutableSettings.EMPTY);
}
public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Settings build = ImmutableSettings.builder()
.put(settings)
.put("path.home", newTempDirPath().toAbsolutePath())
.putArray("path.data", tmpPaths()).build();
return new NodeEnvironment(build, new Environment(build));
}
}