A new `_shard_stores` API provides store information for shard copies of indices.

Store information reports on which nodes shard copies exist, the shard
copy version, indicating how recent they are, and any exceptions
encountered while opening the shard index or from earlier engine failure.

closes #10952
This commit is contained in:
Areek Zillur 2015-06-02 12:31:04 -04:00
parent d902012835
commit 7a21d846bb
32 changed files with 1703 additions and 119 deletions

View File

@ -96,6 +96,8 @@ import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@ -242,6 +244,7 @@ public class ActionModule extends AbstractModule {
registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);

View File

@ -107,11 +107,11 @@ public class ClusterHealthResponse extends ActionResponse implements Iterable<Cl
status = ClusterHealthStatus.GREEN;
for (ClusterIndexHealth indexHealth : indices.values()) {
activePrimaryShards += indexHealth.activePrimaryShards;
activeShards += indexHealth.activeShards;
relocatingShards += indexHealth.relocatingShards;
initializingShards += indexHealth.initializingShards;
unassignedShards += indexHealth.unassignedShards;
activePrimaryShards += indexHealth.getActivePrimaryShards();
activeShards += indexHealth.getActiveShards();
relocatingShards += indexHealth.getRelocatingShards();
initializingShards += indexHealth.getInitializingShards();
unassignedShards += indexHealth.getUnassignedShards();
if (indexHealth.getStatus() == ClusterHealthStatus.RED) {
status = ClusterHealthStatus.RED;
} else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {

View File

@ -50,4 +50,16 @@ public enum ClusterHealthStatus {
throw new IllegalArgumentException("No cluster health status for value [" + value + "]");
}
}
public static ClusterHealthStatus fromString(String status) {
if (status.equalsIgnoreCase("green")) {
return GREEN;
} else if (status.equalsIgnoreCase("yellow")) {
return YELLOW;
} else if (status.equalsIgnoreCase("red")) {
return RED;
} else {
throw new IllegalArgumentException("unknown cluster health status [" + status + "]");
}
}
}

View File

@ -51,21 +51,21 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
private int numberOfReplicas;
int activeShards = 0;
private int activeShards = 0;
int relocatingShards = 0;
private int relocatingShards = 0;
int initializingShards = 0;
private int initializingShards = 0;
int unassignedShards = 0;
private int unassignedShards = 0;
int activePrimaryShards = 0;
private int activePrimaryShards = 0;
ClusterHealthStatus status = ClusterHealthStatus.RED;
private ClusterHealthStatus status = ClusterHealthStatus.RED;
final Map<Integer, ClusterShardHealth> shards = Maps.newHashMap();
private final Map<Integer, ClusterShardHealth> shards = Maps.newHashMap();
List<String> validationFailures;
private List<String> validationFailures;
private ClusterIndexHealth() {
}
@ -77,33 +77,8 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
this.validationFailures = indexRoutingTable.validate(indexMetaData);
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
ClusterShardHealth shardHealth = new ClusterShardHealth(shardRoutingTable.shardId().id());
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.active()) {
shardHealth.activeShards++;
if (shardRouting.relocating()) {
// the shard is relocating, the one it is relocating to will be in initializing state, so we don't count it
shardHealth.relocatingShards++;
}
if (shardRouting.primary()) {
shardHealth.primaryActive = true;
}
} else if (shardRouting.initializing()) {
shardHealth.initializingShards++;
} else if (shardRouting.unassigned()) {
shardHealth.unassignedShards++;
}
}
if (shardHealth.primaryActive) {
if (shardHealth.activeShards == shardRoutingTable.size()) {
shardHealth.status = ClusterHealthStatus.GREEN;
} else {
shardHealth.status = ClusterHealthStatus.YELLOW;
}
} else {
shardHealth.status = ClusterHealthStatus.RED;
}
shards.put(shardHealth.getId(), shardHealth);
int shardId = shardRoutingTable.shardId().id();
shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
}
// update the index status
@ -113,10 +88,10 @@ public class ClusterIndexHealth implements Iterable<ClusterShardHealth>, Streama
if (shardHealth.isPrimaryActive()) {
activePrimaryShards++;
}
activeShards += shardHealth.activeShards;
relocatingShards += shardHealth.relocatingShards;
initializingShards += shardHealth.initializingShards;
unassignedShards += shardHealth.unassignedShards;
activeShards += shardHealth.getActiveShards();
relocatingShards += shardHealth.getRelocatingShards();
initializingShards += shardHealth.getInitializingShards();
unassignedShards += shardHealth.getUnassignedShards();
if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
status = ClusterHealthStatus.RED;

View File

