Flush API: Allow to provide `full` parameter for a complete clean, closes #210.

This commit is contained in:
kimchy 2010-06-05 06:10:02 +03:00
parent ef1866aed4
commit 5cdba0383b
13 changed files with 239 additions and 100 deletions

View File

@ -61,7 +61,7 @@ public class SimpleMemoryMonitorBenchmark {
Thread.sleep(5000);
StopWatch stopWatch = new StopWatch().start();
int COUNT = 200000;
int COUNT = 2000000;
System.out.println("Indexing [" + COUNT + "] ...");
for (int i = 0; i < COUNT; i++) {
client1.index(

View File

@ -42,6 +42,8 @@ public class FlushRequest extends BroadcastOperationRequest {
private boolean refresh = false;
private boolean full = false;
FlushRequest() {
}
@ -71,6 +73,21 @@ public class FlushRequest extends BroadcastOperationRequest {
return this;
}
/**
* Should a "full" flush be performed.
*/
public boolean full() {
return this.full;
}
/**
* Should a "full" flush be performed.
*/
public FlushRequest full(boolean full) {
this.full = full;
return this;
}
/**
* Should the listener be called on a separate thread if needed.
*/
@ -90,10 +107,12 @@ public class FlushRequest extends BroadcastOperationRequest {
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(refresh);
out.writeBoolean(full);
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
full = in.readBoolean();
}
}

View File

@ -32,25 +32,34 @@ class ShardFlushRequest extends BroadcastShardOperationRequest {
private boolean refresh;
private boolean full;
ShardFlushRequest() {
}
public ShardFlushRequest(String index, int shardId, FlushRequest request) {
super(index, shardId);
this.refresh = request.refresh();
this.full = request.full();
}
public boolean refresh() {
return this.refresh;
}
public boolean full() {
return this.full;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refresh = in.readBoolean();
full = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(refresh);
out.writeBoolean(full);
}
}

View File

@ -104,7 +104,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().refresh(request.refresh()));
indexShard.flush(new Engine.Flush().refresh(request.refresh()).full(request.full()));
return new ShardFlushResponse(request.index(), request.shardId());
}

View File

@ -50,6 +50,11 @@ public class FlushRequestBuilder {
return this;
}
public FlushRequestBuilder setFull(boolean full) {
request.full(full);
return this;
}
/**
* Executes the operation asynchronously and returns a future.
*/

View File

@ -137,6 +137,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Flush {
private boolean full = false;
private boolean refresh = false;
/**
@ -154,8 +155,23 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
/**
* Should a "full" flush be issued, basically cleaning as much memory as possible.
*/
public boolean full() {
return this.full;
}
/**
* Should a "full" flush be issued, basically cleaning as much memory as possible.
*/
public Flush full(boolean full) {
this.full = full;
return this;
}
@Override public String toString() {
return "refresh[" + refresh + "]";
return "full[" + full + "], refresh[" + refresh + "]";
}
}

View File

@ -29,4 +29,8 @@ public class FlushFailedEngineException extends EngineException {
public FlushFailedEngineException(ShardId shardId, Throwable t) {
super(shardId, "Flush failed", t);
}
public FlushFailedEngineException(ShardId shardId, String message, Throwable t) {
super(shardId, "Flush failed [" + message + "]", t);
}
}

View File

@ -139,32 +139,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (logger.isDebugEnabled()) {
logger.debug("Starting engine with ram_buffer_size[" + ramBufferSize + "], refresh_interval[" + refreshInterval + "]");
}
IndexWriter indexWriter = null;
try {
// release locks when started
if (IndexWriter.isLocked(store.directory())) {
logger.trace("Shard is locked, releasing lock");
store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME);
}
boolean create = !IndexReader.indexExists(store.directory());
indexWriter = new IndexWriter(store.directory(),
analysisService.defaultIndexAnalyzer(), create, deletionPolicy, IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler());
indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter));
indexWriter.setSimilarity(similarityService.defaultIndexSimilarity());
indexWriter.setRAMBufferSizeMB(ramBufferSize.mbFrac());
indexWriter.setTermIndexInterval(termIndexInterval);
this.indexWriter = createWriter();
} catch (IOException e) {
safeClose(indexWriter);
throw new EngineCreationFailureException(shardId, "Failed to create engine", e);
}
this.indexWriter = indexWriter;
try {
IndexReader indexReader = indexWriter.getReader();
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
this.nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
this.nrtResource = buildNrtResource(indexWriter);
} catch (IOException e) {
try {
indexWriter.rollback();
@ -265,6 +247,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
@Override public void refresh(Refresh refresh) throws EngineException {
// this engine always acts as if waitForOperations=true
if (refreshMutex.compareAndSet(false, true)) {
IndexWriter currentWriter = indexWriter;
try {
if (dirty) {
dirty = false;
@ -277,8 +260,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
current.markForClose();
}
}
} catch (IOException e) {
throw new RefreshFailedEngineException(shardId, e);
} catch (Exception e) {
if (currentWriter != indexWriter) {
// an index writer got replaced on us, ignore
} else {
throw new RefreshFailedEngineException(shardId, e);
}
} finally {
refreshMutex.set(false);
}
@ -295,11 +282,31 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
try {
indexWriter.commit();
translog.newTranslog();
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
refreshMutex.set(true);
try {
// that's ok if the index writer failed and is in inconsistent state
// we will get an exception on a dirty operation, and will cause the shard
// to be allocated to a different node
indexWriter.close();
indexWriter = createWriter();
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
nrtResource = buildNrtResource(indexWriter);
current.markForClose();
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
} finally {
refreshMutex.set(false);
}
} else {
try {
indexWriter.commit();
translog.newTranslog();
} catch (IOException e) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
rwl.writeLock().unlock();
@ -458,6 +465,36 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
}
private IndexWriter createWriter() throws IOException {
IndexWriter indexWriter = null;
try {
// release locks when started
if (IndexWriter.isLocked(store.directory())) {
logger.trace("Shard is locked, releasing lock");
store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME);
}
boolean create = !IndexReader.indexExists(store.directory());
indexWriter = new IndexWriter(store.directory(),
analysisService.defaultIndexAnalyzer(), create, deletionPolicy, IndexWriter.MaxFieldLength.UNLIMITED);
indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler());
indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter));
indexWriter.setSimilarity(similarityService.defaultIndexSimilarity());
indexWriter.setRAMBufferSizeMB(ramBufferSize.mbFrac());
indexWriter.setTermIndexInterval(termIndexInterval);
} catch (IOException e) {
safeClose(indexWriter);
throw e;
}
return indexWriter;
}
private AcquirableResource<ReaderSearcherHolder> buildNrtResource(IndexWriter indexWriter) throws IOException {
IndexReader indexReader = indexWriter.getReader();
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
return newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
}
private static class RobinSearchResult implements Searcher {
private final AcquirableResource<ReaderSearcherHolder> nrtHolder;

View File

@ -27,12 +27,12 @@ import org.elasticsearch.index.translog.TranslogException;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.concurrent.ThreadSafe;
import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -52,7 +52,7 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans
// we use LinkedBlockingQueue and not LinkedTransferQueue since we clear it on #newTranslog
// and with LinkedTransferQueue, nodes are not really cleared, just marked causing for memory
// not to be cleaned properly (besides, clear is heavy..., "while ... poll").
private final Queue<Operation> operations = new LinkedBlockingQueue<Operation>();
private volatile Queue<Operation> operations;
@Inject public MemoryTranslog(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
@ -74,14 +74,14 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans
@Override public void newTranslog() {
synchronized (mutex) {
estimatedMemorySize.set(0);
operations.clear();
operations = new LinkedTransferQueue<Operation>();
id = idGenerator.getAndIncrement();
}
}
@Override public void add(Operation operation) throws TranslogException {
operations.add(operation);
estimatedMemorySize.addAndGet(operation.estimateSize() + 20);
estimatedMemorySize.addAndGet(operation.estimateSize() + 50);
}
@Override public Snapshot snapshot() {

View File

@ -83,6 +83,14 @@ public class IndicesMemoryCleaner extends AbstractComponent {
}
}
public void forceCleanMemory(boolean full) {
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
indexShard.flush(new Engine.Flush().full(full));
}
}
}
/**
* Checks if memory needs to be cleaned and cleans it. Returns the amount of memory cleaned.
*/

View File

@ -23,7 +23,10 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.indices.IndicesMemoryCleaner;
import org.elasticsearch.monitor.memory.MemoryMonitor;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.*;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.ThreadLocals;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.inject.Inject;
import org.elasticsearch.util.settings.Settings;
@ -44,7 +47,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
private final TimeValue interval;
private final int clearCacheThreshold;
private final int fullThreshold;
private final int cleanThreshold;
@ -65,7 +68,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
private volatile ScheduledFuture scheduledFuture;
private AtomicLong totalCleans = new AtomicLong();
private AtomicLong totalClearCache = new AtomicLong();
private AtomicLong totalFull = new AtomicLong();
@Inject public AlphaMemoryMonitor(Settings settings, ThreadPool threadPool, IndicesMemoryCleaner indicesMemoryCleaner) {
super(settings);
@ -75,7 +78,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
this.upperMemoryThreshold = componentSettings.getAsDouble("upper_memory_threshold", 0.8);
this.lowerMemoryThreshold = componentSettings.getAsDouble("lower_memory_threshold", 0.5);
this.interval = componentSettings.getAsTime("interval", timeValueMillis(500));
this.clearCacheThreshold = componentSettings.getAsInt("clear_cache_threshold", 2);
this.fullThreshold = componentSettings.getAsInt("full_threshold", 2);
this.cleanThreshold = componentSettings.getAsInt("clean_threshold", 10);
this.minimumFlushableSizeToClean = componentSettings.getAsSize("minimum_flushable_size_to_clean", new SizeValue(5, SizeUnit.MB));
this.translogNumberOfOperationsThreshold = componentSettings.getAsInt("translog_number_of_operations_threshold", 5000);
@ -108,83 +111,104 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent<MemoryMonitor
private class MemoryCleaner implements Runnable {
private int clearCacheCounter;
private int fullCounter;
private boolean performedClean;
private int cleanCounter;
private StopWatch stopWatch = new StopWatch().keepTaskList(false);
@Override public void run() {
// clear unreferenced in the cache
indicesMemoryCleaner.cacheClearUnreferenced();
try {
// clear unreferenced in the cache
indicesMemoryCleaner.cacheClearUnreferenced();
// try and clean translog based on a threshold, since we don't want to get a very large transaction log
// which means recovery it will take a long time (since the target re-index all this data)
IndicesMemoryCleaner.TranslogCleanResult translogCleanResult = indicesMemoryCleaner.cleanTranslog(translogNumberOfOperationsThreshold);
if (translogCleanResult.cleanedShards() > 0) {
long totalClean = totalCleans.incrementAndGet();
logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult);
}
// try and clean translog based on a threshold, since we don't want to get a very large transaction log
// which means recovery it will take a long time (since the target re-index all this data)
IndicesMemoryCleaner.TranslogCleanResult translogCleanResult = indicesMemoryCleaner.cleanTranslog(translogNumberOfOperationsThreshold);
if (translogCleanResult.cleanedShards() > 0) {
long totalClean = totalCleans.incrementAndGet();
logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult);
}
// the logic is simple, if the used memory is above the upper threshold, we need to clean
// we clean down as much as we can to down to the lower threshold
// the logic is simple, if the used memory is above the upper threshold, we need to clean
// we clean down as much as we can to down to the lower threshold
// in order not to get trashing, we only perform a clean after another clean if a the clean counter
// has expired.
// in order not to get trashing, we only perform a clean after another clean if a the clean counter
// has expired.
// we also do the same for GC invocations
// we also do the same for GC invocations
long upperMemory = maxMemory.bytes();
long totalMemory = totalMemory();
long usedMemory = totalMemory - freeMemory();
long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold);
long upperMemory = maxMemory.bytes();
long totalMemory = totalMemory();
long usedMemory = totalMemory - freeMemory();
long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold);
if (usedMemory - upperThresholdMemory <= 0) {
clearCacheCounter = 0;
performedClean = false;
cleanCounter = 0;
return;
}
if (performedClean) {
if (++cleanCounter < cleanThreshold) {
if (usedMemory - upperThresholdMemory <= 0) {
fullCounter = 0;
performedClean = false;
cleanCounter = 0;
return;
}
if (performedClean) {
if (++cleanCounter < cleanThreshold) {
return;
}
}
long lowerThresholdMemory = (long) (upperMemory * lowerMemoryThreshold);
long memoryToClean = usedMemory - lowerThresholdMemory;
if (fullCounter++ >= fullThreshold) {
long total = totalFull.incrementAndGet();
if (logger.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(total).append("] ");
sb.append("[Full ] Ran after [").append(fullThreshold).append("] consecutive clean swipes");
sb.append(", memory_to_clean [").append(new SizeValue(memoryToClean)).append(']');
sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']');
sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']');
sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']');
sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']');
sb.append(", max_memory[").append(maxMemory).append(']');
logger.info(sb.toString());
}
indicesMemoryCleaner.cacheClear();
indicesMemoryCleaner.forceCleanMemory(true);
ThreadLocals.clearReferencesThreadLocals();
fullCounter = 0;
} else {
long totalClean = totalCleans.incrementAndGet();
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(totalClean).append("] ");
sb.append("[Cleaning] memory_to_clean [").append(new SizeValue(memoryToClean)).append(']');
sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']');
sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']');
sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']');
sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']');
sb.append(", max_memory[").append(maxMemory).append(']');
logger.debug(sb.toString());
}
IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean);
boolean forceClean = false;
if (memoryCleanResult.cleaned().bytes() < memoryToClean) {
forceClean = true;
indicesMemoryCleaner.forceCleanMemory(false);
}
if (logger.isDebugEnabled()) {
logger.debug("[" + totalClean + "] [Cleaned ] force_clean [" + forceClean + "], " + memoryCleanResult);
}
}
performedClean = true;
cleanCounter = 0;
} catch (Exception e) {
logger.info("Failed to run memory monitor", e);
}
long totalClean = totalCleans.incrementAndGet();
long lowerThresholdMemory = (long) (upperMemory * lowerMemoryThreshold);
long memoryToClean = usedMemory - lowerThresholdMemory;
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append('[').append(totalClean).append("] ");
sb.append("[Cleaning] memory_to_clean [").append(new SizeValue(memoryToClean)).append(']');
sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']');
sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']');
sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']');
sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']');
sb.append(", max_memory[").append(maxMemory).append(']');
logger.debug(sb.toString());
}
IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean);
if (logger.isDebugEnabled()) {
logger.debug("[" + totalClean + "] [Cleaned ] " + memoryCleanResult);
}
if (++clearCacheCounter >= clearCacheThreshold) {
long totalClear = totalClearCache.incrementAndGet();
logger.debug("[" + totalClear + "] [Cache ] cleared after [" + (cleanCounter / cleanThreshold) + "] memory clean swipes");
indicesMemoryCleaner.cacheClear();
ThreadLocals.clearReferencesThreadLocals();
clearCacheCounter = 0;
}
performedClean = true;
cleanCounter = 0;
}
}
}

View File

@ -62,6 +62,7 @@ public class RestFlushAction extends BaseRestHandler {
}
flushRequest.operationThreading(operationThreading);
flushRequest.refresh(request.paramAsBoolean("refresh", flushRequest.refresh()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
client.admin().indices().flush(flushRequest, new ActionListener<FlushResponse>() {
@Override public void onResponse(FlushResponse response) {
try {

View File

@ -159,8 +159,24 @@ public class SimpleLuceneTests {
}
}
}
}
@Test public void testNRTSearchOnClosedWriter() throws Exception {
Directory dir = new RAMDirectory();
IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED);
IndexReader reader = indexWriter.getReader();
for (int i = 0; i < 100; i++) {
indexWriter.addDocument(doc()
.add(field("id", Integer.toString(i)))
.boost(i).build());
}
reader = refreshReader(reader);
indexWriter.close();
TermDocs termDocs = reader.termDocs();
termDocs.next();
}
/**