From 508d1d40fba92cd0b2c544cef0213a9c18e9002f Mon Sep 17 00:00:00 2001 From: kimchy Date: Tue, 29 Mar 2011 13:05:36 +0200 Subject: [PATCH] Indices Status API: Remove settings/aliases section, and add `recovery`/`snapshot` flags, closes #809. --- .../admin/indices/status/IndexStatus.java | 14 +- .../indices/status/IndicesStatusRequest.java | 42 ++++ .../indices/status/IndicesStatusResponse.java | 49 ++--- .../status/TransportIndicesStatusAction.java | 182 ++++++++++-------- .../status/IndicesStatusRequestBuilder.java | 16 ++ .../status/RestIndicesStatusAction.java | 2 + .../fs/AbstractSimpleIndexGatewayTests.java | 2 +- 7 files changed, 177 insertions(+), 130 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java index 4ed5504c189..49aa97fbd10 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.admin.indices.status; import org.elasticsearch.common.collect.Maps; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.merge.MergeStats; @@ -39,11 +38,8 @@ public class IndexStatus implements Iterable { private final Map indexShards; - private final Settings settings; - - IndexStatus(String index, Settings settings, ShardStatus[] shards) { + IndexStatus(String index, ShardStatus[] shards) { this.index = index; - this.settings = settings; Map> tmpIndexShards = Maps.newHashMap(); for (ShardStatus shard : shards) { @@ -80,14 +76,6 @@ public class IndexStatus implements Iterable { return shards(); } - public Settings settings() { - return this.settings; - } - - public Settings getSettings() { - return settings(); - } - /** * Returns only the primary shards store size in bytes. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java index 4d760985bd9..b4f645ea9dc 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java @@ -22,12 +22,20 @@ package org.elasticsearch.action.admin.indices.status; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * @author kimchy (shay.banon) */ public class IndicesStatusRequest extends BroadcastOperationRequest { + private boolean recovery = false; + + private boolean snapshot = false; + public IndicesStatusRequest() { this(Strings.EMPTY_ARRAY); } @@ -36,6 +44,30 @@ public class IndicesStatusRequest extends BroadcastOperationRequest { super(indices); } + /** + * Should the status include recovery information. Defaults to false. + */ + public IndicesStatusRequest recovery(boolean recovery) { + this.recovery = recovery; + return this; + } + + public boolean recovery() { + return this.recovery; + } + + /** + * Should the status include recovery information. Defaults to false. + */ + public IndicesStatusRequest snapshot(boolean snapshot) { + this.snapshot = snapshot; + return this; + } + + public boolean snapshot() { + return this.snapshot; + } + @Override public IndicesStatusRequest listenerThreaded(boolean listenerThreaded) { super.listenerThreaded(listenerThreaded); return this; @@ -44,4 +76,14 @@ public class IndicesStatusRequest extends BroadcastOperationRequest { @Override public BroadcastOperationRequest operationThreading(BroadcastOperationThreading operationThreading) { return super.operationThreading(operationThreading); } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(recovery); + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + recovery = in.readBoolean(); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java index 95caf8b9755..a6507230bfd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java @@ -23,10 +23,9 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -36,11 +35,11 @@ import org.elasticsearch.index.merge.MergeStats; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.action.admin.indices.status.ShardStatus.*; import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Maps.*; -import static org.elasticsearch.common.settings.ImmutableSettings.*; /** * @author kimchy (shay.banon) @@ -49,8 +48,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements protected ShardStatus[] shards; - private Map indicesSettings = ImmutableMap.of(); - private Map indicesStatus; IndicesStatusResponse() { @@ -59,12 +56,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; - indicesSettings = newHashMap(); - for (ShardStatus shard : shards) { - if (!indicesSettings.containsKey(shard.shardRouting().index())) { - indicesSettings.put(shard.shardRouting().index(), clusterState.metaData().index(shard.shardRouting().index()).settings()); - } - } } public ShardStatus[] shards() { @@ -92,14 +83,20 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements return indicesStatus; } Map indicesStatus = newHashMap(); - for (String index : indicesSettings.keySet()) { + + Set indices = Sets.newHashSet(); + for (ShardStatus shard : shards) { + indices.add(shard.index()); + } + + for (String index : indices) { List shards = newArrayList(); - for (ShardStatus shard : shards()) { + for (ShardStatus shard : this.shards) { if (shard.shardRouting().index().equals(index)) { shards.add(shard); } } - indicesStatus.put(index, new IndexStatus(index, indicesSettings.get(index), shards.toArray(new ShardStatus[shards.size()]))); + indicesStatus.put(index, new IndexStatus(index, shards.toArray(new ShardStatus[shards.size()]))); } this.indicesStatus = indicesStatus; return indicesStatus; @@ -111,11 +108,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements for (ShardStatus status : shards()) { status.writeTo(out); } - out.writeVInt(indicesSettings.size()); - for (Map.Entry entry : indicesSettings.entrySet()) { - out.writeUTF(entry.getKey()); - writeSettingsToStream(entry.getValue(), out); - } } @Override public void readFrom(StreamInput in) throws IOException { @@ -124,11 +116,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements for (int i = 0; i < shards.length; i++) { shards[i] = readIndexShardStatus(in); } - indicesSettings = newHashMap(); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - indicesSettings.put(in.readUTF(), readSettingsFromStream(in)); - } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -140,18 +127,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements for (IndexStatus indexStatus : indices().values()) { builder.startObject(indexStatus.index(), XContentBuilder.FieldCaseConversion.NONE); - builder.array(Fields.ALIASES, indexStatus.settings().getAsArray("index.aliases")); - - builder.startObject(Fields.SETTINGS); - Settings settings = indexStatus.settings(); - if (settingsFilter != null) { - settings = settingsFilter.filterSettings(settings); - } - for (Map.Entry entry : settings.getAsMap().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); - } - builder.endObject(); - builder.startObject(Fields.INDEX); if (indexStatus.storeSize() != null) { builder.field(Fields.PRIMARY_SIZE, indexStatus.primaryStoreSize().toString()); @@ -309,8 +284,6 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements static final class Fields { static final XContentBuilderString INDICES = new XContentBuilderString("indices"); - static final XContentBuilderString ALIASES = new XContentBuilderString("aliases"); - static final XContentBuilderString SETTINGS = new XContentBuilderString("settings"); static final XContentBuilderString INDEX = new XContentBuilderString("index"); static final XContentBuilderString PRIMARY_SIZE = new XContentBuilderString("primary_size"); static final XContentBuilderString PRIMARY_SIZE_IN_BYTES = new XContentBuilderString("primary_size_in_bytes"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index 408412647af..55554d810a8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -32,6 +32,8 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.gateway.IndexShardGatewayService; @@ -136,7 +138,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct } @Override protected IndexShardStatusRequest newShardRequest(ShardRouting shard, IndicesStatusRequest request) { - return new IndexShardStatusRequest(shard.index(), shard.id()); + return new IndexShardStatusRequest(shard.index(), shard.id(), request); } @Override protected ShardStatus newShardResponse() { @@ -169,86 +171,92 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct shardStatus.mergeStats = indexShard.mergeScheduler().stats(); } - // check on going recovery (from peer or gateway) - RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus(); - if (peerRecoveryStatus == null) { - peerRecoveryStatus = peerRecoveryTarget.peerRecoveryStatus(indexShard.shardId()); - } - if (peerRecoveryStatus != null) { - PeerRecoveryStatus.Stage stage; - switch (peerRecoveryStatus.stage()) { - case INIT: - stage = PeerRecoveryStatus.Stage.INIT; - break; - case INDEX: - stage = PeerRecoveryStatus.Stage.INDEX; - break; - case TRANSLOG: - stage = PeerRecoveryStatus.Stage.TRANSLOG; - break; - case FINALIZE: - stage = PeerRecoveryStatus.Stage.FINALIZE; - break; - case DONE: - stage = PeerRecoveryStatus.Stage.DONE; - break; - default: - stage = PeerRecoveryStatus.Stage.INIT; + + if (request.recovery) { + // check on going recovery (from peer or gateway) + RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus(); + if (peerRecoveryStatus == null) { + peerRecoveryStatus = peerRecoveryTarget.peerRecoveryStatus(indexShard.shardId()); + } + if (peerRecoveryStatus != null) { + PeerRecoveryStatus.Stage stage; + switch (peerRecoveryStatus.stage()) { + case INIT: + stage = PeerRecoveryStatus.Stage.INIT; + break; + case INDEX: + stage = PeerRecoveryStatus.Stage.INDEX; + break; + case TRANSLOG: + stage = PeerRecoveryStatus.Stage.TRANSLOG; + break; + case FINALIZE: + stage = PeerRecoveryStatus.Stage.FINALIZE; + break; + case DONE: + stage = PeerRecoveryStatus.Stage.DONE; + break; + default: + stage = PeerRecoveryStatus.Stage.INIT; + } + shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(), + peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(), + peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations()); + } + + IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class); + org.elasticsearch.index.gateway.RecoveryStatus gatewayRecoveryStatus = gatewayService.recoveryStatus(); + if (gatewayRecoveryStatus != null) { + GatewayRecoveryStatus.Stage stage; + switch (gatewayRecoveryStatus.stage()) { + case INIT: + stage = GatewayRecoveryStatus.Stage.INIT; + break; + case INDEX: + stage = GatewayRecoveryStatus.Stage.INDEX; + break; + case TRANSLOG: + stage = GatewayRecoveryStatus.Stage.TRANSLOG; + break; + case DONE: + stage = GatewayRecoveryStatus.Stage.DONE; + break; + default: + stage = GatewayRecoveryStatus.Stage.INIT; + } + shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), + gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().reusedTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations()); } - shardStatus.peerRecoveryStatus = new PeerRecoveryStatus(stage, peerRecoveryStatus.startTime(), peerRecoveryStatus.time(), - peerRecoveryStatus.phase1TotalSize(), peerRecoveryStatus.phase1ExistingTotalSize(), - peerRecoveryStatus.currentFilesSize(), peerRecoveryStatus.currentTranslogOperations()); } - IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class); - org.elasticsearch.index.gateway.RecoveryStatus gatewayRecoveryStatus = gatewayService.recoveryStatus(); - if (gatewayRecoveryStatus != null) { - GatewayRecoveryStatus.Stage stage; - switch (gatewayRecoveryStatus.stage()) { - case INIT: - stage = GatewayRecoveryStatus.Stage.INIT; - break; - case INDEX: - stage = GatewayRecoveryStatus.Stage.INDEX; - break; - case TRANSLOG: - stage = GatewayRecoveryStatus.Stage.TRANSLOG; - break; - case DONE: - stage = GatewayRecoveryStatus.Stage.DONE; - break; - default: - stage = GatewayRecoveryStatus.Stage.INIT; + if (request.snapshot) { + IndexShardGatewayService gatewayService = indexService.shardInjector(request.shardId()).getInstance(IndexShardGatewayService.class); + SnapshotStatus snapshotStatus = gatewayService.snapshotStatus(); + if (snapshotStatus != null) { + GatewaySnapshotStatus.Stage stage; + switch (snapshotStatus.stage()) { + case DONE: + stage = GatewaySnapshotStatus.Stage.DONE; + break; + case FAILURE: + stage = GatewaySnapshotStatus.Stage.FAILURE; + break; + case TRANSLOG: + stage = GatewaySnapshotStatus.Stage.TRANSLOG; + break; + case FINALIZE: + stage = GatewaySnapshotStatus.Stage.FINALIZE; + break; + case INDEX: + stage = GatewaySnapshotStatus.Stage.INDEX; + break; + default: + stage = GatewaySnapshotStatus.Stage.NONE; + break; + } + shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(), + snapshotStatus.index().totalSize(), snapshotStatus.translog().expectedNumberOfOperations()); } - shardStatus.gatewayRecoveryStatus = new GatewayRecoveryStatus(stage, gatewayRecoveryStatus.startTime(), gatewayRecoveryStatus.time(), - gatewayRecoveryStatus.index().totalSize(), gatewayRecoveryStatus.index().reusedTotalSize(), gatewayRecoveryStatus.index().currentFilesSize(), gatewayRecoveryStatus.translog().currentTranslogOperations()); - } - - SnapshotStatus snapshotStatus = gatewayService.snapshotStatus(); - if (snapshotStatus != null) { - GatewaySnapshotStatus.Stage stage; - switch (snapshotStatus.stage()) { - case DONE: - stage = GatewaySnapshotStatus.Stage.DONE; - break; - case FAILURE: - stage = GatewaySnapshotStatus.Stage.FAILURE; - break; - case TRANSLOG: - stage = GatewaySnapshotStatus.Stage.TRANSLOG; - break; - case FINALIZE: - stage = GatewaySnapshotStatus.Stage.FINALIZE; - break; - case INDEX: - stage = GatewaySnapshotStatus.Stage.INDEX; - break; - default: - stage = GatewaySnapshotStatus.Stage.NONE; - break; - } - shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(), - snapshotStatus.index().totalSize(), snapshotStatus.translog().expectedNumberOfOperations()); } return shardStatus; @@ -256,11 +264,29 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct public static class IndexShardStatusRequest extends BroadcastShardOperationRequest { + boolean recovery; + + boolean snapshot; + IndexShardStatusRequest() { } - IndexShardStatusRequest(String index, int shardId) { + IndexShardStatusRequest(String index, int shardId, IndicesStatusRequest request) { super(index, shardId); + recovery = request.recovery(); + snapshot = request.snapshot(); + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + recovery = in.readBoolean(); + snapshot = in.readBoolean(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(recovery); + out.writeBoolean(snapshot); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/status/IndicesStatusRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/status/IndicesStatusRequestBuilder.java index 441b29ff049..098f69cf70e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/status/IndicesStatusRequestBuilder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/status/IndicesStatusRequestBuilder.java @@ -39,6 +39,22 @@ public class IndicesStatusRequestBuilder extends BaseIndicesRequestBuilderfalse. + */ + public IndicesStatusRequestBuilder setRecovery(boolean recovery) { + request.recovery(recovery); + return this; + } + + /** + * Should the status include recovery information. Defaults to false. + */ + public IndicesStatusRequestBuilder setSnapshot(boolean snapshot) { + request.snapshot(snapshot); + return this; + } + @Override protected void doExecute(ActionListener listener) { client.status(request, listener); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java index 172baa7992e..7774c071962 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java @@ -57,6 +57,8 @@ public class RestIndicesStatusAction extends BaseRestHandler { IndicesStatusRequest indicesStatusRequest = new IndicesStatusRequest(splitIndices(request.param("index"))); // we just send back a response, no need to fork a listener indicesStatusRequest.listenerThreaded(false); + indicesStatusRequest.recovery(request.paramAsBoolean("recovery", indicesStatusRequest.recovery())); + indicesStatusRequest.snapshot(request.paramAsBoolean("snapshot", indicesStatusRequest.snapshot())); BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD); if (operationThreading == BroadcastOperationThreading.NO_THREADS) { // since we don't spawn, don't allow no_threads, but change it to a single thread diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java index 690b086141e..b88cd3c2d38 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java @@ -298,7 +298,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests assertThat(client("server1").prepareCount().setQuery(matchAllQuery()).execute().actionGet().count(), equalTo(1234l)); logger.info("--> checking reuse / recovery status"); - IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().execute().actionGet(); + IndicesStatusResponse statusResponse = client("server1").admin().indices().prepareStatus().setRecovery(true).execute().actionGet(); for (IndexShardStatus indexShardStatus : statusResponse.index("test")) { for (ShardStatus shardStatus : indexShardStatus) { if (shardStatus.shardRouting().primary()) {