@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.cluster.health;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -34,22 +36,47 @@ public class ClusterShardHealth implements Streamable {
ClusterHealthStatus status = ClusterHealthStatus.RED;
int activeShards = 0;
private int activeShards = 0;
int relocatingShards = 0;
private int relocatingShards = 0;
int initializingShards = 0;
private int initializingShards = 0;
int unassignedShards = 0;
private int unassignedShards = 0;
boolean primaryActive = false;
private boolean primaryActive = false;
private ClusterShardHealth() {
}
ClusterShardHealth(int shardId) {
public ClusterShardHealth(int shardId, final IndexShardRoutingTable shardRoutingTable) {
this.shardId = shardId;
for (ShardRouting shardRouting : shardRoutingTable) {
if (shardRouting.active()) {
activeShards++;
if (shardRouting.relocating()) {
// the shard is relocating, the one it is relocating to will be in initializing state, so we don't count it
relocatingShards++;
}
if (shardRouting.primary()) {
primaryActive = true;
}
} else if (shardRouting.initializing()) {
initializingShards++;
} else if (shardRouting.unassigned()) {
unassignedShards++;
}
}
if (primaryActive) {
if (activeShards == shardRoutingTable.size()) {
status = ClusterHealthStatus.GREEN;
} else {
status = ClusterHealthStatus.YELLOW;
}
} else {
status = ClusterHealthStatus.RED;
}
}
public int getId() {

View File

@ -84,17 +84,7 @@ public class ClusterSearchShardsResponse extends ActionResponse implements ToXCo
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
builder.startObject(node.getId(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.getAddress());
if (!node.attributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attr : node.attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
}
builder.endObject();
node.toXContent(builder, params);
}
builder.endObject();
builder.startArray("shards");

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shards;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Request builder for {@link IndicesShardStoresRequest}
*/
public class IndicesShardStoreRequestBuilder extends MasterNodeReadOperationRequestBuilder<IndicesShardStoresRequest, IndicesShardStoresResponse, IndicesShardStoreRequestBuilder> {
public IndicesShardStoreRequestBuilder(ElasticsearchClient client, Action<IndicesShardStoresRequest, IndicesShardStoresResponse, IndicesShardStoreRequestBuilder> action, String... indices) {
super(client, action, new IndicesShardStoresRequest(indices));
}
/**
* Sets the indices for the shard stores request
*/
public IndicesShardStoreRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* Specifies what type of requested indices to ignore and wildcard indices expressions
* By default, expands wildcards to both open and closed indices
*/
public IndicesShardStoreRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
/**
* Set statuses to filter shards to get stores info on.
* @param shardStatuses acceptable values are "green", "yellow", "red" and "all"
* see {@link org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus} for details
*/
public IndicesShardStoreRequestBuilder setShardStatuses(String... shardStatuses) {
request.shardStatuses(shardStatuses);
return this;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shards;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
/**
* Action for {@link TransportIndicesShardStoresAction}
*
* Exposes shard store information for requested indices.
* Shard store information reports which nodes hold shard copies, how recent they are
* and any exceptions on opening the shard index or from previous engine failures
*/
public class IndicesShardStoresAction extends Action<IndicesShardStoresRequest, IndicesShardStoresResponse, IndicesShardStoreRequestBuilder> {
public static final IndicesShardStoresAction INSTANCE = new IndicesShardStoresAction();
public static final String NAME = "indices:monitor/shard_stores";
private IndicesShardStoresAction() {
super(NAME);
}
@Override
public IndicesShardStoresResponse newResponse() {
return new IndicesShardStoresResponse();
}
@Override
public IndicesShardStoreRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new IndicesShardStoreRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shards;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.EnumSet;
/**
* Request for {@link IndicesShardStoresAction}
*/
public class IndicesShardStoresRequest extends MasterNodeReadRequest<IndicesShardStoresRequest> implements IndicesRequest.Replaceable {
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.strictExpand();
private EnumSet<ClusterHealthStatus> statuses = EnumSet.of(ClusterHealthStatus.YELLOW, ClusterHealthStatus.RED);
/**
* Create a request for shard stores info for <code>indices</code>
*/
public IndicesShardStoresRequest(String... indices) {
this.indices = indices;
}
IndicesShardStoresRequest() {
}
/**
* Set statuses to filter shards to get stores info on.
* see {@link ClusterHealthStatus} for details.
* Defaults to "yellow" and "red" status
* @param shardStatuses acceptable values are "green", "yellow", "red" and "all"
*/
public IndicesShardStoresRequest shardStatuses(String... shardStatuses) {
statuses = EnumSet.noneOf(ClusterHealthStatus.class);
for (String statusString : shardStatuses) {
if ("all".equalsIgnoreCase(statusString)) {
statuses = EnumSet.allOf(ClusterHealthStatus.class);
return this;
}
statuses.add(ClusterHealthStatus.fromString(statusString));
}
return this;
}
/**
* Specifies what type of requested indices to ignore and wildcard indices expressions
* By default, expands wildcards to both open and closed indices
*/
public IndicesShardStoresRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
/**
* Sets the indices for the shard stores request
*/
@Override
public IndicesShardStoresRequest indices(String... indices) {
this.indices = indices;
return this;
}
/**
* Returns the shard criteria to get store information on
*/
public EnumSet<ClusterHealthStatus> shardStatuses() {
return statuses;
}
@Override
public String[] indices() {
return indices;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(indices);
out.writeVInt(statuses.size());
for (ClusterHealthStatus status : statuses) {
out.writeByte(status.value());
}
indicesOptions.writeIndicesOptions(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
int nStatus = in.readVInt();
statuses = EnumSet.noneOf(ClusterHealthStatus.class);
for (int i = 0; i < nStatus; i++) {
statuses.add(ClusterHealthStatus.fromValue(in.readByte()));
}
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
}

View File

@ -0,0 +1,385 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shards;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.*;
/**
* Response for {@link IndicesShardStoresAction}
*
* Consists of {@link StoreStatus}s for requested indices grouped by
* indices and shard ids and a list of encountered node {@link Failure}s
*/
public class IndicesShardStoresResponse extends ActionResponse implements ToXContent {
/**
* Shard store information from a node
*/
public static class StoreStatus implements Streamable, ToXContent, Comparable<StoreStatus> {
private DiscoveryNode node;
private long version;
private Throwable storeException;
private Allocation allocation;
/**
* The status of the shard store with respect to the cluster
*/
public enum Allocation {
/**
* Allocated as primary
*/
PRIMARY((byte) 0),
/**
* Allocated as a replica
*/
REPLICA((byte) 1),
/**
* Not allocated
*/
UNUSED((byte) 2);
private final byte id;
Allocation(byte id) {
this.id = id;
}
private static Allocation fromId(byte id) {
switch (id) {
case 0: return PRIMARY;
case 1: return REPLICA;
case 2: return UNUSED;
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
}
}
public String value() {
switch (id) {
case 0: return "primary";
case 1: return "replica";
case 2: return "unused";
default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
}
}
private static Allocation readFrom(StreamInput in) throws IOException {
return fromId(in.readByte());
}
private void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
}
}
private StoreStatus() {
}
public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) {
this.node = node;
this.version = version;
this.allocation = allocation;
this.storeException = storeException;
}
/**
* Node the store belongs to
*/
public DiscoveryNode getNode() {
return node;
}
/**
* Version of the store, used to select the store that will be
* used as a primary.
*/
public long getVersion() {
return version;
}
/**
* Exception while trying to open the
* shard index or from when the shard failed
*/
public Throwable getStoreException() {
return storeException;
}
/**
* The allocation status of the store.
* {@link Allocation#PRIMARY} indicates a primary shard copy
* {@link Allocation#REPLICA} indicates a replica shard copy
* {@link Allocation#UNUSED} indicates an unused shard copy
*/
public Allocation getAllocation() {
return allocation;
}
static StoreStatus readStoreStatus(StreamInput in) throws IOException {
StoreStatus storeStatus = new StoreStatus();
storeStatus.readFrom(in);
return storeStatus;
}
@Override
public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
version = in.readLong();
allocation = Allocation.readFrom(in);
if (in.readBoolean()) {
storeException = in.readThrowable();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
out.writeLong(version);
allocation.writeTo(out);
if (storeException != null) {
out.writeBoolean(true);
out.writeThrowable(storeException);
} else {
out.writeBoolean(false);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
node.toXContent(builder, params);
builder.field(Fields.VERSION, version);
builder.field(Fields.ALLOCATED, allocation.value());
if (storeException != null) {
builder.startObject(Fields.STORE_EXCEPTION);
ElasticsearchException.toXContent(builder, params, storeException);
builder.endObject();
}
return builder;
}
@Override
public int compareTo(StoreStatus other) {
if (storeException != null && other.storeException == null) {
return 1;
} else if (other.storeException != null && storeException == null) {
return -1;
} else {
int compare = Long.compare(other.version, version);
if (compare == 0) {
return Integer.compare(allocation.id, other.allocation.id);
}
return compare;
}
}
}
/**
* Single node failure while retrieving shard store information
*/
public static class Failure extends DefaultShardOperationFailedException {
private String nodeId;
public Failure(String nodeId, String index, int shardId, Throwable reason) {
super(index, shardId, reason);
this.nodeId = nodeId;
}
private Failure() {
}
public String nodeId() {
return nodeId;
}
public static Failure readFailure(StreamInput in) throws IOException {
Failure failure = new Failure();
failure.readFrom(in);
return failure;
}
@Override
public void readFrom(StreamInput in) throws IOException {
nodeId = in.readString();
super.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
super.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node", nodeId());
super.toXContent(builder, params);
return builder;
}
}
private ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses;
private ImmutableList<Failure> failures;
public IndicesShardStoresResponse(ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatuses, ImmutableList<Failure> failures) {
this.storeStatuses = storeStatuses;
this.failures = failures;
}
IndicesShardStoresResponse() {
this(ImmutableOpenMap.<String, ImmutableOpenIntMap<List<StoreStatus>>>of(), ImmutableList.<Failure>of());
}
/**
* Returns {@link StoreStatus}s
* grouped by their index names and shard ids.
*/
public ImmutableOpenMap<String, ImmutableOpenIntMap<List<StoreStatus>>> getStoreStatuses() {
return storeStatuses;
}
/**
* Returns node {@link Failure}s encountered
* while executing the request
*/
public ImmutableList<Failure> getFailures() {
return failures;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int numResponse = in.readVInt();
ImmutableOpenMap.Builder<String, ImmutableOpenIntMap<List<StoreStatus>>> storeStatusesBuilder = ImmutableOpenMap.builder();
for (int i = 0; i < numResponse; i++) {
String index = in.readString();
int indexEntries = in.readVInt();
ImmutableOpenIntMap.Builder<List<StoreStatus>> shardEntries = ImmutableOpenIntMap.builder();
for (int shardCount = 0; shardCount < indexEntries; shardCount++) {
int shardID = in.readInt();
int nodeEntries = in.readVInt();
List<StoreStatus> storeStatuses = new ArrayList<>(nodeEntries);
for (int nodeCount = 0; nodeCount < nodeEntries; nodeCount++) {
storeStatuses.add(readStoreStatus(in));
}
shardEntries.put(shardID, storeStatuses);
}
storeStatusesBuilder.put(index, shardEntries.build());
}
int numFailure = in.readVInt();
ImmutableList.Builder<Failure> failureBuilder = ImmutableList.builder();
for (int i = 0; i < numFailure; i++) {
failureBuilder.add(Failure.readFailure(in));
}
storeStatuses = storeStatusesBuilder.build();
failures = failureBuilder.build();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(storeStatuses.size());
for (ObjectObjectCursor<String, ImmutableOpenIntMap<List<StoreStatus>>> indexShards : storeStatuses) {
out.writeString(indexShards.key);
out.writeVInt(indexShards.value.size());
for (IntObjectCursor<List<StoreStatus>> shardStatusesEntry : indexShards.value) {
out.writeInt(shardStatusesEntry.key);
out.writeVInt(shardStatusesEntry.value.size());
for (StoreStatus storeStatus : shardStatusesEntry.value) {
storeStatus.writeTo(out);
}
}
}
out.writeVInt(failures.size());
for (ShardOperationFailedException failure : failures) {
failure.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (failures.size() > 0) {
builder.startArray(Fields.FAILURES);
for (ShardOperationFailedException failure : failures) {
builder.startObject();
failure.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
builder.startObject(Fields.INDICES);
for (ObjectObjectCursor<String, ImmutableOpenIntMap<List<StoreStatus>>> indexShards : storeStatuses) {
builder.startObject(indexShards.key);
builder.startObject(Fields.SHARDS);
for (IntObjectCursor<List<StoreStatus>> shardStatusesEntry : indexShards.value) {
builder.startObject(String.valueOf(shardStatusesEntry.key));
builder.startArray(Fields.STORES);
for (StoreStatus storeStatus : shardStatusesEntry.value) {
builder.startObject();
storeStatus.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
static final XContentBuilderString STORES = new XContentBuilderString("stores");
// StoreStatus fields
static final XContentBuilderString VERSION = new XContentBuilderString("version");
static final XContentBuilderString STORE_EXCEPTION = new XContentBuilderString("store_exception");
static final XContentBuilderString ALLOCATED = new XContentBuilderString("allocation");
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.shards;
import com.google.common.collect.ImmutableList;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.health.ClusterShardHealth;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.gateway.AsyncShardFetch;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Transport action that reads the cluster state for shards with the requested criteria (see {@link ClusterHealthStatus}) of specific indices
* and fetches store information from all the nodes using {@link TransportNodesListGatewayStartedShards}
*/
public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAction<IndicesShardStoresRequest, IndicesShardStoresResponse> {
private final TransportNodesListGatewayStartedShards listShardStoresInfo;
@Inject
public TransportIndicesShardStoresAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportNodesListGatewayStartedShards listShardStoresInfo) {
super(settings, IndicesShardStoresAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, IndicesShardStoresRequest.class);
this.listShardStoresInfo = listShardStoresInfo;
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected IndicesShardStoresResponse newResponse() {
return new IndicesShardStoresResponse();
}
@Override
protected void masterOperation(IndicesShardStoresRequest request, ClusterState state, ActionListener<IndicesShardStoresResponse> listener) {
final RoutingTable routingTables = state.routingTable();
final RoutingNodes routingNodes = state.routingNodes();
final String[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
final Set<ShardId> shardIdsToFetch = new HashSet<>();
// collect relevant shard ids of the requested indices for fetching store infos
for (String index : concreteIndices) {
IndexRoutingTable indexShardRoutingTables = routingTables.index(index);
if (indexShardRoutingTables == null) {
continue;
}
for (IndexShardRoutingTable routing : indexShardRoutingTables) {
ClusterShardHealth shardHealth = new ClusterShardHealth(routing.shardId().id(), routing);
if (request.shardStatuses().contains(shardHealth.getStatus())) {
shardIdsToFetch.add(routing.shardId());
}
}
}
// async fetch store infos from all the nodes
// NOTE: instead of fetching shard store info one by one from every node (nShards * nNodes requests)
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, state.metaData(), shardIdsToFetch, listener).start();
}
@Override
protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, indexNameExpressionResolver.concreteIndices(state, request));
}
private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
private final MetaData metaData;
private final Set<ShardId> shardIds;
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;
AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, MetaData metaData, Set<ShardId> shardIds, ActionListener<IndicesShardStoresResponse> listener) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.metaData = metaData;
this.shardIds = shardIds;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
this.expectedOps = new CountDown(shardIds.size());
}
void start() {
if (shardIds.isEmpty()) {
listener.onResponse(new IndicesShardStoresResponse());
} else {
for (ShardId shardId : shardIds) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, listShardStoresInfo);
fetch.fetchData(nodes, metaData, Collections.<String>emptySet());
}
}
}
private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShards> {
InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, TransportNodesListGatewayStartedShards action) {
super(logger, type, shardId, action);
}
@Override
protected synchronized void processAsyncFetch(ShardId shardId, NodeGatewayStartedShards[] responses, FailedNodeException[] failures) {
fetchResponses.add(new Response(shardId, responses, failures));
if (expectedOps.countDown()) {
finish();
}
}
void finish() {
ImmutableOpenMap.Builder<String, ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>>> indicesStoreStatusesBuilder = ImmutableOpenMap.builder();
ImmutableList.Builder<IndicesShardStoresResponse.Failure> failureBuilder = ImmutableList.builder();
for (Response fetchResponse : fetchResponses) {
ImmutableOpenIntMap<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexStoreStatuses = indicesStoreStatusesBuilder.get(fetchResponse.shardId.getIndex());
final ImmutableOpenIntMap.Builder<java.util.List<IndicesShardStoresResponse.StoreStatus>> indexShardsBuilder;
if (indexStoreStatuses == null) {
indexShardsBuilder = ImmutableOpenIntMap.builder();
} else {
indexShardsBuilder = ImmutableOpenIntMap.builder(indexStoreStatuses);
}
java.util.List<IndicesShardStoresResponse.StoreStatus> storeStatuses = indexShardsBuilder.get(fetchResponse.shardId.id());
if (storeStatuses == null) {
storeStatuses = new ArrayList<>();
}
for (NodeGatewayStartedShards response : fetchResponse.responses) {
if (shardExistsInNode(response)) {
IndicesShardStoresResponse.StoreStatus.Allocation allocation = getAllocation(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), allocation, response.storeException()));
}
}
CollectionUtil.timSort(storeStatuses);
indexShardsBuilder.put(fetchResponse.shardId.id(), storeStatuses);
indicesStoreStatusesBuilder.put(fetchResponse.shardId.getIndex(), indexShardsBuilder.build());
for (FailedNodeException failure : fetchResponse.failures) {
failureBuilder.add(new IndicesShardStoresResponse.Failure(failure.nodeId(), fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), failure.getCause()));
}
}
listener.onResponse(new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), failureBuilder.build()));
}
private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) {
for (ShardRouting shardRouting : routingNodes.node(node.id())) {
ShardId shardId = shardRouting.shardId();
if (shardId.id() == shardID && shardId.getIndex().equals(index)) {
if (shardRouting.primary()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY;
} else if (shardRouting.assignedToNode()) {
return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA;
} else {
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
}
}
}
return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
}
/**
* A shard exists/existed in a node only if shard state file exists in the node
*/
private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
return response.storeException() != null || response.version() != -1;
}
@Override
protected void reroute(ShardId shardId, String reason) {
// no-op
}
public class Response {
private final ShardId shardId;
private final NodeGatewayStartedShards[] responses;
private final FailedNodeException[] failures;
public Response(ShardId shardId, NodeGatewayStartedShards[] responses, FailedNodeException[] failures) {
this.shardId = shardId;
this.responses = responses;
this.failures = failures;
}
}
}
}
}

