create simple metrics aggregator classes, and use them where applicable in the code, abstracting away the actual aggregation method (and use jsr166e long addr)

This commit is contained in:
Shay Banon 2011-08-21 00:12:36 +03:00
parent e67427d4af
commit 0549e9d1c2
15 changed files with 205 additions and 88 deletions

View File

@ -20,10 +20,11 @@
package org.apache.lucene.index;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
@ -33,9 +34,8 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final ESLogger logger;
private final AtomicLong totalMerges = new AtomicLong();
private final AtomicLong totalMergeTime = new AtomicLong();
private final AtomicLong currentMerges = new AtomicLong();
private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric currentMerges = new CounterMetric();
public TrackingConcurrentMergeScheduler(ESLogger logger) {
super();
@ -43,30 +43,29 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
}
public long totalMerges() {
return totalMerges.get();
return totalMerges.count();
}
public long totalMergeTime() {
return totalMergeTime.get();
return totalMerges.sum();
}
public long currentMerges() {
return currentMerges.get();
return currentMerges.count();
}
@Override protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
long time = System.currentTimeMillis();
currentMerges.incrementAndGet();
currentMerges.inc();
if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting...", merge.info.name);
}
try {
super.doMerge(merge);
} finally {
currentMerges.decrementAndGet();
totalMerges.incrementAndGet();
currentMerges.dec();
long took = System.currentTimeMillis() - time;
totalMergeTime.addAndGet(took);
totalMerges.inc(took);
if (took > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took));
} else if (logger.isTraceEnabled()) {

View File

@ -20,34 +20,34 @@
package org.apache.lucene.index;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
// LUCENE MONITOR - Copied from SerialMergeScheduler
public class TrackingSerialMergeScheduler extends MergeScheduler {
private final ESLogger logger;
private final AtomicLong totalMerges = new AtomicLong();
private final AtomicLong totalMergeTime = new AtomicLong();
private final AtomicLong currentMerges = new AtomicLong();
private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric currentMerges = new CounterMetric();
public TrackingSerialMergeScheduler(ESLogger logger) {
this.logger = logger;
}
public long totalMerges() {
return totalMerges.get();
return totalMerges.count();
}
public long totalMergeTime() {
return totalMergeTime.get();
return totalMerges.sum();
}
public long currentMerges() {
return currentMerges.get();
return currentMerges.count();
}
/**
@ -67,14 +67,13 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
}
long time = System.currentTimeMillis();
currentMerges.incrementAndGet();
currentMerges.inc();
try {
writer.merge(merge);
} finally {
currentMerges.decrementAndGet();
totalMerges.incrementAndGet();
currentMerges.dec();
long took = System.currentTimeMillis() - time;
totalMergeTime.addAndGet(took);
totalMerges.inc(took);
if (took > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took));
} else if (logger.isTraceEnabled()) {

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.metrics;
import org.elasticsearch.common.util.concurrent.jsr166e.LongAdder;
/**
*/
public class CounterMetric implements Metric {
private final LongAdder counter = new LongAdder();
public void inc() {
counter.increment();
}
public void inc(long n) {
counter.add(n);
}
public void dec() {
counter.decrement();
}
public void dec(long n) {
counter.add(-n);
}
public long count() {
return counter.sum();
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.metrics;
import org.elasticsearch.common.util.concurrent.jsr166e.LongAdder;
/**
*/
public class MeanMetric implements Metric {
private final LongAdder counter = new LongAdder();
private final LongAdder sum = new LongAdder();
public void inc(long n) {
counter.increment();
sum.add(n);
}
public void dec(long n) {
counter.decrement();
sum.add(-n);
}
public long count() {
return counter.sum();
}
public long sum() {
return sum.sum();
}
public double mean() {
long count = count();
if (count > 0) {
return sum.sum() / (double) count;
}
return 0.0;
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.metrics;
/**
*/
public interface Metric {
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.netty;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelEvent;
import org.elasticsearch.common.netty.channel.ChannelFuture;
@ -31,7 +32,6 @@ import org.elasticsearch.common.netty.channel.ChannelUpstreamHandler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
public class OpenChannelsHandler implements ChannelUpstreamHandler {
private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private AtomicLong openChannelsCount = new AtomicLong();
private CounterMetric openChannelsMetric = new CounterMetric();
private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
boolean removed = openChannels.remove(future.getChannel());
if (removed) {
openChannelsCount.decrementAndGet();
openChannelsMetric.dec();
}
}
};
@ -57,7 +57,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
if (evt.getState() == ChannelState.OPEN) {
boolean added = openChannels.add(ctx.getChannel());
if (added) {
openChannelsCount.incrementAndGet();
openChannelsMetric.inc();
ctx.getChannel().getCloseFuture().addListener(remover);
}
}
@ -66,7 +66,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler {
}
public long numberOfOpenChannels() {
return openChannelsCount.get();
return openChannelsMetric.count();
}
public void close() {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.base.Objects;
import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -36,7 +37,6 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
@ -48,7 +48,7 @@ public class ResidentFieldDataCache extends AbstractConcurrentMapFieldDataCache
private volatile int maxSize;
private volatile TimeValue expire;
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
private final ApplySettings applySettings = new ApplySettings();
@ -85,11 +85,11 @@ public class ResidentFieldDataCache extends AbstractConcurrentMapFieldDataCache
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) {
evictions.incrementAndGet();
evictions.inc();
}
static {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.field.data.support.AbstractConcurrentMapFieldDataCache;
@ -30,14 +31,13 @@ import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.settings.IndexSettings;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache implements MapEvictionListener<String, FieldData> {
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
@Inject public SoftFieldDataCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
@ -48,7 +48,7 @@ public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache impl
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public String type() {
@ -56,6 +56,6 @@ public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache impl
}
@Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) {
evictions.incrementAndGet();
evictions.inc();
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.field.data.support.AbstractConcurrentMapFieldDataCache;
@ -30,14 +31,13 @@ import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.settings.IndexSettings;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author kimchy (shay.banon)
*/
public class WeakFieldDataCache extends AbstractConcurrentMapFieldDataCache implements MapEvictionListener<String, FieldData> {
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
@Inject public WeakFieldDataCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
@ -52,10 +52,10 @@ public class WeakFieldDataCache extends AbstractConcurrentMapFieldDataCache impl
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) {
evictions.incrementAndGet();
evictions.inc();
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.docset.DocSet;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* A resident reference based filter cache that has weak keys on the <tt>IndexReader</tt>.
@ -49,7 +49,7 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem
private volatile int maxSize;
private volatile TimeValue expire;
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
private final ApplySettings applySettings = new ApplySettings();
@ -85,11 +85,11 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public void onEviction(Filter filter, DocSet docSet) {
evictions.incrementAndGet();
evictions.inc();
}
static {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.docset.DocSet;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* A soft reference based filter cache that has soft keys on the <tt>IndexReader</tt>.
@ -49,7 +49,7 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
private volatile int maxSize;
private volatile TimeValue expire;
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
private final ApplySettings applySettings = new ApplySettings();
@ -87,11 +87,11 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public void onEviction(Filter filter, DocSet docSet) {
evictions.incrementAndGet();
evictions.inc();
}
static {

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.concurrentlinkedhashmap.Weigher;
import org.elasticsearch.common.lab.LongsLAB;
import org.elasticsearch.common.lucene.docset.DocSet;
import org.elasticsearch.common.lucene.search.NoCacheFilter;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -40,13 +42,11 @@ import org.elasticsearch.index.settings.IndexSettings;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent implements FilterCache, IndexReader.ReaderFinishedListener, EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
final ConcurrentMap<Object, Boolean> seenReaders = ConcurrentCollections.newConcurrentMap();
final AtomicInteger seenReadersCount = new AtomicInteger();
final CounterMetric seenReadersCount = new CounterMetric();
final boolean labEnabled;
final ByteSizeValue labMaxAlloc;
@ -55,10 +55,8 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
final int labMaxAllocBytes;
final int labChunkSizeBytes;
protected final AtomicLong evictions = new AtomicLong();
final AtomicLong totalSizeInBytes = new AtomicLong();
final AtomicInteger totalCount = new AtomicInteger();
final CounterMetric evictionsMetric = new CounterMetric();
final MeanMetric totalMetric = new MeanMetric();
protected AbstractWeightedFilterCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
@ -90,14 +88,13 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
if (removed == null) {
return;
}
seenReadersCount.decrementAndGet();
seenReadersCount.dec();
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache = cache();
for (FilterCacheKey key : cache.keySet()) {
if (key.readerKey() == readerKey) {
FilterCacheValue<DocSet> removed2 = cache.remove(key);
if (removed2 != null) {
totalCount.decrementAndGet();
totalSizeInBytes.addAndGet(-removed2.value().sizeInBytes());
totalMetric.dec(removed2.value().sizeInBytes());
}
}
}
@ -115,26 +112,25 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
if (removed == null) {
return;
}
seenReadersCount.decrementAndGet();
seenReadersCount.dec();
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache = cache();
for (FilterCacheKey key : cache.keySet()) {
if (key.readerKey() == reader.getCoreCacheKey()) {
FilterCacheValue<DocSet> removed2 = cache.remove(key);
if (removed2 != null) {
totalCount.decrementAndGet();
totalSizeInBytes.addAndGet(-removed2.value().sizeInBytes());
totalMetric.dec(removed2.value().sizeInBytes());
}
}
}
}
@Override public EntriesStats entriesStats() {
int seenReadersCount = this.seenReadersCount.get();
return new EntriesStats(totalSizeInBytes.get(), seenReadersCount == 0 ? 0 : totalCount.get() / seenReadersCount);
long seenReadersCount = this.seenReadersCount.count();
return new EntriesStats(totalMetric.sum(), seenReadersCount == 0 ? 0 : totalMetric.count() / seenReadersCount);
}
@Override public long evictions() {
return evictions.get();
return evictionsMetric.count();
}
@Override public Filter cache(Filter filterToCache) {
@ -176,7 +172,7 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
Boolean previous = cache.seenReaders.putIfAbsent(reader.getCoreCacheKey(), Boolean.TRUE);
if (previous == null) {
reader.addReaderFinishedListener(cache);
cache.seenReadersCount.incrementAndGet();
cache.seenReadersCount.inc();
}
}
@ -189,8 +185,7 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
cacheValue = new FilterCacheValue<DocSet>(docSet, longsLAB);
FilterCacheValue<DocSet> previous = innerCache.putIfAbsent(cacheKey, cacheValue);
if (previous == null) {
cache.totalSizeInBytes.addAndGet(cacheValue.value().sizeInBytes());
cache.totalCount.incrementAndGet();
cache.totalMetric.inc(cacheValue.value().sizeInBytes());
}
}
@ -226,10 +221,9 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent
@Override public void onEviction(FilterCacheKey filterCacheKey, FilterCacheValue<DocSet> docSetFilterCacheValue) {
if (filterCacheKey != null) {
if (seenReaders.containsKey(filterCacheKey.readerKey())) {
evictions.incrementAndGet();
totalCount.decrementAndGet();
evictionsMetric.inc();
if (docSetFilterCacheValue != null) {
totalSizeInBytes.addAndGet(-docSetFilterCacheValue.value().sizeInBytes());
totalMetric.dec(docSetFilterCacheValue.value().sizeInBytes());
}
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener;
import org.elasticsearch.common.collect.MapMaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.docset.DocSet;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* A weak reference based filter cache that has weak keys on the <tt>IndexReader</tt>.
@ -49,7 +49,7 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
private volatile int maxSize;
private volatile TimeValue expire;
private final AtomicLong evictions = new AtomicLong();
private final CounterMetric evictions = new CounterMetric();
private final ApplySettings applySettings = new ApplySettings();
@ -85,11 +85,11 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
}
@Override public long evictions() {
return evictions.get();
return evictions.count();
}
@Override public void onEviction(Filter filter, DocSet docSet) {
evictions.incrementAndGet();
evictions.inc();
}
static {

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
@ -68,7 +69,6 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.mapper.SourceToParse.*;
@ -122,8 +122,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final AtomicLong totalRefresh = new AtomicLong();
private final AtomicLong totalRefreshTime = new AtomicLong();
private final MeanMetric totalRefreshMetric = new MeanMetric();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) {
@ -382,12 +381,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
}
long time = System.currentTimeMillis();
engine.refresh(refresh);
totalRefresh.incrementAndGet();
totalRefreshTime.addAndGet(System.currentTimeMillis() - time);
totalRefreshMetric.inc(System.currentTimeMillis() - time);
}
@Override public RefreshStats refreshStats() {
return new RefreshStats(totalRefresh.get(), totalRefreshTime.get());
return new RefreshStats(totalRefreshMetric.count(), totalRefreshMetric.sum());
}
@Override public void flush(Engine.Flush flush) throws ElasticSearchException {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
@ -108,7 +109,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public TransportStats stats() {
return new TransportStats(transport.serverOpen(), adapter.rxCount.get(), adapter.rxSize.get(), adapter.txCount.get(), adapter.txSize.get());
return new TransportStats(transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum());
}
public BoundTransportAddress boundAddress() {
@ -231,19 +232,15 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
class Adapter implements TransportServiceAdapter {
final AtomicLong rxCount = new AtomicLong();
final AtomicLong rxSize = new AtomicLong();
final AtomicLong txCount = new AtomicLong();
final AtomicLong txSize = new AtomicLong();
final MeanMetric rxMetric = new MeanMetric();
final MeanMetric txMetric = new MeanMetric();
@Override public void received(long size) {
rxCount.incrementAndGet();
rxSize.addAndGet(size);
rxMetric.inc(size);
}
@Override public void sent(long size) {
txCount.incrementAndGet();
txSize.addAndGet(size);
txMetric.inc(size);
}
@Override public TransportRequestHandler handler(String action) {