Remove and ban unmodifiableMap in cluster package

We're concerned that unmodifiableMap uses significantly more memory than
ImmutableMap did - especially in cluster state - so we ban it there outright
and move to ImmutableOpenMap.

Removes ClusterState$Builder#routingTable(RoutingTable$Builder) because that
method had the side effect of building the routing table which can only be
done once per RoutingTable$Builder now that it uses ImmutableOpenMap.
This commit is contained in:
Nik Everett 2015-10-01 19:31:31 +02:00
parent 7b74f0ddc9
commit a0288742e7
41 changed files with 676 additions and 427 deletions

View File

@ -314,6 +314,45 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>de.thetaphi</groupId>
<artifactId>forbiddenapis</artifactId>
<version>1.8</version>
<executions>
<execution>
<id>check-forbidden-apis-in-cluster</id>
<configuration>
<targetVersion>${maven.compiler.target}</targetVersion>
<!-- disallow undocumented classes like sun.misc.Unsafe: -->
<internalRuntimeForbidden>true</internalRuntimeForbidden>
<!-- if the used Java version is too new, don't fail, just do nothing: -->
<failOnUnsupportedJava>false</failOnUnsupportedJava>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->
<bundledSignature>jdk-unsafe</bundledSignature>
<bundledSignature>jdk-deprecated</bundledSignature>
<bundledSignature>jdk-system-out</bundledSignature>
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${elasticsearch.tools.directory}/forbidden/core-signatures.txt</signaturesFile>
<signaturesFile>${elasticsearch.tools.directory}/forbidden/all-signatures.txt</signaturesFile>
<signaturesFile>${elasticsearch.tools.directory}/forbidden/third-party-signatures.txt</signaturesFile>
<signaturesFile>${elasticsearch.tools.directory}/forbidden/cluster-signatures.txt</signaturesFile>
</signaturesFiles>
<signatures>${forbidden.signatures}</signatures>
<includes>
<include>org/elasticsearch/cluster/**/*.class</include>
</includes>
<suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
</configuration>
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
@ -357,5 +396,22 @@
</activation>
<!-- not including license-maven-plugin is sufficent to expose default license -->
</profile>
<profile>
<id>dev</id>
<build>
<plugins>
<plugin>
<groupId>de.thetaphi</groupId>
<artifactId>forbiddenapis</artifactId>
<executions>
<execution>
<id>check-forbidden-apis-in-cluster</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -19,7 +19,10 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -95,9 +98,9 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
Set<String> nodesIds = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
for (SnapshotsInProgress.ShardSnapshotStatus status : entry.shards().values()) {
if (status.nodeId() != null) {
nodesIds.add(status.nodeId());
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> status : entry.shards().values()) {
if (status.value.nodeId() != null) {
nodesIds.add(status.value.nodeId());
}
}
}
@ -151,15 +154,15 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
for (SnapshotsInProgress.Entry entry : currentSnapshots) {
currentSnapshotIds.add(entry.snapshotId());
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
for (ImmutableMap.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.getValue();
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardEntry : entry.shards()) {
SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.value;
if (status.nodeId() != null) {
// We should have information about this shard from the shard:
TransportNodesSnapshotsStatus.NodeSnapshotStatus nodeStatus = nodeSnapshotStatusMap.get(status.nodeId());
if (nodeStatus != null) {
Map<ShardId, SnapshotIndexShardStatus> shardStatues = nodeStatus.status().get(entry.snapshotId());
if (shardStatues != null) {
SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.getKey());
SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key);
if (shardStatus != null) {
// We have full information about this shard
shardStatusBuilder.add(shardStatus);
@ -169,7 +172,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
}
}
final SnapshotIndexShardStage stage;
switch (shardEntry.getValue().state()) {
switch (shardEntry.value.state()) {
case FAILED:
case ABORTED:
case MISSING:
@ -184,9 +187,9 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
stage = SnapshotIndexShardStage.DONE;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + shardEntry.getValue().state());
throw new IllegalArgumentException("Unknown snapshot state " + shardEntry.value.state());
}
SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.getKey(), stage);
SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
shardStatusBuilder.add(shardStatus);
}
builder.add(new SnapshotStatus(entry.snapshotId(), entry.state(), Collections.unmodifiableList(shardStatusBuilder)));

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.state;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
@ -89,7 +90,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
routingTableBuilder.add(currentState.routingTable().getIndicesRouting().get(filteredIndex));
}
}
builder.routingTable(routingTableBuilder);
builder.routingTable(routingTableBuilder.build());
} else {
builder.routingTable(currentState.routingTable());
}

View File

@ -20,9 +20,7 @@
package org.elasticsearch.cluster;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.common.collect.ImmutableOpenMap;
/**
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
@ -31,15 +29,14 @@ import java.util.Map;
* for the key used in the shardSizes map
*/
public class ClusterInfo {
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailableSpaceUsage;
final Map<String, Long> shardSizes;
private final ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage;
private final ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage;
final ImmutableOpenMap<String, Long> shardSizes;
public static final ClusterInfo EMPTY = new ClusterInfo();
private final Map<ShardRouting, String> routingToDataPath;
private final ImmutableOpenMap<ShardRouting, String> routingToDataPath;
protected ClusterInfo() {
this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP);
this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
}
/**
@ -51,7 +48,9 @@ public class ClusterInfo {
* @param routingToDataPath the shard routing to datapath mapping
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(final Map<String, DiskUsage> leastAvailableSpaceUsage, final Map<String, DiskUsage> mostAvailableSpaceUsage, final Map<String, Long> shardSizes, Map<ShardRouting, String> routingToDataPath) {
public ClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes,
ImmutableOpenMap<ShardRouting, String> routingToDataPath) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
@ -61,14 +60,14 @@ public class ClusterInfo {
/**
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
*/
public Map<String, DiskUsage> getNodeLeastAvailableDiskUsages() {
public ImmutableOpenMap<String, DiskUsage> getNodeLeastAvailableDiskUsages() {
return this.leastAvailableSpaceUsage;
}
/**
* Returns a node id to disk usage mapping for the path that has the most available space on the node.
*/
public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
public ImmutableOpenMap<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return this.mostAvailableSpaceUsage;
}

View File