View File

@ -44,7 +44,7 @@ public class DefaultShardOperationFailedException implements ShardOperationFaile
private RestStatus status;
private DefaultShardOperationFailedException() {
protected DefaultShardOperationFailedException() {
}
public DefaultShardOperationFailedException(ElasticsearchException e) {

View File

@ -81,6 +81,9 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
@ -221,6 +224,29 @@ public interface IndicesAdminClient extends ElasticsearchClient {
*/
IndicesSegmentsRequestBuilder prepareSegments(String... indices);
/**
* The shard stores info of one or more indices.
*
* @param request The indices shard stores request
* @return The result future
* @see Requests#indicesShardStoresRequest(String...)
*/
ActionFuture<IndicesShardStoresResponse> shardStores(IndicesShardStoresRequest request);
/**
* The shard stores info of one or more indices.
*
* @param request The indices shard stores request
* @param listener A listener to be notified with a result
* @see Requests#indicesShardStoresRequest(String...)
*/
void shardStores(IndicesShardStoresRequest request, ActionListener<IndicesShardStoresResponse> listener);
/**
* The shard stores info of one or more indices.
*/
IndicesShardStoreRequestBuilder prepareShardStores(String... indices);
/**
* Creates an index using an explicit request allowing to specify the settings of the index.
*

View File

@ -49,6 +49,7 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.count.CountRequest;
@ -184,6 +185,15 @@ public class Requests {
return new IndicesSegmentsRequest(indices);
}
/**
* Creates an indices shard stores info request.
* @param indices The indices to get shard store information on
* @return The indices shard stores request
* @see org.elasticsearch.client.IndicesAdminClient#shardStores(IndicesShardStoresRequest)
*/
public static IndicesShardStoresRequest indicesShardStoresRequest(String... indices) {
return new IndicesShardStoresRequest(indices);
}
/**
* Creates an indices exists request.
*

View File

@ -176,6 +176,10 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
@ -1498,6 +1502,21 @@ public abstract class AbstractClient extends AbstractComponent implements Client
return new IndicesSegmentsRequestBuilder(this, IndicesSegmentsAction.INSTANCE).setIndices(indices);
}
@Override
public ActionFuture<IndicesShardStoresResponse> shardStores(IndicesShardStoresRequest request) {
return execute(IndicesShardStoresAction.INSTANCE, request);
}
@Override
public void shardStores(IndicesShardStoresRequest request, ActionListener<IndicesShardStoresResponse> listener) {
execute(IndicesShardStoresAction.INSTANCE, request, listener);
}
@Override
public IndicesShardStoreRequestBuilder prepareShardStores(String... indices) {
return new IndicesShardStoreRequestBuilder(this, IndicesShardStoresAction.INSTANCE, indices);
}
@Override
public ActionFuture<UpdateSettingsResponse> updateSettings(final UpdateSettingsRequest request) {
return execute(UpdateSettingsAction.INSTANCE, request);

View File

@ -398,18 +398,8 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
// nodes
if (metrics.contains(Metric.NODES)) {
builder.startObject("nodes");
for (DiscoveryNode node : nodes()) {
builder.startObject(node.id(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", node.name());
builder.field("transport_address", node.address().toString());
builder.startObject("attributes");
for (Map.Entry<String, String> attr : node.attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
builder.endObject();
for (DiscoveryNode node : nodes) {
node.toXContent(builder, params);
}
builder.endObject();
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.transport.TransportAddressSerializers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
@ -38,7 +40,7 @@ import static org.elasticsearch.common.transport.TransportAddressSerializers.add
/**
* A discovery node represents a node that is part of the cluster.
*/
public class DiscoveryNode implements Streamable {
public class DiscoveryNode implements Streamable, ToXContent {
/**
* Minimum version of a node to communicate with. This version corresponds to the minimum compatibility version
@ -372,4 +374,20 @@ public class DiscoveryNode implements Streamable {
}
return sb.toString();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(id(), XContentBuilder.FieldCaseConversion.NONE);
builder.field("name", name());
builder.field("transport_address", address().toString());
builder.startObject("attributes");
for (Map.Entry<String, String> attr : attributes().entrySet()) {
builder.field(attr.getKey(), attr.getValue());
}
builder.endObject();
builder.endObject();
return builder;
}
}

View File

@ -198,8 +198,14 @@ public class GatewayAllocator extends AbstractComponent {
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
long version = nodeShardState.version();
// -1 version means it does not exists, which is what the API returns, and what we expect to
if (nodeShardState.storeException() == null) {
logger.trace("[{}] on node [{}] has version [{}] of shard", shard, nodeShardState.getNode(), version);
nodesState.put(nodeShardState.getNode(), version);
} else {
// when there is an store exception, we disregard the reported version and assign it as -1 (same as shard does not exist)
logger.trace("[{}] on node [{}] has version [{}] but the store can not be opened, treating as version -1", nodeShardState.storeException(), shard, nodeShardState.getNode(), version);
nodesState.put(nodeShardState.getNode(), -1);
}
}
int numberOfAllocationsFound = 0;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gateway;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
@ -121,9 +122,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId));
if (shardStateMetaData != null) {
final IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name()); // it's a mystery why this is sometimes null
if (metaData != null && canOpenIndex(request.getShardId(), metaData) == false) {
logger.trace("{} can't open index for shard [{}]", shardId, shardStateMetaData);
return new NodeGatewayStartedShards(clusterService.localNode(), -1);
if (metaData != null) {
ShardPath shardPath = null;
try {
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings());
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex());
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, exception);
}
}
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
@ -142,18 +152,6 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
}
}
private boolean canOpenIndex(ShardId shardId, IndexMetaData metaData) throws IOException {
// try and see if we an list unallocated
if (metaData == null) {
return false;
}
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, metaData.settings());
if (shardPath == null) {
return false;
}
return Store.canOpenIndex(logger, shardPath.resolveIndex());
}
@Override
protected boolean accumulateExceptions() {
return true;
@ -272,29 +270,48 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public static class NodeGatewayStartedShards extends BaseNodeResponse {
private long version = -1;
private Throwable storeException = null;
NodeGatewayStartedShards() {
}
public NodeGatewayStartedShards(DiscoveryNode node, long version) {
this(node, version, null);
}
public NodeGatewayStartedShards(DiscoveryNode node, long version, Throwable storeException) {
super(node);
this.version = version;
this.storeException = storeException;
}
public long version() {
return this.version;
}
public Throwable storeException() {
return this.storeException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
version = in.readLong();
if (in.readBoolean()) {
storeException = in.readThrowable();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(version);
if (storeException != null) {
out.writeBoolean(true);
out.writeThrowable(storeException);
} else {
out.writeBoolean(false);
}
}
}
}

View File

@ -386,14 +386,25 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* corruption markers.
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, new ShardId("", 1));
Lucene.readSegmentInfos(dir);
return true;
try {
tryOpenIndex(indexLocation);
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
}
return true;
}
/**
* Tries to open an index for the given location. This includes reading the
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
public static void tryOpenIndex(Path indexLocation) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, new ShardId("", 1));
Lucene.readSegmentInfos(dir);
}
}
/**

View File

@ -67,6 +67,7 @@ import org.elasticsearch.rest.action.admin.indices.open.RestOpenIndexAction;
import org.elasticsearch.rest.action.admin.indices.optimize.RestOptimizeAction;
import org.elasticsearch.rest.action.admin.indices.recovery.RestRecoveryAction;
import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction;
import org.elasticsearch.rest.action.admin.indices.shards.RestIndicesShardStoresAction;
import org.elasticsearch.rest.action.admin.indices.segments.RestIndicesSegmentsAction;
import org.elasticsearch.rest.action.admin.indices.settings.RestGetSettingsAction;
import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction;
@ -155,6 +156,7 @@ public class RestActionModule extends AbstractModule {
bind(RestGetIndicesAction.class).asEagerSingleton();
bind(RestIndicesStatsAction.class).asEagerSingleton();
bind(RestIndicesSegmentsAction.class).asEagerSingleton();
bind(RestIndicesShardStoresAction.class).asEagerSingleton();
bind(RestGetAliasesAction.class).asEagerSingleton();
bind(RestAliasesExistAction.class).asEagerSingleton();
bind(RestIndexDeleteAliasesAction.class).asEagerSingleton();

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.shards;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestStatus.OK;
/**
* Rest action for {@link IndicesShardStoresAction}
*/
public class RestIndicesShardStoresAction extends BaseRestHandler {
@Inject
public RestIndicesShardStoresAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(GET, "/_shard_stores", this);
controller.registerHandler(GET, "/{index}/_shard_stores", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
IndicesShardStoresRequest indicesShardStoresRequest = new IndicesShardStoresRequest(Strings.splitStringByCommaToArray(request.param("index")));
if (request.hasParam("status")) {
indicesShardStoresRequest.shardStatuses(Strings.splitStringByCommaToArray(request.param("status")));
}
indicesShardStoresRequest.indicesOptions(IndicesOptions.fromRequest(request, indicesShardStoresRequest.indicesOptions()));
client.admin().indices().shardStores(indicesShardStoresRequest, new RestBuilderListener<IndicesShardStoresResponse>(channel) {
@Override
public RestResponse buildResponse(IndicesShardStoresResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, request);
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.segments;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.*;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST)
public class IndicesShardStoreRequestTests extends ElasticsearchIntegrationTest {
@Test
public void testEmpty() {
ensureGreen();
IndicesShardStoresResponse rsp = client().admin().indices().prepareShardStores().get();
assertThat(rsp.getStoreStatuses().size(), equalTo(0));
}
@Test
public void testBasic() throws Exception {
String index = "test";
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate(index).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "2")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
));
indexRandomData(index);
ensureGreen(index);
// no unallocated shards
IndicesShardStoresResponse response = client().admin().indices().prepareShardStores(index).get();
assertThat(response.getStoreStatuses().size(), equalTo(0));
// all shards
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index).shardStatuses("all")).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStores = response.getStoreStatuses().get(index);
assertThat(shardStores.values().size(), equalTo(2));
for (ObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.values()) {
for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.value) {
assertThat(storeStatus.getVersion(), greaterThan(-1l));
assertThat(storeStatus.getNode(), notNullValue());
assertThat(storeStatus.getStoreException(), nullValue());
}
}
// default with unassigned shards
ensureGreen(index);
logger.info("--> disable allocation");
disableAllocation(index);
logger.info("--> stop random node");
internalCluster().stopRandomNode(new IndexNodePredicate(index));
List<ShardRouting> unassignedShards = clusterService().state().routingTable().index(index).shardsWithState(ShardRoutingState.UNASSIGNED);
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index)).get();
assertThat(response.getStoreStatuses().containsKey(index), equalTo(true));
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStoresStatuses = response.getStoreStatuses().get(index);
assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses) {
assertThat("must report for one store", storesStatus.value.size(), equalTo(1));
assertThat("reported store should be primary", storesStatus.value.get(0).getAllocation(), equalTo(IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY));
}
logger.info("--> enable allocation");
enableAllocation(index);
}
@Test
public void testIndices() throws Exception {
String index1 = "test1";
String index2 = "test2";
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate(index1).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "2")
));
assertAcked(prepareCreate(index2).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "2")
));
indexRandomData(index1);
indexRandomData(index2);
ensureGreen();
IndicesShardStoresResponse response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest().shardStatuses("all")).get();
ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> shardStatuses = response.getStoreStatuses();
assertThat(shardStatuses.containsKey(index1), equalTo(true));
assertThat(shardStatuses.containsKey(index2), equalTo(true));
assertThat(shardStatuses.get(index1).size(), equalTo(2));
assertThat(shardStatuses.get(index2).size(), equalTo(2));
// ensure index filtering works
response = client().admin().indices().shardStores(Requests.indicesShardStoresRequest(index1).shardStatuses("all")).get();
shardStatuses = response.getStoreStatuses();
assertThat(shardStatuses.containsKey(index1), equalTo(true));
assertThat(shardStatuses.containsKey(index2), equalTo(false));
assertThat(shardStatuses.get(index1).size(), equalTo(2));
}
@Test
public void testCorruptedShards() throws Exception {
String index = "test";
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate(index).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "5")
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
));
indexRandomData(index);
ensureGreen(index);
logger.info("--> disable allocation");
disableAllocation(index);
logger.info("--> corrupt random shard copies");
Map<Integer, Set<String>> corruptedShardIDMap = new HashMap<>();
for (String node : internalCluster().nodesInclude(index)) {
IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
IndexService indexShards = indexServices.indexServiceSafe(index);
for (Integer shardId : indexShards.shardIds()) {
IndexShard shard = indexShards.shardSafe(shardId);
if (randomBoolean()) {
shard.failShard("test", new CorruptIndexException("test corrupted", ""));
Set<String> nodes = corruptedShardIDMap.get(shardId);
if (nodes == null) {
nodes = new HashSet<>();
}
nodes.add(node);
corruptedShardIDMap.put(shardId, nodes);
}
}
}
IndicesShardStoresResponse rsp = client().admin().indices().prepareShardStores(index).setShardStatuses("all").get();
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> shardStatuses = rsp.getStoreStatuses().get(index);
assertNotNull(shardStatuses);
assertThat(shardStatuses.size(), greaterThan(0));
for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStatus : shardStatuses) {
for (IndicesShardStoresResponse.StoreStatus status : shardStatus.value) {
if (corruptedShardIDMap.containsKey(shardStatus.key)
&& corruptedShardIDMap.get(shardStatus.key).contains(status.getNode().name())) {
assertThat(status.getVersion(), greaterThanOrEqualTo(0l));
assertThat(status.getStoreException(), notNullValue());
} else {
assertThat(status.getVersion(), greaterThanOrEqualTo(0l));
assertNull(status.getStoreException());
}
}
}
logger.info("--> enable allocation");
enableAllocation(index);
}
private void indexRandomData(String index) throws ExecutionException, InterruptedException {
int numDocs = scaledRandomIntBetween(10, 20);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(index, "type").setSource("field", "value");
}
indexRandom(true, builders);
client().admin().indices().prepareFlush().setForce(true).setWaitIfOngoing(true).execute().actionGet();
}
private final static class IndexNodePredicate implements Predicate<Settings> {
private final Set<String> nodesWithShard;
public IndexNodePredicate(String index) {
this.nodesWithShard = findNodesWithShard(index);
}
@Override
public boolean apply(Settings settings) {
return nodesWithShard.contains(settings.get("name"));
}
private Set<String> findNodesWithShard(String index) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
List<ShardRouting> startedShards = indexRoutingTable.shardsWithState(ShardRoutingState.STARTED);
Set<String> nodesWithShard = new HashSet<>();
for (ShardRouting startedShard : startedShards) {
nodesWithShard.add(state.nodes().get(startedShard.currentNodeId()).getName());
}
return nodesWithShard;
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.segments;
import com.google.common.collect.ImmutableList;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
public class IndicesShardStoreResponseTest extends ElasticsearchTestCase {
@Test
public void testBasicSerialization() throws Exception {
ImmutableOpenMap.Builder<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> indexStoreStatuses = ImmutableOpenMap.builder();
ImmutableList.Builder<IndicesShardStoresResponse.Failure> failures = ImmutableList.builder();
ImmutableOpenIntMap.Builder<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = ImmutableOpenIntMap.builder();
DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT);
List<IndicesShardStoresResponse.StoreStatus> storeStatusList = new ArrayList<>();
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, new IOException("corrupted")));
storeStatuses.put(0, storeStatusList);
storeStatuses.put(1, storeStatusList);
ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storesMap = storeStatuses.build();
indexStoreStatuses.put("test", storesMap);
indexStoreStatuses.put("test2", storesMap);
failures.add(new IndicesShardStoresResponse.Failure("node1", "test", 3, new NodeDisconnectedException(node1, "")));
IndicesShardStoresResponse storesResponse = new IndicesShardStoresResponse(indexStoreStatuses.build(), failures.build());
XContentBuilder contentBuilder = XContentFactory.jsonBuilder();
contentBuilder.startObject();
storesResponse.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS);
contentBuilder.endObject();
BytesReference bytes = contentBuilder.bytes();
try (XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(bytes)) {
Map<String, Object> map = parser.map();
List failureList = (List) map.get("failures");
assertThat(failureList.size(), equalTo(1));
HashMap failureMap = (HashMap) failureList.get(0);
assertThat(failureMap.containsKey("index"), equalTo(true));
assertThat(((String) failureMap.get("index")), equalTo("test"));
assertThat(failureMap.containsKey("shard"), equalTo(true));
assertThat(((int) failureMap.get("shard")), equalTo(3));
assertThat(failureMap.containsKey("node"), equalTo(true));
assertThat(((String) failureMap.get("node")), equalTo("node1"));
Map<String, Object> indices = (Map<String, Object>) map.get("indices");
for (String index : new String[] {"test", "test2"}) {
assertThat(indices.containsKey(index), equalTo(true));
Map<String, Object> shards = ((Map<String, Object>) ((Map<String, Object>) indices.get(index)).get("shards"));
assertThat(shards.size(), equalTo(2));
for (String shardId : shards.keySet()) {
HashMap shardStoresStatus = (HashMap) shards.get(shardId);
assertThat(shardStoresStatus.containsKey("stores"), equalTo(true));
List stores = (ArrayList) shardStoresStatus.get("stores");
assertThat(stores.size(), equalTo(storeStatusList.size()));
for (int i = 0; i < stores.size(); i++) {
HashMap storeInfo = ((HashMap) stores.get(i));
IndicesShardStoresResponse.StoreStatus storeStatus = storeStatusList.get(i);
assertThat(storeInfo.containsKey("version"), equalTo(true));
assertThat(((int) storeInfo.get("version")), equalTo(((int) storeStatus.getVersion())));
assertThat(storeInfo.containsKey("allocation"), equalTo(true));
assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocation().value()));
assertThat(storeInfo.containsKey(storeStatus.getNode().id()), equalTo(true));
if (storeStatus.getStoreException() != null) {
assertThat(storeInfo.containsKey("store_exception"), equalTo(true));
}
}
}
}
}
}
@Test
public void testStoreStatusOrdering() throws Exception {
DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
List<IndicesShardStoresResponse.StoreStatus> orderedStoreStatuses = new ArrayList<>();
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, null));
orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, new IOException("corrupted")));
List<IndicesShardStoresResponse.StoreStatus> storeStatuses = new ArrayList<>(orderedStoreStatuses);
Collections.shuffle(storeStatuses);
CollectionUtil.timSort(storeStatuses);
assertThat(storeStatuses, equalTo(orderedStoreStatuses));
}
}

