Add the shard's store status to the explain API
This adds information similar to what is from the [shard stores API](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-shards-stores.html) to the cluster allocation explanation API (in fact, internally it uses that API). This means when you have a decision that otherwise could indicate that a shard can go somewhere, you now have more information: ```json { "shard" : { "index" : "i", "index_uuid" : "QzoKda9aQCG_hCaZQ18GEg", "id" : 0, "primary" : true }, "assigned" : false, "unassigned_info" : { "reason" : "CLUSTER_RECOVERED", "at" : "2016-04-11T20:58:04.088Z" }, "allocation_delay" : "0s", "allocation_delay_ms" : 0, "remaining_delay" : "0s", "remaining_delay_ms" : 0, "nodes" : { "24Qmw4tdRTuVOtjAdtmr5Q" : { "node_name" : "Vampire by Night", "node_attributes" : { }, "final_decision" : "YES", "weight" : 7.0, "decisions" : [ ], "store" : { "allocation_id" : "aC6qVWA7TT2pgsalYxxUJQ", "store_exception" : "IndexFormatTooOldException[Format version is not supported (resource BufferedChecksumIndexInput(SimpleFSIndexInput(path=\"/home/hinmanm/scratch/elasticsearch-5.0.0-alpha1-SNAPSHOT/data/elasticsearch/nodes/0/indices/QzoKda9aQCG_hCaZQ18GEg/0/index/segments_1\"))): -1906795950 (needs to be between 1071082519 and 1071082519). This version of Lucene only supports indexes created with release 5.0 and later.]", "allocation" : "UNUSED" } } } } ``` The "store" section is the new section, and will include allocation, id, and the exception if there is one. Relates to #17372
This commit is contained in:
parent
2f6405ee27
commit
1987107d75
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.allocation;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
|
@ -30,11 +32,16 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A {@code ClusterAllocationExplanation} is an explanation of why a shard may or may not be allocated to nodes. It also includes weights
|
||||
|
@ -49,10 +56,15 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
private final Map<DiscoveryNode, Float> nodeWeights;
|
||||
private final UnassignedInfo unassignedInfo;
|
||||
private final long remainingDelayNanos;
|
||||
private final List<IndicesShardStoresResponse.StoreStatus> shardStores;
|
||||
private final Set<String> activeAllocationIds;
|
||||
|
||||
private Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeStoreStatus = null;
|
||||
|
||||
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId,
|
||||
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
|
||||
Map<DiscoveryNode, Float> nodeWeights, long remainingDelayNanos) {
|
||||
UnassignedInfo unassignedInfo, Map<DiscoveryNode, Decision> nodeToDecision,
|
||||
Map<DiscoveryNode, Float> nodeWeights, long remainingDelayNanos,
|
||||
List<IndicesShardStoresResponse.StoreStatus> shardStores, Set<String> activeAllocationIds) {
|
||||
this.shard = shard;
|
||||
this.primary = primary;
|
||||
this.assignedNodeId = assignedNodeId;
|
||||
|
@ -60,6 +72,8 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
this.nodeToDecision = nodeToDecision == null ? Collections.emptyMap() : nodeToDecision;
|
||||
this.nodeWeights = nodeWeights == null ? Collections.emptyMap() : nodeWeights;
|
||||
this.remainingDelayNanos = remainingDelayNanos;
|
||||
this.shardStores = shardStores;
|
||||
this.activeAllocationIds = activeAllocationIds;
|
||||
}
|
||||
|
||||
public ClusterAllocationExplanation(StreamInput in) throws IOException {
|
||||
|
@ -88,31 +102,20 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
}
|
||||
this.nodeWeights = ntw;
|
||||
remainingDelayNanos = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
this.getShard().writeTo(out);
|
||||
out.writeBoolean(this.isPrimary());
|
||||
out.writeOptionalString(this.getAssignedNodeId());
|
||||
out.writeOptionalWriteable(this.getUnassignedInfo());
|
||||
|
||||
Map<DiscoveryNode, Decision> ntd = this.getNodeDecisions();
|
||||
out.writeVInt(ntd.size());
|
||||
for (Map.Entry<DiscoveryNode, Decision> entry : ntd.entrySet()) {
|
||||
entry.getKey().writeTo(out);
|
||||
Decision.writeTo(entry.getValue(), out);
|
||||
size = in.readVInt();
|
||||
List<IndicesShardStoresResponse.StoreStatus> stores = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
stores.add(IndicesShardStoresResponse.StoreStatus.readStoreStatus(in));
|
||||
}
|
||||
Map<DiscoveryNode, Float> ntw = this.getNodeWeights();
|
||||
out.writeVInt(ntw.size());
|
||||
for (Map.Entry<DiscoveryNode, Float> entry : ntw.entrySet()) {
|
||||
entry.getKey().writeTo(out);
|
||||
out.writeFloat(entry.getValue());
|
||||
this.shardStores = stores;
|
||||
size = in.readVInt();
|
||||
Set<String> activeIds = new HashSet<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
activeIds.add(in.readString());
|
||||
}
|
||||
out.writeVLong(remainingDelayNanos);
|
||||
this.activeAllocationIds = activeIds;
|
||||
}
|
||||
|
||||
|
||||
public ShardId getShard() {
|
||||
return this.shard;
|
||||
}
|
||||
|
@ -156,6 +159,23 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
return this.remainingDelayNanos;
|
||||
}
|
||||
|
||||
/** Return a map of {@code DiscoveryNode} to store status for the explained shard */
|
||||
public Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> getNodeStoreStatus() {
|
||||
if (nodeStoreStatus == null) {
|
||||
Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeToStatus = new HashMap<>(shardStores.size());
|
||||
for (IndicesShardStoresResponse.StoreStatus status : shardStores) {
|
||||
nodeToStatus.put(status.getNode(), status);
|
||||
}
|
||||
nodeStoreStatus = nodeToStatus;
|
||||
}
|
||||
return nodeStoreStatus;
|
||||
}
|
||||
|
||||
/** Return a set of the active allocation ids for this shard */
|
||||
public Set<String> getActiveAllocationIds() {
|
||||
return this.activeAllocationIds;
|
||||
}
|
||||
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(); {
|
||||
builder.startObject("shard"); {
|
||||
|
@ -191,11 +211,30 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
}
|
||||
builder.endObject(); // end attributes
|
||||
Decision d = nodeToDecision.get(node);
|
||||
if (node.getId().equals(assignedNodeId)) {
|
||||
builder.field("final_decision", "CURRENTLY_ASSIGNED");
|
||||
} else {
|
||||
builder.field("final_decision", d.type().toString());
|
||||
String finalDecision = node.getId().equals(assignedNodeId) ? "CURRENTLY_ASSIGNED" : d.type().toString();
|
||||
IndicesShardStoresResponse.StoreStatus storeStatus = getNodeStoreStatus().get(node);
|
||||
builder.startObject("store"); {
|
||||
if (storeStatus != null) {
|
||||
final Throwable storeErr = storeStatus.getStoreException();
|
||||
if (storeErr != null) {
|
||||
builder.field("store_exception", ExceptionsHelper.detailedMessage(storeErr));
|
||||
// Cannot allocate, final decision is "STORE_ERROR"
|
||||
finalDecision = "STORE_ERROR";
|
||||
}
|
||||
if (activeAllocationIds.isEmpty() || activeAllocationIds.contains(storeStatus.getAllocationId())) {
|
||||
// If either we don't have allocation IDs, or they contain the store allocation id, show the allocation
|
||||
// status
|
||||
builder.field("shard_copy", storeStatus.getAllocationStatus());
|
||||
} else{
|
||||
// Otherwise, this is a stale copy of the data (allocation ids don't match)
|
||||
builder.field("shard_copy", "STALE_COPY");
|
||||
// Cannot allocate, final decision is "STORE_STALE"
|
||||
finalDecision = "STORE_STALE";
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.endObject(); // end store
|
||||
builder.field("final_decision", finalDecision);
|
||||
builder.field("weight", entry.getValue());
|
||||
d.toXContent(builder, params);
|
||||
}
|
||||
|
@ -206,4 +245,34 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
|
|||
builder.endObject(); // end wrapping object
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
this.getShard().writeTo(out);
|
||||
out.writeBoolean(this.isPrimary());
|
||||
out.writeOptionalString(this.getAssignedNodeId());
|
||||
out.writeOptionalWriteable(this.getUnassignedInfo());
|
||||
|
||||
Map<DiscoveryNode, Decision> ntd = this.getNodeDecisions();
|
||||
out.writeVInt(ntd.size());
|
||||
for (Map.Entry<DiscoveryNode, Decision> entry : ntd.entrySet()) {
|
||||
entry.getKey().writeTo(out);
|
||||
Decision.writeTo(entry.getValue(), out);
|
||||
}
|
||||
Map<DiscoveryNode, Float> ntw = this.getNodeWeights();
|
||||
out.writeVInt(ntw.size());
|
||||
for (Map.Entry<DiscoveryNode, Float> entry : ntw.entrySet()) {
|
||||
entry.getKey().writeTo(out);
|
||||
out.writeFloat(entry.getValue());
|
||||
}
|
||||
out.writeVLong(remainingDelayNanos);
|
||||
out.writeVInt(shardStores.size());
|
||||
for (IndicesShardStoresResponse.StoreStatus status : shardStores) {
|
||||
status.writeTo(out);
|
||||
}
|
||||
out.writeVInt(activeAllocationIds.size());
|
||||
for (String id : activeAllocationIds) {
|
||||
out.writeString(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,9 @@ package org.elasticsearch.action.admin.cluster.allocation;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
|
||||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
||||
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
|
@ -47,6 +50,7 @@ import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
|
|||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -68,19 +72,22 @@ public class TransportClusterAllocationExplainAction
|
|||
private final ClusterInfoService clusterInfoService;
|
||||
private final AllocationDeciders allocationDeciders;
|
||||
private final ShardsAllocator shardAllocator;
|
||||
private final TransportIndicesShardStoresAction shardStoresAction;
|
||||
|
||||
@Inject
|
||||
public TransportClusterAllocationExplainAction(Settings settings, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
AllocationService allocationService, ClusterInfoService clusterInfoService,
|
||||
AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator) {
|
||||
AllocationDeciders allocationDeciders, ShardsAllocator shardAllocator,
|
||||
TransportIndicesShardStoresAction shardStoresAction) {
|
||||
super(settings, ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
indexNameExpressionResolver, ClusterAllocationExplainRequest::new);
|
||||
this.allocationService = allocationService;
|
||||
this.clusterInfoService = clusterInfoService;
|
||||
this.allocationDeciders = allocationDeciders;
|
||||
this.shardAllocator = shardAllocator;
|
||||
this.shardStoresAction = shardStoresAction;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,7 +130,8 @@ public class TransportClusterAllocationExplainAction
|
|||
* includeYesDecisions} is true, returns all decisions, otherwise returns only 'NO' and 'THROTTLE' decisions.
|
||||
*/
|
||||
public static ClusterAllocationExplanation explainShard(ShardRouting shard, RoutingAllocation allocation, RoutingNodes routingNodes,
|
||||
boolean includeYesDecisions, ShardsAllocator shardAllocator) {
|
||||
boolean includeYesDecisions, ShardsAllocator shardAllocator,
|
||||
List<IndicesShardStoresResponse.StoreStatus> shardStores) {
|
||||
// don't short circuit deciders, we want a full explanation
|
||||
allocation.debugDecision(true);
|
||||
// get the existing unassigned info if available
|
||||
|
@ -140,13 +148,15 @@ public class TransportClusterAllocationExplainAction
|
|||
}
|
||||
}
|
||||
long remainingDelayNanos = 0;
|
||||
final MetaData metadata = allocation.metaData();
|
||||
final IndexMetaData indexMetaData = metadata.index(shard.index());
|
||||
if (ui != null) {
|
||||
final MetaData metadata = allocation.metaData();
|
||||
final Settings indexSettings = metadata.index(shard.index()).getSettings();
|
||||
final Settings indexSettings = indexMetaData.getSettings();
|
||||
remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
|
||||
}
|
||||
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(), ui, nodeToDecision,
|
||||
shardAllocator.weighShard(allocation, shard), remainingDelayNanos);
|
||||
shardAllocator.weighShard(allocation, shard), remainingDelayNanos, shardStores,
|
||||
indexMetaData.activeAllocationIds(shard.getId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -156,30 +166,30 @@ public class TransportClusterAllocationExplainAction
|
|||
final RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, state.nodes(),
|
||||
clusterInfoService.getClusterInfo(), System.nanoTime());
|
||||
|
||||
ShardRouting shardRouting = null;
|
||||
ShardRouting foundShard = null;
|
||||
if (request.useAnyUnassignedShard()) {
|
||||
// If we can use any shard, just pick the first unassigned one (if there are any)
|
||||
RoutingNodes.UnassignedShards.UnassignedIterator ui = routingNodes.unassigned().iterator();
|
||||
if (ui.hasNext()) {
|
||||
shardRouting = ui.next();
|
||||
foundShard = ui.next();
|
||||
}
|
||||
} else {
|
||||
String index = request.getIndex();
|
||||
int shard = request.getShard();
|
||||
if (request.isPrimary()) {
|
||||
// If we're looking for the primary shard, there's only one copy, so pick it directly
|
||||
shardRouting = allocation.routingTable().shardRoutingTable(index, shard).primaryShard();
|
||||
foundShard = allocation.routingTable().shardRoutingTable(index, shard).primaryShard();
|
||||
} else {
|
||||
// If looking for a replica, go through all the replica shards
|
||||
List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(index, shard).replicaShards();
|
||||
if (replicaShardRoutings.size() > 0) {
|
||||
// Pick the first replica at the very least
|
||||
shardRouting = replicaShardRoutings.get(0);
|
||||
foundShard = replicaShardRoutings.get(0);
|
||||
// In case there are multiple replicas where some are assigned and some aren't,
|
||||
// try to find one that is unassigned at least
|
||||
for (ShardRouting replica : replicaShardRoutings) {
|
||||
if (replica.unassigned()) {
|
||||
shardRouting = replica;
|
||||
foundShard = replica;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -187,14 +197,34 @@ public class TransportClusterAllocationExplainAction
|
|||
}
|
||||
}
|
||||
|
||||
if (shardRouting == null) {
|
||||
if (foundShard == null) {
|
||||
listener.onFailure(new ElasticsearchException("unable to find any shards to explain [{}] in the routing table", request));
|
||||
return;
|
||||
}
|
||||
final ShardRouting shardRouting = foundShard;
|
||||
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);
|
||||
|
||||
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
|
||||
request.includeYesDecisions(), shardAllocator);
|
||||
listener.onResponse(new ClusterAllocationExplainResponse(cae));
|
||||
getShardStores(shardRouting, new ActionListener<IndicesShardStoresResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndicesShardStoresResponse shardStoreResponse) {
|
||||
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses =
|
||||
shardStoreResponse.getStoreStatuses().get(shardRouting.getIndexName());
|
||||
List<IndicesShardStoresResponse.StoreStatus> shardStoreStatus = shardStatuses.get(shardRouting.id());
|
||||
ClusterAllocationExplanation cae = explainShard(shardRouting, allocation, routingNodes,
|
||||
request.includeYesDecisions(), shardAllocator, shardStoreStatus);
|
||||
listener.onResponse(new ClusterAllocationExplainResponse(cae));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void getShardStores(ShardRouting shard, final ActionListener<IndicesShardStoresResponse> listener) {
|
||||
IndicesShardStoresRequest request = new IndicesShardStoresRequest(shard.getIndexName());
|
||||
request.shardStatuses("all");
|
||||
shardStoresAction.execute(request, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -164,7 +164,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
|
|||
return allocationStatus;
|
||||
}
|
||||
|
||||
static StoreStatus readStoreStatus(StreamInput in) throws IOException {
|
||||
public static StoreStatus readStoreStatus(StreamInput in) throws IOException {
|
||||
StoreStatus storeStatus = new StoreStatus();
|
||||
storeStatus.readFrom(in);
|
||||
return storeStatus;
|
||||
|
|
|
@ -20,14 +20,21 @@
|
|||
package org.elasticsearch.action.admin.cluster.allocation;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -70,4 +77,89 @@ public final class ClusterAllocationExplainIT extends ESIntegTestCase {
|
|||
assertFalse(cae.isAssigned());
|
||||
assertThat("expecting a remaining delay, got: " + cae.getRemainingDelayNanos(), cae.getRemainingDelayNanos(), greaterThan(0L));
|
||||
}
|
||||
|
||||
public void testUnassignedShards() throws Exception {
|
||||
logger.info("--> starting 3 nodes");
|
||||
String noAttrNode = internalCluster().startNode();
|
||||
String barAttrNode = internalCluster().startNode(Settings.builder().put("node.attr.bar", "baz"));
|
||||
String fooBarAttrNode = internalCluster().startNode(Settings.builder()
|
||||
.put("node.attr.foo", "bar")
|
||||
.put("node.attr.bar", "baz"));
|
||||
|
||||
// Wait for all 3 nodes to be up
|
||||
logger.info("--> waiting for 3 nodes to be up");
|
||||
client().admin().cluster().health(Requests.clusterHealthRequest().waitForNodes("3")).actionGet();
|
||||
|
||||
client().admin().indices().prepareCreate("anywhere")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_shards", 5)
|
||||
.put("index.number_of_replicas", 1))
|
||||
.get();
|
||||
|
||||
client().admin().indices().prepareCreate("only-baz")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.include.bar", "baz")
|
||||
.put("index.number_of_shards", 5)
|
||||
.put("index.number_of_replicas", 1))
|
||||
.get();
|
||||
|
||||
client().admin().indices().prepareCreate("only-foo")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.include.foo", "bar")
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 1))
|
||||
.get();
|
||||
|
||||
ensureGreen("anywhere", "only-baz");
|
||||
ensureYellow("only-foo");
|
||||
|
||||
ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain()
|
||||
.setIndex("only-foo")
|
||||
.setShard(0)
|
||||
.setPrimary(false)
|
||||
.get();
|
||||
ClusterAllocationExplanation cae = resp.getExplanation();
|
||||
assertThat(cae.getShard().getIndexName(), equalTo("only-foo"));
|
||||
assertFalse(cae.isPrimary());
|
||||
assertFalse(cae.isAssigned());
|
||||
assertThat(UnassignedInfo.Reason.INDEX_CREATED, equalTo(cae.getUnassignedInfo().getReason()));
|
||||
assertThat("expecting no remaining delay: " + cae.getRemainingDelayNanos(), cae.getRemainingDelayNanos(), equalTo(0L));
|
||||
|
||||
Map<DiscoveryNode, Decision> nodeToDecisions = cae.getNodeDecisions();
|
||||
Map<DiscoveryNode, Float> nodeToWeight = cae.getNodeWeights();
|
||||
Map<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> nodeToStatus = cae.getNodeStoreStatus();
|
||||
|
||||
Float noAttrWeight = -1f;
|
||||
Float barAttrWeight = -1f;
|
||||
Float fooBarAttrWeight = -1f;
|
||||
for (Map.Entry<DiscoveryNode, Decision> entry : nodeToDecisions.entrySet()) {
|
||||
DiscoveryNode node = entry.getKey();
|
||||
String nodeName = node.getName();
|
||||
Decision d = entry.getValue();
|
||||
float weight = nodeToWeight.get(node);
|
||||
IndicesShardStoresResponse.StoreStatus storeStatus = nodeToStatus.get(node);
|
||||
|
||||
assertEquals(d.type(), Decision.Type.NO);
|
||||
if (noAttrNode.equals(nodeName)) {
|
||||
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
|
||||
noAttrWeight = weight;
|
||||
assertNull(storeStatus);
|
||||
} else if (barAttrNode.equals(nodeName)) {
|
||||
assertThat(d.toString(), containsString("node does not match index include filters [foo:\"bar\"]"));
|
||||
barAttrWeight = weight;
|
||||
assertNull(storeStatus);
|
||||
} else if (fooBarAttrNode.equals(nodeName)) {
|
||||
assertThat(d.toString(), containsString("the shard cannot be allocated on the same node id"));
|
||||
fooBarAttrWeight = weight;
|
||||
assertEquals(storeStatus.getAllocationStatus(),
|
||||
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY);
|
||||
} else {
|
||||
fail("unexpected node with name: " + nodeName +
|
||||
", I have: " + noAttrNode + ", " + barAttrNode + ", " + fooBarAttrNode);
|
||||
}
|
||||
}
|
||||
assertThat(noAttrWeight, greaterThan(barAttrWeight));
|
||||
assertThat(noAttrWeight, greaterThan(fooBarAttrWeight));
|
||||
assertFalse(barAttrWeight == fooBarAttrWeight);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,17 +19,27 @@
|
|||
|
||||
package org.elasticsearch.action.admin.cluster.allocation;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
|
@ -66,8 +76,15 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
long remainingDelay = randomIntBetween(0, 500);
|
||||
DiscoveryNode nodeWithStore = new DiscoveryNode("node-1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(nodeWithStore, 42, "eggplant",
|
||||
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, new ElasticsearchException("stuff's broke, yo"));
|
||||
List<IndicesShardStoresResponse.StoreStatus> storeStatusList = Collections.singletonList(storeStatus);
|
||||
Set<String> allocationIds = new HashSet<>();
|
||||
allocationIds.add("eggplant");
|
||||
allocationIds.add("potato");
|
||||
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null,
|
||||
nodeToDecisions, nodeToWeight, remainingDelay);
|
||||
nodeToDecisions, nodeToWeight, remainingDelay, storeStatusList, allocationIds);
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
cae.writeTo(out);
|
||||
StreamInput in = StreamInput.wrap(out.bytes());
|
||||
|
@ -80,7 +97,50 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
|
|||
for (Map.Entry<DiscoveryNode, Decision> entry : cae2.getNodeDecisions().entrySet()) {
|
||||
assertEquals(nodeToDecisions.get(entry.getKey()), entry.getValue());
|
||||
}
|
||||
for (Map.Entry<DiscoveryNode, IndicesShardStoresResponse.StoreStatus> entry : cae2.getNodeStoreStatus().entrySet()) {
|
||||
assertEquals(nodeWithStore, entry.getKey());
|
||||
IndicesShardStoresResponse.StoreStatus status = entry.getValue();
|
||||
assertEquals(storeStatus.getLegacyVersion(), status.getLegacyVersion());
|
||||
assertEquals(storeStatus.getAllocationId(), status.getAllocationId());
|
||||
assertEquals(storeStatus.getAllocationStatus(), status.getAllocationStatus());
|
||||
assertEquals(ExceptionsHelper.detailedMessage(storeStatus.getStoreException()),
|
||||
ExceptionsHelper.detailedMessage(status.getStoreException()));
|
||||
}
|
||||
assertEquals(nodeToWeight, cae2.getNodeWeights());
|
||||
assertEquals(remainingDelay, cae2.getRemainingDelayNanos());
|
||||
assertEquals(allocationIds, cae2.getActiveAllocationIds());
|
||||
}
|
||||
|
||||
public void testStaleShardExplanation() throws Exception {
|
||||
ShardId shard = new ShardId("test", "uuid", 0);
|
||||
Map<DiscoveryNode, Decision> nodeToDecisions = new HashMap<>();
|
||||
Map<DiscoveryNode, Float> nodeToWeight = new HashMap<>();
|
||||
DiscoveryNode dn = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
Decision.Multi d = new Decision.Multi();
|
||||
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
|
||||
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
|
||||
d.add(Decision.single(Decision.Type.THROTTLE, "throttle label", "wait a sec"));
|
||||
nodeToDecisions.put(dn, d);
|
||||
nodeToWeight.put(dn, 1.5f);
|
||||
|
||||
long remainingDelay = 42;
|
||||
DiscoveryNode nodeWithStore = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, emptyMap(), emptySet(), Version.CURRENT);
|
||||
IndicesShardStoresResponse.StoreStatus storeStatus = new IndicesShardStoresResponse.StoreStatus(nodeWithStore, 42, "eggplant",
|
||||
IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null);
|
||||
List<IndicesShardStoresResponse.StoreStatus> storeStatusList = Collections.singletonList(storeStatus);
|
||||
Set<String> allocationIds = new HashSet<>();
|
||||
allocationIds.add("potato");
|
||||
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true, "assignedNode", null,
|
||||
nodeToDecisions, nodeToWeight, remainingDelay, storeStatusList, allocationIds);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
cae.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
assertEquals("{\"shard\":{\"index\":\"test\",\"index_uuid\":\"uuid\",\"id\":0,\"primary\":true}," +
|
||||
"\"assigned\":true,\"assigned_node_id\":\"assignedNode\"," +
|
||||
"\"nodes\":{\"node1\":{\"node_name\":\"\",\"node_attributes\":{},\"store\":{\"shard_copy\":\"STALE_COPY\"}," +
|
||||
"\"final_decision\":\"STORE_STALE\",\"weight\":1.5,\"decisions\":[{\"decider\":\"no label\",\"decision\":\"NO\"," +
|
||||
"\"explanation\":\"because I said no\"},{\"decider\":\"yes label\",\"decision\":\"YES\"," +
|
||||
"\"explanation\":\"yes please\"},{\"decider\":\"throttle label\",\"decision\":\"THROTTLE\"," +
|
||||
"\"explanation\":\"wait a sec\"}]}}}",
|
||||
builder.string());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue