diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/monitor/memory/SimpleMemoryMonitorBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/monitor/memory/SimpleMemoryMonitorBenchmark.java index 0c14ea8a3ad..606d245eef1 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/monitor/memory/SimpleMemoryMonitorBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/monitor/memory/SimpleMemoryMonitorBenchmark.java @@ -61,7 +61,7 @@ public class SimpleMemoryMonitorBenchmark { Thread.sleep(5000); StopWatch stopWatch = new StopWatch().start(); - int COUNT = 200000; + int COUNT = 2000000; System.out.println("Indexing [" + COUNT + "] ..."); for (int i = 0; i < COUNT; i++) { client1.index( diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java index dda1ce1e082..f834d80b455 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushRequest.java @@ -42,6 +42,8 @@ public class FlushRequest extends BroadcastOperationRequest { private boolean refresh = false; + private boolean full = false; + FlushRequest() { } @@ -71,6 +73,21 @@ public class FlushRequest extends BroadcastOperationRequest { return this; } + /** + * Should a "full" flush be performed. + */ + public boolean full() { + return this.full; + } + + /** + * Should a "full" flush be performed. + */ + public FlushRequest full(boolean full) { + this.full = full; + return this; + } + /** * Should the listener be called on a separate thread if needed. */ @@ -90,10 +107,12 @@ public class FlushRequest extends BroadcastOperationRequest { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(refresh); + out.writeBoolean(full); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refresh = in.readBoolean(); + full = in.readBoolean(); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index aba835e8bfb..b7efa5dc35c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -32,25 +32,34 @@ class ShardFlushRequest extends BroadcastShardOperationRequest { private boolean refresh; + private boolean full; + ShardFlushRequest() { } public ShardFlushRequest(String index, int shardId, FlushRequest request) { super(index, shardId); this.refresh = request.refresh(); + this.full = request.full(); } public boolean refresh() { return this.refresh; } + public boolean full() { + return this.full; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refresh = in.readBoolean(); + full = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(refresh); + out.writeBoolean(full); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 1416acf3665..2624efe39df 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -104,7 +104,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } - try { - indexWriter.commit(); - translog.newTranslog(); - } catch (IOException e) { - throw new FlushFailedEngineException(shardId, e); + if (flush.full()) { + // disable refreshing, not dirty + dirty = false; + refreshMutex.set(true); + try { + // that's ok if the index writer failed and is in inconsistent state + // we will get an exception on a dirty operation, and will cause the shard + // to be allocated to a different node + indexWriter.close(); + indexWriter = createWriter(); + AcquirableResource current = nrtResource; + nrtResource = buildNrtResource(indexWriter); + current.markForClose(); + } catch (IOException e) { + throw new FlushFailedEngineException(shardId, e); + } finally { + refreshMutex.set(false); + } + } else { + try { + indexWriter.commit(); + translog.newTranslog(); + } catch (IOException e) { + throw new FlushFailedEngineException(shardId, e); + } } } finally { rwl.writeLock().unlock(); @@ -458,6 +465,36 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } } + private IndexWriter createWriter() throws IOException { + IndexWriter indexWriter = null; + try { + // release locks when started + if (IndexWriter.isLocked(store.directory())) { + logger.trace("Shard is locked, releasing lock"); + store.directory().clearLock(IndexWriter.WRITE_LOCK_NAME); + } + boolean create = !IndexReader.indexExists(store.directory()); + indexWriter = new IndexWriter(store.directory(), + analysisService.defaultIndexAnalyzer(), create, deletionPolicy, IndexWriter.MaxFieldLength.UNLIMITED); + indexWriter.setMergeScheduler(mergeScheduler.newMergeScheduler()); + indexWriter.setMergePolicy(mergePolicyProvider.newMergePolicy(indexWriter)); + indexWriter.setSimilarity(similarityService.defaultIndexSimilarity()); + indexWriter.setRAMBufferSizeMB(ramBufferSize.mbFrac()); + indexWriter.setTermIndexInterval(termIndexInterval); + } catch (IOException e) { + safeClose(indexWriter); + throw e; + } + return indexWriter; + } + + private AcquirableResource buildNrtResource(IndexWriter indexWriter) throws IOException { + IndexReader indexReader = indexWriter.getReader(); + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity()); + return newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); + } + private static class RobinSearchResult implements Searcher { private final AcquirableResource nrtHolder; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java index fec167459ed..c44e341cef4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/memory/MemoryTranslog.java @@ -27,12 +27,12 @@ import org.elasticsearch.index.translog.TranslogException; import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.concurrent.ThreadSafe; +import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.settings.Settings; import java.util.ArrayList; import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; /** @@ -52,7 +52,7 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans // we use LinkedBlockingQueue and not LinkedTransferQueue since we clear it on #newTranslog // and with LinkedTransferQueue, nodes are not really cleared, just marked causing for memory // not to be cleaned properly (besides, clear is heavy..., "while ... poll"). - private final Queue operations = new LinkedBlockingQueue(); + private volatile Queue operations; @Inject public MemoryTranslog(ShardId shardId, @IndexSettings Settings indexSettings) { super(shardId, indexSettings); @@ -74,14 +74,14 @@ public class MemoryTranslog extends AbstractIndexShardComponent implements Trans @Override public void newTranslog() { synchronized (mutex) { estimatedMemorySize.set(0); - operations.clear(); + operations = new LinkedTransferQueue(); id = idGenerator.getAndIncrement(); } } @Override public void add(Operation operation) throws TranslogException { operations.add(operation); - estimatedMemorySize.addAndGet(operation.estimateSize() + 20); + estimatedMemorySize.addAndGet(operation.estimateSize() + 50); } @Override public Snapshot snapshot() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java index eb87d58cd23..90bd29d29aa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/IndicesMemoryCleaner.java @@ -83,6 +83,14 @@ public class IndicesMemoryCleaner extends AbstractComponent { } } + public void forceCleanMemory(boolean full) { + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + indexShard.flush(new Engine.Flush().full(full)); + } + } + } + /** * Checks if memory needs to be cleaned and cleans it. Returns the amount of memory cleaned. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java b/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java index c138c5b048c..8173be20404 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/monitor/memory/alpha/AlphaMemoryMonitor.java @@ -23,7 +23,10 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.indices.IndicesMemoryCleaner; import org.elasticsearch.monitor.memory.MemoryMonitor; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.util.*; +import org.elasticsearch.util.SizeUnit; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.ThreadLocals; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.inject.Inject; import org.elasticsearch.util.settings.Settings; @@ -44,7 +47,7 @@ public class AlphaMemoryMonitor extends AbstractLifecycleComponent 0) { - long totalClean = totalCleans.incrementAndGet(); - logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult); - } + // try and clean translog based on a threshold, since we don't want to get a very large transaction log + // which means recovery it will take a long time (since the target re-index all this data) + IndicesMemoryCleaner.TranslogCleanResult translogCleanResult = indicesMemoryCleaner.cleanTranslog(translogNumberOfOperationsThreshold); + if (translogCleanResult.cleanedShards() > 0) { + long totalClean = totalCleans.incrementAndGet(); + logger.debug("[" + totalClean + "] [Translog] " + translogCleanResult); + } - // the logic is simple, if the used memory is above the upper threshold, we need to clean - // we clean down as much as we can to down to the lower threshold + // the logic is simple, if the used memory is above the upper threshold, we need to clean + // we clean down as much as we can to down to the lower threshold - // in order not to get trashing, we only perform a clean after another clean if a the clean counter - // has expired. + // in order not to get trashing, we only perform a clean after another clean if a the clean counter + // has expired. - // we also do the same for GC invocations + // we also do the same for GC invocations - long upperMemory = maxMemory.bytes(); - long totalMemory = totalMemory(); - long usedMemory = totalMemory - freeMemory(); - long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold); + long upperMemory = maxMemory.bytes(); + long totalMemory = totalMemory(); + long usedMemory = totalMemory - freeMemory(); + long upperThresholdMemory = (long) (upperMemory * upperMemoryThreshold); - if (usedMemory - upperThresholdMemory <= 0) { - clearCacheCounter = 0; - performedClean = false; - cleanCounter = 0; - return; - } - - if (performedClean) { - if (++cleanCounter < cleanThreshold) { + if (usedMemory - upperThresholdMemory <= 0) { + fullCounter = 0; + performedClean = false; + cleanCounter = 0; return; } + + if (performedClean) { + if (++cleanCounter < cleanThreshold) { + return; + } + } + + + long lowerThresholdMemory = (long) (upperMemory * lowerMemoryThreshold); + long memoryToClean = usedMemory - lowerThresholdMemory; + + if (fullCounter++ >= fullThreshold) { + long total = totalFull.incrementAndGet(); + if (logger.isInfoEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append('[').append(total).append("] "); + sb.append("[Full ] Ran after [").append(fullThreshold).append("] consecutive clean swipes"); + sb.append(", memory_to_clean [").append(new SizeValue(memoryToClean)).append(']'); + sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']'); + sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']'); + sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']'); + sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']'); + sb.append(", max_memory[").append(maxMemory).append(']'); + logger.info(sb.toString()); + } + indicesMemoryCleaner.cacheClear(); + indicesMemoryCleaner.forceCleanMemory(true); + ThreadLocals.clearReferencesThreadLocals(); + fullCounter = 0; + } else { + long totalClean = totalCleans.incrementAndGet(); + if (logger.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append('[').append(totalClean).append("] "); + sb.append("[Cleaning] memory_to_clean [").append(new SizeValue(memoryToClean)).append(']'); + sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']'); + sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']'); + sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']'); + sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']'); + sb.append(", max_memory[").append(maxMemory).append(']'); + logger.debug(sb.toString()); + } + + IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean); + boolean forceClean = false; + if (memoryCleanResult.cleaned().bytes() < memoryToClean) { + forceClean = true; + indicesMemoryCleaner.forceCleanMemory(false); + } + + if (logger.isDebugEnabled()) { + logger.debug("[" + totalClean + "] [Cleaned ] force_clean [" + forceClean + "], " + memoryCleanResult); + } + } + + performedClean = true; + cleanCounter = 0; + } catch (Exception e) { + logger.info("Failed to run memory monitor", e); } - - long totalClean = totalCleans.incrementAndGet(); - - long lowerThresholdMemory = (long) (upperMemory * lowerMemoryThreshold); - long memoryToClean = usedMemory - lowerThresholdMemory; - if (logger.isDebugEnabled()) { - StringBuilder sb = new StringBuilder(); - sb.append('[').append(totalClean).append("] "); - sb.append("[Cleaning] memory_to_clean [").append(new SizeValue(memoryToClean)).append(']'); - sb.append(", lower_memory_threshold [").append(new SizeValue(lowerThresholdMemory)).append(']'); - sb.append(", upper_memory_threshold [").append(new SizeValue(upperThresholdMemory)).append(']'); - sb.append(", used_memory [").append(new SizeValue(usedMemory)).append(']'); - sb.append(", total_memory[").append(new SizeValue(totalMemory)).append(']'); - sb.append(", max_memory[").append(maxMemory).append(']'); - logger.debug(sb.toString()); - } - - IndicesMemoryCleaner.MemoryCleanResult memoryCleanResult = indicesMemoryCleaner.cleanMemory(memoryToClean, minimumFlushableSizeToClean); - if (logger.isDebugEnabled()) { - logger.debug("[" + totalClean + "] [Cleaned ] " + memoryCleanResult); - } - - if (++clearCacheCounter >= clearCacheThreshold) { - long totalClear = totalClearCache.incrementAndGet(); - logger.debug("[" + totalClear + "] [Cache ] cleared after [" + (cleanCounter / cleanThreshold) + "] memory clean swipes"); - indicesMemoryCleaner.cacheClear(); - ThreadLocals.clearReferencesThreadLocals(); - clearCacheCounter = 0; - } - - performedClean = true; - cleanCounter = 0; } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestFlushAction.java index 6cd5697bdca..b93b9aa754b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/flush/RestFlushAction.java @@ -62,6 +62,7 @@ public class RestFlushAction extends BaseRestHandler { } flushRequest.operationThreading(operationThreading); flushRequest.refresh(request.paramAsBoolean("refresh", flushRequest.refresh())); + flushRequest.full(request.paramAsBoolean("full", flushRequest.full())); client.admin().indices().flush(flushRequest, new ActionListener() { @Override public void onResponse(FlushResponse response) { try { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java index d278b969a95..4ff115ff3ff 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/deps/lucene/SimpleLuceneTests.java @@ -159,8 +159,24 @@ public class SimpleLuceneTests { } } } + } + + @Test public void testNRTSearchOnClosedWriter() throws Exception { + Directory dir = new RAMDirectory(); + IndexWriter indexWriter = new IndexWriter(dir, Lucene.STANDARD_ANALYZER, true, IndexWriter.MaxFieldLength.UNLIMITED); + IndexReader reader = indexWriter.getReader(); + + for (int i = 0; i < 100; i++) { + indexWriter.addDocument(doc() + .add(field("id", Integer.toString(i))) + .boost(i).build()); + } + reader = refreshReader(reader); indexWriter.close(); + + TermDocs termDocs = reader.termDocs(); + termDocs.next(); } /**