@ -591,10 +591,6 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
return this;
}
public Builder routingTable(RoutingTable.Builder routingTable) {
return routingTable(routingTable.build());
}
public Builder routingResult(RoutingAllocation.Result routingResult) {
this.routingTable = routingResult.routingTable();
return this;

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -44,10 +45,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -70,10 +68,10 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
private volatile TimeValue updateFrequency;
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<ShardRouting, String> shardRoutingToDataPath;
private volatile Map<String, Long> shardSizes;
private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
private volatile ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
private volatile ImmutableOpenMap<String, Long> shardSizes;
private volatile boolean isMaster = false;
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
@ -89,10 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
ThreadPool threadPool) {
super(settings);
this.leastAvailableSpaceUsages = Collections.emptyMap();
this.mostAvailableSpaceUsages = Collections.emptyMap();
this.shardRoutingToDataPath = Collections.emptyMap();
this.shardSizes = Collections.emptyMap();
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
this.shardRoutingToDataPath = ImmutableOpenMap.of();
this.shardSizes = ImmutableOpenMap.of();
this.transportNodesStatsAction = transportNodesStatsAction;
this.transportIndicesStatsAction = transportIndicesStatsAction;
this.clusterService = clusterService;
@ -198,14 +196,14 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
logger.trace("Removing node from cluster info: {}", removedNode.getId());
}
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
Map<String, DiskUsage> newMaxUsages = new HashMap<>(leastAvailableSpaceUsages);
ImmutableOpenMap.Builder<String, DiskUsage> newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages);
newMaxUsages.remove(removedNode.getId());
leastAvailableSpaceUsages = Collections.unmodifiableMap(newMaxUsages);
leastAvailableSpaceUsages = newMaxUsages.build();
}
if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) {
Map<String, DiskUsage> newMinUsages = new HashMap<>(mostAvailableSpaceUsages);
ImmutableOpenMap.Builder<String, DiskUsage> newMinUsages = ImmutableOpenMap.builder(mostAvailableSpaceUsages);
newMinUsages.remove(removedNode.getId());
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMinUsages);
mostAvailableSpaceUsages = newMinUsages.build();
}
}
}
@ -309,11 +307,11 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();
Map<String, DiskUsage> newMostAvaiableUsages = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = Collections.unmodifiableMap(newLeastAvaiableUsages);
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMostAvaiableUsages);
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
}
@Override
@ -329,8 +327,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
leastAvailableSpaceUsages = Collections.emptyMap();
mostAvailableSpaceUsages = Collections.emptyMap();
leastAvailableSpaceUsages = ImmutableOpenMap.of();
mostAvailableSpaceUsages = ImmutableOpenMap.of();
}
}
});
@ -339,11 +337,11 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
final HashMap<String, Long> newShardSizes = new HashMap<>();
final HashMap<ShardRouting, String> newShardRoutingToDataPath = new HashMap<>();
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
shardSizes = Collections.unmodifiableMap(newShardSizes);
shardRoutingToDataPath = Collections.unmodifiableMap(newShardRoutingToDataPath);
shardSizes = newShardSizes.build();
shardRoutingToDataPath = newShardRoutingToDataPath.build();
}
@Override
@ -359,8 +357,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
shardSizes = Collections.emptyMap();
shardRoutingToDataPath = Collections.emptyMap();
shardSizes = ImmutableOpenMap.of();
shardRoutingToDataPath = ImmutableOpenMap.of();
}
}
});
@ -389,7 +387,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
return clusterInfo;
}
static void buildShardLevelInfo(ESLogger logger, ShardStats[] stats, HashMap<String, Long> newShardSizes, HashMap<ShardRouting, String> newShardRoutingToDataPath) {
static void buildShardLevelInfo(ESLogger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
@ -401,7 +400,9 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu
}
}
static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map<String, DiskUsage> newLeastAvaiableUsages, Map<String, DiskUsage> newMostAvaiableUsages) {
static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray,
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages,
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages) {
for (NodeStats nodeStats : nodeStatsArray) {
if (nodeStats.getFs() == null) {
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());

View File

@ -19,9 +19,13 @@
package org.elasticsearch.cluster;
import com.google.common.collect.ImmutableMap;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@ -32,14 +36,11 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
/**
* Meta data about snapshots that are currently executing
*/
@ -69,31 +70,31 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private final State state;
private final SnapshotId snapshotId;
private final boolean includeGlobalState;
private final Map<ShardId, ShardSnapshotStatus> shards;
private final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
private final List<String> indices;
private final Map<String, List<ShardId>> waitingIndices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List<String> indices, long startTime, Map<ShardId, ShardSnapshotStatus> shards) {
public Entry(SnapshotId snapshotId, boolean includeGlobalState, State state, List<String> indices, long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshotId = snapshotId;
this.includeGlobalState = includeGlobalState;
this.indices = indices;
this.startTime = startTime;
if (shards == null) {
this.shards = ImmutableMap.of();
this.waitingIndices = ImmutableMap.of();
this.shards = ImmutableOpenMap.of();
this.waitingIndices = ImmutableOpenMap.of();
} else {
this.shards = unmodifiableMap(shards);
this.shards = shards;
this.waitingIndices = findWaitingIndices(shards);
}
}
public Entry(Entry entry, State state, Map<ShardId, ShardSnapshotStatus> shards) {
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshotId, entry.includeGlobalState, state, entry.indices, entry.startTime, shards);
}
public Entry(Entry entry, Map<ShardId, ShardSnapshotStatus> shards) {
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
}
@ -101,7 +102,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return this.snapshotId;
}
public Map<ShardId, ShardSnapshotStatus> shards() {
public ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards() {
return this.shards;
}
@ -113,7 +114,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return indices;
}
public Map<String, List<ShardId>> waitingIndices() {
public ImmutableOpenMap<String, List<ShardId>> waitingIndices() {
return waitingIndices;
}
@ -155,28 +156,26 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return result;
}
private ImmutableMap<String, List<ShardId>> findWaitingIndices(Map<ShardId, ShardSnapshotStatus> shards) {
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == State.WAITING) {
List<ShardId> waitingShards = waitingIndicesMap.get(entry.getKey().getIndex());
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.WAITING) {
List<ShardId> waitingShards = waitingIndicesMap.get(entry.key.getIndex());
if (waitingShards == null) {
waitingShards = new ArrayList<>();
waitingIndicesMap.put(entry.getKey().getIndex(), waitingShards);
waitingIndicesMap.put(entry.key.getIndex(), waitingShards);
}
waitingShards.add(entry.getKey());
waitingShards.add(entry.key);
}
}
if (!waitingIndicesMap.isEmpty()) {
ImmutableMap.Builder<String, List<ShardId>> waitingIndicesBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<ShardId>> entry : waitingIndicesMap.entrySet()) {
waitingIndicesBuilder.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
}
return waitingIndicesBuilder.build();
} else {
return ImmutableMap.of();
if (waitingIndicesMap.isEmpty()) {
return ImmutableOpenMap.of();
}
ImmutableOpenMap.Builder<String, List<ShardId>> waitingIndicesBuilder = ImmutableOpenMap.builder();
for (Map.Entry<String, List<ShardId>> entry : waitingIndicesMap.entrySet()) {
waitingIndicesBuilder.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
}
return waitingIndicesBuilder.build();
}
}
@ -187,9 +186,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
* @param shards list of shard statuses
* @return true if all shards have completed (either successfully or failed), false otherwise
*/
public static boolean completed(Collection<ShardSnapshotStatus> shards) {
for (ShardSnapshotStatus status : shards) {
if (status.state().completed() == false) {
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
if (status.value.state().completed() == false) {
return false;
}
}
@ -369,7 +368,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
indexBuilder.add(in.readString());
}
long startTime = in.readLong();
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableMap.builder();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
@ -395,10 +394,10 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards().entrySet()) {
shardEntry.getKey().writeTo(out);
out.writeOptionalString(shardEntry.getValue().nodeId());
out.writeByte(shardEntry.getValue().state().value());
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards()) {
shardEntry.key.writeTo(out);
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
}
}
}
@ -444,9 +443,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
builder.timeValueField(Fields.START_TIME_MILLIS, Fields.START_TIME, entry.startTime());
builder.startArray(Fields.SHARDS);
{
for (Map.Entry<ShardId, ShardSnapshotStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardSnapshotStatus status = shardEntry.getValue();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {
ShardId shardId = shardEntry.key;
ShardSnapshotStatus status = shardEntry.value;
builder.startObject();
{
builder.field(Fields.INDEX, shardId.getIndex());

View File

@ -21,7 +21,7 @@ package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import java.nio.charset.StandardCharsets;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -76,6 +76,7 @@ import org.joda.time.DateTimeZone;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@ -459,7 +460,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
if (request.state() == State.OPEN) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.index()));
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build());
updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build();
}
removalReason = "cleaning up after validating index on master";

View File

@ -128,7 +128,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
.build();
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).routingTable(routingTableBuilder).metaData(newMetaData).build());
ClusterState.builder(currentState).routingTable(routingTableBuilder.build()).metaData(newMetaData).build());
ClusterBlocks blocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(request.index).build();

