[STORE] Add simple cache for StoreStats
this commit tries to reduce the filesystem calls to fetch metadata by using a simple cache on top of the stats call. Relates to #9683
This commit is contained in:
parent
a7e238dbb8
commit
85c611a1b7
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.lucene.store;
|
||||
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* IndexOutput that delegates all calls to another IndexOutput
|
||||
*/
|
||||
public class FilterIndexOutput extends IndexOutput {
|
||||
|
||||
protected final IndexOutput out;
|
||||
|
||||
public FilterIndexOutput(String resourceDescription, IndexOutput out) {
|
||||
super(resourceDescription);
|
||||
this.out = out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return out.getFilePointer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getChecksum() throws IOException {
|
||||
return out.getChecksum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
out.writeByte(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] b, int offset, int length) throws IOException {
|
||||
out.writeBytes(b, offset, length);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* A very simple single object cache that allows non-blocking refresh calls
|
||||
* triggered by expiry time.
|
||||
*/
|
||||
public abstract class SingleObjectCache<T>{
|
||||
|
||||
private volatile T cached;
|
||||
private Lock refreshLock = new ReentrantLock();
|
||||
private final TimeValue refreshInterval;
|
||||
protected long lastRefreshTimestamp = 0;
|
||||
|
||||
protected SingleObjectCache(TimeValue refreshInterval, T initialValue) {
|
||||
if (initialValue == null) {
|
||||
throw new IllegalArgumentException("initialValue must not be null");
|
||||
}
|
||||
this.refreshInterval = refreshInterval;
|
||||
cached = initialValue;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns the currently cached object and potentially refreshes the cache before returning.
|
||||
*/
|
||||
public T getOrRefresh() {
|
||||
if (needsRefresh()) {
|
||||
if(refreshLock.tryLock()) {
|
||||
try {
|
||||
if (needsRefresh()) { // check again!
|
||||
cached = refresh();
|
||||
assert cached != null;
|
||||
lastRefreshTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
} finally {
|
||||
refreshLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
assert cached != null;
|
||||
return cached;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new instance to cache
|
||||
*/
|
||||
protected abstract T refresh();
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the cache needs to be refreshed.
|
||||
*/
|
||||
protected boolean needsRefresh() {
|
||||
if (refreshInterval.millis() == 0) {
|
||||
return true;
|
||||
}
|
||||
final long currentTime = System.currentTimeMillis();
|
||||
return (currentTime - lastRefreshTimestamp) > refreshInterval.millis();
|
||||
}
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.apache.lucene.codecs.CodecUtil;
|
|||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.apache.lucene.util.*;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -37,11 +38,12 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.lucene.Directories;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||
import org.elasticsearch.common.util.concurrent.RefCounted;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
|
@ -87,6 +89,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
private static final int VERSION_START = 0;
|
||||
private static final int VERSION = VERSION_STACK_TRACE;
|
||||
private static final String CORRUPTED = "corrupted_";
|
||||
public static final String INDEX_STORE_STATS_REFRESH_INTERVAL = "index.store.stats_refresh_interval";
|
||||
|
||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
private final DirectoryService directoryService;
|
||||
|
@ -94,6 +97,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
|
||||
private final ShardLock shardLock;
|
||||
private final OnClose onClose;
|
||||
private final SingleObjectCache<StoreStats> statsCache;
|
||||
|
||||
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
|
||||
@Override
|
||||
|
@ -114,6 +118,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
|
||||
this.shardLock = shardLock;
|
||||
this.onClose = onClose;
|
||||
final TimeValue refreshInterval = indexSettings.getAsTime(INDEX_STORE_STATS_REFRESH_INTERVAL, TimeValue.timeValueSeconds(10));
|
||||
|
||||
this.statsCache = new StoreStatsCache(refreshInterval, directory, directoryService);
|
||||
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
|
||||
|
||||
assert onClose != null;
|
||||
assert shardLock != null;
|
||||
assert shardLock.getShardId().equals(shardId);
|
||||
|
@ -283,7 +292,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
public StoreStats stats() throws IOException {
|
||||
ensureOpen();
|
||||
return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
|
||||
return statsCache.getOrRefresh();
|
||||
}
|
||||
|
||||
public void renameFile(String from, String to) throws IOException {
|
||||
|
@ -621,6 +630,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
this.deletesLogger = deletesLogger;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
assert false : "Nobody should close this directory except of the Store itself";
|
||||
|
@ -644,6 +654,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
public String toString() {
|
||||
return "store(" + in.toString() + ")";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Log that we are about to delete this file, to the index.store.deletes component. */
|
||||
|
@ -1329,4 +1340,37 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
|
||||
private final Directory directory;
|
||||
private final DirectoryService directoryService;
|
||||
|
||||
public StoreStatsCache(TimeValue refreshInterval, Directory directory, DirectoryService directoryService) throws IOException {
|
||||
super(refreshInterval, new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos()));
|
||||
this.directory = directory;
|
||||
this.directoryService = directoryService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StoreStats refresh() {
|
||||
try {
|
||||
return new StoreStats(estimateSize(directory), directoryService.throttleTimeInNanos());
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to refresh store stats");
|
||||
}
|
||||
}
|
||||
|
||||
private static long estimateSize(Directory directory) throws IOException {
|
||||
long estimatedSize = 0;
|
||||
String[] files = directory.listAll();
|
||||
for (String file : files) {
|
||||
try {
|
||||
estimatedSize += directory.fileLength(file);
|
||||
} catch (NoSuchFileException | FileNotFoundException e) {
|
||||
// ignore, the file is not there no more
|
||||
}
|
||||
}
|
||||
return estimatedSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,19 +22,18 @@ package org.elasticsearch.index.store;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
|
||||
|
||||
/**
|
||||
* abstract class for verifying what was written.
|
||||
* subclasses override {@link #writeByte(byte)} and {@link #writeBytes(byte[], int, int)}
|
||||
*/
|
||||
// do NOT optimize this class for performance
|
||||
public abstract class VerifyingIndexOutput extends IndexOutput {
|
||||
protected final IndexOutput out;
|
||||
|
||||
public abstract class VerifyingIndexOutput extends FilterIndexOutput {
|
||||
|
||||
/** Sole constructor */
|
||||
VerifyingIndexOutput(IndexOutput out) {
|
||||
super("VerifyingIndexOutput(out=" + out + ")");
|
||||
this.out = out;
|
||||
super("VerifyingIndexOutput(out=" + out.toString() + ")", out);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -42,21 +41,4 @@ public abstract class VerifyingIndexOutput extends IndexOutput {
|
|||
* called after all data has been written to this output.
|
||||
*/
|
||||
public abstract void verify() throws IOException;
|
||||
|
||||
// default implementations... forwarding to delegate
|
||||
|
||||
@Override
|
||||
public final void close() throws IOException {
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long getChecksum() throws IOException {
|
||||
return out.getChecksum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long getFilePointer() {
|
||||
return out.getFilePointer();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
import org.elasticsearch.monitor.os.OsStats;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -32,31 +34,35 @@ public class FsService extends AbstractComponent {
|
|||
|
||||
private final FsProbe probe;
|
||||
|
||||
private final TimeValue refreshInterval;
|
||||
|
||||
private FsStats cachedStats;
|
||||
private final SingleObjectCache<FsStats> fsStatsCache;
|
||||
|
||||
@Inject
|
||||
public FsService(Settings settings, FsProbe probe) throws IOException {
|
||||
super(settings);
|
||||
this.probe = probe;
|
||||
this.cachedStats = probe.stats();
|
||||
|
||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
|
||||
TimeValue refreshInterval = settings.getAsTime("monitor.fs.refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
fsStatsCache = new FsStatsCache(refreshInterval, probe.stats());
|
||||
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
|
||||
}
|
||||
|
||||
public synchronized FsStats stats() {
|
||||
try {
|
||||
if ((System.currentTimeMillis() - cachedStats.getTimestamp()) > refreshInterval.millis()) {
|
||||
cachedStats = probe.stats();
|
||||
}
|
||||
return cachedStats;
|
||||
} catch (IOException ex) {
|
||||
logger.warn("can't fetch fs stats", ex);
|
||||
public FsStats stats() {
|
||||
return fsStatsCache.getOrRefresh();
|
||||
}
|
||||
|
||||
private class FsStatsCache extends SingleObjectCache<FsStats> {
|
||||
public FsStatsCache(TimeValue interval, FsStats initValue) {
|
||||
super(interval, initValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FsStats refresh() {
|
||||
try {
|
||||
return probe.stats();
|
||||
} catch (IOException ex) {
|
||||
logger.warn("Failed to fetch fs stats - returning empty instance");
|
||||
return new FsStats();
|
||||
}
|
||||
}
|
||||
return cachedStats;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
|
@ -31,29 +32,26 @@ import java.util.Enumeration;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class NetworkService extends AbstractComponent {
|
||||
public final class NetworkService extends AbstractComponent {
|
||||
|
||||
private final NetworkProbe probe;
|
||||
|
||||
private final NetworkInfo info;
|
||||
|
||||
private final TimeValue refreshInterval;
|
||||
|
||||
private NetworkStats cachedStats;
|
||||
private final SingleObjectCache<NetworkStats> networkStatsCache;
|
||||
|
||||
@Inject
|
||||
public NetworkService(Settings settings, NetworkProbe probe) {
|
||||
super(settings);
|
||||
this.probe = probe;
|
||||
|
||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(5));
|
||||
TimeValue refreshInterval = settings.getAsTime("monitor.network.refresh_interval", TimeValue.timeValueSeconds(5));
|
||||
|
||||
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
|
||||
|
||||
this.info = probe.networkInfo();
|
||||
this.info.refreshInterval = refreshInterval.millis();
|
||||
this.cachedStats = probe.networkStats();
|
||||
|
||||
networkStatsCache = new NetworkStatsCache(refreshInterval, probe.networkStats());
|
||||
if (logger.isDebugEnabled()) {
|
||||
StringBuilder netDebug = new StringBuilder("net_info");
|
||||
try {
|
||||
|
@ -104,17 +102,26 @@ public class NetworkService extends AbstractComponent {
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("ifconfig\n\n" + ifconfig());
|
||||
}
|
||||
stats(); // pull the stats one time
|
||||
}
|
||||
|
||||
public NetworkInfo info() {
|
||||
return this.info;
|
||||
}
|
||||
|
||||
public synchronized NetworkStats stats() {
|
||||
if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
|
||||
cachedStats = probe.networkStats();
|
||||
public NetworkStats stats() {
|
||||
return networkStatsCache.getOrRefresh();
|
||||
}
|
||||
|
||||
private class NetworkStatsCache extends SingleObjectCache<NetworkStats> {
|
||||
public NetworkStatsCache(TimeValue interval, NetworkStats initValue) {
|
||||
super(interval, initValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected NetworkStats refresh() {
|
||||
return probe.networkStats();
|
||||
}
|
||||
return cachedStats;
|
||||
}
|
||||
|
||||
public String ifconfig() {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -33,22 +34,19 @@ public class OsService extends AbstractComponent {
|
|||
|
||||
private final OsInfo info;
|
||||
|
||||
private final TimeValue refreshInterval;
|
||||
|
||||
private OsStats cachedStats;
|
||||
private SingleObjectCache<OsStats> osStatsCache;
|
||||
|
||||
@Inject
|
||||
public OsService(Settings settings, OsProbe probe) {
|
||||
super(settings);
|
||||
this.probe = probe;
|
||||
|
||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
TimeValue refreshInterval = settings.getAsTime("monitor.os.refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
|
||||
this.info = probe.osInfo();
|
||||
this.info.refreshInterval = refreshInterval.millis();
|
||||
this.info.availableProcessors = Runtime.getRuntime().availableProcessors();
|
||||
this.cachedStats = probe.osStats();
|
||||
|
||||
osStatsCache = new OsStatsCache(refreshInterval, probe.osStats());
|
||||
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
|
||||
}
|
||||
|
||||
|
@ -57,9 +55,17 @@ public class OsService extends AbstractComponent {
|
|||
}
|
||||
|
||||
public synchronized OsStats stats() {
|
||||
if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
|
||||
cachedStats = probe.osStats();
|
||||
return osStatsCache.getOrRefresh();
|
||||
}
|
||||
|
||||
private class OsStatsCache extends SingleObjectCache<OsStats> {
|
||||
public OsStatsCache(TimeValue interval, OsStats initValue) {
|
||||
super(interval, initValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected OsStats refresh() {
|
||||
return probe.osStats();
|
||||
}
|
||||
return cachedStats;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,31 +23,26 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ProcessService extends AbstractComponent {
|
||||
public final class ProcessService extends AbstractComponent {
|
||||
|
||||
private final ProcessProbe probe;
|
||||
|
||||
private final ProcessInfo info;
|
||||
|
||||
private final TimeValue refreshInterval;
|
||||
|
||||
private ProcessStats cachedStats;
|
||||
private final SingleObjectCache<ProcessStats> processStatsCache;
|
||||
|
||||
@Inject
|
||||
public ProcessService(Settings settings, ProcessProbe probe) {
|
||||
super(settings);
|
||||
this.probe = probe;
|
||||
|
||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
|
||||
final TimeValue refreshInterval = settings.getAsTime("monitor.process.refresh_interval", TimeValue.timeValueSeconds(1));
|
||||
processStatsCache = new ProcessStatsCache(refreshInterval, probe.processStats());
|
||||
this.info = probe.processInfo();
|
||||
this.info.refreshInterval = refreshInterval.millis();
|
||||
this.cachedStats = probe.processStats();
|
||||
|
||||
logger.debug("Using probe [{}] with refresh_interval [{}]", probe, refreshInterval);
|
||||
}
|
||||
|
||||
|
@ -55,10 +50,18 @@ public class ProcessService extends AbstractComponent {
|
|||
return this.info;
|
||||
}
|
||||
|
||||
public synchronized ProcessStats stats() {
|
||||
if ((System.currentTimeMillis() - cachedStats.timestamp()) > refreshInterval.millis()) {
|
||||
cachedStats = probe.processStats();
|
||||
public ProcessStats stats() {
|
||||
return processStatsCache.getOrRefresh();
|
||||
}
|
||||
|
||||
private class ProcessStatsCache extends SingleObjectCache<ProcessStats> {
|
||||
public ProcessStatsCache(TimeValue interval, ProcessStats initValue) {
|
||||
super(interval, initValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ProcessStats refresh() {
|
||||
return probe.processStats();
|
||||
}
|
||||
return cachedStats;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
|||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.monitor.sigar.SigarService;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
|
@ -33,7 +34,9 @@ import org.junit.Test;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.*;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0)
|
||||
|
@ -132,6 +135,7 @@ public class ClusterStatsTests extends ElasticsearchIntegrationTest {
|
|||
internalCluster().ensureAtMostNumDataNodes(5);
|
||||
internalCluster().ensureAtLeastNumDataNodes(1);
|
||||
SigarService sigarService = internalCluster().getInstance(SigarService.class);
|
||||
assertAcked(prepareCreate("test1").setSettings(settingsBuilder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0).build()));
|
||||
index("test1", "type", "1", "f", "f");
|
||||
/*
|
||||
* Ensure at least one shard is allocated otherwise the FS stats might
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.plugins.AbstractPlugin;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
|
@ -52,6 +53,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -151,7 +154,7 @@ public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest {
|
|||
internalCluster().startNodesAsync(2,
|
||||
ImmutableSettings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "200ms").build())
|
||||
.get();
|
||||
createIndex("test");
|
||||
assertAcked(prepareCreate("test").setSettings(settingsBuilder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0).build()));
|
||||
ensureGreen("test");
|
||||
InternalTestCluster internalTestCluster = internalCluster();
|
||||
// Get the cluster info service on the master node
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class SingleObjectCacheTests extends ElasticsearchTestCase {
|
||||
|
||||
public void testRefresh() {
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
final AtomicBoolean needsRefresh = new AtomicBoolean(true);
|
||||
SingleObjectCache<Integer> cache = new SingleObjectCache<Integer>(TimeValue.timeValueMillis(100000), 0) {
|
||||
|
||||
@Override
|
||||
protected Integer refresh() {
|
||||
return count.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsRefresh() {
|
||||
return needsRefresh.get();
|
||||
}
|
||||
};
|
||||
assertEquals(1, cache.getOrRefresh().intValue());
|
||||
assertEquals(2, cache.getOrRefresh().intValue());
|
||||
needsRefresh.set(false);
|
||||
assertEquals(2, cache.getOrRefresh().intValue());
|
||||
needsRefresh.set(true);
|
||||
assertEquals(3, cache.getOrRefresh().intValue());
|
||||
}
|
||||
|
||||
public void testRefreshDoesntBlock() throws InterruptedException {
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
final AtomicBoolean needsRefresh = new AtomicBoolean(true);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CountDownLatch waiting = new CountDownLatch(1);
|
||||
final SingleObjectCache<Integer> cache = new SingleObjectCache<Integer>(TimeValue.timeValueMillis(1000), 0) {
|
||||
|
||||
@Override
|
||||
protected Integer refresh() {
|
||||
if (count.get() == 1) {
|
||||
try {
|
||||
waiting.countDown();
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
assert false;
|
||||
}
|
||||
}
|
||||
return count.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsRefresh() {
|
||||
return needsRefresh.get();
|
||||
}
|
||||
};
|
||||
assertEquals(1, cache.getOrRefresh().intValue());
|
||||
needsRefresh.set(true);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
Integer value = cache.getOrRefresh();
|
||||
assertEquals(2, value.intValue());
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
waiting.await();
|
||||
assertEquals(1, cache.getOrRefresh().intValue());
|
||||
needsRefresh.set(false);
|
||||
latch.countDown();
|
||||
t.join();
|
||||
assertEquals(2, cache.getOrRefresh().intValue());
|
||||
|
||||
}
|
||||
}
|
|
@ -32,6 +32,8 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.apache.lucene.util.TestUtil;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -1050,4 +1052,35 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
|||
|
||||
assertEquals(count.get(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreStats() throws IOException {
|
||||
final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
|
||||
Settings settings = ImmutableSettings.builder().put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, TimeValue.timeValueMinutes(0)).build();
|
||||
Store store = new Store(shardId, settings, directoryService, randomDistributor(directoryService), new DummyShardLock(shardId));
|
||||
|
||||
StoreStats stats = store.stats();
|
||||
assertEquals(stats.getSize().bytes(), 0);
|
||||
|
||||
Directory dir = store.directory();
|
||||
long length = 0;
|
||||
try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) {
|
||||
int iters = scaledRandomIntBetween(10, 100);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
BytesRef bytesRef = new BytesRef(TestUtil.randomRealisticUnicodeString(random(), 10, 1024));
|
||||
output.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length);
|
||||
}
|
||||
stats = store.stats();
|
||||
assertEquals(stats.getSize().bytes(), 0);
|
||||
length = output.getFilePointer();
|
||||
}
|
||||
|
||||
assertTrue(store.directory().listAll().length > 0);
|
||||
stats = store.stats();
|
||||
assertEquals(stats.getSizeInBytes(), length);
|
||||
|
||||
store.deleteContent();
|
||||
IOUtils.close(store);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState.Stage;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState.Type;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
|
@ -401,7 +402,7 @@ public class IndexRecoveryTests extends ElasticsearchIntegrationTest {
|
|||
|
||||
logger.info("--> creating test index: {}", name);
|
||||
assertAcked(prepareCreate(name, nodeCount, settingsBuilder().put("number_of_shards", shardCount)
|
||||
.put("number_of_replicas", replicaCount)));
|
||||
.put("number_of_replicas", replicaCount).put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0)));
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing sample data");
|
||||
|
|
Loading…
Reference in New Issue