Move caching of the size of a directory to `StoreDirectory`. (#30581)
In spite of the existing caching, I have seen a number of nodes hot threads where one thread had been spending all its cpu on computing the size of a directory. I am proposing to move the computation of the size of the directory to `StoreDirectory` in order to skip recomputing the size of the directory if no changes have been made. This should help with users that have read-only indices, which is very common for time-based indices.
This commit is contained in:
parent
984523dda9
commit
03dcf22e06
|
@ -64,6 +64,11 @@ public abstract class SingleObjectCache<T>{
|
|||
return cached;
|
||||
}
|
||||
|
||||
/** Return the potentially stale cached entry. */
|
||||
protected final T getNoRefresh() {
|
||||
return cached;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new instance to cache
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
|
||||
final class ByteSizeCachingDirectory extends FilterDirectory {
|
||||
|
||||
private static class SizeAndModCount {
|
||||
final long size;
|
||||
final long modCount;
|
||||
final boolean pendingWrite;
|
||||
|
||||
SizeAndModCount(long length, long modCount, boolean pendingWrite) {
|
||||
this.size = length;
|
||||
this.modCount = modCount;
|
||||
this.pendingWrite = pendingWrite;
|
||||
}
|
||||
}
|
||||
|
||||
private static long estimateSizeInBytes(Directory directory) throws IOException {
|
||||
long estimatedSize = 0;
|
||||
String[] files = directory.listAll();
|
||||
for (String file : files) {
|
||||
try {
|
||||
estimatedSize += directory.fileLength(file);
|
||||
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
|
||||
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
|
||||
// calling Files.size, you can also sometimes hit AccessDeniedException
|
||||
}
|
||||
}
|
||||
return estimatedSize;
|
||||
}
|
||||
|
||||
private final SingleObjectCache<SizeAndModCount> size;
|
||||
// Both these variables need to be accessed under `this` lock.
|
||||
private long modCount = 0;
|
||||
private long numOpenOutputs = 0;
|
||||
|
||||
ByteSizeCachingDirectory(Directory in, TimeValue refreshInterval) {
|
||||
super(in);
|
||||
size = new SingleObjectCache<SizeAndModCount>(refreshInterval, new SizeAndModCount(0L, -1L, true)) {
|
||||
@Override
|
||||
protected SizeAndModCount refresh() {
|
||||
// It is ok for the size of the directory to be more recent than
|
||||
// the mod count, we would just recompute the size of the
|
||||
// directory on the next call as well. However the opposite
|
||||
// would be bad as we would potentially have a stale cache
|
||||
// entry for a long time. So we fetch the values of modCount and
|
||||
// numOpenOutputs BEFORE computing the size of the directory.
|
||||
final long modCount;
|
||||
final boolean pendingWrite;
|
||||
synchronized(ByteSizeCachingDirectory.this) {
|
||||
modCount = ByteSizeCachingDirectory.this.modCount;
|
||||
pendingWrite = ByteSizeCachingDirectory.this.numOpenOutputs != 0;
|
||||
}
|
||||
final long size;
|
||||
try {
|
||||
// Compute this OUTSIDE of the lock
|
||||
size = estimateSizeInBytes(getDelegate());
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
return new SizeAndModCount(size, modCount, pendingWrite);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean needsRefresh() {
|
||||
if (super.needsRefresh() == false) {
|
||||
// The size was computed recently, don't recompute
|
||||
return false;
|
||||
}
|
||||
SizeAndModCount cached = getNoRefresh();
|
||||
if (cached.pendingWrite) {
|
||||
// The cached entry was generated while there were pending
|
||||
// writes, so the size might be stale: recompute.
|
||||
return true;
|
||||
}
|
||||
synchronized(ByteSizeCachingDirectory.this) {
|
||||
// If there are pending writes or if new files have been
|
||||
// written/deleted since last time: recompute
|
||||
return numOpenOutputs != 0 || cached.modCount != modCount;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Return the cumulative size of all files in this directory. */
|
||||
long estimateSizeInBytes() throws IOException {
|
||||
try {
|
||||
return size.getOrRefresh().size;
|
||||
} catch (UncheckedIOException e) {
|
||||
// we wrapped in the cache and unwrap here
|
||||
throw e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
return wrapIndexOutput(super.createOutput(name, context));
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
|
||||
return wrapIndexOutput(super.createTempOutput(prefix, suffix, context));
|
||||
}
|
||||
|
||||
private IndexOutput wrapIndexOutput(IndexOutput out) {
|
||||
synchronized (this) {
|
||||
numOpenOutputs++;
|
||||
}
|
||||
return new FilterIndexOutput(out.toString(), out) {
|
||||
@Override
|
||||
public void writeBytes(byte[] b, int length) throws IOException {
|
||||
// Don't write to atomicXXX here since it might be called in
|
||||
// tight loops and memory barriers are costly
|
||||
super.writeBytes(b, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(byte b) throws IOException {
|
||||
// Don't write to atomicXXX here since it might be called in
|
||||
// tight loops and memory barriers are costly
|
||||
super.writeByte(b);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// Close might cause some data to be flushed from in-memory buffers, so
|
||||
// increment the modification counter too.
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
numOpenOutputs--;
|
||||
modCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFile(String name) throws IOException {
|
||||
try {
|
||||
super.deleteFile(name);
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
modCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -50,7 +50,6 @@ import org.apache.lucene.util.ArrayUtil;
|
|||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -67,7 +66,6 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.SingleObjectCache;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||
import org.elasticsearch.common.util.concurrent.RefCounted;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
|
@ -91,7 +89,6 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
|
@ -146,7 +143,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
|
||||
private final ShardLock shardLock;
|
||||
private final OnClose onClose;
|
||||
private final SingleObjectCache<StoreStats> statsCache;
|
||||
|
||||
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
|
||||
@Override
|
||||
|
@ -164,12 +160,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
OnClose onClose) throws IOException {
|
||||
super(shardId, indexSettings);
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId));
|
||||
Directory dir = directoryService.newDirectory();
|
||||
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
|
||||
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
|
||||
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
|
||||
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", settings, shardId));
|
||||
this.shardLock = shardLock;
|
||||
this.onClose = onClose;
|
||||
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
|
||||
this.statsCache = new StoreStatsCache(refreshInterval, directory);
|
||||
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
|
||||
|
||||
assert onClose != null;
|
||||
assert shardLock != null;
|
||||
|
@ -377,7 +374,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
public StoreStats stats() throws IOException {
|
||||
ensureOpen();
|
||||
return statsCache.getOrRefresh();
|
||||
return new StoreStats(directory.estimateSize());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -731,11 +728,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
|
||||
private final Logger deletesLogger;
|
||||
|
||||
StoreDirectory(Directory delegateDirectory, Logger deletesLogger) {
|
||||
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
|
||||
super(delegateDirectory);
|
||||
this.deletesLogger = deletesLogger;
|
||||
}
|
||||
|
||||
/** Estimate the cumulative size of all files in this directory in bytes. */
|
||||
long estimateSize() throws IOException {
|
||||
return ((ByteSizeCachingDirectory) getDelegate()).estimateSizeInBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
assert false : "Nobody should close this directory except of the Store itself";
|
||||
|
@ -1428,38 +1430,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
};
|
||||
}
|
||||
|
||||
private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
|
||||
private final Directory directory;
|
||||
|
||||
StoreStatsCache(TimeValue refreshInterval, Directory directory) throws IOException {
|
||||
super(refreshInterval, new StoreStats(estimateSize(directory)));
|
||||
this.directory = directory;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StoreStats refresh() {
|
||||
try {
|
||||
return new StoreStats(estimateSize(directory));
|
||||
} catch (IOException ex) {
|
||||
throw new ElasticsearchException("failed to refresh store stats", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static long estimateSize(Directory directory) throws IOException {
|
||||
long estimatedSize = 0;
|
||||
String[] files = directory.listAll();
|
||||
for (String file : files) {
|
||||
try {
|
||||
estimatedSize += directory.fileLength(file);
|
||||
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
|
||||
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
|
||||
// calling Files.size, you can also sometimes hit AccessDeniedException
|
||||
}
|
||||
}
|
||||
return estimatedSize;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FilterDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ByteSizeCachingDirectoryTests extends ESTestCase {
|
||||
|
||||
private static class LengthCountingDirectory extends FilterDirectory {
|
||||
|
||||
int numFileLengthCalls;
|
||||
|
||||
LengthCountingDirectory(Directory in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long fileLength(String name) throws IOException {
|
||||
numFileLengthCalls++;
|
||||
return super.fileLength(name);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBasics() throws IOException {
|
||||
try (Directory dir = newDirectory()) {
|
||||
try (IndexOutput out = dir.createOutput("quux", IOContext.DEFAULT)) {
|
||||
out.writeBytes(new byte[11], 11);
|
||||
}
|
||||
LengthCountingDirectory countingDir = new LengthCountingDirectory(dir);
|
||||
|
||||
ByteSizeCachingDirectory cachingDir = new ByteSizeCachingDirectory(countingDir, new TimeValue(0));
|
||||
assertEquals(11, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(11, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(1, countingDir.numFileLengthCalls);
|
||||
|
||||
try (IndexOutput out = cachingDir.createOutput("foo", IOContext.DEFAULT)) {
|
||||
out.writeBytes(new byte[5], 5);
|
||||
|
||||
cachingDir.estimateSizeInBytes();
|
||||
// +2 because there are 3 files
|
||||
assertEquals(3, countingDir.numFileLengthCalls);
|
||||
// An index output is open so no caching
|
||||
cachingDir.estimateSizeInBytes();
|
||||
assertEquals(5, countingDir.numFileLengthCalls);
|
||||
}
|
||||
|
||||
assertEquals(16, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(7, countingDir.numFileLengthCalls);
|
||||
assertEquals(16, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(7, countingDir.numFileLengthCalls);
|
||||
|
||||
try (IndexOutput out = cachingDir.createTempOutput("bar", "baz", IOContext.DEFAULT)) {
|
||||
out.writeBytes(new byte[4], 4);
|
||||
|
||||
cachingDir.estimateSizeInBytes();
|
||||
assertEquals(10, countingDir.numFileLengthCalls);
|
||||
// An index output is open so no caching
|
||||
cachingDir.estimateSizeInBytes();
|
||||
assertEquals(13, countingDir.numFileLengthCalls);
|
||||
}
|
||||
|
||||
assertEquals(20, cachingDir.estimateSizeInBytes());
|
||||
// +3 because there are 3 files
|
||||
assertEquals(16, countingDir.numFileLengthCalls);
|
||||
assertEquals(20, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(16, countingDir.numFileLengthCalls);
|
||||
|
||||
cachingDir.deleteFile("foo");
|
||||
|
||||
assertEquals(15, cachingDir.estimateSizeInBytes());
|
||||
// +2 because there are 2 files now
|
||||
assertEquals(18, countingDir.numFileLengthCalls);
|
||||
assertEquals(15, cachingDir.estimateSizeInBytes());
|
||||
assertEquals(18, countingDir.numFileLengthCalls);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue