Indices Status / Node Stats: Add (Lucene) index merge stats, closes #745.
This commit is contained in:
parent
2909060af8
commit
682ad7e2fc
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
|
||||
* and current merges.
|
||||
*/
|
||||
public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||
|
||||
private AtomicLong totalMerges = new AtomicLong();
|
||||
private AtomicLong totalMergeTime = new AtomicLong();
|
||||
private AtomicLong currentMerges = new AtomicLong();
|
||||
|
||||
public TrackingConcurrentMergeScheduler() {
|
||||
super();
|
||||
}
|
||||
|
||||
public long totalMerges() {
|
||||
return totalMerges.get();
|
||||
}
|
||||
|
||||
public long totalMergeTime() {
|
||||
return totalMergeTime.get();
|
||||
}
|
||||
|
||||
public long currentMerges() {
|
||||
return currentMerges.get();
|
||||
}
|
||||
|
||||
@Override protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
|
||||
long time = System.currentTimeMillis();
|
||||
currentMerges.incrementAndGet();
|
||||
try {
|
||||
super.doMerge(merge);
|
||||
} finally {
|
||||
currentMerges.decrementAndGet();
|
||||
totalMerges.incrementAndGet();
|
||||
totalMergeTime.addAndGet(System.currentTimeMillis() - time);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.lucene.index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
// LUCENE MONITOR - Copied from SerialMergeScheduler
|
||||
public class TrackingSerialMergeScheduler extends MergeScheduler {
|
||||
|
||||
private AtomicLong totalMerges = new AtomicLong();
|
||||
private AtomicLong totalMergeTime = new AtomicLong();
|
||||
private AtomicLong currentMerges = new AtomicLong();
|
||||
|
||||
public long totalMerges() {
|
||||
return totalMerges.get();
|
||||
}
|
||||
|
||||
public long totalMergeTime() {
|
||||
return totalMergeTime.get();
|
||||
}
|
||||
|
||||
public long currentMerges() {
|
||||
return currentMerges.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Just do the merges in sequence. We do this
|
||||
* "synchronized" so that even if the application is using
|
||||
* multiple threads, only one merge may run at a time.
|
||||
*/
|
||||
@Override
|
||||
synchronized public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
|
||||
while (true) {
|
||||
MergePolicy.OneMerge merge = writer.getNextMerge();
|
||||
if (merge == null)
|
||||
break;
|
||||
|
||||
long time = System.currentTimeMillis();
|
||||
currentMerges.incrementAndGet();
|
||||
try {
|
||||
writer.merge(merge);
|
||||
} finally {
|
||||
currentMerges.decrementAndGet();
|
||||
totalMerges.incrementAndGet();
|
||||
totalMergeTime.addAndGet(System.currentTimeMillis() - time);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.status;
|
|||
|
||||
import org.elasticsearch.common.collect.Iterators;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
@ -162,6 +163,24 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
|
|||
return docs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Total merges of this shard replication group.
|
||||
*/
|
||||
public MergeStats mergeStats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (ShardStatus shard : shards) {
|
||||
mergeStats.add(shard.mergeStats());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Total merges of this shard replication group.
|
||||
*/
|
||||
public MergeStats getMergeStats() {
|
||||
return this.mergeStats();
|
||||
}
|
||||
|
||||
@Override public Iterator<ShardStatus> iterator() {
|
||||
return Iterators.forArray(shards);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.status;
|
|||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -182,6 +183,24 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
|
|||
return docs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Total merges of this index.
|
||||
*/
|
||||
public MergeStats mergeStats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (IndexShardStatus shard : this) {
|
||||
mergeStats.add(shard.mergeStats());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Total merges of this index.
|
||||
*/
|
||||
public MergeStats getMergeStats() {
|
||||
return this.mergeStats();
|
||||
}
|
||||
|
||||
@Override public Iterator<IndexShardStatus> iterator() {
|
||||
return indexShards.values().iterator();
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -50,6 +51,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
|
|||
|
||||
DocsStatus docs;
|
||||
|
||||
MergeStats mergeStats;
|
||||
|
||||
PeerRecoveryStatus peerRecoveryStatus;
|
||||
|
||||
GatewayRecoveryStatus gatewayRecoveryStatus;
|
||||
|
@ -148,6 +151,20 @@ public class ShardStatus extends BroadcastShardOperationResponse {
|
|||
return docs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Index merge statistics.
|
||||
*/
|
||||
public MergeStats mergeStats() {
|
||||
return this.mergeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Index merge statistics.
|
||||
*/
|
||||
public MergeStats getMergeStats() {
|
||||
return this.mergeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Peer recovery status (<tt>null</tt> if not applicable). Both real time if an on going recovery
|
||||
* is in progress and summary once it is done.
|
||||
|
@ -256,6 +273,13 @@ public class ShardStatus extends BroadcastShardOperationResponse {
|
|||
out.writeVLong(gatewaySnapshotStatus.indexSize);
|
||||
out.writeVInt(gatewaySnapshotStatus.expectedNumberOfOperations());
|
||||
}
|
||||
|
||||
if (mergeStats == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
mergeStats.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
|
@ -287,5 +311,9 @@ public class ShardStatus extends BroadcastShardOperationResponse {
|
|||
gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()),
|
||||
in.readVLong(), in.readVLong(), in.readVLong(), in.readVInt());
|
||||
}
|
||||
|
||||
if (in.readBoolean()) {
|
||||
mergeStats = MergeStats.readMergeStats(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -166,6 +166,8 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
|||
} finally {
|
||||
searcher.release();
|
||||
}
|
||||
|
||||
shardStatus.mergeStats = indexShard.mergeScheduler().stats();
|
||||
}
|
||||
// check on going recovery (from peer or gateway)
|
||||
RecoveryStatus peerRecoveryStatus = indexShard.peerRecoveryStatus();
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.merge;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class MergeStats implements Streamable {
|
||||
|
||||
private long totalMerges;
|
||||
|
||||
private long currentMerges;
|
||||
|
||||
private long totalMergeTime;
|
||||
|
||||
public MergeStats() {
|
||||
|
||||
}
|
||||
|
||||
public MergeStats(long totalMerges, long currentMerges, long totalMergeTime) {
|
||||
this.totalMerges = totalMerges;
|
||||
this.currentMerges = currentMerges;
|
||||
this.totalMergeTime = totalMergeTime;
|
||||
}
|
||||
|
||||
public void add(long totalMerges, long currentMerges, long totalMergeTime) {
|
||||
this.totalMerges += totalMerges;
|
||||
this.currentMerges += currentMerges;
|
||||
this.totalMergeTime += totalMergeTime;
|
||||
}
|
||||
|
||||
public void add(MergeStats mergeStats) {
|
||||
this.totalMerges += mergeStats.totalMerges;
|
||||
this.currentMerges += mergeStats.currentMerges;
|
||||
this.totalMergeTime += mergeStats.totalMergeTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total number of merges executed.
|
||||
*/
|
||||
public long totalMerges() {
|
||||
return this.totalMerges;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of merges executing.
|
||||
*/
|
||||
public long currentMerges() {
|
||||
return this.currentMerges;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merges have been executed (in milliseconds).
|
||||
*/
|
||||
public long totalMergeTimeInMillis() {
|
||||
return this.totalMergeTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merges have been executed.
|
||||
*/
|
||||
public TimeValue totalMergeTime() {
|
||||
return new TimeValue(totalMergeTime);
|
||||
}
|
||||
|
||||
public static MergeStats readMergeStats(StreamInput in) throws IOException {
|
||||
MergeStats stats = new MergeStats();
|
||||
stats.readFrom(in);
|
||||
return stats;
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
totalMerges = in.readVLong();
|
||||
currentMerges = in.readVLong();
|
||||
totalMergeTime = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(totalMerges);
|
||||
out.writeVLong(currentMerges);
|
||||
out.writeVLong(totalMergeTime);
|
||||
}
|
||||
}
|
|
@ -23,12 +23,15 @@ import org.apache.lucene.index.*;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
|
@ -37,6 +40,8 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
|
|||
|
||||
private final int maxThreadCount;
|
||||
|
||||
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomConcurrentMergeScheduler>();
|
||||
|
||||
@Inject public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
super(shardId, indexSettings);
|
||||
|
||||
|
@ -46,17 +51,30 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
|
|||
}
|
||||
|
||||
@Override public MergeScheduler newMergeScheduler() {
|
||||
ConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(shardId);
|
||||
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(shardId, this);
|
||||
concurrentMergeScheduler.setMaxThreadCount(maxThreadCount);
|
||||
schedulers.add(concurrentMergeScheduler);
|
||||
return concurrentMergeScheduler;
|
||||
}
|
||||
|
||||
private static class CustomConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||
@Override public MergeStats stats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.currentMerges(), scheduler.totalMergeTime());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
||||
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
|
||||
|
||||
private final ShardId shardId;
|
||||
|
||||
private CustomConcurrentMergeScheduler(ShardId shardId) {
|
||||
private final ConcurrentMergeSchedulerProvider provider;
|
||||
|
||||
private CustomConcurrentMergeScheduler(ShardId shardId, ConcurrentMergeSchedulerProvider provider) {
|
||||
super();
|
||||
this.shardId = shardId;
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
@Override public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
|
||||
|
@ -81,5 +99,10 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
|
|||
thread.setName("[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName());
|
||||
return thread;
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
super.close();
|
||||
provider.schedulers.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.index.merge.scheduler;
|
||||
|
||||
import org.apache.lucene.index.MergeScheduler;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.shard.IndexShardComponent;
|
||||
|
||||
/**
|
||||
|
@ -28,4 +29,6 @@ import org.elasticsearch.index.shard.IndexShardComponent;
|
|||
public interface MergeSchedulerProvider<T extends MergeScheduler> extends IndexShardComponent {
|
||||
|
||||
T newMergeScheduler();
|
||||
|
||||
MergeStats stats();
|
||||
}
|
||||
|
|
|
@ -22,32 +22,53 @@ package org.elasticsearch.index.merge.scheduler;
|
|||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.MergeScheduler;
|
||||
import org.apache.lucene.index.SerialMergeScheduler;
|
||||
import org.apache.lucene.index.TrackingSerialMergeScheduler;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent implements MergeSchedulerProvider {
|
||||
|
||||
private Set<CustomSerialMergeScheduler> schedulers = new CopyOnWriteArraySet<CustomSerialMergeScheduler>();
|
||||
|
||||
@Inject public SerialMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
super(shardId, indexSettings);
|
||||
logger.trace("using [serial] merge scheduler");
|
||||
}
|
||||
|
||||
@Override public MergeScheduler newMergeScheduler() {
|
||||
return new CustomSerialMergeScheduler();
|
||||
CustomSerialMergeScheduler scheduler = new CustomSerialMergeScheduler(this);
|
||||
schedulers.add(scheduler);
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
public static class CustomSerialMergeScheduler extends SerialMergeScheduler {
|
||||
@Override public MergeStats stats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (CustomSerialMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.currentMerges(), scheduler.totalMergeTime());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
||||
public static class CustomSerialMergeScheduler extends TrackingSerialMergeScheduler {
|
||||
|
||||
private final SerialMergeSchedulerProvider provider;
|
||||
|
||||
public CustomSerialMergeScheduler(SerialMergeSchedulerProvider provider) {
|
||||
this.provider = provider;
|
||||
}
|
||||
|
||||
@Override public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
|
||||
try {
|
||||
|
@ -65,5 +86,10 @@ public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent im
|
|||
}
|
||||
super.merge(writer);
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
super.close();
|
||||
provider.schedulers.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
|||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.engine.*;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.query.IndexQueryParser;
|
||||
import org.elasticsearch.index.query.IndexQueryParserMissingException;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
|
@ -79,6 +80,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
|
||||
private final Store store;
|
||||
|
||||
private final MergeSchedulerProvider mergeScheduler;
|
||||
|
||||
private final Engine engine;
|
||||
|
||||
private final Translog translog;
|
||||
|
@ -103,12 +106,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
|
||||
private CopyOnWriteArrayList<OperationListener> listeners = null;
|
||||
|
||||
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, Engine engine, Translog translog,
|
||||
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
|
||||
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
|
||||
super(shardId, indexSettings);
|
||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||
this.store = store;
|
||||
this.engine = engine;
|
||||
this.mergeScheduler = mergeScheduler;
|
||||
this.translog = translog;
|
||||
this.threadPool = threadPool;
|
||||
this.mapperService = mapperService;
|
||||
|
@ -145,6 +149,10 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
}
|
||||
}
|
||||
|
||||
public MergeSchedulerProvider mergeScheduler() {
|
||||
return this.mergeScheduler;
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
return this.store;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.index.engine.IndexEngineModule;
|
|||
import org.elasticsearch.index.gateway.IndexGateway;
|
||||
import org.elasticsearch.index.gateway.IndexGatewayModule;
|
||||
import org.elasticsearch.index.mapper.MapperServiceModule;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.percolator.PercolatorModule;
|
||||
import org.elasticsearch.index.percolator.PercolatorService;
|
||||
import org.elasticsearch.index.query.IndexQueryParserModule;
|
||||
|
@ -165,6 +166,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
long fieldCacheEvictions = 0;
|
||||
long fieldCacheTotalSize = 0;
|
||||
long filterCacheTotalSize = 0;
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (IndexService indexService : indices.values()) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
try {
|
||||
|
@ -181,12 +183,13 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
|||
searcher.release();
|
||||
}
|
||||
}
|
||||
mergeStats.add(((InternalIndexShard) indexShard).mergeScheduler().stats());
|
||||
}
|
||||
fieldCacheEvictions += indexService.cache().fieldData().evictions();
|
||||
fieldCacheTotalSize += indexService.cache().fieldData().sizeInBytes();
|
||||
filterCacheTotalSize += indexService.cache().filter().sizeInBytes();
|
||||
}
|
||||
return new NodeIndicesStats(new ByteSizeValue(storeTotalSize), numberOfDocs, new ByteSizeValue(fieldCacheTotalSize), new ByteSizeValue(filterCacheTotalSize), fieldCacheEvictions);
|
||||
return new NodeIndicesStats(new ByteSizeValue(storeTotalSize), numberOfDocs, new ByteSizeValue(fieldCacheTotalSize), new ByteSizeValue(filterCacheTotalSize), fieldCacheEvictions, mergeStats);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
|
@ -47,16 +48,19 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
|
||||
private long fieldCacheEvictions;
|
||||
|
||||
private MergeStats mergeStats;
|
||||
|
||||
NodeIndicesStats() {
|
||||
}
|
||||
|
||||
public NodeIndicesStats(ByteSizeValue storeSize, long numDocs, ByteSizeValue fieldCacheSize, ByteSizeValue filterCacheSize,
|
||||
long fieldCacheEvictions) {
|
||||
long fieldCacheEvictions, MergeStats mergeStats) {
|
||||
this.storeSize = storeSize;
|
||||
this.numDocs = numDocs;
|
||||
this.fieldCacheSize = fieldCacheSize;
|
||||
this.filterCacheSize = filterCacheSize;
|
||||
this.fieldCacheEvictions = fieldCacheEvictions;
|
||||
this.mergeStats = mergeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -111,6 +115,14 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
return fieldCacheEvictions();
|
||||
}
|
||||
|
||||
public MergeStats mergeStats() {
|
||||
return this.mergeStats;
|
||||
}
|
||||
|
||||
public MergeStats getMergeStats() {
|
||||
return this.mergeStats;
|
||||
}
|
||||
|
||||
public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
|
||||
NodeIndicesStats stats = new NodeIndicesStats();
|
||||
stats.readFrom(in);
|
||||
|
@ -123,6 +135,7 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
fieldCacheSize = ByteSizeValue.readBytesSizeValue(in);
|
||||
filterCacheSize = ByteSizeValue.readBytesSizeValue(in);
|
||||
fieldCacheEvictions = in.readVLong();
|
||||
mergeStats = MergeStats.readMergeStats(in);
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -131,6 +144,7 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
fieldCacheSize.writeTo(out);
|
||||
filterCacheSize.writeTo(out);
|
||||
out.writeVLong(fieldCacheEvictions);
|
||||
mergeStats.writeTo(out);
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
|
@ -143,6 +157,14 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
builder.field(Fields.FIELD_CACHE_SIZE_IN_BYTES, fieldCacheSize.bytes());
|
||||
builder.field(Fields.FILTER_CACHE_SIZE, filterCacheSize.toString());
|
||||
builder.field(Fields.FILTER_CACHE_SIZE_IN_BYTES, filterCacheSize.bytes());
|
||||
|
||||
builder.startObject(Fields.MERGES);
|
||||
builder.field(Fields.CURRENT, mergeStats.currentMerges());
|
||||
builder.field(Fields.TOTAL, mergeStats.totalMerges());
|
||||
builder.field(Fields.TOTAL_TIME, mergeStats.totalMergeTime());
|
||||
builder.field(Fields.TOTAL_TIME_IN_MILLIS, mergeStats.totalMergeTimeInMillis());
|
||||
builder.endObject();
|
||||
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -157,5 +179,10 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
|||
static final XContentBuilderString FIELD_CACHE_EVICTIONS = new XContentBuilderString("field_cache_evictions");
|
||||
static final XContentBuilderString FILTER_CACHE_SIZE = new XContentBuilderString("filter_cache_size");
|
||||
static final XContentBuilderString FILTER_CACHE_SIZE_IN_BYTES = new XContentBuilderString("filter_cache_size_in_bytes");
|
||||
static final XContentBuilderString MERGES = new XContentBuilderString("merges");
|
||||
static final XContentBuilderString CURRENT = new XContentBuilderString("current");
|
||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
|
||||
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsFilter;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
|
@ -107,6 +108,16 @@ public class RestIndicesStatusAction extends BaseRestHandler {
|
|||
builder.endObject();
|
||||
}
|
||||
|
||||
MergeStats mergeStats = indexStatus.mergeStats();
|
||||
if (mergeStats != null) {
|
||||
builder.startObject("merges");
|
||||
builder.field("current", mergeStats.currentMerges());
|
||||
builder.field("total", mergeStats.totalMerges());
|
||||
builder.field("total_time", mergeStats.totalMergeTime());
|
||||
builder.field("total_time_in_millis", mergeStats.totalMergeTimeInMillis());
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
builder.startObject("shards");
|
||||
for (IndexShardStatus indexShardStatus : indexStatus) {
|
||||
builder.startArray(Integer.toString(indexShardStatus.shardId().id()));
|
||||
|
@ -144,6 +155,16 @@ public class RestIndicesStatusAction extends BaseRestHandler {
|
|||
builder.endObject();
|
||||
}
|
||||
|
||||
mergeStats = shardStatus.mergeStats();
|
||||
if (mergeStats != null) {
|
||||
builder.startObject("merges");
|
||||
builder.field("current", mergeStats.currentMerges());
|
||||
builder.field("total", mergeStats.totalMerges());
|
||||
builder.field("total_time", mergeStats.totalMergeTime());
|
||||
builder.field("total_time_in_millis", mergeStats.totalMergeTimeInMillis());
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
if (shardStatus.peerRecoveryStatus() != null) {
|
||||
PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();
|
||||
builder.startObject("peer_recovery");
|
||||
|
|
Loading…
Reference in New Issue