View File

@ -124,7 +124,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
rtBuilder.remove(index);
}
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build());
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}
@ -181,7 +181,7 @@ public class MetaDataIndexStateService extends AbstractComponent {
rtBuilder.addAsFromCloseToOpen(updatedState.metaData().index(index));
}
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build());
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -320,7 +320,7 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
}
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder).blocks(blocks).build();
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaDataBuilder).routingTable(routingTableBuilder.build()).blocks(blocks).build();
// now, reroute in case things change that require it (like number of replicas)
RoutingAllocation.Result routingResult = allocationService.reroute(updatedState);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -30,7 +31,15 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
/**
@ -82,8 +91,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
// fill in the inverse of node -> shards allocated
// also fill replicaSet information
for (IndexRoutingTable indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable) {
for (ObjectCursor<IndexRoutingTable> indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable.value) {
for (ShardRouting shard : indexShard) {
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified

View File

@ -20,6 +20,8 @@
package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.Diff;
@ -27,6 +29,7 @@ import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.iterable.Iterables;
@ -41,8 +44,6 @@ import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import static java.util.Collections.unmodifiableMap;
/**
* Represents a global cluster-wide routing table for all indices including the
* version of the current routing state.
@ -58,11 +59,11 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
private final long version;
// index to IndexRoutingTable map
private final Map<String, IndexRoutingTable> indicesRouting;
private final ImmutableOpenMap<String, IndexRoutingTable> indicesRouting;
RoutingTable(long version, Map<String, IndexRoutingTable> indicesRouting) {
RoutingTable(long version, ImmutableOpenMap<String, IndexRoutingTable> indicesRouting) {
this.version = version;
this.indicesRouting = unmodifiableMap(indicesRouting);
this.indicesRouting = indicesRouting;
}
/**
@ -76,7 +77,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
@Override
public Iterator<IndexRoutingTable> iterator() {
return indicesRouting.values().iterator();
return indicesRouting.valuesIt();
}
public boolean hasIndex(String index) {
@ -87,11 +88,11 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return indicesRouting.get(index);
}
public Map<String, IndexRoutingTable> indicesRouting() {
public ImmutableOpenMap<String, IndexRoutingTable> indicesRouting() {
return indicesRouting;
}
public Map<String, IndexRoutingTable> getIndicesRouting() {
public ImmutableOpenMap<String, IndexRoutingTable> getIndicesRouting() {
return indicesRouting();
}
@ -126,7 +127,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
*/
public List<ShardRouting> allShards() {
List<ShardRouting> shards = new ArrayList<>();
String[] indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
String[] indices = indicesRouting.keys().toArray(String.class);
for (String index : indices) {
List<ShardRouting> allShardsIndex = allShards(index);
shards.addAll(allShardsIndex);
@ -303,8 +304,8 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeVInt(indicesRouting.size());
for (IndexRoutingTable index : indicesRouting.values()) {
index.writeTo(out);
for (ObjectCursor<IndexRoutingTable> index : indicesRouting.values()) {
index.value.writeTo(out);
}
}
@ -312,7 +313,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
private final long version;
private final Diff<Map<String, IndexRoutingTable>> indicesRouting;
private final Diff<ImmutableOpenMap<String, IndexRoutingTable>> indicesRouting;
public RoutingTableDiff(RoutingTable before, RoutingTable after) {
version = after.version;
@ -321,7 +322,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
public RoutingTableDiff(StreamInput in) throws IOException {
version = in.readLong();
indicesRouting = DiffableUtils.readJdkMapDiff(in, IndexRoutingTable.PROTO);
indicesRouting = DiffableUtils.readImmutableOpenMapDiff(in, IndexRoutingTable.PROTO);
}
@Override
@ -344,10 +345,13 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return new Builder(routingTable);
}
/**
* Builder for the routing table. Note that build can only be called one time.
*/
public static class Builder {
private long version;
private final Map<String, IndexRoutingTable> indicesRouting = new HashMap<>();
private final ImmutableOpenMap.Builder<String, IndexRoutingTable> indicesRouting = ImmutableOpenMap.builder();
public Builder() {
@ -404,7 +408,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) {
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
indices = indicesRouting.keys().toArray(String.class);
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = indicesRouting.get(index);
@ -514,12 +518,17 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return this;
}
/**
* Builds the routing table. Note that this can only be called one time.
* If you need to build a new RoutingTable as a copy of this one you'll
* need to build a new RoutingTable.Builder.
*/
public RoutingTable build() {
// normalize the versions right before we build it...
for (IndexRoutingTable indexRoutingTable : indicesRouting.values()) {
indicesRouting.put(indexRoutingTable.index(), indexRoutingTable.normalizeVersions());
for (ObjectCursor<IndexRoutingTable> indexRoutingTable : indicesRouting.values()) {
indicesRouting.put(indexRoutingTable.value.index(), indexRoutingTable.value.normalizeVersions());
}
return new RoutingTable(version, indicesRouting);
return new RoutingTable(version, indicesRouting.build());
}
public static RoutingTable readFrom(StreamInput in) throws IOException {
@ -529,8 +538,8 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
public String prettyPrint() {
StringBuilder sb = new StringBuilder("routing_table (version ").append(version).append("):\n");
for (Map.Entry<String, IndexRoutingTable> entry : indicesRouting.entrySet()) {
sb.append(entry.getValue().prettyPrint()).append('\n');
for (ObjectObjectCursor<String, IndexRoutingTable> entry : indicesRouting) {
sb.append(entry.value.prettyPrint()).append('\n');
}
return sb.toString();
}

View File

@ -21,11 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.allocator;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -40,10 +39,20 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.common.util.CollectionUtils.addAll;
/**
* The {@link BalancedShardsAllocator} re-balances the nodes allocations
@ -284,7 +293,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Start distributing Shards");
}
indices.addAll(allocation.routingTable().indicesRouting().keySet());
addAll(indices, allocation.routingTable().indicesRouting().keys());
buildModelFromAssigned(routing.shards(assignedFilter));
return allocateUnassigned(unassigned);
}
@ -428,7 +437,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
deltas[i] = sorter.delta();
}
new IntroSorter() {
float pivotWeight;
@Override
@ -554,10 +563,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return false;
}
boolean changed = false;
/*
* TODO: We could be smarter here and group the shards by index and then
* use the sorter to save some iterations.
* use the sorter to save some iterations.
*/
final AllocationDeciders deciders = allocation.deciders();
final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation);
@ -768,7 +777,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
if (candidate != null) {
/* allocate on the model even if not throttled */
maxNode.removeShard(candidate);
minNode.addShard(candidate, decision);

View File

@ -19,6 +19,10 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
@ -30,6 +34,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -38,7 +43,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.util.Map;
import java.util.Set;
/**
@ -164,23 +168,23 @@ public class DiskThresholdDecider extends AllocationDecider {
@Override
public void onNewInfo(ClusterInfo info) {
Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages != null) {
boolean reroute = false;
String explanation = "";
// Garbage collect nodes that have been removed from the cluster
// from the map that tracks watermark crossing
Set<String> nodes = usages.keySet();
ObjectLookupContainer<String> nodes = usages.keys();
for (String node : nodeHasPassedWatermark) {
if (nodes.contains(node) == false) {
nodeHasPassedWatermark.remove(node);
}
}
for (Map.Entry<String, DiskUsage> entry : usages.entrySet()) {
String node = entry.getKey();
DiskUsage usage = entry.getValue();
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
@ -336,7 +340,7 @@ public class DiskThresholdDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
ImmutableOpenMap<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
return decision;
@ -451,7 +455,7 @@ public class DiskThresholdDecider extends AllocationDecider {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}
final ClusterInfo clusterInfo = allocation.clusterInfo();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final ImmutableOpenMap<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
final Decision decision = earlyTerminate(allocation, usages);
if (decision != null) {
return decision;
@ -488,7 +492,7 @@ public class DiskThresholdDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
}
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, Map<String, DiskUsage> usages) {
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
ClusterInfo clusterInfo = allocation.clusterInfo();
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
@ -521,15 +525,15 @@ public class DiskThresholdDecider extends AllocationDecider {
* @param usages Map of nodeId to DiskUsage for all known nodes
* @return DiskUsage representing given node using the average disk usage
*/
public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
public DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap<String, DiskUsage> usages) {
if (usages.size() == 0) {
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", 0, 0);
}
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
for (ObjectCursor<DiskUsage> du : usages.values()) {
totalBytes += du.value.getTotalBytes();
freeBytes += du.value.getFreeBytes();
}
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
}
@ -592,7 +596,7 @@ public class DiskThresholdDecider extends AllocationDecider {
}
}
private Decision earlyTerminate(RoutingAllocation allocation, final Map<String, DiskUsage> usages) {
private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (!enabled) {
return allocation.decision(Decision.YES, NAME, "disk threshold decider disabled");

View File

@ -20,8 +20,16 @@
package org.elasticsearch.cluster.service;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
@ -41,7 +49,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.*;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
@ -49,8 +63,18 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -413,7 +437,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
// only the master controls the version numbers
Builder builder = ClusterState.builder(newClusterState).incrementVersion();
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());
}
if (previousClusterState.metaData() != newClusterState.metaData()) {
builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));

View File

@ -19,23 +19,21 @@
package org.elasticsearch.cluster.settings;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.regex.Regex;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
/**
* A container for setting names and validation methods for those settings.
*/
public class DynamicSettings {
private final Map<String, Validator> dynamicSettings;
private final ImmutableOpenMap<String, Validator> dynamicSettings;
public static class Builder {
private Map<String, Validator> settings = new HashMap<>();
private ImmutableOpenMap.Builder<String, Validator> settings = ImmutableOpenMap.builder();
public void addSetting(String setting, Validator validator) {
Validator old = settings.put(setting, validator);
@ -45,12 +43,12 @@ public class DynamicSettings {
}
public DynamicSettings build() {
return new DynamicSettings(settings);
return new DynamicSettings(settings.build());
}
}
private DynamicSettings(Map<String, Validator> settings) {
this.dynamicSettings = Collections.unmodifiableMap(settings);
private DynamicSettings(ImmutableOpenMap<String, Validator> settings) {
this.dynamicSettings = settings;
}
public boolean isDynamicOrLoggingSetting(String key) {
@ -58,8 +56,8 @@ public class DynamicSettings {
}
public boolean hasDynamicSetting(String key) {
for (String dynamicSetting : dynamicSettings.keySet()) {
if (Regex.simpleMatch(dynamicSetting, key)) {
for (ObjectCursor<String> dynamicSetting : dynamicSettings.keys()) {
if (Regex.simpleMatch(dynamicSetting.value, key)) {
return true;
}
}
@ -67,9 +65,9 @@ public class DynamicSettings {
}
public String validateDynamicSetting(String dynamicSetting, String value, ClusterState clusterState) {
for (Map.Entry<String, Validator> setting : dynamicSettings.entrySet()) {
if (Regex.simpleMatch(setting.getKey(), dynamicSetting)) {
return setting.getValue().validate(dynamicSetting, value, clusterState);
for (ObjectObjectCursor<String, Validator> setting : dynamicSettings) {
if (Regex.simpleMatch(setting.key, dynamicSetting)) {
return setting.value.validate(dynamicSetting, value, clusterState);
}
}
return null;

View File

@ -23,10 +23,26 @@ import com.carrotsearch.hppc.DoubleArrayList;
import com.carrotsearch.hppc.FloatArrayList;
import com.carrotsearch.hppc.LongArrayList;
import com.carrotsearch.hppc.ObjectArrayList;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.*;
import java.util.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefArray;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.apache.lucene.util.IntroSorter;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.RandomAccess;
/** Collections-related utility methods. */
public enum CollectionUtils {
@ -458,4 +474,9 @@ public enum CollectionUtils {
return result;
}
public static <E> void addAll(Collection<E> collection, Iterable<ObjectCursor<E>> iterable) {
for (ObjectCursor<E> c: iterable) {
collection.add(c.value);
}
}
}

View File

@ -20,7 +20,12 @@
package org.elasticsearch.gateway;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -253,7 +258,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
routingTableBuilder.version(0);
// now, reroute
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -37,7 +37,13 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
@ -158,10 +164,11 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
}
logger.info("auto importing dangled indices {} from [{}]", sb.toString(), request.fromNode);
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks).routingTable(routingTableBuilder).build();
RoutingTable routingTable = routingTableBuilder.build();
ClusterState updatedState = ClusterState.builder(currentState).metaData(metaData).blocks(blocks).routingTable(routingTable).build();
// now, reroute
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTable).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -325,8 +325,9 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
shards.size(), shards.size() - failedShards(shards));
}
ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rtBuilder).build();
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rtBuilder).build());
RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metaData(mdBuilder).blocks(blocks).routingTable(rt).build();
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(rt).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -19,7 +19,9 @@
package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -30,6 +32,7 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -210,12 +213,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
if (entry.state() == SnapshotsInProgress.State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.getValue().nodeId())) {
if (shard.getValue().state() == SnapshotsInProgress.State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.getKey()))) {
logger.trace("[{}] - Adding shard to the queue", shard.getKey());
startedShards.put(shard.getKey(), new IndexShardSnapshotStatus());
if (localNodeId.equals(shard.value.nodeId())) {
if (shard.value.state() == SnapshotsInProgress.State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) {
logger.trace("[{}] - Adding shard to the queue", shard.key);
startedShards.put(shard.key, new IndexShardSnapshotStatus());
}
}
}
@ -238,8 +241,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
if (snapshotShards != null) {
for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key);
if (snapshotStatus != null) {
switch (snapshotStatus.stage()) {
case INIT:
@ -247,16 +250,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
snapshotStatus.abort();
break;
case FINALIZE:
logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish", entry.snapshotId(), shard.getKey());
logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish", entry.snapshotId(), shard.key);
break;
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.getKey(),
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshotId(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.SUCCESS));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.getKey());
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.getKey(),
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshotId(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshotId(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().localNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
break;
default:
@ -368,7 +371,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
if (snapshot.state() == SnapshotsInProgress.State.STARTED || snapshot.state() == SnapshotsInProgress.State.ABORTED) {
Map<ShardId, IndexShardSnapshotStatus> localShards = currentSnapshotShards(snapshot.snapshotId());
if (localShards != null) {
Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> masterShards = snapshot.shards();
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> masterShards = snapshot.shards();
for(Map.Entry<ShardId, IndexShardSnapshotStatus> localShard : localShards.entrySet()) {
ShardId shardId = localShard.getKey();
IndexShardSnapshotStatus localShardStatus = localShard.getValue();
@ -518,7 +521,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
final Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = new HashMap<>();
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (int i = 0; i < batchSize; i++) {
@ -538,11 +541,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent<SnapshotSh
if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, ImmutableMap.copyOf(shards)));
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, SnapshotsInProgress.State.SUCCESS, ImmutableMap.copyOf(shards));
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, SnapshotsInProgress.State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);

View File

@ -19,15 +19,27 @@
package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -35,6 +47,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -50,7 +63,13 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
@ -226,7 +245,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
validate(new SnapshotId(request.repository(), request.name()));
}
private static void validate(SnapshotId snapshotId) {
String name = snapshotId.getSnapshot();
if (!Strings.hasLength(name)) {
@ -297,7 +316,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshotId().equals(snapshot.snapshotId())) {
// Replace the snapshot that was just created
ImmutableMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
@ -525,23 +544,23 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
boolean snapshotChanged = false;
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards().entrySet()) {
ShardSnapshotStatus shardStatus = shardEntry.getValue();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
ShardSnapshotStatus shardStatus = shardEntry.value;
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
if (nodes.nodeExists(shardStatus.nodeId())) {
shards.put(shardEntry);
shards.put(shardEntry.key, shardEntry.value);
} else {
// TODO: Restart snapshot on another node?
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.getKey(), shardStatus.nodeId());
shards.put(shardEntry.getKey(), new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId());
shards.put(shardEntry.key, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
}
}
}
if (snapshotChanged) {
changed = true;
ImmutableMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
if (!snapshot.state().completed() && completed(shardsMap.values())) {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap);
endSnapshot(updatedSnapshot);
@ -596,7 +615,7 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
if (snapshot.state() == State.STARTED) {
Map<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(), routingTable);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(), routingTable);
if (shards != null) {
changed = true;
if (!snapshot.state().completed() && completed(shards.values())) {
@ -625,13 +644,14 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
}
}
private Map<ShardId, ShardSnapshotStatus> processWaitingShards(Map<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable) {
private ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShards(
ImmutableOpenMap<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable) {
boolean snapshotChanged = false;
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards.entrySet()) {
ShardSnapshotStatus shardStatus = shardEntry.getValue();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards) {
ShardSnapshotStatus shardStatus = shardEntry.value;
ShardId shardId = shardEntry.key;
if (shardStatus.state() == State.WAITING) {
ShardId shardId = shardEntry.getKey();
IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
if (indexShardRoutingTable != null) {
IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id());
@ -639,22 +659,22 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (shardRouting.primaryShard().started()) {
// Shard that we were waiting for has started on a node, let's process it
snapshotChanged = true;
logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardEntry.getKey(), shardStatus.nodeId());
shards.put(shardEntry.getKey(), new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId()));
logger.trace("starting shard that we were waiting for [{}] on node [{}]", shardId, shardStatus.nodeId());
shards.put(shardId, new ShardSnapshotStatus(shardRouting.primaryShard().currentNodeId()));
continue;
} else if (shardRouting.primaryShard().initializing() || shardRouting.primaryShard().relocating()) {
// Shard that we were waiting for hasn't started yet or still relocating - will continue to wait
shards.put(shardEntry);
shards.put(shardId, shardStatus);
continue;
}
}
}
// Shard that we were waiting for went into unassigned state or disappeared - giving up
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardEntry.getKey(), shardStatus.nodeId());
shards.put(shardEntry.getKey(), new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
} else {
shards.put(shardEntry);
shards.put(shardId, shardStatus);
}
}
if (snapshotChanged) {
@ -669,10 +689,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
if (curr != null) {
for (SnapshotsInProgress.Entry entry : curr.entries()) {
if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) {
for (String index : entry.waitingIndices().keySet()) {
if (event.indexRoutingTableChanged(index)) {
IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index);
for (ShardId shardId : entry.waitingIndices().get(index)) {
for (ObjectCursor<String> index : entry.waitingIndices().keys()) {
if (event.indexRoutingTableChanged(index.value)) {
IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value);
for (ShardId shardId : entry.waitingIndices().get(index.value)) {
ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard();
if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) {
return true;
@ -699,8 +719,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
return true;
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
for (ShardSnapshotStatus shardStatus : snapshot.shards().values()) {
if (!shardStatus.state().completed() && node.getId().equals(shardStatus.nodeId())) {
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshot.shards().values()) {
if (!shardStatus.value.state().completed() && node.getId().equals(shardStatus.value.nodeId())) {
// At least one shard was running on the removed node - we need to fail it
return true;
}
@ -716,15 +736,15 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param shards list of shard statuses
* @return list of failed and closed indices
*/
private Tuple<Set<String>, Set<String>> indicesWithMissingShards(Map<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards, MetaData metaData) {
private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards, MetaData metaData) {
Set<String> missing = new HashSet<>();
Set<String> closed = new HashSet<>();
for (Map.Entry<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards.entrySet()) {
if (entry.getValue().state() == State.MISSING) {
if (metaData.hasIndex(entry.getKey().getIndex()) && metaData.index(entry.getKey().getIndex()).getState() == IndexMetaData.State.CLOSE) {
closed.add(entry.getKey().getIndex());
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.MISSING) {
if (metaData.hasIndex(entry.key.getIndex()) && metaData.index(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
closed.add(entry.key.getIndex());
} else {
missing.add(entry.getKey().getIndex());
missing.add(entry.key.getIndex());
}
}
}
@ -761,9 +781,9 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshotId, entry.state(), failure);
ArrayList<ShardSearchFailure> failures = new ArrayList<>();
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
for (Map.Entry<ShardId, ShardSnapshotStatus> shardStatus : entry.shards().entrySet()) {
ShardId shardId = shardStatus.getKey();
ShardSnapshotStatus status = shardStatus.getValue();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
ShardId shardId = shardStatus.key;
ShardSnapshotStatus status = shardStatus.value;
if (status.state().failed()) {
failures.add(new ShardSearchFailure(status.reason(), new SearchShardTarget(status.nodeId(), shardId.getIndex(), shardId.id())));
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId.getIndex(), shardId.id(), status.reason()));
@ -864,16 +884,16 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
Map<ShardId, ShardSnapshotStatus> shards;
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
if (snapshot.state() == State.STARTED && snapshot.shards() != null) {
// snapshot is currently running - stop started shards
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableMap.builder();
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards().entrySet()) {
ShardSnapshotStatus status = shardEntry.getValue();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (!status.state().completed()) {
shardsBuilder.put(shardEntry.getKey(), new ShardSnapshotStatus(status.nodeId(), State.ABORTED));
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED));
} else {
shardsBuilder.put(shardEntry.getKey(), status);
shardsBuilder.put(shardEntry.key, status);
}
}
shards = shardsBuilder.build();
@ -884,9 +904,10 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
for (ShardSnapshotStatus shardStatus : snapshot.shards().values()) {
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshot.shards().values()) {
// Check if we still have shard running on existing nodes
if (shardStatus.state().completed() == false && shardStatus.nodeId() != null && currentState.nodes().get(shardStatus.nodeId()) != null) {
if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null
&& currentState.nodes().get(shardStatus.value.nodeId()) != null) {
hasUncompletedShards = true;
break;
}
@ -991,8 +1012,8 @@ public class SnapshotsService extends AbstractLifecycleComponent<SnapshotsServic
* @param indices list of indices to be snapshotted
* @return list of shard to be included into current snapshot
*/
private ImmutableMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<String> indices) {
ImmutableMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableMap.builder();
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<String> indices) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData();
for (String index : indices) {
IndexMetaData indexMetaData = metaData.index(index);

View File

@ -20,9 +20,14 @@
package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -46,7 +51,11 @@ import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.rest.RestStatus;
import java.util.*;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@ -301,7 +310,7 @@ public class TribeService extends AbstractLifecycleComponent<TribeService> {
}
}
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable).build();
return ClusterState.builder(currentState).incrementVersion().blocks(blocks).nodes(nodes).metaData(metaData).routingTable(routingTable.build()).build();
}
private void removeIndex(ClusterBlocks.Builder blocks, MetaData.Builder metaData, RoutingTable.Builder routingTable, IndexMetaData index) {

View File

@ -21,17 +21,18 @@ package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.health.ClusterIndexHealth;
import org.elasticsearch.action.admin.cluster.health.ClusterShardHealth;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
@ -46,7 +47,10 @@ import java.io.IOException;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ClusterHealthResponsesTests extends ESTestCase {
@ -209,7 +213,7 @@ public class ClusterHealthResponsesTests extends ESTestCase {
metaData.put(indexMetaData, true);
routingTable.add(indexRoutingTable);
}
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable.build()).build();
int pendingTasks = randomIntBetween(0, 200);
int inFlight = randomIntBetween(0, 200);
int delayedUnassigned = randomIntBetween(0, 200);
@ -249,7 +253,7 @@ public class ClusterHealthResponsesTests extends ESTestCase {
MetaData.Builder metaData = MetaData.builder();
metaData.put(indexMetaData, true);
routingTable.add(indexRoutingTable);
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable.build()).build();
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", indexNameExpressionResolver.concreteIndices(clusterState, IndicesOptions.strictExpand(), (String[]) null), clusterState, 0, 0, 0, TimeValue.timeValueMillis(0));
clusterHealth = maybeSerialize(clusterHealth);
// currently we have no cluster level validation failures as index validation issues are reported per index.

View File

@ -27,7 +27,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
@ -35,8 +40,10 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
@ -124,7 +131,7 @@ public class ClusterStateCreationUtils {
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build())));
state.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(indexShardRoutingBuilder.build())).build());
return state.build();
}
@ -158,7 +165,7 @@ public class ClusterStateCreationUtils {
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, ShardRoutingState.STARTED, 0, null));
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
}
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder));
state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
return state.build();
}
@ -214,7 +221,7 @@ public class ClusterStateCreationUtils {
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
state.routingTable(RoutingTable.builder());
state.routingTable(RoutingTable.builder().build());
return state.build();
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
@ -34,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
@ -53,7 +56,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -149,24 +151,24 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName());
ClusterInfo info = infoService.refresh();
assertNotNull("info should not be null", info);
final Map<String, DiskUsage> leastUsages = info.getNodeLeastAvailableDiskUsages();
final Map<String, DiskUsage> mostUsages = info.getNodeMostAvailableDiskUsages();
final Map<String, Long> shardSizes = info.shardSizes;
ImmutableOpenMap<String, DiskUsage> leastUsages = info.getNodeLeastAvailableDiskUsages();
ImmutableOpenMap<String, DiskUsage> mostUsages = info.getNodeMostAvailableDiskUsages();
ImmutableOpenMap<String, Long> shardSizes = info.shardSizes;
assertNotNull(leastUsages);
assertNotNull(shardSizes);
assertThat("some usages are populated", leastUsages.values().size(), Matchers.equalTo(2));
assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0));
for (DiskUsage usage : leastUsages.values()) {
logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
for (ObjectCursor<DiskUsage> usage : leastUsages.values()) {
logger.info("--> usage: {}", usage.value);
assertThat("usage has be retrieved", usage.value.getFreeBytes(), greaterThan(0L));
}
for (DiskUsage usage : mostUsages.values()) {
logger.info("--> usage: {}", usage);
assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L));
for (ObjectCursor<DiskUsage> usage : mostUsages.values()) {
logger.info("--> usage: {}", usage.value);
assertThat("usage has be retrieved", usage.value.getFreeBytes(), greaterThan(0L));
}
for (Long size : shardSizes.values()) {
logger.info("--> shard size: {}", size);
assertThat("shard size is greater than 0", size, greaterThanOrEqualTo(0L));
for (ObjectCursor<Long> size : shardSizes.values()) {
logger.info("--> shard size: {}", size.value);
assertThat("shard size is greater than 0", size.value, greaterThanOrEqualTo(0L));
}
ClusterService clusterService = internalTestCluster.getInstance(ClusterService.class, internalTestCluster.getMasterName());
ClusterState state = clusterService.state();

View File

@ -21,13 +21,25 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -49,9 +61,9 @@ import java.util.List;
import static org.elasticsearch.cluster.metadata.AliasMetaData.newAliasMetaDataBuilder;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -197,7 +209,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
RoutingTable.Builder builder = RoutingTable.builder(clusterState.routingTable());
int numberOfIndices = clusterState.routingTable().indicesRouting().size();
if (numberOfIndices > 0) {
List<String> randomIndices = randomSubsetOf(randomInt(numberOfIndices - 1), clusterState.routingTable().indicesRouting().keySet().toArray(new String[numberOfIndices]));
List<String> randomIndices = randomSubsetOf(randomInt(numberOfIndices - 1), clusterState.routingTable().indicesRouting().keys().toArray(String.class));
for (String index : randomIndices) {
if (randomBoolean()) {
builder.remove(index);
@ -661,7 +673,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.<String>emptyList(),
Math.abs(randomLong()),
ImmutableMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>of()));
ImmutableOpenMap.of()));
case 1:
return new RestoreInProgress(new RestoreInProgress.Entry(
new SnapshotId(randomName("repo"), randomName("snap")),

View File

@ -21,13 +21,13 @@ package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
@ -36,8 +36,6 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
@ -95,7 +93,7 @@ public class DiskUsageTests extends ESTestCase {
}
}
}
public void testFillShardLevelInfo() {
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_0, "node1");
@ -113,8 +111,8 @@ public class DiskUsageTests extends ESTestCase {
new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, "0xdeadbeef", test_0.shardId()), commonStats0 , null),
new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, "0xdeadbeef", test_1.shardId()), commonStats1 , null)
};
HashMap<String, Long> shardSizes = new HashMap<>();
HashMap<ShardRouting, String> routingToPath = new HashMap<>();
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
assertEquals(2, shardSizes.size());
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
@ -130,8 +128,8 @@ public class DiskUsageTests extends ESTestCase {
}
public void testFillDiskUsage() {
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();
Map<String, DiskUsage> newMostAvaiableUsages = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
FsInfo.Path[] node1FSInfo = new FsInfo.Path[] {
new FsInfo.Path("/middle", "/dev/sda", 100, 90, 80),
new FsInfo.Path("/least", "/dev/sdb", 200, 190, 70),

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -35,10 +36,6 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
/**
@ -116,30 +113,24 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
return new CountDownLatch(0);
}
@Override
public ClusterInfo getClusterInfo() {
ClusterInfo clusterInfo = super.getClusterInfo();
return new ClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes, DEV_NULL_MAP);
return new DevNullClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes);
}
public static final Map<ShardRouting, String> DEV_NULL_MAP = Collections.unmodifiableMap(new StaticValueMap("/dev/null"));
// a test only map that always returns the same value no matter what key is passed
private static final class StaticValueMap extends AbstractMap<ShardRouting, String> {
private final String value;
private StaticValueMap(String value) {
this.value = value;
/**
* ClusterInfo that always points to DevNull.
*/
public static class DevNullClusterInfo extends ClusterInfo {
public DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
}
@Override
public String get(Object key) {
return value;
}
@Override
public Set<Entry<ShardRouting, String>> entrySet() {
throw new UnsupportedOperationException("this is a test-only map that only supports #get(Object key)");
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
}
}
}

View File

@ -36,8 +36,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -76,7 +74,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
@ -106,7 +104,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
@ -153,7 +151,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -40,8 +41,15 @@ import org.junit.Test;
import java.util.Collections;
import java.util.EnumSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.Matchers.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
/**
*/
@ -89,7 +97,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.INDEX_CREATED));
}
@ -102,7 +110,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsRecovery(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsRecovery(metaData.index("test")).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.CLUSTER_RECOVERED));
}
@ -115,7 +123,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsFromCloseToOpen(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsFromCloseToOpen(metaData.index("test")).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.INDEX_REOPENED));
}
@ -128,7 +136,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test"), new IntHashSet())).build();
.routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test"), new IntHashSet()).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED));
}
@ -141,7 +149,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test"))).build();
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"), Version.CURRENT, "test")).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED));
}
@ -154,7 +162,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsFromDangling(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsFromDangling(metaData.index("test")).build()).build();
for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED));
}
@ -168,7 +176,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
@ -178,7 +186,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
builder.addIndexShard(indexShardRoutingTable);
}
builder.addReplica();
clusterState = ClusterState.builder(clusterState).routingTable(RoutingTable.builder(clusterState.routingTable()).add(builder)).build();
clusterState = ClusterState.builder(clusterState).routingTable(RoutingTable.builder(clusterState.routingTable()).add(builder).build()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.REPLICA_ADDED));
@ -211,7 +219,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
@ -241,7 +249,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// starting primaries
@ -305,7 +313,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
@ -331,7 +339,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build();
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -235,8 +236,8 @@ public class AddIncrementallyTests extends ESAllocationTestCase {
private void assertNumIndexShardsPerNode(ClusterState state, Matcher<Integer> matcher) {
for (String index : state.routingTable().indicesRouting().keySet()) {
assertNumIndexShardsPerNode(state, index, matcher);
for (ObjectCursor<String> index : state.routingTable().indicesRouting().keys()) {
assertNumIndexShardsPerNode(state, index.value, matcher);
}
}
@ -248,10 +249,10 @@ public class AddIncrementallyTests extends ESAllocationTestCase {
private void assertAtLeastOneIndexShardPerNode(ClusterState state) {
for (String index : state.routingTable().indicesRouting().keySet()) {
for (ObjectCursor<String> index : state.routingTable().indicesRouting().keys()) {
for (RoutingNode node : state.getRoutingNodes()) {
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(1));
assertThat(node.shardsWithState(index.value, STARTED).size(), Matchers.greaterThanOrEqualTo(1));
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
@ -27,7 +28,11 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
@ -35,9 +40,9 @@ import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllo
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -246,11 +251,11 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - treshold)));
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + treshold)));
for (String index : nodes.getRoutingTable().indicesRouting().keySet()) {
for (ObjectCursor<String> index : nodes.getRoutingTable().indicesRouting().keys()) {
for (RoutingNode node : nodes) {
// logger.info(node.nodeId() +":"+index+ ": " + node.shardsWithState(index, INITIALIZING, STARTED).size() + " shards ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")");
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(node.shardsWithState(index, STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards));
assertThat(node.shardsWithState(index.value, STARTED).size(), Matchers.greaterThanOrEqualTo(minAvgNumberOfShards));
assertThat(node.shardsWithState(index.value, STARTED).size(), Matchers.lessThanOrEqualTo(maxAvgNumberOfShards));
}
}
}
@ -262,10 +267,10 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
final int minAvgNumberOfShards = Math.round(Math.round(Math.floor(avgNumShards - treshold)));
final int maxAvgNumberOfShards = Math.round(Math.round(Math.ceil(avgNumShards + treshold)));
for (String index : nodes.getRoutingTable().indicesRouting().keySet()) {
for (ObjectCursor<String> index : nodes.getRoutingTable().indicesRouting().keys()) {
for (RoutingNode node : nodes) {
int primaries = 0;
for (ShardRouting shard : node.shardsWithState(index, STARTED)) {
for (ShardRouting shard : node.shardsWithState(index.value, STARTED)) {
primaries += shard.primary() ? 1 : 0;
}
// logger.info(node.nodeId() + ": " + primaries + " primaries ("+minAvgNumberOfShards+" to "+maxAvgNumberOfShards+")");

View File

@ -24,7 +24,13 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Test;
@ -54,7 +60,7 @@ public class StartedShardsRoutingTests extends ESAllocationTestCase {
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId()).addShard(relocatingShard).build())));
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId()).addShard(relocatingShard).build())).build());
ClusterState state = stateBuilder.build();

