Indices Stats API + indexing statistics, closes #1273.

This commit is contained in:
Shay Banon 2011-08-24 11:24:52 +03:00
parent 265b9f0369
commit 80062fbe10
45 changed files with 2508 additions and 162 deletions

View File

@ -47,6 +47,7 @@ import org.elasticsearch.action.admin.indices.optimize.TransportOptimizeAction;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
@ -99,6 +100,7 @@ public class TransportActionModule extends AbstractModule {
bind(TransportIndexReplicationPingAction.class).asEagerSingleton();
bind(TransportReplicationPingAction.class).asEagerSingleton();
bind(TransportIndicesStatsAction.class).asEagerSingleton();
bind(TransportIndicesStatusAction.class).asEagerSingleton();
bind(TransportIndicesSegmentsAction.class).asEagerSingleton();
bind(TransportCreateIndexAction.class).asEagerSingleton();

View File

@ -59,6 +59,7 @@ public class TransportActions {
public static final String REFRESH = "indices/refresh";
public static final String OPTIMIZE = "indices/optimize";
public static final String STATUS = "indices/status";
public static final String STATS = "indices/stats";
public static final String SEGMENTS = "indices/segments";
public static final String EXISTS = "indices/exists";
public static final String ALIASES = "indices/aliases";

View File

@ -0,0 +1,240 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import java.io.IOException;
/**
*/
public class CommonStats implements Streamable, ToXContent {
@Nullable DocsStats docs;
@Nullable StoreStats store;
@Nullable IndexingStats indexing;
@Nullable MergeStats merge;
@Nullable RefreshStats refresh;
@Nullable FlushStats flush;
public void add(CommonStats stats) {
if (docs == null) {
if (stats.docs() != null) {
docs = new DocsStats();
docs.add(stats.docs());
}
} else {
docs.add(stats.docs());
}
if (store == null) {
if (stats.store() != null) {
store = new StoreStats();
store.add(stats.store());
}
} else {
store.add(stats.store());
}
if (indexing == null) {
if (stats.indexing() != null) {
indexing = new IndexingStats();
indexing.add(stats.indexing());
}
} else {
indexing.add(stats.indexing());
}
if (merge == null) {
if (stats.merge() != null) {
merge = new MergeStats();
merge.add(stats.merge());
}
} else {
merge.add(stats.merge());
}
if (refresh == null) {
if (stats.refresh() != null) {
refresh = new RefreshStats();
refresh.add(stats.refresh());
}
} else {
refresh.add(stats.refresh());
}
if (flush == null) {
if (stats.flush() != null) {
flush = new FlushStats();
flush.add(stats.flush());
}
} else {
flush.add(stats.flush());
}
}
@Nullable public DocsStats docs() {
return this.docs;
}
@Nullable public DocsStats getDocs() {
return this.docs;
}
@Nullable public StoreStats store() {
return store;
}
@Nullable public StoreStats getStore() {
return store;
}
@Nullable public IndexingStats indexing() {
return indexing;
}
@Nullable public IndexingStats getIndexing() {
return indexing;
}
@Nullable public MergeStats merge() {
return merge;
}
@Nullable public MergeStats getMerge() {
return merge;
}
@Nullable public RefreshStats refresh() {
return refresh;
}
@Nullable public RefreshStats getRefresh() {
return refresh;
}
@Nullable public FlushStats flush() {
return flush;
}
@Nullable public FlushStats getFlush() {
return flush;
}
public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
docs = DocsStats.readDocStats(in);
}
if (in.readBoolean()) {
store = StoreStats.readStoreStats(in);
}
if (in.readBoolean()) {
indexing = IndexingStats.readIndexingStats(in);
}
if (in.readBoolean()) {
merge = MergeStats.readMergeStats(in);
}
if (in.readBoolean()) {
refresh = RefreshStats.readRefreshStats(in);
}
if (in.readBoolean()) {
flush = FlushStats.readFlushStats(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
if (docs == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
docs.writeTo(out);
}
if (store == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
store.writeTo(out);
}
if (indexing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
indexing.writeTo(out);
}
if (merge == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
merge.writeTo(out);
}
if (refresh == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
refresh.writeTo(out);
}
if (flush == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
flush.writeTo(out);
}
}
// note, requires a wrapping object
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (docs != null) {
docs.toXContent(builder, params);
}
if (store != null) {
store.toXContent(builder, params);
}
if (indexing != null) {
indexing.toXContent(builder, params);
}
if (merge != null) {
merge.toXContent(builder, params);
}
if (refresh != null) {
refresh.toXContent(builder, params);
}
if (flush != null) {
flush.toXContent(builder, params);
}
return builder;
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator;
/**
*/
public class IndexShardStats implements Iterable<ShardStats> {
private final ShardId shardId;
private final ShardStats[] shards;
IndexShardStats(ShardId shardId, ShardStats[] shards) {
this.shardId = shardId;
this.shards = shards;
}
public ShardId shardId() {
return this.shardId;
}
public ShardId getShardId() {
return shardId();
}
public ShardStats[] shards() {
return shards;
}
public ShardStats[] getShards() {
return shards;
}
public ShardStats getAt(int position) {
return shards[position];
}
@Override public Iterator<ShardStats> iterator() {
return Iterators.forArray(shards);
}
private CommonStats total = null;
public CommonStats total() {
if (total != null) {
return total;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
stats.add(shard.stats());
}
total = stats;
return stats;
}
private CommonStats primary = null;
public CommonStats primary() {
if (primary != null) {
return primary;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
if (shard.shardRouting().primary()) {
stats.add(shard.stats());
}
}
primary = stats;
return stats;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class IndexStats implements Iterable<IndexShardStats> {
private final String index;
private final ShardStats shards[];
public IndexStats(String index, ShardStats[] shards) {
this.index = index;
this.shards = shards;
}
public String index() {
return this.index;
}
public ShardStats[] shards() {
return this.shards;
}
private Map<Integer, IndexShardStats> indexShards;
public Map<Integer, IndexShardStats> indexShards() {
if (indexShards != null) {
return indexShards;
}
Map<Integer, List<ShardStats>> tmpIndexShards = Maps.newHashMap();
for (ShardStats shard : shards) {
List<ShardStats> lst = tmpIndexShards.get(shard.shardRouting().id());
if (lst == null) {
lst = Lists.newArrayList();
tmpIndexShards.put(shard.shardRouting().id(), lst);
}
lst.add(shard);
}
indexShards = Maps.newHashMap();
for (Map.Entry<Integer, List<ShardStats>> entry : tmpIndexShards.entrySet()) {
indexShards.put(entry.getKey(), new IndexShardStats(entry.getValue().get(0).shardRouting().shardId(), entry.getValue().toArray(new ShardStats[entry.getValue().size()])));
}
return indexShards;
}
@Override public Iterator<IndexShardStats> iterator() {
return indexShards().values().iterator();
}
private CommonStats total = null;
public CommonStats getTotal() {
return total();
}
public CommonStats total() {
if (total != null) {
return total;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
stats.add(shard.stats());
}
total = stats;
return stats;
}
private CommonStats primary = null;
public CommonStats getPrimaries() {
return primaries();
}
public CommonStats primaries() {
if (primary != null) {
return primary;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
if (shard.shardRouting().primary()) {
stats.add(shard.stats());
}
}
primary = stats;
return stats;
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class IndicesStats extends BroadcastOperationResponse implements ToXContent {
private ShardStats[] shards;
IndicesStats() {
}
IndicesStats(ShardStats[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.shards = shards;
}
public ShardStats[] shards() {
return this.shards;
}
public ShardStats[] getShards() {
return this.shards;
}
public ShardStats getAt(int position) {
return shards[position];
}
public IndexStats index(String index) {
return indices().get(index);
}
public Map<String, IndexStats> getIndices() {
return indices();
}
private Map<String, IndexStats> indicesStats;
public Map<String, IndexStats> indices() {
if (indicesStats != null) {
return indicesStats;
}
Map<String, IndexStats> indicesStats = Maps.newHashMap();
Set<String> indices = Sets.newHashSet();
for (ShardStats shard : shards) {
indices.add(shard.index());
}
for (String index : indices) {
List<ShardStats> shards = Lists.newArrayList();
for (ShardStats shard : this.shards) {
if (shard.shardRouting().index().equals(index)) {
shards.add(shard);
}
}
indicesStats.put(index, new IndexStats(index, shards.toArray(new ShardStats[shards.size()])));
}
this.indicesStats = indicesStats;
return indicesStats;
}
private CommonStats total = null;
public CommonStats getTotal() {
return total();
}
public CommonStats total() {
if (total != null) {
return total;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
stats.add(shard.stats());
}
total = stats;
return stats;
}
private CommonStats primary = null;
public CommonStats getPrimaries() {
return primaries();
}
public CommonStats primaries() {
if (primary != null) {
return primary;
}
CommonStats stats = new CommonStats();
for (ShardStats shard : shards) {
if (shard.shardRouting().primary()) {
stats.add(shard.stats());
}
}
primary = stats;
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shards = new ShardStats[in.readVInt()];
for (int i = 0; i < shards.length; i++) {
shards[i] = ShardStats.readShardStats(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(shards.length);
for (ShardStats shard : shards) {
shard.writeTo(out);
}
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("_all");
builder.startObject("primaries");
primaries().toXContent(builder, params);
builder.endObject();
builder.startObject("total");
primaries().toXContent(builder, params);
builder.endObject();
builder.startObject(Fields.INDICES);
for (IndexStats indexStats : indices().values()) {
builder.startObject(indexStats.index());
builder.startObject("primaries");
indexStats.primaries().toXContent(builder, params);
builder.endObject();
builder.startObject("total");
indexStats.total().toXContent(builder, params);
builder.endObject();
if ("shards".equalsIgnoreCase(params.param("level", null))) {
builder.startObject(Fields.SHARDS);
for (IndexShardStats indexShardStats : indexStats) {
builder.startArray(Integer.toString(indexShardStats.shardId().id()));
for (ShardStats shardStats : indexShardStats) {
builder.startObject();
builder.startObject(Fields.ROUTING)
.field(Fields.STATE, shardStats.shardRouting().state())
.field(Fields.PRIMARY, shardStats.shardRouting().primary())
.field(Fields.NODE, shardStats.shardRouting().currentNodeId())
.field(Fields.RELOCATING_NODE, shardStats.shardRouting().relocatingNodeId())
.endObject();
shardStats.stats().toXContent(builder, params);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString SHARDS = new XContentBuilderString("shards");
static final XContentBuilderString ROUTING = new XContentBuilderString("routing");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString PRIMARY = new XContentBuilderString("primary");
static final XContentBuilderString NODE = new XContentBuilderString("node");
static final XContentBuilderString RELOCATING_NODE = new XContentBuilderString("relocating_node");
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* A request to get indices level stats. Allow to enable different stats to be returned.
*
* <p>By default, the {@link #docs(boolean)}, {@link #store(boolean)}, {@link #indexing(boolean)}
* are enabled. Other stats can be enabled as well.
*
* <p>All the stats to be returned can be cleared using {@link #clear()}, at which point, specific
* stats can be enabled.
*/
public class IndicesStatsRequest extends BroadcastOperationRequest {
private boolean docs = true;
private boolean store = true;
private boolean indexing = true;
private boolean merge = false;
private boolean refresh = false;
private boolean flush = false;
private String[] types = null;
public IndicesStatsRequest indices(String... indices) {
this.indices = indices;
return this;
}
/**
* Clears all stats.
*/
public IndicesStatsRequest clear() {
docs = false;
store = false;
indexing = false;
merge = false;
refresh = false;
flush = false;
return this;
}
/**
* Document types to return stats for. Mainly affects {@link #indexing(boolean)} when
* enabled, returning specific indexing stats for those types.
*/
public IndicesStatsRequest types(String... types) {
this.types = types;
return this;
}
/**
* Document types to return stats for. Mainly affects {@link #indexing(boolean)} when
* enabled, returning specific indexing stats for those types.
*/
public String[] types() {
return this.types;
}
public IndicesStatsRequest docs(boolean docs) {
this.docs = docs;
return this;
}
public boolean docs() {
return this.docs;
}
public IndicesStatsRequest store(boolean store) {
this.store = store;
return this;
}
public boolean store() {
return this.store;
}
public IndicesStatsRequest indexing(boolean indexing) {
this.indexing = indexing;
return this;
}
public boolean indexing() {
return this.indexing;
}
public IndicesStatsRequest merge(boolean merge) {
this.merge = merge;
return this;
}
public boolean merge() {
return this.merge;
}
public IndicesStatsRequest refresh(boolean refresh) {
this.refresh = refresh;
return this;
}
public boolean refresh() {
return this.refresh;
}
public IndicesStatsRequest flush(boolean flush) {
this.flush = flush;
return this;
}
public boolean flush() {
return this.flush;
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(docs);
out.writeBoolean(store);
out.writeBoolean(indexing);
out.writeBoolean(merge);
out.writeBoolean(flush);
out.writeBoolean(refresh);
if (types == null) {
out.writeVInt(0);
} else {
out.writeVInt(types.length);
for (String type : types) {
out.writeUTF(type);
}
}
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
docs = in.readBoolean();
store = in.readBoolean();
indexing = in.readBoolean();
merge = in.readBoolean();
flush = in.readBoolean();
refresh = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
types = new String[size];
for (int i = 0; i < size; i++) {
types[i] = in.readUTF();
}
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.*;
/**
*/
public class ShardStats extends BroadcastShardOperationResponse {
private ShardRouting shardRouting;
CommonStats stats;
ShardStats() {
}
ShardStats(ShardRouting shardRouting) {
super(shardRouting.index(), shardRouting.id());
this.shardRouting = shardRouting;
this.stats = new CommonStats();
}
/**
* The shard routing information (cluster wide shard state).
*/
public ShardRouting shardRouting() {
return this.shardRouting;
}
/**
* The shard routing information (cluster wide shard state).
*/
public ShardRouting getShardRouting() {
return shardRouting();
}
public CommonStats stats() {
return this.stats;
}
public CommonStats getStats() {
return stats();
}
public static ShardStats readShardStats(StreamInput in) throws IOException {
ShardStats stats = new ShardStats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardRouting = readShardRoutingEntry(in);
stats = CommonStats.readCommonStats(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardRouting.writeTo(out);
stats.writeTo(out);
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationRequest;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.common.collect.Lists.*;
/**
*/
public class TransportIndicesStatsAction extends TransportBroadcastOperationAction<IndicesStatsRequest, IndicesStats, TransportIndicesStatsAction.IndexShardStatsRequest, ShardStats> {
private final IndicesService indicesService;
@Inject public TransportIndicesStatsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
}
@Override protected String executor() {
return ThreadPool.Names.CACHED;
}
@Override protected String transportAction() {
return TransportActions.Admin.Indices.STATUS;
}
@Override protected String transportShardAction() {
return "indices/stats/shard";
}
@Override protected IndicesStatsRequest newRequest() {
return new IndicesStatsRequest();
}
@Override protected boolean ignoreNonActiveExceptions() {
return true;
}
/**
* Status goes across *all* shards.
*/
@Override protected GroupShardsIterator shards(IndicesStatsRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override protected IndicesStats newResponse(IndicesStatsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
final List<ShardStats> shards = Lists.newArrayList();
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
shards.add((ShardStats) shardResponse);
successfulShards++;
}
}
return new IndicesStats(shards.toArray(new ShardStats[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
@Override protected IndexShardStatsRequest newShardRequest() {
return new IndexShardStatsRequest();
}
@Override protected IndexShardStatsRequest newShardRequest(ShardRouting shard, IndicesStatsRequest request) {
return new IndexShardStatsRequest(shard.index(), shard.id(), request);
}
@Override protected ShardStats newShardResponse() {
return new ShardStats();
}
@Override protected ShardStats shardOperation(IndexShardStatsRequest request) throws ElasticSearchException {
InternalIndexService indexService = (InternalIndexService) indicesService.indexServiceSafe(request.index());
InternalIndexShard indexShard = (InternalIndexShard) indexService.shardSafe(request.shardId());
ShardStats stats = new ShardStats(indexShard.routingEntry());
if (request.request.docs()) {
stats.stats.docs = indexShard.docStats();
}
if (request.request.store()) {
stats.stats.store = indexShard.storeStats();
}
if (request.request.indexing()) {
stats.stats.indexing = indexShard.indexingStats(request.request.types());
}
if (request.request.merge()) {
stats.stats.merge = indexShard.mergeStats();
}
if (request.request.refresh()) {
stats.stats.refresh = indexShard.refreshStats();
}
if (request.request.flush()) {
stats.stats.flush = indexShard.flushStats();
}
return stats;
}
public static class IndexShardStatsRequest extends BroadcastShardOperationRequest {
IndicesStatsRequest request;
IndexShardStatsRequest() {
}
IndexShardStatsRequest(String index, int shardId, IndicesStatsRequest request) {
super(index, shardId);
this.request = request;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = new IndicesStatsRequest();
request.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -70,13 +72,17 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
@Override protected PrimaryResponse<ShardDeleteByQueryResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.filteringAliases(), request.types());
IndexShard indexShard = indexShard(shardRequest);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
return new PrimaryResponse<ShardDeleteByQueryResponse>(new ShardDeleteByQueryResponse(), null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.filteringAliases(), request.types());
IndexShard indexShard = indexShard(shardRequest);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
}
@Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {

View File

@ -50,7 +50,7 @@ public abstract class BroadcastOperationRequest implements ActionRequest {
return indices;
}
public BroadcastOperationRequest indices(String[] indices) {
public BroadcastOperationRequest indices(String... indices) {
this.indices = indices;
return this;
}

View File

@ -53,6 +53,8 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@ -75,6 +77,7 @@ import org.elasticsearch.client.action.admin.indices.optimize.OptimizeRequestBui
import org.elasticsearch.client.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.client.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.settings.UpdateSettingsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.status.IndicesStatusRequestBuilder;
import org.elasticsearch.client.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.elasticsearch.client.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
@ -110,6 +113,21 @@ public interface IndicesAdminClient {
*/
IndicesExistsRequestBuilder prepareExists(String... indices);
/**
* Indices stats.
*/
ActionFuture<IndicesStats> stats(IndicesStatsRequest request);
/**
* Indices stats.
*/
void stats(IndicesStatsRequest request, ActionListener<IndicesStats> listener);
/**
* Indices stats.
*/
IndicesStatsRequestBuilder prepareStats(String... indices);
/**
* The status of one or more indices.
*

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.action.admin.indices.stats;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.action.admin.indices.support.BaseIndicesRequestBuilder;
/**
* A request to get indices level stats. Allow to enable different stats to be returned.
*
* <p>By default, the {@link #setDocs(boolean)}, {@link #setStore(boolean)}, {@link #setIndexing(boolean)}
* are enabled. Other stats can be enabled as well.
*
* <p>All the stats to be returned can be cleared using {@link #clear()}, at which point, specific
* stats can be enabled.
*/
public class IndicesStatsRequestBuilder extends BaseIndicesRequestBuilder<IndicesStatsRequest, IndicesStats> {
public IndicesStatsRequestBuilder(IndicesAdminClient indicesClient) {
super(indicesClient, new IndicesStatsRequest());
}
public IndicesStatsRequestBuilder clear() {
request.clear();
return this;
}
/**
* Sets specific indices to return the stats for.
*/
public IndicesStatsRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* Document types to return stats for. Mainly affects {@link #setIndexing(boolean)} when
* enabled, returning specific indexing stats for those types.
*/
public IndicesStatsRequestBuilder setTypes(String... types) {
request.types(types);
return this;
}
public IndicesStatsRequestBuilder setDocs(boolean docs) {
request.docs(docs);
return this;
}
public IndicesStatsRequestBuilder setStore(boolean store) {
request.store(store);
return this;
}
public IndicesStatsRequestBuilder setIndexing(boolean indexing) {
request.indexing(indexing);
return this;
}
public IndicesStatsRequestBuilder setMerge(boolean merge) {
request.merge(merge);
return this;
}
public IndicesStatsRequestBuilder setRefresh(boolean refresh) {
request.refresh(refresh);
return this;
}
public IndicesStatsRequestBuilder setFlush(boolean flush) {
request.flush(flush);
return this;
}
@Override protected void doExecute(ActionListener<IndicesStats> listener) {
client.stats(request, listener);
}
}

View File

@ -34,6 +34,9 @@ public class IndicesStatusRequestBuilder extends BaseIndicesRequestBuilder<Indic
super(indicesClient, new IndicesStatusRequest());
}
/**
* Sets specific indices to return the status for.
*/
public IndicesStatusRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;

View File

@ -69,6 +69,9 @@ import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsA
import org.elasticsearch.action.admin.indices.settings.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.status.TransportIndicesStatusAction;
@ -93,6 +96,8 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
private final TransportIndicesExistsAction indicesExistsAction;
private final TransportIndicesStatsAction indicesStatsAction;
private final TransportIndicesStatusAction indicesStatusAction;
private final TransportIndicesSegmentsAction indicesSegmentsAction;
@ -129,7 +134,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
private final TransportDeleteIndexTemplateAction deleteIndexTemplateAction;
@Inject public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, TransportIndicesExistsAction indicesExistsAction, TransportIndicesStatusAction indicesStatusAction, TransportIndicesSegmentsAction indicesSegmentsAction,
@Inject public NodeIndicesAdminClient(Settings settings, ThreadPool threadPool, TransportIndicesExistsAction indicesExistsAction, TransportIndicesStatsAction indicesStatsAction, TransportIndicesStatusAction indicesStatusAction, TransportIndicesSegmentsAction indicesSegmentsAction,
TransportCreateIndexAction createIndexAction, TransportDeleteIndexAction deleteIndexAction,
TransportCloseIndexAction closeIndexAction, TransportOpenIndexAction openIndexAction,
TransportRefreshAction refreshAction, TransportFlushAction flushAction, TransportOptimizeAction optimizeAction,
@ -139,6 +144,7 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
TransportPutIndexTemplateAction putIndexTemplateAction, TransportDeleteIndexTemplateAction deleteIndexTemplateAction) {
this.threadPool = threadPool;
this.indicesExistsAction = indicesExistsAction;
this.indicesStatsAction = indicesStatsAction;
this.indicesStatusAction = indicesStatusAction;
this.indicesSegmentsAction = indicesSegmentsAction;
this.createIndexAction = createIndexAction;
@ -171,6 +177,14 @@ public class NodeIndicesAdminClient extends AbstractIndicesAdminClient implement
indicesExistsAction.execute(request, listener);
}
@Override public ActionFuture<IndicesStats> stats(IndicesStatsRequest request) {
return indicesStatsAction.execute(request);
}
@Override public void stats(IndicesStatsRequest request, ActionListener<IndicesStats> lister) {
indicesStatsAction.execute(request, lister);
}
@Override public ActionFuture<IndicesStatusResponse> status(IndicesStatusRequest request) {
return indicesStatusAction.execute(request);
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.client.action.admin.indices.optimize.OptimizeRequestBui
import org.elasticsearch.client.action.admin.indices.refresh.RefreshRequestBuilder;
import org.elasticsearch.client.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.settings.UpdateSettingsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.client.action.admin.indices.status.IndicesStatusRequestBuilder;
import org.elasticsearch.client.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
import org.elasticsearch.client.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
@ -97,6 +98,10 @@ public abstract class AbstractIndicesAdminClient implements InternalIndicesAdmin
return new RefreshRequestBuilder(this).setIndices(indices);
}
@Override public IndicesStatsRequestBuilder prepareStats(String... indices) {
return new IndicesStatsRequestBuilder(this).setIndices(indices);
}
@Override public IndicesStatusRequestBuilder prepareStatus(String... indices) {
return new IndicesStatusRequestBuilder(this).setIndices(indices);
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.client.transport.action.admin.indices.optimize.ClientTr
import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction;
import org.elasticsearch.client.transport.action.admin.indices.segments.ClientTransportIndicesSegmentsAction;
import org.elasticsearch.client.transport.action.admin.indices.settings.ClientTransportUpdateSettingsAction;
import org.elasticsearch.client.transport.action.admin.indices.stats.ClientTransportIndicesStatsAction;
import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction;
import org.elasticsearch.client.transport.action.admin.indices.template.delete.ClientTransportDeleteIndexTemplateAction;
import org.elasticsearch.client.transport.action.admin.indices.template.put.ClientTransportPutIndexTemplateAction;
@ -78,6 +79,7 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportPercolateAction.class).asEagerSingleton();
bind(ClientTransportIndicesExistsAction.class).asEagerSingleton();
bind(ClientTransportIndicesStatsAction.class).asEagerSingleton();
bind(ClientTransportIndicesStatusAction.class).asEagerSingleton();
bind(ClientTransportIndicesSegmentsAction.class).asEagerSingleton();
bind(ClientTransportRefreshAction.class).asEagerSingleton();

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.admin.indices.stats;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportService;
/**
*/
public class ClientTransportIndicesStatsAction extends BaseClientTransportAction<IndicesStatsRequest, IndicesStats> {
@Inject public ClientTransportIndicesStatsAction(Settings settings, TransportService transportService) {
super(settings, transportService, IndicesStats.class);
}
@Override protected String action() {
return TransportActions.Admin.Indices.STATS;
}
}

View File

@ -54,6 +54,8 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.UpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusRequest;
import org.elasticsearch.action.admin.indices.status.IndicesStatusResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
@ -79,6 +81,7 @@ import org.elasticsearch.client.transport.action.admin.indices.optimize.ClientTr
import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction;
import org.elasticsearch.client.transport.action.admin.indices.segments.ClientTransportIndicesSegmentsAction;
import org.elasticsearch.client.transport.action.admin.indices.settings.ClientTransportUpdateSettingsAction;
import org.elasticsearch.client.transport.action.admin.indices.stats.ClientTransportIndicesStatsAction;
import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction;
import org.elasticsearch.client.transport.action.admin.indices.template.delete.ClientTransportDeleteIndexTemplateAction;
import org.elasticsearch.client.transport.action.admin.indices.template.put.ClientTransportPutIndexTemplateAction;
@ -98,6 +101,8 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
private final ClientTransportIndicesExistsAction indicesExistsAction;
private final ClientTransportIndicesStatsAction indicesStatsAction;
private final ClientTransportIndicesStatusAction indicesStatusAction;
private final ClientTransportIndicesSegmentsAction indicesSegmentsAction;
@ -135,7 +140,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
private final ClientTransportDeleteIndexTemplateAction deleteIndexTemplateAction;
@Inject public InternalTransportIndicesAdminClient(Settings settings, TransportClientNodesService nodesService, ThreadPool threadPool,
ClientTransportIndicesExistsAction indicesExistsAction, ClientTransportIndicesStatusAction indicesStatusAction, ClientTransportIndicesSegmentsAction indicesSegmentsAction,
ClientTransportIndicesExistsAction indicesExistsAction, ClientTransportIndicesStatusAction indicesStatusAction, ClientTransportIndicesStatsAction indicesStatsAction, ClientTransportIndicesSegmentsAction indicesSegmentsAction,
ClientTransportCreateIndexAction createIndexAction, ClientTransportDeleteIndexAction deleteIndexAction,
ClientTransportCloseIndexAction closeIndexAction, ClientTransportOpenIndexAction openIndexAction,
ClientTransportRefreshAction refreshAction, ClientTransportFlushAction flushAction, ClientTransportOptimizeAction optimizeAction,
@ -146,6 +151,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
this.nodesService = nodesService;
this.threadPool = threadPool;
this.indicesExistsAction = indicesExistsAction;
this.indicesStatsAction = indicesStatsAction;
this.indicesStatusAction = indicesStatusAction;
this.indicesSegmentsAction = indicesSegmentsAction;
this.createIndexAction = createIndexAction;
@ -186,6 +192,22 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
}, listener);
}
@Override public ActionFuture<IndicesStats> stats(final IndicesStatsRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndicesStats>>() {
@Override public ActionFuture<IndicesStats> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return indicesStatsAction.execute(node, request);
}
});
}
@Override public void stats(final IndicesStatsRequest request, final ActionListener<IndicesStats> listener) {
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndicesStats>() {
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndicesStats> listener) throws ElasticSearchException {
indicesStatsAction.execute(node, request, listener);
}
}, listener);
}
@Override public ActionFuture<IndicesStatusResponse> status(final IndicesStatusRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndicesStatusResponse>>() {
@Override public ActionFuture<IndicesStatusResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.lucene;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -35,7 +34,7 @@ public class Directories {
/**
* Returns the estimated size of a {@link Directory}.
*/
public static ByteSizeValue estimateSize(Directory directory) throws IOException {
public static long estimateSize(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
@ -45,7 +44,7 @@ public class Directories {
// ignore, the file is not there no more
}
}
return new ByteSizeValue(estimatedSize);
return estimatedSize;
}
private Directories() {

View File

@ -53,4 +53,9 @@ public class MeanMetric implements Metric {
}
return 0.0;
}
public void clear() {
counter.reset();
sum.reset();
}
}

View File

@ -38,7 +38,7 @@ public interface ToXContent {
boolean paramAsBoolean(String key, boolean defaultValue);
Boolean paramAsBoolean(String key, Boolean defaultValue);
Boolean paramAsBooleanOptional(String key, Boolean defaultValue);
}
public static final Params EMPTY_PARAMS = new Params() {
@ -54,7 +54,7 @@ public interface ToXContent {
return defaultValue;
}
@Override public Boolean paramAsBoolean(String key, Boolean defaultValue) {
@Override public Boolean paramAsBooleanOptional(String key, Boolean defaultValue) {
return defaultValue;
}
};
@ -83,7 +83,7 @@ public interface ToXContent {
return Booleans.parseBoolean(param(key), defaultValue);
}
@Override public Boolean paramAsBoolean(String key, Boolean defaultValue) {
@Override public Boolean paramAsBooleanOptional(String key, Boolean defaultValue) {
String sValue = param(key);
if (sValue == null) {
return defaultValue;

View File

@ -317,6 +317,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY;
private long startTime;
private long endTime;
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
this.docMapper = docMapper;
this.uid = uid;
@ -397,6 +400,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public UidField uidField() {
return (UidField) doc.rootDoc().getFieldable(UidFieldMapper.NAME);
}
public Create startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public Create endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}
static class Index implements IndexingOperation {
@ -407,6 +435,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private VersionType versionType = VersionType.INTERNAL;
private Origin origin = Origin.PRIMARY;
private long startTime;
private long endTime;
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
this.docMapper = docMapper;
this.uid = uid;
@ -487,6 +518,30 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public UidField uidField() {
return (UidField) doc.rootDoc().getFieldable(UidFieldMapper.NAME);
}
public Index startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public Index endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}
static class Delete implements Operation {
@ -498,6 +553,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private Origin origin = Origin.PRIMARY;
private boolean notFound;
private long startTime;
private long endTime;
public Delete(String type, String id, Term uid) {
this.type = type;
this.id = id;
@ -555,6 +613,31 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
this.notFound = notFound;
return this;
}
public Delete startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public Delete endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}
static class DeleteByQuery {
@ -564,6 +647,9 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final Filter aliasFilter;
private final String[] types;
private long startTime;
private long endTime;
public DeleteByQuery(Query query, byte[] source, @Nullable String[] filteringAliases, @Nullable Filter aliasFilter, String... types) {
this.query = query;
this.source = source;
@ -591,6 +677,30 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
public Filter aliasFilter() {
return aliasFilter;
}
public DeleteByQuery startTime(long startTime) {
this.startTime = startTime;
return this;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public DeleteByQuery endTime(long endTime) {
this.endTime = endTime;
return this;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.indexing;
import org.elasticsearch.index.engine.Engine;
/**
* @author kimchy (shay.banon)
*/
public abstract class IndexingOperationListener {
public Engine.Create preCreate(Engine.Create create) {
return create;
}
public void postCreate(Engine.Create create) {
}
public Engine.Index preIndex(Engine.Index index) {
return index;
}
public void postIndex(Engine.Index index) {
}
public Engine.Delete preDelete(Engine.Delete delete) {
return delete;
}
public void postDelete(Engine.Delete delete) {
}
public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
return deleteByQuery;
}
public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.indexing;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class IndexingStats implements Streamable, ToXContent {
public static class Stats implements Streamable, ToXContent {
private long indexCount;
private long indexTimeInMillis;
private long deleteCount;
private long deleteTimeInMillis;
Stats() {
}
public Stats(long indexCount, long indexTimeInMillis, long deleteCount, long deleteTimeInMillis) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
this.deleteCount = deleteCount;
this.deleteTimeInMillis = deleteTimeInMillis;
}
public void add(Stats stats) {
indexCount += stats.indexCount;
indexTimeInMillis += stats.indexTimeInMillis;
deleteCount += stats.deleteCount;
deleteTimeInMillis += stats.deleteTimeInMillis;
}
public long indexCount() {
return indexCount;
}
public long getIndexCount() {
return indexCount;
}
public TimeValue indexTime() {
return new TimeValue(indexTimeInMillis);
}
public long indexTimeInMillis() {
return indexTimeInMillis;
}
public long getIndexTimeInMillis() {
return indexTimeInMillis;
}
public long deleteCount() {
return deleteCount;
}
public long getDeleteCount() {
return deleteCount;
}
public TimeValue deleteTime() {
return new TimeValue(deleteTimeInMillis);
}
public long deleteTimeInMillis() {
return deleteTimeInMillis;
}
public long getDeleteTimeInMillis() {
return deleteTimeInMillis;
}
public static Stats readStats(StreamInput in) throws IOException {
Stats stats = new Stats();
stats.readFrom(in);
return stats;
}
@Override public void readFrom(StreamInput in) throws IOException {
indexCount = in.readVLong();
indexTimeInMillis = in.readVLong();
deleteCount = in.readVLong();
deleteTimeInMillis = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
out.writeVLong(indexTimeInMillis);
out.writeVLong(deleteCount);
out.writeVLong(deleteTimeInMillis);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.INDEX_TOTAL, indexCount);
builder.field(Fields.INDEX_TIME, indexTime().toString());
builder.field(Fields.INDEX_TIME_IN_MILLIS, indexTimeInMillis);
builder.field(Fields.DELETE_TOTAL, deleteCount);
builder.field(Fields.DELETE_TIME, deleteTime().toString());
builder.field(Fields.DELETE_TIME_IN_MILLIS, deleteTimeInMillis);
return builder;
}
}
private Stats totalStats;
@Nullable private Map<String, Stats> typeStats;
public IndexingStats() {
totalStats = new Stats();
}
public IndexingStats(Stats totalStats, @Nullable Map<String, Stats> typeStats) {
this.totalStats = totalStats;
this.typeStats = typeStats;
}
public void add(IndexingStats indexingStats) {
if (indexingStats == null) {
return;
}
totalStats.add(indexingStats.totalStats);
if (indexingStats.typeStats != null && !indexingStats.typeStats.isEmpty()) {
if (typeStats == null) {
typeStats = new HashMap<String, Stats>(indexingStats.typeStats.size());
}
for (Map.Entry<String, Stats> entry : indexingStats.typeStats.entrySet()) {
Stats stats = typeStats.get(entry.getKey());
if (stats == null) {
typeStats.put(entry.getKey(), entry.getValue());
} else {
stats.add(entry.getValue());
}
}
}
}
public Stats total() {
return this.totalStats;
}
public Map<String, Stats> typeStats() {
return this.typeStats;
}
public static IndexingStats readIndexingStats(StreamInput in) throws IOException {
IndexingStats indexingStats = new IndexingStats();
indexingStats.readFrom(in);
return indexingStats;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.INDEXING);
totalStats.toXContent(builder, params);
if (typeStats != null && !typeStats.isEmpty()) {
builder.startObject(Fields.TYPES);
for (Map.Entry<String, Stats> entry : typeStats.entrySet()) {
builder.startObject(entry.getKey());
entry.getValue().toXContent(builder, params);
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString INDEXING = new XContentBuilderString("indexing");
static final XContentBuilderString TYPES = new XContentBuilderString("types");
static final XContentBuilderString INDEX_TOTAL = new XContentBuilderString("index_total");
static final XContentBuilderString INDEX_TIME = new XContentBuilderString("index_time");
static final XContentBuilderString INDEX_TIME_IN_MILLIS = new XContentBuilderString("index_time_in_millis");
static final XContentBuilderString DELETE_TOTAL = new XContentBuilderString("delete_total");
static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time");
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
}
@Override public void readFrom(StreamInput in) throws IOException {
totalStats = Stats.readStats(in);
if (in.readBoolean()) {
int size = in.readVInt();
typeStats = new HashMap<String, Stats>(size);
for (int i = 0; i < size; i++) {
typeStats.put(in.readUTF(), Stats.readStats(in));
}
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
totalStats.writeTo(out);
if (typeStats == null || typeStats.isEmpty()) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(typeStats.size());
for (Map.Entry<String, Stats> entry : typeStats.entrySet()) {
out.writeUTF(entry.getKey());
entry.getValue().writeTo(out);
}
}
}
}

View File

@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@ -17,24 +17,15 @@
* under the License.
*/
package org.elasticsearch.index.shard.service;
package org.elasticsearch.index.indexing;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.common.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public abstract class OperationListener {
public class ShardIndexingModule extends AbstractModule {
public Engine.Create beforeCreate(Engine.Create create) {
return create;
}
public Engine.Index beforeIndex(Engine.Index index) {
return index;
}
public Engine.Delete beforeDelete(Engine.Delete delete) {
return delete;
@Override protected void configure() {
bind(ShardIndexingService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.indexing;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
/**
*/
public class ShardIndexingService extends AbstractIndexShardComponent {
private final StatsHolder totalStats = new StatsHolder();
private volatile Map<String, StatsHolder> typesStats = ImmutableMap.of();
private CopyOnWriteArrayList<IndexingOperationListener> listeners = null;
@Inject public ShardIndexingService(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
/**
* Returns the stats, including type specific stats. If the types are null/0 length, then nothing
* is returned for them. If they are set, then only types provided will be returned, or
* <tt>_all</tt> for all types.
*/
public IndexingStats stats(String... types) {
IndexingStats.Stats total = totalStats.stats();
Map<String, IndexingStats.Stats> typesSt = null;
if (types != null && types.length > 0) {
if (types.length == 1 && types[0].equals("_all")) {
typesSt = new HashMap<String, IndexingStats.Stats>(typesStats.size());
for (Map.Entry<String, StatsHolder> entry : typesStats.entrySet()) {
typesSt.put(entry.getKey(), entry.getValue().stats());
}
} else {
typesSt = new HashMap<String, IndexingStats.Stats>(types.length);
for (String type : types) {
StatsHolder statsHolder = typesStats.get(type);
if (statsHolder != null) {
typesSt.put(type, statsHolder.stats());
}
}
}
}
return new IndexingStats(total, typesSt);
}
public synchronized void addListener(IndexingOperationListener listener) {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<IndexingOperationListener>();
}
listeners.add(listener);
}
public synchronized void removeListener(IndexingOperationListener listener) {
if (listeners == null) {
return;
}
listeners.remove(listener);
if (listeners.isEmpty()) {
listeners = null;
}
}
public Engine.Create preCreate(Engine.Create create) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
create = listener.preCreate(create);
}
}
return create;
}
public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
typeStats(create.type()).indexMetric.inc(took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postCreate(create);
}
}
}
public Engine.Index preIndex(Engine.Index index) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
index = listener.preIndex(index);
}
}
return index;
}
public void postIndex(Engine.Index index) {
long took = index.endTime() - index.startTime();
totalStats.indexMetric.inc(took);
typeStats(index.type()).indexMetric.inc(took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postIndex(index);
}
}
}
public Engine.Delete preDelete(Engine.Delete delete) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
delete = listener.preDelete(delete);
}
}
return delete;
}
public void postDelete(Engine.Delete delete) {
long took = delete.endTime() - delete.startTime();
totalStats.deleteMetric.inc(took);
typeStats(delete.type()).deleteMetric.inc(took);
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postDelete(delete);
}
}
}
public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
deleteByQuery = listener.preDeleteByQuery(deleteByQuery);
}
}
return deleteByQuery;
}
public void postDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
if (listeners != null) {
for (IndexingOperationListener listener : listeners) {
listener.postDeleteByQuery(deleteByQuery);
}
}
}
public void clear() {
totalStats.clear();
synchronized (this) {
typesStats = ImmutableMap.of();
}
}
private StatsHolder typeStats(String type) {
StatsHolder stats = typesStats.get(type);
if (stats == null) {
synchronized (this) {
stats = typesStats.get(type);
if (stats == null) {
stats = new StatsHolder();
typesStats = MapBuilder.newMapBuilder(typesStats).put(type, stats).immutableMap();
}
}
}
return stats;
}
static class StatsHolder {
public final MeanMetric indexMetric = new MeanMetric();
public final MeanMetric deleteMetric = new MeanMetric();
public IndexingStats.Stats stats() {
return new IndexingStats.Stats(
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()),
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()));
}
public void clear() {
indexMetric.clear();
deleteMetric.clear();
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.indexing.IndexingOperationListener;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
@ -43,7 +44,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.OperationListener;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesService;
@ -84,7 +85,7 @@ public class PercolatorService extends AbstractIndexComponent {
if (percolatorIndexService != null) {
for (IndexShard indexShard : percolatorIndexService) {
try {
indexShard.addListener(realTimePercolatorOperationListener);
indexShard.indexingService().addListener(realTimePercolatorOperationListener);
} catch (Exception e) {
// ignore
}
@ -101,7 +102,7 @@ public class PercolatorService extends AbstractIndexComponent {
if (percolatorIndexService != null) {
for (IndexShard indexShard : percolatorIndexService) {
try {
indexShard.removeListener(realTimePercolatorOperationListener);
indexShard.indexingService().removeListener(realTimePercolatorOperationListener);
} catch (Exception e) {
// ignore
}
@ -197,7 +198,7 @@ public class PercolatorService extends AbstractIndexComponent {
// add a listener that will update based on changes done to the _percolate index
// the relevant indices with loaded queries
if (indexShard.shardId().index().name().equals(INDEX_NAME)) {
indexShard.addListener(realTimePercolatorOperationListener);
((InternalIndexShard) indexShard).indexingService().addListener(realTimePercolatorOperationListener);
}
}
@ -250,23 +251,23 @@ public class PercolatorService extends AbstractIndexComponent {
}
}
class RealTimePercolatorOperationListener extends OperationListener {
class RealTimePercolatorOperationListener extends IndexingOperationListener {
@Override public Engine.Create beforeCreate(Engine.Create create) {
@Override public Engine.Create preCreate(Engine.Create create) {
if (create.type().equals(index().name())) {
percolator.addQuery(create.id(), create.source());
}
return create;
}
@Override public Engine.Index beforeIndex(Engine.Index index) {
@Override public Engine.Index preIndex(Engine.Index index) {
if (index.type().equals(index().name())) {
percolator.addQuery(index.id(), index.source());
}
return index;
}
@Override public Engine.Delete beforeDelete(Engine.Delete delete) {
@Override public Engine.Delete preDelete(Engine.Delete delete) {
if (delete.type().equals(index().name())) {
percolator.removeQuery(delete.id());
}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayModule;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.indexing.ShardIndexingModule;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyModule;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
@ -279,6 +280,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId));
modules.add(new ShardIndexingModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class)));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*/
public class DocsStats implements Streamable, ToXContent {
long count = 0;
long deleted = 0;
public DocsStats() {
}
public DocsStats(long count, long deleted) {
this.count = count;
this.deleted = deleted;
}
public void add(DocsStats docsStats) {
if (docsStats == null) {
return;
}
count += docsStats.count;
deleted += docsStats.deleted;
}
public long count() {
return this.count;
}
public long getCount() {
return this.count;
}
public long deleted() {
return this.deleted;
}
public long getDeleted() {
return this.deleted;
}
public static DocsStats readDocStats(StreamInput in) throws IOException {
DocsStats docsStats = new DocsStats();
docsStats.readFrom(in);
return docsStats;
}
@Override public void readFrom(StreamInput in) throws IOException {
count = in.readVLong();
deleted = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(deleted);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOCS);
builder.field(Fields.COUNT, count);
builder.field(Fields.DELETED, deleted);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString DOCS = new XContentBuilderString("docs");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
static final XContentBuilderString DELETED = new XContentBuilderString("deleted");
}
}

View File

@ -26,11 +26,16 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.StoreStats;
/**
* @author kimchy (shay.banon)
@ -38,12 +43,18 @@ import org.elasticsearch.index.shard.IndexShardState;
@ThreadSafe
public interface IndexShard extends IndexShardComponent {
void addListener(OperationListener listener);
void removeListener(OperationListener listener);
ShardIndexingService indexingService();
ShardRouting routingEntry();
DocsStats docStats();
StoreStats storeStats();
IndexingStats indexingStats(String... types);
MergeStats mergeStats();
RefreshStats refreshStats();
FlushStats flushStats();
@ -62,7 +73,9 @@ public interface IndexShard extends IndexShardComponent {
void delete(Engine.Delete delete) throws ElasticSearchException;
void deleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
Engine.DeleteByQuery prepareDeleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException;
void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException;
Engine.GetResult get(Engine.Get get) throws ElasticSearchException;

View File

@ -47,11 +47,14 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.OptimizeFailedEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.refresh.RefreshStats;
@ -60,6 +63,7 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.recovery.RecoveryStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.InternalIndicesLifecycle;
@ -68,7 +72,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -102,6 +105,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private final IndexAliasesService indexAliasesService;
private final ShardIndexingService indexingService;
private final Object mutex = new Object();
@ -120,15 +125,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private RecoveryStatus peerRecoveryStatus;
private CopyOnWriteArrayList<OperationListener> listeners = null;
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService) {
super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
@ -141,6 +144,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.queryParserService = queryParserService;
this.indexCache = indexCache;
this.indexAliasesService = indexAliasesService;
this.indexingService = indexingService;
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime("index.refresh_interval", engine.defaultRefreshInterval()));
@ -153,23 +157,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.checkIndex = indexSettings.getAsBoolean("index.shard.check_index", false);
}
@Override public synchronized void addListener(OperationListener listener) {
if (listeners == null) {
listeners = new CopyOnWriteArrayList<OperationListener>();
}
listeners.add(listener);
}
@Override public synchronized void removeListener(OperationListener listener) {
if (listeners == null) {
return;
}
listeners.remove(listener);
if (listeners.isEmpty()) {
listeners = null;
}
}
public MergeSchedulerProvider mergeScheduler() {
return this.mergeScheduler;
}
@ -186,6 +173,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return translog;
}
public ShardIndexingService indexingService() {
return this.indexingService;
}
@Override public ShardRouting routingEntry() {
return this.shardRouting;
}
@ -267,82 +258,82 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
@Override public Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid()), doc);
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid()), doc).startTime(startTime);
}
@Override public ParsedDocument create(Engine.Create create) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
create = listener.beforeCreate(create);
}
}
create = indexingService.preCreate(create);
if (logger.isTraceEnabled()) {
logger.trace("index {}", create.docs());
}
engine.create(create);
create.endTime(System.nanoTime());
indexingService.postCreate(create);
return create.parsedDoc();
}
@Override public Engine.Index prepareIndex(SourceToParse source) throws ElasticSearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
ParsedDocument doc = docMapper.parse(source);
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid()), doc);
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid()), doc).startTime(startTime);
}
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
index = listener.beforeIndex(index);
}
}
index = indexingService.preIndex(index);
if (logger.isTraceEnabled()) {
logger.trace("index {}", index.docs());
}
engine.index(index);
index.endTime(System.nanoTime());
indexingService.postIndex(index);
return index.parsedDoc();
}
@Override public Engine.Delete prepareDelete(String type, String id, long version) throws ElasticSearchException {
long startTime = System.nanoTime();
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(type);
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id)).version(version);
return new Engine.Delete(type, id, docMapper.uidMapper().term(type, id)).version(version).startTime(startTime);
}
@Override public void delete(Engine.Delete delete) throws ElasticSearchException {
writeAllowed();
if (listeners != null) {
for (OperationListener listener : listeners) {
delete = listener.beforeDelete(delete);
}
}
delete = indexingService.preDelete(delete);
if (logger.isTraceEnabled()) {
logger.trace("delete [{}]", delete.uid().text());
}
engine.delete(delete);
delete.endTime(System.nanoTime());
indexingService.postDelete(delete);
}
@Override public void deleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
writeAllowed();
@Override public Engine.DeleteByQuery prepareDeleteByQuery(byte[] querySource, @Nullable String[] filteringAliases, String... types) throws ElasticSearchException {
long startTime = System.nanoTime();
if (types == null) {
types = Strings.EMPTY_ARRAY;
}
innerDeleteByQuery(querySource, filteringAliases, types);
}
private void innerDeleteByQuery(byte[] querySource, String[] filteringAliases, String... types) {
Query query = queryParserService.parse(querySource).query();
query = filterQueryIfNeeded(query, types);
Filter aliasFilter = indexAliasesService.aliasFilter(filteringAliases);
if (logger.isTraceEnabled()) {
logger.trace("delete_by_query [{}]", query);
}
return new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, types).startTime(startTime);
}
engine.delete(new Engine.DeleteByQuery(query, querySource, filteringAliases, aliasFilter, types));
@Override public void deleteByQuery(Engine.DeleteByQuery deleteByQuery) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
logger.trace("delete_by_query [{}]", deleteByQuery.query());
}
deleteByQuery = indexingService.preDeleteByQuery(deleteByQuery);
engine.delete(deleteByQuery);
deleteByQuery.endTime(System.nanoTime());
indexingService.postDeleteByQuery(deleteByQuery);
}
@Override public Engine.GetResult get(Engine.Get get) throws ElasticSearchException {
@ -395,6 +386,36 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
}
@Override public DocsStats docStats() {
Engine.Searcher searcher = null;
try {
searcher = engine.searcher();
return new DocsStats(searcher.reader().numDocs(), searcher.reader().numDeletedDocs());
} catch (Exception e) {
return new DocsStats();
} finally {
if (searcher != null) {
searcher.release();
}
}
}
@Override public IndexingStats indexingStats(String... types) {
return indexingService.stats(types);
}
@Override public StoreStats storeStats() {
try {
return store.stats();
} catch (IOException e) {
return new StoreStats();
}
}
@Override public MergeStats mergeStats() {
return mergeScheduler.stats();
}
@Override public void flush(Engine.Flush flush) throws ElasticSearchException {
writeAllowed();
if (logger.isTraceEnabled()) {
@ -434,10 +455,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
public void close(String reason) {
synchronized (mutex) {
if (listeners != null) {
listeners.clear();
}
listeners = null;
indexSettingsService.removeListener(applyRefreshSettings);
if (state != IndexShardState.CLOSED) {
if (refreshScheduledFuture != null) {
@ -520,7 +537,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
break;
case DELETE_BY_QUERY:
Translog.DeleteByQuery deleteByQuery = (Translog.DeleteByQuery) operation;
innerDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types());
engine.delete(prepareDeleteByQuery(deleteByQuery.source(), deleteByQuery.filteringAliases(), deleteByQuery.types()));
break;
default:
throw new ElasticSearchIllegalStateException("No operation defined for [" + operation + "]");

View File

@ -64,6 +64,8 @@ public interface Store extends IndexShardComponent {
*/
void fullDelete() throws IOException;
StoreStats stats() throws IOException;
/**
* The estimated size this store is using.
*/

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*/
public class StoreStats implements Streamable, ToXContent {
private long sizeInBytes;
public StoreStats() {
}
public StoreStats(long sizeInBytes) {
this.sizeInBytes = sizeInBytes;
}
public void add(StoreStats stats) {
if (stats == null) {
return;
}
sizeInBytes += stats.sizeInBytes;
}
public long sizeInBytes() {
return sizeInBytes;
}
public long getSizeInBytes() {
return sizeInBytes;
}
public ByteSizeValue size() {
return new ByteSizeValue(sizeInBytes);
}
public ByteSizeValue getSize() {
return size();
}
public static StoreStats readStoreStats(StreamInput in) throws IOException {
StoreStats store = new StoreStats();
store.readFrom(in);
return store;
}
@Override public void readFrom(StreamInput in) throws IOException {
sizeInBytes = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(sizeInBytes);
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.STORE);
builder.field(Fields.SIZE, size().toString());
builder.field(Fields.SIZE_IN_BYTES, sizeInBytes);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString STORE = new XContentBuilderString("store");
static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -125,8 +126,12 @@ public abstract class AbstractStore extends AbstractIndexShardComponent implemen
deleteContent();
}
@Override public StoreStats stats() throws IOException {
return new StoreStats(Directories.estimateSize(directory()));
}
@Override public ByteSizeValue estimateSize() throws IOException {
return Directories.estimateSize(directory());
return new ByteSizeValue(Directories.estimateSize(directory()));
}
@Override public void renameFile(String from, String to) throws IOException {

View File

@ -33,7 +33,6 @@ import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.env.NodeEnvironment;
@ -50,12 +49,12 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.CacheStats;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexGatewayModule;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperServiceModule;
import org.elasticsearch.index.merge.MergeStats;
@ -67,18 +66,17 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.service.InternalIndexService;
import org.elasticsearch.index.settings.IndexSettingsModule;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.plugins.IndexPluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -172,35 +170,25 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
}
@Override public NodeIndicesStats stats() {
long storeTotalSize = 0;
long numberOfDocs = 0;
DocsStats docsStats = new DocsStats();
StoreStats storeStats = new StoreStats();
IndexingStats indexingStats = new IndexingStats();
CacheStats cacheStats = new CacheStats();
MergeStats mergeStats = new MergeStats();
RefreshStats refreshStats = new RefreshStats();
FlushStats flushStats = new FlushStats();
for (IndexService indexService : indices.values()) {
for (IndexShard indexShard : indexService) {
try {
storeTotalSize += ((InternalIndexShard) indexShard).store().estimateSize().bytes();
} catch (IOException e) {
// ignore
}
if (indexShard.state() == IndexShardState.STARTED) {
Engine.Searcher searcher = indexShard.searcher();
try {
numberOfDocs += searcher.reader().numDocs();
} finally {
searcher.release();
}
}
mergeStats.add(((InternalIndexShard) indexShard).mergeScheduler().stats());
storeStats.add(indexShard.storeStats());
docsStats.add(indexShard.docStats());
indexingStats.add(indexShard.indexingStats());
mergeStats.add(indexShard.mergeStats());
refreshStats.add(indexShard.refreshStats());
flushStats.add(indexShard.flushStats());
}
cacheStats.add(indexService.cache().stats());
}
return new NodeIndicesStats(new ByteSizeValue(storeTotalSize), numberOfDocs, cacheStats, mergeStats, refreshStats, flushStats);
return new NodeIndicesStats(storeStats, docsStats, indexingStats, cacheStats, mergeStats, refreshStats, flushStats);
}
/**

View File

@ -22,14 +22,16 @@ package org.elasticsearch.indices;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.cache.CacheStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import java.io.IOException;
import java.io.Serializable;
@ -41,9 +43,11 @@ import java.io.Serializable;
*/
public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
private ByteSizeValue storeSize;
private StoreStats storeStats;
private long numDocs;
private DocsStats docsStats;
private IndexingStats indexingStats;
private CacheStats cacheStats;
@ -56,41 +60,41 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
NodeIndicesStats() {
}
public NodeIndicesStats(ByteSizeValue storeSize, long numDocs, CacheStats cacheStats, MergeStats mergeStats, RefreshStats refreshStats, FlushStats flushStats) {
this.storeSize = storeSize;
this.numDocs = numDocs;
public NodeIndicesStats(StoreStats storeStats, DocsStats docsStats, IndexingStats indexingStats, CacheStats cacheStats, MergeStats mergeStats, RefreshStats refreshStats, FlushStats flushStats) {
this.storeStats = storeStats;
this.docsStats = docsStats;
this.indexingStats = indexingStats;
this.cacheStats = cacheStats;
this.mergeStats = mergeStats;
this.refreshStats = refreshStats;
this.flushStats = flushStats;
}
/**
* The size of the index storage taken on the node.
*/
public ByteSizeValue storeSize() {
return this.storeSize;
public StoreStats store() {
return this.storeStats;
}
/**
* The size of the index storage taken on the node.
*/
public ByteSizeValue getStoreSize() {
return storeSize;
public StoreStats getStore() {
return storeStats;
}
/**
* The number of docs on the node (an aggregation of the number of docs of all the shards allocated on the node).
*/
public long numDocs() {
return numDocs;
public DocsStats docs() {
return this.docsStats;
}
/**
* The number of docs on the node (an aggregation of the number of docs of all the shards allocated on the node).
*/
public long getNumDocs() {
return numDocs();
public DocsStats getDocs() {
return this.docsStats;
}
public IndexingStats indexing() {
return indexingStats;
}
public IndexingStats getIndexing() {
return indexing();
}
public CacheStats cache() {
@ -132,8 +136,9 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
}
@Override public void readFrom(StreamInput in) throws IOException {
storeSize = ByteSizeValue.readBytesSizeValue(in);
numDocs = in.readVLong();
storeStats = StoreStats.readStoreStats(in);
docsStats = DocsStats.readDocStats(in);
indexingStats = IndexingStats.readIndexingStats(in);
cacheStats = CacheStats.readCacheStats(in);
mergeStats = MergeStats.readMergeStats(in);
refreshStats = RefreshStats.readRefreshStats(in);
@ -141,8 +146,9 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
}
@Override public void writeTo(StreamOutput out) throws IOException {
storeSize.writeTo(out);
out.writeVLong(numDocs);
storeStats.writeTo(out);
docsStats.writeTo(out);
indexingStats.writeTo(out);
cacheStats.writeTo(out);
mergeStats.writeTo(out);
refreshStats.writeTo(out);
@ -152,13 +158,9 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.INDICES);
builder.field(Fields.SIZE, storeSize.toString());
builder.field(Fields.SIZE_IN_BYTES, storeSize.bytes());
builder.startObject(Fields.DOCS);
builder.field(Fields.NUM_DOCS, numDocs);
builder.endObject();
storeStats.toXContent(builder, params);
docsStats.toXContent(builder, params);
indexingStats.toXContent(builder, params);
cacheStats.toXContent(builder, params);
mergeStats.toXContent(builder, params);
refreshStats.toXContent(builder, params);
@ -170,11 +172,5 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
static final class Fields {
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString DOCS = new XContentBuilderString("docs");
static final XContentBuilderString NUM_DOCS = new XContentBuilderString("num_docs");
}
}

View File

@ -82,7 +82,7 @@ public interface RestRequest extends ToXContent.Params {
boolean paramAsBoolean(String key, boolean defaultValue);
Boolean paramAsBoolean(String key, Boolean defaultValue);
Boolean paramAsBooleanOptional(String key, Boolean defaultValue);
TimeValue paramAsTime(String key, TimeValue defaultValue);

View File

@ -52,6 +52,7 @@ import org.elasticsearch.rest.action.admin.indices.refresh.RestRefreshAction;
import org.elasticsearch.rest.action.admin.indices.segments.RestIndicesSegmentsAction;
import org.elasticsearch.rest.action.admin.indices.settings.RestGetSettingsAction;
import org.elasticsearch.rest.action.admin.indices.settings.RestUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.indices.stats.RestIndicesStatsAction;
import org.elasticsearch.rest.action.admin.indices.status.RestIndicesStatusAction;
import org.elasticsearch.rest.action.admin.indices.template.delete.RestDeleteIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.template.get.RestGetIndexTemplateAction;
@ -102,6 +103,7 @@ public class RestActionModule extends AbstractModule {
bind(RestReplicationPingAction.class).asEagerSingleton();
bind(RestIndicesExistsAction.class).asEagerSingleton();
bind(RestIndicesStatsAction.class).asEagerSingleton();
bind(RestIndicesStatusAction.class).asEagerSingleton();
bind(RestIndicesSegmentsAction.class).asEagerSingleton();
bind(RestGetIndicesAliasesAction.class).asEagerSingleton();

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices.stats;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.action.support.RestActions.*;
/**
*/
public class RestIndicesStatsAction extends BaseRestHandler {
@Inject public RestIndicesStatsAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(GET, "/_stats", this);
controller.registerHandler(GET, "/{index}/_stats", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
boolean clear = request.paramAsBoolean("clear", false);
if (clear) {
indicesStatsRequest.clear();
}
indicesStatsRequest.docs(request.paramAsBoolean("docs", indicesStatsRequest.docs()));
indicesStatsRequest.store(request.paramAsBoolean("store", indicesStatsRequest.store()));
indicesStatsRequest.indexing(request.paramAsBoolean("indexing", indicesStatsRequest.indexing()));
indicesStatsRequest.merge(request.paramAsBoolean("merge", indicesStatsRequest.merge()));
indicesStatsRequest.refresh(request.paramAsBoolean("refresh", indicesStatsRequest.refresh()));
indicesStatsRequest.flush(request.paramAsBoolean("flush", indicesStatsRequest.flush()));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStats>() {
@Override public void onResponse(IndicesStats response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -59,7 +59,7 @@ public class RestGetAction extends BaseRestHandler {
getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
getRequest.routing(request.param("routing"));
getRequest.preference(request.param("preference"));
getRequest.realtime(request.paramAsBoolean("realtime", null));
getRequest.realtime(request.paramAsBooleanOptional("realtime", null));
String sField = request.param("fields");
if (sField != null) {

View File

@ -56,7 +56,7 @@ public class RestMultiGetAction extends BaseRestHandler {
multiGetRequest.listenerThreaded(false);
multiGetRequest.refresh(request.paramAsBoolean("refresh", multiGetRequest.refresh()));
multiGetRequest.preference(request.param("preference"));
multiGetRequest.realtime(request.paramAsBoolean("realtime", null));
multiGetRequest.realtime(request.paramAsBooleanOptional("realtime", null));
try {
multiGetRequest.add(request.param("index"), request.param("type"), request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength());

View File

@ -178,8 +178,8 @@ public class RestSearchAction extends BaseRestHandler {
}
searchSourceBuilder.explain(request.paramAsBoolean("explain", null));
searchSourceBuilder.version(request.paramAsBoolean("version", null));
searchSourceBuilder.explain(request.paramAsBooleanOptional("explain", null));
searchSourceBuilder.version(request.paramAsBooleanOptional("version", null));
String sField = request.param("fields");
if (sField != null) {

View File

@ -78,7 +78,7 @@ public abstract class AbstractRestRequest implements RestRequest {
return Booleans.parseBoolean(param(key), defaultValue);
}
@Override public Boolean paramAsBoolean(String key, Boolean defaultValue) {
@Override public Boolean paramAsBooleanOptional(String key, Boolean defaultValue) {
String sValue = param(key);
if (sValue == null) {
return defaultValue;

View File

@ -0,0 +1,113 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.indices.stats;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SimpleIndexStatsTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node2");
}
@Test public void simpleIndexTemplateTests() throws Exception {
// rely on 1 replica for this tests
client.admin().indices().prepareCreate("test1").execute().actionGet();
client.admin().indices().prepareCreate("test2").execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealthResponse.timedOut(), equalTo(false));
client.prepareIndex("test1", "type1", Integer.toString(1)).setSource("field", "value").execute().actionGet();
client.prepareIndex("test1", "type2", Integer.toString(1)).setSource("field", "value").execute().actionGet();
client.prepareIndex("test2", "type", Integer.toString(1)).setSource("field", "value").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
IndicesStats stats = client.admin().indices().prepareStats().execute().actionGet();
assertThat(stats.primaries().docs().count(), equalTo(3l));
assertThat(stats.total().docs().count(), equalTo(6l));
assertThat(stats.primaries().indexing().total().indexCount(), equalTo(3l));
assertThat(stats.total().indexing().total().indexCount(), equalTo(6l));
assertThat(stats.total().store(), notNullValue());
// verify nulls
assertThat(stats.total().merge(), nullValue());
assertThat(stats.total().flush(), nullValue());
assertThat(stats.total().refresh(), nullValue());
assertThat(stats.index("test1").primaries().docs().count(), equalTo(2l));
assertThat(stats.index("test1").total().docs().count(), equalTo(4l));
assertThat(stats.index("test1").primaries().store(), notNullValue());
assertThat(stats.index("test1").primaries().merge(), nullValue());
assertThat(stats.index("test1").primaries().flush(), nullValue());
assertThat(stats.index("test1").primaries().refresh(), nullValue());
assertThat(stats.index("test2").primaries().docs().count(), equalTo(1l));
assertThat(stats.index("test2").total().docs().count(), equalTo(2l));
// check flags
stats = client.admin().indices().prepareStats()
.setDocs(false)
.setStore(false)
.setIndexing(false)
.setFlush(true)
.setRefresh(true)
.setMerge(true)
.execute().actionGet();
assertThat(stats.total().docs(), nullValue());
assertThat(stats.total().store(), nullValue());
assertThat(stats.total().indexing(), nullValue());
assertThat(stats.total().merge(), notNullValue());
assertThat(stats.total().flush(), notNullValue());
assertThat(stats.total().refresh(), notNullValue());
// check types
stats = client.admin().indices().prepareStats().setTypes("type1", "type").execute().actionGet();
assertThat(stats.primaries().indexing().typeStats().get("type1").indexCount(), equalTo(1l));
assertThat(stats.primaries().indexing().typeStats().get("type").indexCount(), equalTo(1l));
assertThat(stats.primaries().indexing().typeStats().get("type2"), nullValue());
}
}