Indices Status API: Remove settings/aliases section, and add `recovery`/`snapshot` flags, closes #809.

This commit is contained in:
kimchy 2011-03-29 13:05:36 +02:00
parent 95e36a073a
commit 508d1d40fb
7 changed files with 177 additions and 130 deletions

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.merge.MergeStats;
@ -39,11 +38,8 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
private final Map<Integer, IndexShardStatus> indexShards;
private final Settings settings;
IndexStatus(String index, Settings settings, ShardStatus[] shards) {
IndexStatus(String index, ShardStatus[] shards) {
this.index = index;
this.settings = settings;
Map<Integer, List<ShardStatus>> tmpIndexShards = Maps.newHashMap();
for (ShardStatus shard : shards) {
@ -80,14 +76,6 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
return shards();
}
public Settings settings() {
return this.settings;
}
public Settings getSettings() {
return settings();
}
/**
* Returns only the primary shards store size in bytes.
*/

View File

@ -22,12 +22,20 @@ package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class IndicesStatusRequest extends BroadcastOperationRequest {
private boolean recovery = false;
private boolean snapshot = false;
public IndicesStatusRequest() {
this(Strings.EMPTY_ARRAY);
}
@ -36,6 +44,30 @@ public class IndicesStatusRequest extends BroadcastOperationRequest {
super(indices);
}
/**
* Should the status include recovery information. Defaults to <tt>false</tt>.
*/
public IndicesStatusRequest recovery(boolean recovery) {
this.recovery = recovery;
return this;
}
public boolean recovery() {
return this.recovery;
}
/**
* Should the status include recovery information. Defaults to <tt>false</tt>.
*/
public IndicesStatusRequest snapshot(boolean snapshot) {
this.snapshot = snapshot;
return this;
}
public boolean snapshot() {
return this.snapshot;
}
@Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) {
super.listenerThreaded(listenerThreaded);
return this;
@ -44,4 +76,14 @@ public class IndicesStatusRequest extends BroadcastOperationRequest {
@Override public BroadcastOperationRequest operationThreading(BroadcastOperationThreading operationThreading) {
return super.operationThreading(operationThreading);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(recovery);
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recovery = in.readBoolean();
}
}

View File

@ -23,10 +23,9 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -36,11 +35,11 @@ import org.elasticsearch.index.merge.MergeStats;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.action.admin.indices.status.ShardStatus.*;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.collect.Maps.*;
import static org.elasticsearch.common.settings.ImmutableSettings.*;
/**
* @author kimchy (shay.banon)
@ -49,8 +48,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
protected ShardStatus[] shards;
private Map<String, Settings> indicesSettings = ImmutableMap.of();
private Map<String, IndexStatus> indicesStatus;
IndicesStatusResponse() {
@ -59,12 +56,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
indicesSettings = newHashMap();
for (ShardStatus shard : shards) {
if (!indicesSettings.containsKey(shard.shardRouting().index())) {
indicesSettings.put(shard.shardRouting().index(), clusterState.metaData().index(shard.shardRouting().index()).settings());
}
}
}
public ShardStatus[] shards() {
@ -92,14 +83,20 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
return indicesStatus;
}
Map<String, IndexStatus> indicesStatus = newHashMap();
for (String index : indicesSettings.keySet()) {
Set<String> indices = Sets.newHashSet();
for (ShardStatus shard : shards) {
indices.add(shard.index());
}
for (String index : indices) {
List<ShardStatus> shards = newArrayList();
for (ShardStatus shard : shards()) {
for (ShardStatus shard : this.shards) {
if (shard.shardRouting().index().equals(index)) {
shards.add(shard);
}
}
indicesStatus.put(index, new IndexStatus(index, indicesSettings.get(index), shards.toArray(new ShardStatus[shards.size()])));
indicesStatus.put(index, new IndexStatus(index, shards.toArray(new ShardStatus[shards.size()])));
}
this.indicesStatus = indicesStatus;
return indicesStatus;
@ -111,11 +108,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
for (ShardStatus status : shards()) {
status.writeTo(out);
}
out.writeVInt(indicesSettings.size());
for (Map.Entry<String, Settings> entry : indicesSettings.entrySet()) {
out.writeUTF(entry.getKey());
writeSettingsToStream(entry.getValue(), out);
}
}
@Override public void readFrom(StreamInput in) throws IOException {
@ -124,11 +116,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
for (int i = 0; i < shards.length; i++) {
shards[i] = readIndexShardStatus(in);
}
indicesSettings = newHashMap();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
indicesSettings.put(in.readUTF(), readSettingsFromStream(in));
}
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@ -140,18 +127,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
for (IndexStatus indexStatus : indices().values()) {
builder.startObject(indexStatus.index(), XContentBuilder.FieldCaseConversion.NONE);
builder.array(Fields.ALIASES, indexStatus.settings().getAsArray("index.aliases"));
builder.startObject(Fields.SETTINGS);
Settings settings = indexStatus.settings();
if (settingsFilter != null) {
settings = settingsFilter.filterSettings(settings);
}
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.startObject(Fields.INDEX);
if (indexStatus.storeSize() != null) {
builder.field(Fields.PRIMARY_SIZE, indexStatus.primaryStoreSize().toString());
@ -309,8 +284,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
static final class Fields {
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString ALIASES = new XContentBuilderString("aliases");
static final XContentBuilderString SETTINGS = new XContentBuilderString("settings");
static final XContentBuilderString INDEX = new XContentBuilderString("index");
static final XContentBuilderString PRIMARY_SIZE = new XContentBuilderString("primary_size");
static final XContentBuilderString PRIMARY_SIZE_IN_BYTES = new XContentBuilderString("primary_size_in_bytes");

View File

@ -32,6 +32,8 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
@ -136,7 +138,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
}
@Override protected IndexShardStatusRequest newShardRequest(ShardRouting shard, IndicesStatusRequest request) {
return new IndexShardStatusRequest(shard.index(), shard.id());
return new IndexShardStatusRequest(shard.index(), shard.id(), request);
}
@Override protected ShardStatus newShardResponse() {
@ -169,6 +171,8 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
shardStatus.mergeStats = indexShard.mergeScheduler().stats();
}
if (request.recovery) {
// check on going recovery (from peer or gateway)
RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
if (peerRecoveryStatus == null) {
@ -223,7 +227,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(),
gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().reusedTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations());
}
}
if (request.snapshot) {
IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class);
SnapshotStatus snapshotStatus = gatewayService.snapshotStatus();
if (snapshotStatus != null) {
GatewaySnapshotStatus.Stage stage;
@ -250,17 +257,36 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(),
snapshotStatus.index().totalSize(), snapshotStatus.translog().expectedNumberOfOperations());
}
}
return shardStatus;
}
public static class IndexShardStatusRequest extends BroadcastShardOperationRequest {
boolean recovery;
boolean snapshot;
IndexShardStatusRequest() {
}
IndexShardStatusRequest(String index, int shardId) {
IndexShardStatusRequest(String index, int shardId, IndicesStatusRequest request) {
super(index, shardId);
recovery = request.recovery();
snapshot = request.snapshot();
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recovery = in.readBoolean();
snapshot = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(recovery);
out.writeBoolean(snapshot);
}
}
}

View File

@ -39,6 +39,22 @@ public class IndicesStatusRequestBuilder extends BaseIndicesRequestBuilder<Indic
return this;
}
/**
* Should the status include recovery information. Defaults to <tt>false</tt>.
*/
public IndicesStatusRequestBuilder setRecovery(boolean recovery) {
request.recovery(recovery);
return this;
}
/**
* Should the status include recovery information. Defaults to <tt>false</tt>.
*/
public IndicesStatusRequestBuilder setSnapshot(boolean snapshot) {
request.snapshot(snapshot);
return this;
}
@Override protected void doExecute(ActionListener<IndicesStatusResponse> listener) {
client.status(request, listener);
}

View File

@ -57,6 +57,8 @@ public class RestIndicesStatusAction extends BaseRestHandler {
IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(splitIndices(request.param("index")));
// we just send back a response, no need to fork a listener
indicesStatusRequest.listenerThreaded(false);
indicesStatusRequest.recovery(request.paramAsBoolean("recovery", indicesStatusRequest.recovery()));
indicesStatusRequest.snapshot(request.paramAsBoolean("snapshot", indicesStatusRequest.snapshot()));
BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
// since we don't spawn, don't allow no_threads, but change it to a single thread

View File

@ -298,7 +298,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests
assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1234l));
logger.info("--> checking reuse / recovery status");
IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().execute().actionGet();
IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().setRecovery(true).execute().actionGet();
for (IndexShardStatus indexShardStatus : statusResponse.index("test")) {
for (ShardStatus shardStatus : indexShardStatus) {
if (shardStatus.shardRouting().primary()) {