View File

@ -24,7 +24,7 @@ import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -43,6 +43,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId;
@ -50,13 +51,10 @@ import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import org.junit.Test;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -80,16 +78,18 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 35)); // 65% used
usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", "node4", "/dev/null", 100, 80)); // 20% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "node1", "/dev/null", 100, 10)); // 90% used
usagesBuilder.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 35)); // 65% used
usagesBuilder.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 60)); // 40% used
usagesBuilder.put("node4", new DiskUsage("node4", "node4", "/dev/null", 100, 80)); // 20% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 10L); // 10 bytes
shardSizesBuilder.put("[test][0][r]", 10L);
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
@ -273,17 +273,19 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "30b")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "9b").build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 10)); // 90% used
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 10)); // 90% used
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 60)); // 40% used
usages.put("node4", new DiskUsage("node4", "n4", "/dev/null", 100, 80)); // 20% used
usages.put("node5", new DiskUsage("node5", "n5", "/dev/null", 100, 85)); // 15% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 10)); // 90% used
usagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 10)); // 90% used
usagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 60)); // 40% used
usagesBuilder.put("node4", new DiskUsage("node4", "n4", "/dev/null", 100, 80)); // 20% used
usagesBuilder.put("node5", new DiskUsage("node5", "n5", "/dev/null", 100, 85)); // 15% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 10L); // 10 bytes
shardSizesBuilder.put("[test][0][r]", 10L);
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
@ -344,8 +346,10 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary);
// Make node without the primary now habitable to replicas
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
final ClusterInfo clusterInfo2 = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
usagesBuilder = ImmutableOpenMap.builder(usages);
usagesBuilder.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", "/dev/null", 100, 35)); // 65% used
usages = usagesBuilder.build();
final ClusterInfo clusterInfo2 = new DevNullClusterInfo(usages, usages, shardSizes);
cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
@ -538,13 +542,15 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "71%").build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 31)); // 69% used
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 1)); // 99% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 31)); // 69% used
usagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 1)); // 99% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 10L); // 10 bytes
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
@ -604,14 +610,16 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.85).build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 0)); // 100% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node2", new DiskUsage("node2", "node2", "/dev/null", 100, 50)); // 50% used
usagesBuilder.put("node3", new DiskUsage("node3", "node3", "/dev/null", 100, 0)); // 100% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][0][r]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 10L); // 10 bytes
shardSizesBuilder.put("[test][0][r]", 10L); // 10 bytes
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
@ -675,11 +683,11 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
RoutingNode rn = new RoutingNode("node1", newNode("node1"));
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY);
Map<String, DiskUsage> usages = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> usages = ImmutableOpenMap.builder();
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 50)); // 50% used
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 0)); // 100% used
DiskUsage node1Usage = decider.averageUsage(rn, usages);
DiskUsage node1Usage = decider.averageUsage(rn, usages.build());
assertThat(node1Usage.getTotalBytes(), equalTo(100L));
assertThat(node1Usage.getFreeBytes(), equalTo(25L));
}
@ -705,17 +713,19 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, 0.7)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, 0.8).build();
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used
usages.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 40)); // 60% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 40)); // 60% used
usagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 40)); // 60% used
usagesBuilder.put("node3", new DiskUsage("node3", "n3", "/dev/null", 100, 40)); // 60% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 14L); // 14 bytes
shardSizes.put("[test][0][r]", 14L);
shardSizes.put("[test2][0][p]", 1L); // 1 bytes
shardSizes.put("[test2][0][r]", 1L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 14L); // 14 bytes
shardSizesBuilder.put("[test][0][r]", 14L);
shardSizesBuilder.put("[test2][0][p]", 1L); // 1 bytes
shardSizesBuilder.put("[test2][0][r]", 1L);
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
AllocationDeciders deciders = new AllocationDeciders(Settings.EMPTY,
new HashSet<>(Arrays.asList(
@ -811,14 +821,17 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build();
// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used
usages.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used
ImmutableOpenMap.Builder<String, DiskUsage> usagesBuilder = ImmutableOpenMap.builder();
usagesBuilder.put("node1", new DiskUsage("node1", "n1", "/dev/null", 100, 20)); // 80% used
usagesBuilder.put("node2", new DiskUsage("node2", "n2", "/dev/null", 100, 100)); // 0% used
ImmutableOpenMap<String, DiskUsage> usages = usagesBuilder.build();
Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 40L);
shardSizes.put("[test][1][p]", 40L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(usages), Collections.unmodifiableMap(usages), Collections.unmodifiableMap(shardSizes), MockInternalClusterInfoService.DEV_NULL_MAP);
ImmutableOpenMap.Builder<String, Long> shardSizesBuilder = ImmutableOpenMap.builder();
shardSizesBuilder.put("[test][0][p]", 40L);
shardSizesBuilder.put("[test][1][p]", 40L);
ImmutableOpenMap<String, Long> shardSizes = shardSizesBuilder.build();
final ClusterInfo clusterInfo = new DevNullClusterInfo(usages, usages, shardSizes);
DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
MetaData metaData = MetaData.builder()
@ -854,7 +867,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.build()
)
);
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
@ -874,7 +887,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
.build()
)
);
clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));