View File

@ -283,7 +283,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
indexRandom(true, docs);
assertAllShardsOnNodes("test", backwardsCluster().backwardsNodePattern());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
disableAllocation("test");
backwardsCluster().allowOnAllNodes("test");
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
@ -295,7 +295,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
}
indexRandom(true, docs);
}
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
enableAllocation("test");
ensureYellow();
final int numIters = randomIntBetween(1, 20);
for (int i = 0; i < numIters; i++) {
@ -328,7 +328,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
for (String index : indices) {
assertAllShardsOnNodes(index, backwardsCluster().backwardsNodePattern());
}
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
disableAllocation(indices);
backwardsCluster().allowOnAllNodes(indices);
logClusterState();
boolean upgraded;
@ -346,7 +346,7 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
}
indexRandom(true, docs);
} while (upgraded);
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
enableAllocation(indices);
ensureYellow();
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);

View File

@ -729,16 +729,4 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
}
return files;
}
private void disableAllocation(String index) {
client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder().put(
"index.routing.allocation.enable", "none"
)).get();
}
private void enableAllocation(String index) {
client().admin().indices().prepareUpdateSettings(index).setSettings(Settings.builder().put(
"index.routing.allocation.enable", "all"
)).get();
}
}

View File

@ -87,7 +87,7 @@ public class FunctionScoreBackwardCompatibilityTests extends ElasticsearchBackwa
checkFunctionScoreStillWorks(ids);
logClusterState();
// prevent any kind of allocation during the upgrade we recover from gateway
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
disableAllocation("test");
boolean upgraded;
int upgradedNodesCounter = 1;
do {
@ -97,7 +97,7 @@ public class FunctionScoreBackwardCompatibilityTests extends ElasticsearchBackwa
logClusterState();
checkFunctionScoreStillWorks(ids);
} while (upgraded);
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
enableAllocation("test");
logger.debug("done function_score while upgrading");
}

