diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java index db5406babf7..91863b6c6fb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java @@ -44,6 +44,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction; import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; +import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.TransportUpdateSettingsAction; import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction; import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction; @@ -95,6 +96,7 @@ public class TransportActionModule extends AbstractModule { bind(TransportReplicationPingAction.class).asEagerSingleton(); bind(TransportIndicesStatusAction.class).asEagerSingleton(); + bind(TransportIndicesSegmentsAction.class).asEagerSingleton(); bind(TransportCreateIndexAction.class).asEagerSingleton(); bind(TransportDeleteIndexAction.class).asEagerSingleton(); bind(TransportOpenIndexAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java index c1056f6b9b0..723327dc41b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java @@ -57,6 +57,7 @@ public class TransportActions { public static final String REFRESH = "indices/refresh"; public static final String OPTIMIZE = "indices/optimize"; public static final String STATUS = "indices/status"; + public static final String SEGMENTS = "indices/segments"; public static final String EXISTS = "indices/exists"; public static final String ALIASES = "indices/aliases"; public static final String UPDATE_SETTINGS = "indices/updateSettings"; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexSegments.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexSegments.java new file mode 100644 index 00000000000..6f5efb3d233 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexSegments.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Maps; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IndexSegments implements Iterable { + + private final String index; + + private final Map indexShards; + + IndexSegments(String index, ShardSegments[] shards) { + this.index = index; + + Map> tmpIndexShards = Maps.newHashMap(); + for (ShardSegments shard : shards) { + List lst = tmpIndexShards.get(shard.shardRouting().id()); + if (lst == null) { + lst = Lists.newArrayList(); + tmpIndexShards.put(shard.shardRouting().id(), lst); + } + lst.add(shard); + } + indexShards = Maps.newHashMap(); + for (Map.Entry> entry : tmpIndexShards.entrySet()) { + indexShards.put(entry.getKey(), new IndexShardSegments(entry.getValue().get(0).shardRouting().shardId(), entry.getValue().toArray(new ShardSegments[entry.getValue().size()]))); + } + } + + public String index() { + return this.index; + } + + public String getIndex() { + return index(); + } + + /** + * A shard id to index shard segments map (note, index shard segments is the replication shard group that maps + * to the shard id). + */ + public Map shards() { + return this.indexShards; + } + + public Map getShards() { + return shards(); + } + + @Override public Iterator iterator() { + return indexShards.values().iterator(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexShardSegments.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexShardSegments.java new file mode 100644 index 00000000000..e75cea1954b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndexShardSegments.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Iterator; + +public class IndexShardSegments implements Iterable { + + private final ShardId shardId; + + private final ShardSegments[] shards; + + IndexShardSegments(ShardId shardId, ShardSegments[] shards) { + this.shardId = shardId; + this.shards = shards; + } + + public ShardId shardId() { + return this.shardId; + } + + public ShardId getShardId() { + return this.shardId; + } + + public ShardSegments getAt(int i) { + return shards[i]; + } + + public ShardSegments[] shards() { + return this.shards; + } + + public ShardSegments[] getShards() { + return this.shards; + } + + @Override public Iterator iterator() { + return Iterators.forArray(shards); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java new file mode 100644 index 00000000000..ef1b0541c89 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -0,0 +1,166 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.collect.Lists; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.collect.Sets; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.engine.Segment; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class IndicesSegmentResponse extends BroadcastOperationResponse implements ToXContent { + + private ShardSegments[] shards; + + private Map indicesSegments; + + IndicesSegmentResponse() { + + } + + IndicesSegmentResponse(ShardSegments[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.shards = shards; + } + + public Map getIndices() { + return this.indices(); + } + + public Map indices() { + if (indicesSegments != null) { + return indicesSegments; + } + Map indicesSegments = Maps.newHashMap(); + + Set indices = Sets.newHashSet(); + for (ShardSegments shard : shards) { + indices.add(shard.index()); + } + + for (String index : indices) { + List shards = Lists.newArrayList(); + for (ShardSegments shard : this.shards) { + if (shard.shardRouting().index().equals(index)) { + shards.add(shard); + } + } + indicesSegments.put(index, new IndexSegments(index, shards.toArray(new ShardSegments[shards.size()]))); + } + this.indicesSegments = indicesSegments; + return indicesSegments; + } + + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shards = new ShardSegments[in.readVInt()]; + for (int i = 0; i < shards.length; i++) { + shards[i] = ShardSegments.readShardSegments(in); + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(shards.length); + for (ShardSegments shard : shards) { + shard.writeTo(out); + } + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.INDICES); + + for (IndexSegments indexSegments : indices().values()) { + builder.startObject(indexSegments.index(), XContentBuilder.FieldCaseConversion.NONE); + + builder.startObject(Fields.SHARDS); + for (IndexShardSegments indexSegment : indexSegments) { + builder.startArray(Integer.toString(indexSegment.shardId().id())); + for (ShardSegments shardSegments : indexSegment) { + builder.startObject(); + + builder.startObject(Fields.ROUTING); + builder.field(Fields.STATE, shardSegments.shardRouting().state()); + builder.field(Fields.PRIMARY, shardSegments.shardRouting().primary()); + builder.field(Fields.NODE, shardSegments.shardRouting().currentNodeId()); + if (shardSegments.shardRouting().relocatingNodeId() != null) { + builder.field(Fields.RELOCATING_NODE, shardSegments.shardRouting().relocatingNodeId()); + } + builder.endObject(); + + builder.startObject(Fields.SEGMENTS); + for (Segment segment : shardSegments) { + builder.startObject(segment.name()); + builder.field(Fields.GENERATION, segment.generation()); + builder.field(Fields.NUM_DOCS, segment.numDocs()); + builder.field(Fields.DELETED_DOCS, segment.deletedDocs()); + builder.field(Fields.SIZE, segment.size().toString()); + builder.field(Fields.SIZE_IN_BYTES, segment.sizeInBytes()); + builder.field(Fields.COMMITTED, segment.committed()); + builder.field(Fields.SEARCH, segment.search()); + builder.endObject(); + } + builder.endObject(); + + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + + builder.endObject(); + } + + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString INDICES = new XContentBuilderString("indices"); + static final XContentBuilderString SHARDS = new XContentBuilderString("shards"); + static final XContentBuilderString ROUTING = new XContentBuilderString("routing"); + static final XContentBuilderString STATE = new XContentBuilderString("state"); + static final XContentBuilderString PRIMARY = new XContentBuilderString("primary"); + static final XContentBuilderString NODE = new XContentBuilderString("node"); + static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node"); + + static final XContentBuilderString SEGMENTS = new XContentBuilderString("segments"); + static final XContentBuilderString GENERATION = new XContentBuilderString("generation"); + static final XContentBuilderString NUM_DOCS = new XContentBuilderString("num_docs"); + static final XContentBuilderString DELETED_DOCS = new XContentBuilderString("deleted_docs"); + static final XContentBuilderString SIZE = new XContentBuilderString("size"); + static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); + static final XContentBuilderString COMMITTED = new XContentBuilderString("committed"); + static final XContentBuilderString SEARCH = new XContentBuilderString("search"); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequest.java new file mode 100644 index 00000000000..901758095c4 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentsRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest; +import org.elasticsearch.common.Strings; + +public class IndicesSegmentsRequest extends BroadcastOperationRequest { + + public IndicesSegmentsRequest() { + this(Strings.EMPTY_ARRAY); + } + + public IndicesSegmentsRequest(String... indices) { + super(indices); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java new file mode 100644 index 00000000000..c2181a4b0d9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/ShardSegments.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.engine.Segment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*; + +public class ShardSegments extends BroadcastShardOperationResponse implements Iterable { + + private ShardRouting shardRouting; + + private List segments; + + ShardSegments() { + } + + public ShardSegments(ShardRouting shardRouting, List segments) { + super(shardRouting.index(), shardRouting.id()); + this.shardRouting = shardRouting; + this.segments = segments; + } + + @Override public Iterator iterator() { + return segments.iterator(); + } + + public ShardRouting shardRouting() { + return this.shardRouting; + } + + public ShardRouting getShardRouting() { + return this.shardRouting; + } + + public List segments() { + return this.segments; + } + + public List getSegments() { + return segments; + } + + public static ShardSegments readShardSegments(StreamInput in) throws IOException { + ShardSegments shard = new ShardSegments(); + shard.readFrom(in); + return shard; + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardRouting = readShardRoutingEntry(in); + int size = in.readVInt(); + if (size == 0) { + segments = ImmutableList.of(); + } else { + segments = new ArrayList(size); + for (int i = 0; i < size; i++) { + segments.add(Segment.readSegment(in)); + } + } + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardRouting.writeTo(out); + out.writeVInt(segments.size()); + for (Segment segment : segments) { + segment.writeTo(out); + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java new file mode 100644 index 00000000000..5e1706f3c9f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -0,0 +1,162 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.segments; + +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest; +import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.service.InternalIndexService; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import static org.elasticsearch.common.collect.Lists.*; + +/** + * @author kimchy (shay.banon) + */ +public class TransportIndicesSegmentsAction extends TransportBroadcastOperationAction { + + private final IndicesService indicesService; + + @Inject public TransportIndicesSegmentsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, + IndicesService indicesService) { + super(settings, threadPool, clusterService, transportService); + this.indicesService = indicesService; + } + + @Override protected String executor() { + return ThreadPool.Names.CACHED; + } + + @Override protected String transportAction() { + return TransportActions.Admin.Indices.SEGMENTS; + } + + @Override protected String transportShardAction() { + return "indices/segments/shard"; + } + + @Override protected IndicesSegmentsRequest newRequest() { + return new IndicesSegmentsRequest(); + } + + @Override protected boolean ignoreNonActiveExceptions() { + return true; + } + + /** + * Segments goes across *all* shards. + */ + @Override protected GroupShardsIterator shards(IndicesSegmentsRequest request, String[] concreteIndices, ClusterState clusterState) { + return clusterState.routingTable().allShardsGrouped(concreteIndices); + } + + /** + * We want to go over all assigned nodes (to get recovery status) and not just active ones. + */ + @Override protected ShardRouting nextShardOrNull(ShardIterator shardIt) { + return shardIt.nextAssignedOrNull(); + } + + /** + * We want to go over all assigned nodes (to get recovery status) and not just active ones. + */ + @Override protected boolean hasNextShard(ShardIterator shardIt) { + return shardIt.hasNextAssigned(); + } + + @Override protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { + int successfulShards = 0; + int failedShards = 0; + List shardFailures = null; + final List shards = newArrayList(); + for (int i = 0; i < shardsResponses.length(); i++) { + Object shardResponse = shardsResponses.get(i); + if (shardResponse == null) { + // simply ignore non active shards + } else if (shardResponse instanceof BroadcastShardOperationFailedException) { + failedShards++; + if (shardFailures == null) { + shardFailures = newArrayList(); + } + shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse)); + } else { + shards.add((ShardSegments) shardResponse); + successfulShards++; + } + } + return new IndicesSegmentResponse(shards.toArray(new ShardSegments[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); + } + + @Override protected IndexShardSegmentRequest newShardRequest() { + return new IndexShardSegmentRequest(); + } + + @Override protected IndexShardSegmentRequest newShardRequest(ShardRouting shard, IndicesSegmentsRequest request) { + return new IndexShardSegmentRequest(shard.index(), shard.id(), request); + } + + @Override protected ShardSegments newShardResponse() { + return new ShardSegments(); + } + + @Override protected ShardSegments shardOperation(IndexShardSegmentRequest request) throws ElasticSearchException { + InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.index()); + InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId()); + return new ShardSegments(indexShard.routingEntry(), indexShard.engine().segments()); + } + + public static class IndexShardSegmentRequest extends BroadcastShardOperationRequest { + + IndexShardSegmentRequest() { + } + + IndexShardSegmentRequest(String index, int shardId, IndicesSegmentsRequest request) { + super(index, shardId); + } + + @Override public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java index b4f645ea9dc..e1e72939610 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusRequest.java @@ -80,10 +80,12 @@ public class IndicesStatusRequest extends BroadcastOperationRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(recovery); + out.writeBoolean(snapshot); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); recovery = in.readBoolean(); + snapshot = in.readBoolean(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 6aa7131a3ad..6003ddd53ba 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -49,6 +49,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest; @@ -71,6 +73,7 @@ import org.elasticsearch.client.action.admin.indices.mapping.put.PutMappingReque import org.elasticsearch.client.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.client.action.admin.indices.optimize.OptimizeRequestBuilder; import org.elasticsearch.client.action.admin.indices.refresh.RefreshRequestBuilder; +import org.elasticsearch.client.action.admin.indices.segments.IndicesSegmentsRequestBuilder; import org.elasticsearch.client.action.admin.indices.settings.UpdateSettingsRequestBuilder; import org.elasticsearch.client.action.admin.indices.status.IndicesStatusRequestBuilder; import org.elasticsearch.client.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder; @@ -130,6 +133,29 @@ public interface IndicesAdminClient { */ IndicesStatusRequestBuilder prepareStatus(String... indices); + /** + * The segments of one or more indices. + * + * @param request The indices segments request + * @return The result future + * @see Requests#indicesSegmentsRequest(String...) + */ + ActionFuture segments(IndicesSegmentsRequest request); + + /** + * The segments of one or more indices. + * + * @param request The indices segments request + * @param listener A listener to be notified with a result + * @see Requests#indicesSegmentsRequest(String...) + */ + void segments(IndicesSegmentsRequest request, ActionListener listener); + + /** + * The segments of one or more indices. + */ + IndicesSegmentsRequestBuilder prepareSegments(String... indices); + /** * Creates an index using an explicit request allowing to specify the settings of the index. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java index c0e046c08ac..370ff31a146 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java @@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -188,6 +189,10 @@ public class Requests { return new IndicesStatusRequest(indices); } + public static IndicesSegmentsRequest indicesSegmentsRequest(String... indices) { + return new IndicesSegmentsRequest(indices); + } + /** * Creates an indices exists request. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/segments/IndicesSegmentsRequestBuilder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/segments/IndicesSegmentsRequestBuilder.java new file mode 100644 index 00000000000..47295d3cf5a --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/segments/IndicesSegmentsRequestBuilder.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.action.admin.indices.segments; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder; + +/** + * @author kimchy (shay.banon) + */ +public class IndicesSegmentsRequestBuilder extends BaseIndicesRequestBuilder { + + public IndicesSegmentsRequestBuilder(IndicesAdminClient indicesClient) { + super(indicesClient, new IndicesSegmentsRequest()); + } + + public IndicesSegmentsRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + @Override protected void doExecute(ActionListener listener) { + client.segments(request, listener); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeIndicesAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeIndicesAdminClient.java index 6d16af48dba..c2f701c2219 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeIndicesAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeIndicesAdminClient.java @@ -63,6 +63,9 @@ import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.TransportUpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; @@ -92,6 +95,8 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement private final TransportIndicesStatusAction indicesStatusAction; + private final TransportIndicesSegmentsAction indicesSegmentsAction; + private final TransportCreateIndexAction createIndexAction; private final TransportDeleteIndexAction deleteIndexAction; @@ -124,7 +129,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement private final TransportDeleteIndexTemplateAction deleteIndexTemplateAction; - @Inject public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, TransportIndicesExistsAction indicesExistsAction, TransportIndicesStatusAction indicesStatusAction, + @Inject public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, TransportIndicesExistsAction indicesExistsAction, TransportIndicesStatusAction indicesStatusAction, TransportIndicesSegmentsAction indicesSegmentsAction, TransportCreateIndexAction createIndexAction, TransportDeleteIndexAction deleteIndexAction, TransportCloseIndexAction closeIndexAction, TransportOpenIndexAction openIndexAction, TransportRefreshAction refreshAction, TransportFlushAction flushAction, TransportOptimizeAction optimizeAction, @@ -135,6 +140,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement this.threadPool = threadPool; this.indicesExistsAction = indicesExistsAction; this.indicesStatusAction = indicesStatusAction; + this.indicesSegmentsAction = indicesSegmentsAction; this.createIndexAction = createIndexAction; this.deleteIndexAction = deleteIndexAction; this.closeIndexAction = closeIndexAction; @@ -173,6 +179,14 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement indicesStatusAction.execute(request, listener); } + @Override public ActionFuture segments(IndicesSegmentsRequest request) { + return indicesSegmentsAction.execute(request); + } + + @Override public void segments(IndicesSegmentsRequest request, ActionListener listener) { + indicesSegmentsAction.execute(request, listener); + } + @Override public ActionFuture create(CreateIndexRequest request) { return createIndexAction.execute(request); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java index ce7fbf32230..930b22832f3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/support/AbstractIndicesAdminClient.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.action.admin.indices.mapping.put.PutMappingReque import org.elasticsearch.client.action.admin.indices.open.OpenIndexRequestBuilder; import org.elasticsearch.client.action.admin.indices.optimize.OptimizeRequestBuilder; import org.elasticsearch.client.action.admin.indices.refresh.RefreshRequestBuilder; +import org.elasticsearch.client.action.admin.indices.segments.IndicesSegmentsRequestBuilder; import org.elasticsearch.client.action.admin.indices.settings.UpdateSettingsRequestBuilder; import org.elasticsearch.client.action.admin.indices.status.IndicesStatusRequestBuilder; import org.elasticsearch.client.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder; @@ -100,6 +101,10 @@ public abstract class AbstractIndicesAdminClient implements InternalIndicesAdmin return new IndicesStatusRequestBuilder(this).setIndices(indices); } + @Override public IndicesSegmentsRequestBuilder prepareSegments(String... indices) { + return new IndicesSegmentsRequestBuilder(this).setIndices(indices); + } + @Override public UpdateSettingsRequestBuilder prepareUpdateSettings(String... indices) { return new UpdateSettingsRequestBuilder(this).setIndices(indices); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java index d993164884a..e67a5da3ac8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java @@ -42,6 +42,7 @@ import org.elasticsearch.client.transport.action.admin.indices.mapping.put.Clien import org.elasticsearch.client.transport.action.admin.indices.open.ClientTransportOpenIndexAction; import org.elasticsearch.client.transport.action.admin.indices.optimize.ClientTransportOptimizeAction; import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction; +import org.elasticsearch.client.transport.action.admin.indices.segments.ClientTransportIndicesSegmentsAction; import org.elasticsearch.client.transport.action.admin.indices.settings.ClientTransportUpdateSettingsAction; import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction; import org.elasticsearch.client.transport.action.admin.indices.template.delete.ClientTransportDeleteIndexTemplateAction; @@ -75,6 +76,7 @@ public class ClientTransportActionModule extends AbstractModule { bind(ClientTransportIndicesExistsAction.class).asEagerSingleton(); bind(ClientTransportIndicesStatusAction.class).asEagerSingleton(); + bind(ClientTransportIndicesSegmentsAction.class).asEagerSingleton(); bind(ClientTransportRefreshAction.class).asEagerSingleton(); bind(ClientTransportFlushAction.class).asEagerSingleton(); bind(ClientTransportOptimizeAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/indices/segments/ClientTransportIndicesSegmentsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/indices/segments/ClientTransportIndicesSegmentsAction.java new file mode 100644 index 00000000000..ad4389a6439 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/indices/segments/ClientTransportIndicesSegmentsAction.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport.action.admin.indices.segments; + +import org.elasticsearch.action.TransportActions; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.client.transport.action.support.BaseClientTransportAction; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportService; + +/** + */ +public class ClientTransportIndicesSegmentsAction extends BaseClientTransportAction { + + @Inject public ClientTransportIndicesSegmentsAction(Settings settings, TransportService transportService) { + super(settings, transportService, IndicesSegmentResponse.class); + } + + @Override protected String action() { + return TransportActions.Admin.Indices.SEGMENTS; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java index ae76ce74c99..fb7b4253d4c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java @@ -50,6 +50,8 @@ import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse; import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest; @@ -75,6 +77,7 @@ import org.elasticsearch.client.transport.action.admin.indices.mapping.put.Clien import org.elasticsearch.client.transport.action.admin.indices.open.ClientTransportOpenIndexAction; import org.elasticsearch.client.transport.action.admin.indices.optimize.ClientTransportOptimizeAction; import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction; +import org.elasticsearch.client.transport.action.admin.indices.segments.ClientTransportIndicesSegmentsAction; import org.elasticsearch.client.transport.action.admin.indices.settings.ClientTransportUpdateSettingsAction; import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction; import org.elasticsearch.client.transport.action.admin.indices.template.delete.ClientTransportDeleteIndexTemplateAction; @@ -97,6 +100,8 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli private final ClientTransportIndicesStatusAction indicesStatusAction; + private final ClientTransportIndicesSegmentsAction indicesSegmentsAction; + private final ClientTransportCreateIndexAction createIndexAction; private final ClientTransportDeleteIndexAction deleteIndexAction; @@ -130,7 +135,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli private final ClientTransportDeleteIndexTemplateAction deleteIndexTemplateAction; @Inject public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool, - ClientTransportIndicesExistsAction indicesExistsAction, ClientTransportIndicesStatusAction indicesStatusAction, + ClientTransportIndicesExistsAction indicesExistsAction, ClientTransportIndicesStatusAction indicesStatusAction, ClientTransportIndicesSegmentsAction indicesSegmentsAction, ClientTransportCreateIndexAction createIndexAction, ClientTransportDeleteIndexAction deleteIndexAction, ClientTransportCloseIndexAction closeIndexAction, ClientTransportOpenIndexAction openIndexAction, ClientTransportRefreshAction refreshAction, ClientTransportFlushAction flushAction, ClientTransportOptimizeAction optimizeAction, @@ -142,6 +147,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli this.threadPool = threadPool; this.indicesExistsAction = indicesExistsAction; this.indicesStatusAction = indicesStatusAction; + this.indicesSegmentsAction = indicesSegmentsAction; this.createIndexAction = createIndexAction; this.deleteIndexAction = deleteIndexAction; this.closeIndexAction = closeIndexAction; @@ -198,6 +204,23 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli }); } + @Override public ActionFuture segments(final IndicesSegmentsRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(DiscoveryNode node) throws ElasticSearchException { + return indicesSegmentsAction.execute(node, request); + } + }); + } + + @Override public void segments(final IndicesSegmentsRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException { + indicesSegmentsAction.execute(node, request, listener); + return null; + } + }); + } + @Override public ActionFuture create(final CreateIndexRequest request) { return nodesService.execute(new TransportClientNodesService.NodeCallback>() { @Override public ActionFuture doWithNode(DiscoveryNode node) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 9a37f33774e..9d4b02aa268 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -23,6 +23,8 @@ import org.apache.lucene.analysis.KeywordAnalyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; import org.apache.lucene.index.TermDocs; import org.apache.lucene.search.*; @@ -35,6 +37,7 @@ import org.elasticsearch.index.analysis.AnalyzerScope; import org.elasticsearch.index.analysis.NamedAnalyzer; import java.io.IOException; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -385,6 +388,27 @@ public class Lucene { } } + private static final Field segmentReaderSegmentInfoField; + + static { + Field segmentReaderSegmentInfoFieldX = null; + try { + segmentReaderSegmentInfoFieldX = SegmentReader.class.getDeclaredField("si"); + segmentReaderSegmentInfoFieldX.setAccessible(true); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } + segmentReaderSegmentInfoField = segmentReaderSegmentInfoFieldX; + } + + public static SegmentInfo getSegmentInfo(SegmentReader reader) { + try { + return (SegmentInfo) segmentReaderSegmentInfoField.get(reader); + } catch (IllegalAccessException e) { + return null; + } + } + public static class CountCollector extends Collector { private final float minScore; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index fbf1581fd96..83a733e60e3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -44,6 +44,8 @@ import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; +import java.util.List; + /** * @author kimchy (shay.banon) */ @@ -81,6 +83,8 @@ public interface Engine extends IndexShardComponent, CloseableComponent { Searcher searcher() throws EngineException; + List segments(); + /** * Returns true if a refresh is really needed. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Segment.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Segment.java new file mode 100644 index 00000000000..b0bff468ee8 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Segment.java @@ -0,0 +1,150 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.io.IOException; + +public class Segment implements Streamable { + + private String name; + private long generation; + public boolean committed; + public boolean search; + public long sizeInBytes = -1; + public int docCount = -1; + public int delDocCount = -1; + + Segment() { + } + + public Segment(String name) { + this.name = name; + this.generation = Long.parseLong(name.substring(1), Character.MAX_RADIX); + } + + public String name() { + return this.name; + } + + public String getName() { + return name(); + } + + public long generation() { + return this.generation; + } + + public long getGeneration() { + return this.generation; + } + + public boolean committed() { + return this.committed; + } + + public boolean isCommitted() { + return this.committed; + } + + public boolean search() { + return this.search; + } + + public boolean isSearch() { + return this.search; + } + + public int numDocs() { + return this.docCount; + } + + public int getNumDocs() { + return this.docCount; + } + + public int deletedDocs() { + return this.delDocCount; + } + + public int getDeletedDocs() { + return this.delDocCount; + } + + public ByteSizeValue size() { + return new ByteSizeValue(sizeInBytes); + } + + public ByteSizeValue getSize() { + return size(); + } + + public long sizeInBytes() { + return sizeInBytes; + } + + public long getSizeInBytes() { + return sizeInBytes(); + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Segment segment = (Segment) o; + + if (name != null ? !name.equals(segment.name) : segment.name != null) return false; + + return true; + } + + @Override public int hashCode() { + return name != null ? name.hashCode() : 0; + } + + public static Segment readSegment(StreamInput in) throws IOException { + Segment segment = new Segment(); + segment.readFrom(in); + return segment; + } + + @Override public void readFrom(StreamInput in) throws IOException { + name = in.readUTF(); + generation = Long.parseLong(name.substring(1), Character.MAX_RADIX); + committed = in.readBoolean(); + search = in.readBoolean(); + docCount = in.readInt(); + delDocCount = in.readInt(); + sizeInBytes = in.readLong(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeUTF(name); + out.writeBoolean(committed); + out.writeBoolean(search); + out.writeInt(docCount); + out.writeInt(delDocCount); + out.writeLong(sizeInBytes); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 48097aa2e6a..123f86b4941 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -23,6 +23,9 @@ import org.apache.lucene.index.ExtendedIndexSearcher; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.FilteredQuery; import org.apache.lucene.search.Query; @@ -65,6 +68,10 @@ import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -157,6 +164,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private final AtomicLong translogIdGenerator = new AtomicLong(); + private SegmentInfos lastCommittedSegmentInfos; + @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, @@ -261,6 +270,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { indexingSearcher.set(null); indexingSearcher.get().release(); } + SegmentInfos infos = new SegmentInfos(); + infos.read(store.directory()); + lastCommittedSegmentInfos = infos; } catch (IOException e) { try { indexWriter.rollback(); @@ -819,6 +831,13 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } refreshVersioningTable(threadPool.estimatedTimeInMillis()); + try { + SegmentInfos infos = new SegmentInfos(); + infos.read(store.directory()); + lastCommittedSegmentInfos = infos; + } catch (Exception e) { + logger.warn("failed to read latest segment infos on flush", e); + } } finally { flushing.set(false); } @@ -1016,6 +1035,78 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } + @Override public List segments() { + rwl.readLock().lock(); + try { + IndexWriter indexWriter = this.indexWriter; + if (indexWriter == null) { + throw new EngineClosedException(shardId, failedEngine); + } + Map segments = new HashMap(); + + // first, go over and compute the search ones... + Searcher searcher = searcher(); + try { + IndexReader[] readers = searcher.reader().getSequentialSubReaders(); + for (IndexReader reader : readers) { + assert reader instanceof SegmentReader; + SegmentInfo info = Lucene.getSegmentInfo((SegmentReader) reader); + assert !segments.containsKey(info.name); + Segment segment = new Segment(info.name); + segment.search = true; + segment.docCount = reader.numDocs(); + segment.delDocCount = reader.numDeletedDocs(); + try { + segment.sizeInBytes = info.sizeInBytes(true); + } catch (IOException e) { + logger.trace("failed to get size for [{}]", e, info.name); + } + segments.put(info.name, segment); + } + } finally { + searcher.release(); + } + + // now, correlate or add the committed ones... + if (lastCommittedSegmentInfos != null) { + SegmentInfos infos = lastCommittedSegmentInfos; + for (SegmentInfo info : infos) { + Segment segment = segments.get(info.name); + if (segment == null) { + segment = new Segment(info.name); + segment.search = false; + segment.committed = true; + segment.docCount = info.docCount; + try { + segment.delDocCount = indexWriter.numDeletedDocs(info); + } catch (IOException e) { + logger.trace("failed to get deleted docs for committed segment", e); + } + try { + segment.sizeInBytes = info.sizeInBytes(true); + } catch (IOException e) { + logger.trace("failed to get size for [{}]", e, info.name); + } + segments.put(info.name, segment); + } else { + segment.committed = true; + } + } + } + + Segment[] segmentsArr = segments.values().toArray(new Segment[segments.values().size()]); + Arrays.sort(segmentsArr, new Comparator() { + @Override public int compare(Segment o1, Segment o2) { + return (int) (o1.generation() - o2.generation()); + } + }); + + return Arrays.asList(segmentsArr); + } finally { + rwl.readLock().unlock(); + } + } + @Override public void close() throws ElasticSearchException { rwl.writeLock().lock(); try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java index 892fb192054..019d6143c99 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java @@ -47,6 +47,7 @@ import org.elasticsearch.rest.action.admin.indices.mapping.put.RestPutMappingAct import org.elasticsearch.rest.action.admin.indices.open.RestOpenIndexAction; import org.elasticsearch.rest.action.admin.indices.optimize.RestOptimizeAction; import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction; +import org.elasticsearch.rest.action.admin.indices.segments.RestIndicesSegmentsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestGetSettingsAction; import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction; import org.elasticsearch.rest.action.admin.indices.status.RestIndicesStatusAction; @@ -97,6 +98,7 @@ public class RestActionModule extends AbstractModule { bind(RestIndicesExistsAction.class).asEagerSingleton(); bind(RestIndicesStatusAction.class).asEagerSingleton(); + bind(RestIndicesSegmentsAction.class).asEagerSingleton(); bind(RestGetIndicesAliasesAction.class).asEagerSingleton(); bind(RestIndicesAliasesAction.class).asEagerSingleton(); bind(RestCreateIndexAction.class).asEagerSingleton(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/segments/RestIndicesSegmentsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/segments/RestIndicesSegmentsAction.java new file mode 100644 index 00000000000..dbaebcfb288 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/segments/RestIndicesSegmentsAction.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.indices.segments; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.XContentRestResponse; +import org.elasticsearch.rest.XContentThrowableRestResponse; +import org.elasticsearch.rest.action.support.RestXContentBuilder; + +import java.io.IOException; + +import static org.elasticsearch.rest.RestRequest.Method.*; +import static org.elasticsearch.rest.RestStatus.*; +import static org.elasticsearch.rest.action.support.RestActions.*; + +/** + */ +public class RestIndicesSegmentsAction extends BaseRestHandler { + + @Inject public RestIndicesSegmentsAction(Settings settings, Client client, RestController controller) { + super(settings, client); + controller.registerHandler(GET, "/_segments", this); + controller.registerHandler(GET, "/{index}/_segments", this); + } + + @Override public void handleRequest(final RestRequest request, final RestChannel channel) { + IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest(splitIndices(request.param("index"))); + // we just send back a response, no need to fork a listener + indicesSegmentsRequest.listenerThreaded(false); + BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operation_threading"), BroadcastOperationThreading.SINGLE_THREAD); + if (operationThreading == BroadcastOperationThreading.NO_THREADS) { + // since we don't spawn, don't allow no_threads, but change it to a single thread + operationThreading = BroadcastOperationThreading.SINGLE_THREAD; + } + indicesSegmentsRequest.operationThreading(operationThreading); + client.admin().indices().segments(indicesSegmentsRequest, new ActionListener() { + @Override public void onResponse(IndicesSegmentResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + builder.field("ok", true); + buildBroadcastShardsHeader(builder, response); + response.toXContent(builder, request); + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Exception e) { + onFailure(e); + } + } + + @Override public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java index d3b7944dde5..65efcfc0548 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/AbstractSimpleEngineTests.java @@ -48,6 +48,7 @@ import org.testng.annotations.Test; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -132,9 +133,71 @@ public abstract class AbstractSimpleEngineTests { protected static final byte[] B_2 = new byte[]{2}; protected static final byte[] B_3 = new byte[]{3}; + @Test public void testSegments() throws Exception { + List segments = engine.segments(); + assertThat(segments.isEmpty(), equalTo(true)); + + // create a doc and refresh + ParsedDocument doc = new ParsedDocument("1", "1", "test", null, doc().add(uidField("1")).add(field("value", "test")).add(field(SourceFieldMapper.NAME, B_1, Field.Store.YES)).build(), Lucene.STANDARD_ANALYZER, B_1, false); + engine.create(new Engine.Create(null, newUid("1"), doc)); + + ParsedDocument doc2 = new ParsedDocument("2", "2", "test", null, doc().add(uidField("2")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_2, false); + engine.create(new Engine.Create(null, newUid("2"), doc2)); + engine.refresh(new Engine.Refresh(true)); + + segments = engine.segments(); + assertThat(segments.size(), equalTo(1)); + assertThat(segments.get(0).committed(), equalTo(false)); + assertThat(segments.get(0).search(), equalTo(true)); + assertThat(segments.get(0).numDocs(), equalTo(2)); + assertThat(segments.get(0).deletedDocs(), equalTo(0)); + + engine.flush(new Engine.Flush()); + + segments = engine.segments(); + assertThat(segments.size(), equalTo(1)); + assertThat(segments.get(0).committed(), equalTo(true)); + assertThat(segments.get(0).search(), equalTo(true)); + assertThat(segments.get(0).numDocs(), equalTo(2)); + assertThat(segments.get(0).deletedDocs(), equalTo(0)); + + + ParsedDocument doc3 = new ParsedDocument("3", "3", "test", null, doc().add(uidField("3")).add(field("value", "test")).build(), Lucene.STANDARD_ANALYZER, B_3, false); + engine.create(new Engine.Create(null, newUid("3"), doc3)); + engine.refresh(new Engine.Refresh(true)); + + segments = engine.segments(); + assertThat(segments.size(), equalTo(2)); + assertThat(segments.get(0).generation() < segments.get(1).generation(), equalTo(true)); + assertThat(segments.get(0).committed(), equalTo(true)); + assertThat(segments.get(0).search(), equalTo(true)); + assertThat(segments.get(0).numDocs(), equalTo(2)); + assertThat(segments.get(0).deletedDocs(), equalTo(0)); + + assertThat(segments.get(1).committed(), equalTo(false)); + assertThat(segments.get(1).search(), equalTo(true)); + assertThat(segments.get(1).numDocs(), equalTo(1)); + assertThat(segments.get(1).deletedDocs(), equalTo(0)); + + engine.delete(new Engine.Delete("test", "1", newUid("1"))); + engine.refresh(new Engine.Refresh(true)); + + segments = engine.segments(); + assertThat(segments.size(), equalTo(2)); + assertThat(segments.get(0).generation() < segments.get(1).generation(), equalTo(true)); + assertThat(segments.get(0).committed(), equalTo(true)); + assertThat(segments.get(0).search(), equalTo(true)); + assertThat(segments.get(0).numDocs(), equalTo(1)); + assertThat(segments.get(0).deletedDocs(), equalTo(1)); + + assertThat(segments.get(1).committed(), equalTo(false)); + assertThat(segments.get(1).search(), equalTo(true)); + assertThat(segments.get(1).numDocs(), equalTo(1)); + assertThat(segments.get(1).deletedDocs(), equalTo(0)); + } + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.searcher(); - assertThat(searchResult, engineSearcherTotalHits(0)); searchResult.release();