View File

@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.MockInternalClusterInfoService;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
@ -45,9 +46,6 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
@ -123,17 +121,17 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
).build();
// actual test -- after all that bloat :)
Map<String, DiskUsage> leastAvailableUsages = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsages = ImmutableOpenMap.builder();
leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full
leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full
Map<String, DiskUsage> mostAvailableUsage = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsage = ImmutableOpenMap.builder();
mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, randomIntBetween(20, 100))); // 20 - 99 percent since after allocation there must be at least 10% left and shard is 10byte
mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, randomIntBetween(0, 10))); // this is weird and smells like a bug! it should be up to 20%?
Map<String, Long> shardSizes = new HashMap<>();
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes), Collections.EMPTY_MAP);
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
@ -143,7 +141,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
NodeSettingsService nss = new NodeSettingsService(Settings.EMPTY);
ClusterInfoService cis = EmptyClusterInfoService.INSTANCE;
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss, cis, null);
Map<ShardRouting, String> shardRoutingMap = new HashMap<>();
ImmutableOpenMap.Builder<ShardRouting, String> shardRoutingMap = ImmutableOpenMap.builder();
DiscoveryNode node_0 = new DiscoveryNode("node_0", DummyTransportAddress.INSTANCE, Version.CURRENT);
DiscoveryNode node_1 = new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT);
@ -175,20 +173,20 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
).build();
// actual test -- after all that bloat :)
Map<String, DiskUsage> leastAvailableUsages = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsages = ImmutableOpenMap.builder();
leastAvailableUsages.put("node_0", new DiskUsage("node_0", "node_0", "/node0/least", 100, 10)); // 90% used
leastAvailableUsages.put("node_1", new DiskUsage("node_1", "node_1", "/node1/least", 100, 9)); // 91% used
Map<String, DiskUsage> mostAvailableUsage = new HashMap<>();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsage = ImmutableOpenMap.builder();
mostAvailableUsage.put("node_0", new DiskUsage("node_0", "node_0", "/node0/most", 100, 90)); // 10% used
mostAvailableUsage.put("node_1", new DiskUsage("node_1", "node_1", "/node1/most", 100, 90)); // 10% used
Map<String, Long> shardSizes = new HashMap<>();
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
shardSizes.put("[test][1][p]", 10L);
shardSizes.put("[test][2][p]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(Collections.unmodifiableMap(leastAvailableUsages), Collections.unmodifiableMap(mostAvailableUsage), Collections.unmodifiableMap(shardSizes), shardRoutingMap);
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
@ -208,7 +206,6 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
ShardRouting test_2 = ShardRouting.newUnassigned("test", 2, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_2, node_1.getId());
ShardRoutingHelper.moveToStarted(test_2);
shardRoutingMap.put(test_2, "/node1/most");
assertEquals("can stay since allocated on a different path with enough space", Decision.YES, decider.canRemain(test_2, new RoutingNode("node_1", node_1), allocation));
ShardRouting test_3 = ShardRouting.newUnassigned("test", 3, null, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
@ -219,12 +216,12 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
public void testShardSizeAndRelocatingSize() {
Map<String, Long> shardSizes = new HashMap<>();
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][r]", 10L);
shardSizes.put("[test][1][r]", 100L);
shardSizes.put("[test][2][r]", 1000L);
shardSizes.put("[other][0][p]", 10000L);
ClusterInfo info = new ClusterInfo(Collections.EMPTY_MAP, Collections.EMPTY_MAP, shardSizes, MockInternalClusterInfoService.DEV_NULL_MAP);
ClusterInfo info = new DevNullClusterInfo(ImmutableOpenMap.of(), ImmutableOpenMap.of(), shardSizes.build());
ShardRouting test_0 = ShardRouting.newUnassigned("test", 0, null, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
ShardRoutingHelper.initialize(test_0, "node1");
ShardRoutingHelper.moveToStarted(test_0);

View File

@ -35,6 +35,8 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -55,8 +57,6 @@ import static org.elasticsearch.index.shard.IndexShardState.CREATED;
import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY;
import static org.elasticsearch.index.shard.IndexShardState.RECOVERING;
import static org.elasticsearch.index.shard.IndexShardState.STARTED;
import static org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import static org.elasticsearch.test.ESIntegTestCase.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -105,7 +105,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
} catch (Exception e) {
assertTrue(e.getMessage().contains("failing on purpose"));
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
assertFalse(resp.getState().routingTable().indicesRouting().keySet().contains("failed"));
assertFalse(resp.getState().routingTable().indicesRouting().keys().contains("failed"));
}
}
@ -149,7 +149,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase {
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("failing on purpose"));
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
assertFalse(resp.getState().routingTable().indicesRouting().keySet().contains("failed"));
assertFalse(resp.getState().routingTable().indicesRouting().keys().contains("failed"));
}