View File

@ -111,7 +111,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length)));
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
disableAllocation(indices);
backwardsCluster().allowOnAllNodes(indices);
logClusterState();
boolean upgraded;
@ -124,7 +124,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
} while (upgraded);
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
enableAllocation(indices);
logger.info("--> close indices");
client().admin().indices().prepareClose("index_before_*").get();
@ -201,7 +201,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
}
if (frequently()) {
logger.info("--> upgrade");
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
disableAllocation("test");
backwardsCluster().allowOnAllNodes("test");
logClusterState();
boolean upgraded;
@ -214,7 +214,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);
} while (upgraded);
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
enableAllocation("test");
}
if (cluster().numDataNodes() > 1 && randomBoolean()) { // only bump the replicas if we have enough nodes
logger.info("--> move from 0 to 1 replica");

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.impl.client.HttpClients;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
@ -1228,6 +1229,24 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return actionGet.isExists();
}
/**
* Syntactic sugar for enabling allocation for <code>indices</code>
*/
protected final void enableAllocation(String... indices) {
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all"
)).get();
}
/**
* Syntactic sugar for disabling allocation for <code>indices</code>
*/
protected final void disableAllocation(String... indices) {
client().admin().indices().prepareUpdateSettings(indices).setSettings(Settings.builder().put(
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none"
)).get();
}
/**
* Returns a random admin client. This client can either be a node or a transport client pointing to any of
* the nodes in the cluster.

View File

@ -0,0 +1,73 @@
[[indices-shards-stores]]
== Indices Shard Stores
Provides store information for shard copies of indices.
Store information reports on which nodes shard copies exist, the shard
copy version, indicating how recent they are, and any exceptions
encountered while opening the shard index or from earlier engine failure.
By default, only lists store information for shards that have at least one
unallocated copy. When the cluster health status is yellow, this will list
store information for shards that have at least one unassigned replica.
When the cluster health status is red, this will list store information
for shards, which has unassigned primaries.
Endpoints include shard stores information for a specific index, several
indices, or all:
[source,js]
--------------------------------------------------
curl -XGET 'http://localhost:9200/test/_shard_stores'
curl -XGET 'http://localhost:9200/test1,test2/_shard_stores'
curl -XGET 'http://localhost:9200/_shard_stores'
--------------------------------------------------
The scope of shards to list store information can be changed through
`status` param. Defaults to 'yellow' and 'red'. 'yellow' lists store information of
shards with at least one unassigned replica and 'red' for shards with unassigned
primary shard.
Use 'green' to list store information for shards with all assigned copies.
[source,js]
--------------------------------------------------
curl -XGET 'http://localhost:9200/_shard_stores?status=green'
--------------------------------------------------
Response:
The shard stores information is grouped by indices and shard ids.
[source,js]
--------------------------------------------------
{
...
"0": { <1>
"stores": [ <2>
{
"sPa3OgxLSYGvQ4oPs-Tajw": { <3>
"name": "node_t0",
"transport_address": "local[1]",
"attributes": {
"enable_custom_paths": "true",
"mode": "local"
}
},
"version": 4, <4>
"allocation" : "primary" | "replica" | "unused", <6>
"store_exception": ... <5>
},
...
]
},
...
}
--------------------------------------------------
<1> The key is the corresponding shard id for the store information
<2> A list of store information for all copies of the shard
<3> The node information that hosts a copy of the store, the key
is the unique node id.
<4> The version of the store copy
<5> The status of the store copy, whether it is used as a
primary, replica or not used at all
<6> Any exception encountered while opening the shard index or
from earlier engine failure

View File

@ -0,0 +1,41 @@
{
"indices.shard_stores": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-shard-stores.html",
"methods": ["GET"],
"url": {
"path": "/_shard_stores",
"paths": ["/_shard_stores", "/{index}/_shard_stores"],
"parts": {
"index": {
"type" : "list",
"description" : "A comma-separated list of index names; use `_all` or empty string to perform the operation on all indices"
}
},
"params": {
"status" : {
"type" : "list",
"options" : ["green", "yellow", "red", "all"],
"description" : "A comma-separated list of statuses used to filter on shards to get store information for"
},
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"allow_no_indices": {
"type" : "boolean",
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
},
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"operation_threading": {
"description" : "TODO: ?"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,86 @@
---
"no indices test":
- do:
indices.shard_stores:
allow_no_indices: true
- match: { indices: {}}
- do:
catch: missing
indices.shard_stores:
allow_no_indices: false
---
"basic index test":
- do:
indices.create:
index: index1
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
index:
index: index1
type: type
body: { foo: bar }
refresh: true
- do:
cluster.health:
wait_for_status: green
- do:
indices.shard_stores:
index: index1
status: "green"
- match: { indices.index1.shards.0.stores.0.allocation: "PRIMARY" }
- gte: { indices.index1.shards.0.stores.0.version: 0 }
---
"multiple indices test":
- do:
indices.create:
index: index1
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
indices.create:
index: index2
body:
settings:
number_of_shards: "2"
number_of_replicas: "0"
- do:
index:
index: index1
type: type
body: { foo: bar }
refresh: true
- do:
index:
index: index2
type: type
body: { foo: bar }
refresh: true
- do:
cluster.health:
wait_for_status: green
- do:
indices.shard_stores:
status: "green"
- match: { indices.index1.shards.0.stores.0.allocation: "PRIMARY" }
- gte: { indices.index1.shards.0.stores.0.version: 0 }
- match: { indices.index2.shards.0.stores.0.allocation: "PRIMARY" }
- gte: { indices.index2.shards.0.stores.0.version: 0 }
- match: { indices.index2.shards.1.stores.0.allocation: "PRIMARY" }
- gte: { indices.index2.shards.1.stores.0.version: 0 }