From 0549e9d1c2fd11f55fdeba1dc320f83ed40bf642 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sun, 21 Aug 2011 00:12:36 +0300 Subject: [PATCH] create simple metrics aggregator classes, and use them where applicable in the code, abstracting away the actual aggregation method (and use jsr166e long addr) --- .../TrackingConcurrentMergeScheduler.java | 21 ++++--- .../index/TrackingSerialMergeScheduler.java | 21 ++++--- .../common/metrics/CounterMetric.java | 49 ++++++++++++++++ .../common/metrics/MeanMetric.java | 56 +++++++++++++++++++ .../elasticsearch/common/metrics/Metric.java | 25 +++++++++ .../common/netty/OpenChannelsHandler.java | 10 ++-- .../data/resident/ResidentFieldDataCache.java | 8 +-- .../field/data/soft/SoftFieldDataCache.java | 8 +-- .../field/data/weak/WeakFieldDataCache.java | 8 +-- .../filter/resident/ResidentFilterCache.java | 8 +-- .../cache/filter/soft/SoftFilterCache.java | 8 +-- .../support/AbstractWeightedFilterCache.java | 38 ++++++------- .../cache/filter/weak/WeakFilterCache.java | 8 +-- .../shard/service/InternalIndexShard.java | 10 ++-- .../transport/TransportService.java | 15 ++--- 15 files changed, 205 insertions(+), 88 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/CounterMetric.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/MeanMetric.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/Metric.java diff --git a/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java b/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java index 8229d985270..5aad4552b06 100644 --- a/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java +++ b/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingConcurrentMergeScheduler.java @@ -20,10 +20,11 @@ package org.apache.lucene.index; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; /** * An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total @@ -33,9 +34,8 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler { private final ESLogger logger; - private final AtomicLong totalMerges = new AtomicLong(); - private final AtomicLong totalMergeTime = new AtomicLong(); - private final AtomicLong currentMerges = new AtomicLong(); + private final MeanMetric totalMerges = new MeanMetric(); + private final CounterMetric currentMerges = new CounterMetric(); public TrackingConcurrentMergeScheduler(ESLogger logger) { super(); @@ -43,30 +43,29 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler { } public long totalMerges() { - return totalMerges.get(); + return totalMerges.count(); } public long totalMergeTime() { - return totalMergeTime.get(); + return totalMerges.sum(); } public long currentMerges() { - return currentMerges.get(); + return currentMerges.count(); } @Override protected void doMerge(MergePolicy.OneMerge merge) throws IOException { long time = System.currentTimeMillis(); - currentMerges.incrementAndGet(); + currentMerges.inc(); if (logger.isTraceEnabled()) { logger.trace("merge [{}] starting...", merge.info.name); } try { super.doMerge(merge); } finally { - currentMerges.decrementAndGet(); - totalMerges.incrementAndGet(); + currentMerges.dec(); long took = System.currentTimeMillis() - time; - totalMergeTime.addAndGet(took); + totalMerges.inc(took); if (took > 20000) { // if more than 20 seconds, DEBUG log it logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took)); } else if (logger.isTraceEnabled()) { diff --git a/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java b/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java index 8b86110bb6a..c0f875bfe6c 100644 --- a/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java +++ b/modules/elasticsearch/src/main/java/org/apache/lucene/index/TrackingSerialMergeScheduler.java @@ -20,34 +20,34 @@ package org.apache.lucene.index; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; // LUCENE MONITOR - Copied from SerialMergeScheduler public class TrackingSerialMergeScheduler extends MergeScheduler { private final ESLogger logger; - private final AtomicLong totalMerges = new AtomicLong(); - private final AtomicLong totalMergeTime = new AtomicLong(); - private final AtomicLong currentMerges = new AtomicLong(); + private final MeanMetric totalMerges = new MeanMetric(); + private final CounterMetric currentMerges = new CounterMetric(); public TrackingSerialMergeScheduler(ESLogger logger) { this.logger = logger; } public long totalMerges() { - return totalMerges.get(); + return totalMerges.count(); } public long totalMergeTime() { - return totalMergeTime.get(); + return totalMerges.sum(); } public long currentMerges() { - return currentMerges.get(); + return currentMerges.count(); } /** @@ -67,14 +67,13 @@ public class TrackingSerialMergeScheduler extends MergeScheduler { } long time = System.currentTimeMillis(); - currentMerges.incrementAndGet(); + currentMerges.inc(); try { writer.merge(merge); } finally { - currentMerges.decrementAndGet(); - totalMerges.incrementAndGet(); + currentMerges.dec(); long took = System.currentTimeMillis() - time; - totalMergeTime.addAndGet(took); + totalMerges.inc(took); if (took > 20000) { // if more than 20 seconds, DEBUG log it logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took)); } else if (logger.isTraceEnabled()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/CounterMetric.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/CounterMetric.java new file mode 100644 index 00000000000..a711fe1c8fa --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/CounterMetric.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.metrics; + +import org.elasticsearch.common.util.concurrent.jsr166e.LongAdder; + +/** + */ +public class CounterMetric implements Metric { + + private final LongAdder counter = new LongAdder(); + + public void inc() { + counter.increment(); + } + + public void inc(long n) { + counter.add(n); + } + + public void dec() { + counter.decrement(); + } + + public void dec(long n) { + counter.add(-n); + } + + public long count() { + return counter.sum(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/MeanMetric.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/MeanMetric.java new file mode 100644 index 00000000000..4fe3d03f2a7 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/MeanMetric.java @@ -0,0 +1,56 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.metrics; + +import org.elasticsearch.common.util.concurrent.jsr166e.LongAdder; + +/** + */ +public class MeanMetric implements Metric { + + private final LongAdder counter = new LongAdder(); + private final LongAdder sum = new LongAdder(); + + public void inc(long n) { + counter.increment(); + sum.add(n); + } + + public void dec(long n) { + counter.decrement(); + sum.add(-n); + } + + public long count() { + return counter.sum(); + } + + public long sum() { + return sum.sum(); + } + + public double mean() { + long count = count(); + if (count > 0) { + return sum.sum() / (double) count; + } + return 0.0; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/Metric.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/Metric.java new file mode 100644 index 00000000000..298e9db68ff --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/metrics/Metric.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.metrics; + +/** + */ +public interface Metric { +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java index 9eb0b7da86a..a38232597ed 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/netty/OpenChannelsHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.netty; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.netty.channel.Channel; import org.elasticsearch.common.netty.channel.ChannelEvent; import org.elasticsearch.common.netty.channel.ChannelFuture; @@ -31,7 +32,6 @@ import org.elasticsearch.common.netty.channel.ChannelUpstreamHandler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; /** * @author kimchy (shay.banon) @@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong; public class OpenChannelsHandler implements ChannelUpstreamHandler { private Set openChannels = ConcurrentCollections.newConcurrentSet(); - private AtomicLong openChannelsCount = new AtomicLong(); + private CounterMetric openChannelsMetric = new CounterMetric(); private final ChannelFutureListener remover = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { boolean removed = openChannels.remove(future.getChannel()); if (removed) { - openChannelsCount.decrementAndGet(); + openChannelsMetric.dec(); } } }; @@ -57,7 +57,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler { if (evt.getState() == ChannelState.OPEN) { boolean added = openChannels.add(ctx.getChannel()); if (added) { - openChannelsCount.incrementAndGet(); + openChannelsMetric.inc(); ctx.getChannel().getCloseFuture().addListener(remover); } } @@ -66,7 +66,7 @@ public class OpenChannelsHandler implements ChannelUpstreamHandler { } public long numberOfOpenChannels() { - return openChannelsCount.get(); + return openChannelsMetric.count(); } public void close() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/resident/ResidentFieldDataCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/resident/ResidentFieldDataCache.java index 017e741bba2..8455102cd73 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/resident/ResidentFieldDataCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/resident/ResidentFieldDataCache.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.base.Objects; import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -36,7 +37,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * @author kimchy (shay.banon) @@ -48,7 +48,7 @@ public class ResidentFieldDataCache extends AbstractConcurrentMapFieldDataCache private volatile int maxSize; private volatile TimeValue expire; - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); private final ApplySettings applySettings = new ApplySettings(); @@ -85,11 +85,11 @@ public class ResidentFieldDataCache extends AbstractConcurrentMapFieldDataCache } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) { - evictions.incrementAndGet(); + evictions.inc(); } static { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/soft/SoftFieldDataCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/soft/SoftFieldDataCache.java index 7b745ade686..9cc2a9baab0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/soft/SoftFieldDataCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/soft/SoftFieldDataCache.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.cache.field.data.support.AbstractConcurrentMapFieldDataCache; @@ -30,14 +31,13 @@ import org.elasticsearch.index.field.data.FieldData; import org.elasticsearch.index.settings.IndexSettings; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; /** * @author kimchy (shay.banon) */ public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache implements MapEvictionListener { - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); @Inject public SoftFieldDataCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); @@ -48,7 +48,7 @@ public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache impl } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public String type() { @@ -56,6 +56,6 @@ public class SoftFieldDataCache extends AbstractConcurrentMapFieldDataCache impl } @Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) { - evictions.incrementAndGet(); + evictions.inc(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/weak/WeakFieldDataCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/weak/WeakFieldDataCache.java index ade4fbd3a58..1669df66956 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/weak/WeakFieldDataCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/field/data/weak/WeakFieldDataCache.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.cache.field.data.support.AbstractConcurrentMapFieldDataCache; @@ -30,14 +31,13 @@ import org.elasticsearch.index.field.data.FieldData; import org.elasticsearch.index.settings.IndexSettings; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; /** * @author kimchy (shay.banon) */ public class WeakFieldDataCache extends AbstractConcurrentMapFieldDataCache implements MapEvictionListener { - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); @Inject public WeakFieldDataCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); @@ -52,10 +52,10 @@ public class WeakFieldDataCache extends AbstractConcurrentMapFieldDataCache impl } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public void onEviction(@Nullable String s, @Nullable FieldData fieldData) { - evictions.incrementAndGet(); + evictions.inc(); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java index ca245a440e8..07786431533 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.docset.DocSet; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * A resident reference based filter cache that has weak keys on the IndexReader. @@ -49,7 +49,7 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem private volatile int maxSize; private volatile TimeValue expire; - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); private final ApplySettings applySettings = new ApplySettings(); @@ -85,11 +85,11 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public void onEviction(Filter filter, DocSet docSet) { - evictions.incrementAndGet(); + evictions.inc(); } static { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java index 87a7af874b7..3194e83fdc1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.docset.DocSet; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * A soft reference based filter cache that has soft keys on the IndexReader. @@ -49,7 +49,7 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements private volatile int maxSize; private volatile TimeValue expire; - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); private final ApplySettings applySettings = new ApplySettings(); @@ -87,11 +87,11 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public void onEviction(Filter filter, DocSet docSet) { - evictions.incrementAndGet(); + evictions.inc(); } static { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractWeightedFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractWeightedFilterCache.java index 379827e2644..bf79e93dde9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractWeightedFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractWeightedFilterCache.java @@ -29,6 +29,8 @@ import org.elasticsearch.common.concurrentlinkedhashmap.Weigher; import org.elasticsearch.common.lab.LongsLAB; import org.elasticsearch.common.lucene.docset.DocSet; import org.elasticsearch.common.lucene.search.NoCacheFilter; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -40,13 +42,11 @@ import org.elasticsearch.index.settings.IndexSettings; import java.io.IOException; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent implements FilterCache, IndexReader.ReaderFinishedListener, EvictionListener> { final ConcurrentMap seenReaders = ConcurrentCollections.newConcurrentMap(); - final AtomicInteger seenReadersCount = new AtomicInteger(); + final CounterMetric seenReadersCount = new CounterMetric(); final boolean labEnabled; final ByteSizeValue labMaxAlloc; @@ -55,10 +55,8 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent final int labMaxAllocBytes; final int labChunkSizeBytes; - protected final AtomicLong evictions = new AtomicLong(); - - final AtomicLong totalSizeInBytes = new AtomicLong(); - final AtomicInteger totalCount = new AtomicInteger(); + final CounterMetric evictionsMetric = new CounterMetric(); + final MeanMetric totalMetric = new MeanMetric(); protected AbstractWeightedFilterCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); @@ -90,14 +88,13 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent if (removed == null) { return; } - seenReadersCount.decrementAndGet(); + seenReadersCount.dec(); ConcurrentMap> cache = cache(); for (FilterCacheKey key : cache.keySet()) { if (key.readerKey() == readerKey) { FilterCacheValue removed2 = cache.remove(key); if (removed2 != null) { - totalCount.decrementAndGet(); - totalSizeInBytes.addAndGet(-removed2.value().sizeInBytes()); + totalMetric.dec(removed2.value().sizeInBytes()); } } } @@ -115,26 +112,25 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent if (removed == null) { return; } - seenReadersCount.decrementAndGet(); + seenReadersCount.dec(); ConcurrentMap> cache = cache(); for (FilterCacheKey key : cache.keySet()) { if (key.readerKey() == reader.getCoreCacheKey()) { FilterCacheValue removed2 = cache.remove(key); if (removed2 != null) { - totalCount.decrementAndGet(); - totalSizeInBytes.addAndGet(-removed2.value().sizeInBytes()); + totalMetric.dec(removed2.value().sizeInBytes()); } } } } @Override public EntriesStats entriesStats() { - int seenReadersCount = this.seenReadersCount.get(); - return new EntriesStats(totalSizeInBytes.get(), seenReadersCount == 0 ? 0 : totalCount.get() / seenReadersCount); + long seenReadersCount = this.seenReadersCount.count(); + return new EntriesStats(totalMetric.sum(), seenReadersCount == 0 ? 0 : totalMetric.count() / seenReadersCount); } @Override public long evictions() { - return evictions.get(); + return evictionsMetric.count(); } @Override public Filter cache(Filter filterToCache) { @@ -176,7 +172,7 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent Boolean previous = cache.seenReaders.putIfAbsent(reader.getCoreCacheKey(), Boolean.TRUE); if (previous == null) { reader.addReaderFinishedListener(cache); - cache.seenReadersCount.incrementAndGet(); + cache.seenReadersCount.inc(); } } @@ -189,8 +185,7 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent cacheValue = new FilterCacheValue(docSet, longsLAB); FilterCacheValue previous = innerCache.putIfAbsent(cacheKey, cacheValue); if (previous == null) { - cache.totalSizeInBytes.addAndGet(cacheValue.value().sizeInBytes()); - cache.totalCount.incrementAndGet(); + cache.totalMetric.inc(cacheValue.value().sizeInBytes()); } } @@ -226,10 +221,9 @@ public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent @Override public void onEviction(FilterCacheKey filterCacheKey, FilterCacheValue docSetFilterCacheValue) { if (filterCacheKey != null) { if (seenReaders.containsKey(filterCacheKey.readerKey())) { - evictions.incrementAndGet(); - totalCount.decrementAndGet(); + evictionsMetric.inc(); if (docSetFilterCacheValue != null) { - totalSizeInBytes.addAndGet(-docSetFilterCacheValue.value().sizeInBytes()); + totalMetric.dec(docSetFilterCacheValue.value().sizeInBytes()); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java index 1588053f511..b2dbbe4cd14 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.docset.DocSet; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -35,7 +36,6 @@ import org.elasticsearch.index.settings.IndexSettingsService; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * A weak reference based filter cache that has weak keys on the IndexReader. @@ -49,7 +49,7 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements private volatile int maxSize; private volatile TimeValue expire; - private final AtomicLong evictions = new AtomicLong(); + private final CounterMetric evictions = new CounterMetric(); private final ApplySettings applySettings = new ApplySettings(); @@ -85,11 +85,11 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements } @Override public long evictions() { - return evictions.get(); + return evictions.count(); } @Override public void onEviction(Filter filter, DocSet docSet) { - evictions.incrementAndGet(); + evictions.inc(); } static { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 84d2695b284..8a6dabfe885 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadSafe; @@ -68,7 +69,6 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.index.mapper.SourceToParse.*; @@ -122,8 +122,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); - private final AtomicLong totalRefresh = new AtomicLong(); - private final AtomicLong totalRefreshTime = new AtomicLong(); + private final MeanMetric totalRefreshMetric = new MeanMetric(); @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) { @@ -382,12 +381,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } long time = System.currentTimeMillis(); engine.refresh(refresh); - totalRefresh.incrementAndGet(); - totalRefreshTime.addAndGet(System.currentTimeMillis() - time); + totalRefreshMetric.inc(System.currentTimeMillis() - time); } @Override public RefreshStats refreshStats() { - return new RefreshStats(totalRefresh.get(), totalRefreshTime.get()); + return new RefreshStats(totalRefreshMetric.count(), totalRefreshMetric.sum()); } @Override public void flush(Engine.Flush flush) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 6287b1d9870..5639fc678a3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; @@ -108,7 +109,7 @@ public class TransportService extends AbstractLifecycleComponent