View File

@ -23,7 +23,10 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -54,12 +57,19 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
/**
*/
@ -88,7 +98,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[0]);
RoutingNodes routingNodes = new RoutingNodes(
ClusterState.builder(current)
.routingTable(RoutingTable.builder(current.routingTable()).remove("a").addAsRecovery(current.metaData().index("a")))
.routingTable(RoutingTable.builder(current.routingTable()).remove("a").addAsRecovery(current.metaData().index("a")).build())
.nodes(DiscoveryNodes.EMPTY_NODES)
.build(), false
);
@ -127,7 +137,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metaData().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable).build();
updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
RoutingAllocation.Result result = allocationService.reroute(updatedState);
return ClusterState.builder(updatedState).routingResult(result).build();

View File

@ -19,7 +19,6 @@
package org.elasticsearch.snapshots;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -29,7 +28,11 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.*;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStage;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
@ -79,8 +82,24 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.shard.IndexShard.INDEX_REFRESH_INTERVAL;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateMissing;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@ -1824,7 +1843,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
@Override
public ClusterState execute(ClusterState currentState) {
// Simulate orphan snapshot
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
shards.put(new ShardId("test-idx", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId("test-idx", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId("test-idx", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));

View File

@ -0,0 +1,2 @@
@defaultMessage Prefer ImmutableOpenMap for cluster state
java.util.Collections#unmodifiableMap(java.util.Map)