From 2c150419ce565ed8a0d3d83af25b2bbcba323eae Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 3 Oct 2012 16:36:13 -0400 Subject: [PATCH] add additional (internal) flush option this flush option allows to just commit and index writer, without actually clearing the translog --- .../indices/flush/TransportFlushAction.java | 2 +- .../elasticsearch/index/engine/Engine.java | 51 +++++-- .../index/engine/robin/RobinEngine.java | 136 ++++++++++++------ 3 files changed, 130 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 92bacdb898f..de123bda24e 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -116,7 +116,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction T snapshot(SnapshotHandler snapshotHandler) throws EngineException; + /** + * Snapshots the index and returns a handle to it. Will always try and "commit" the + * lucene index to make sure we have a "fresh" copy of the files to snapshot. + */ + SnapshotIndexCommit snapshotIndex() throws EngineException; + void recover(RecoveryHandler recoveryHandler) throws EngineException; static interface FailedEngineListener { @@ -141,8 +147,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent { void phase3(Translog.Snapshot snapshot) throws ElasticSearchException; } - /** - */ static interface SnapshotHandler { T snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException; @@ -211,9 +215,28 @@ public interface Engine extends IndexShardComponent, CloseableComponent { static class Flush { - private boolean full = false; + public static enum Type { + /** + * A flush that causes a new writer to be created. + */ + NEW_WRITER, + /** + * A flush that just commits the writer, without cleaning the translog. + */ + COMMIT, + /** + * A flush that does a commit, as well as clears the translog. + */ + COMMIT_TRANSLOG + } + + private Type type = Type.COMMIT_TRANSLOG; private boolean refresh = false; private boolean force = false; + /** + * Should the flush operation wait if there is an ongoing flush operation. + */ + private boolean waitIfOngoing = false; /** * Should a refresh be performed after flushing. Defaults to false. @@ -230,18 +253,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } - /** - * Should a "full" flush be issued, basically cleaning as much memory as possible. - */ - public boolean full() { - return this.full; + public Type type() { + return this.type; } /** * Should a "full" flush be issued, basically cleaning as much memory as possible. */ - public Flush full(boolean full) { - this.full = full; + public Flush type(Type type) { + this.type = type; return this; } @@ -254,9 +274,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } + public boolean waitIfOngoing() { + return this.waitIfOngoing; + } + + public Flush waitIfOngoing(boolean waitIfOngoing) { + this.waitIfOngoing = waitIfOngoing; + return this; + } + @Override public String toString() { - return "full[" + full + "], refresh[" + refresh + "], force[" + force + "]"; + return "type[" + type + "], refresh[" + refresh + "], force[" + force + "]"; } } diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index e0e17066aec..55e3c8d308d 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.*; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction; import org.elasticsearch.common.Nullable; @@ -69,8 +70,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.common.lucene.Lucene.safeClose; @@ -81,51 +85,33 @@ import static org.elasticsearch.common.lucene.Lucene.safeClose; public class RobinEngine extends AbstractIndexShardComponent implements Engine { private volatile ByteSizeValue indexingBufferSize; - private volatile int termIndexInterval; - private volatile int termIndexDivisor; - private volatile int indexConcurrency; - - private final ReadWriteLock rwl = new ReentrantReadWriteLock(); - - private final AtomicBoolean optimizeMutex = new AtomicBoolean(); - private long gcDeletesInMillis; - private volatile boolean enableGcDeletes = true; + private final boolean asyncLoadBloomFilter; private final ThreadPool threadPool; private final ShardIndexingService indexingService; - private final IndexSettingsService indexSettingsService; - @Nullable private final InternalIndicesWarmer warmer; - private final Store store; - private final SnapshotDeletionPolicy deletionPolicy; - private final Translog translog; - private final MergePolicyProvider mergePolicyProvider; - private final MergeSchedulerProvider mergeScheduler; - private final AnalysisService analysisService; - private final SimilarityService similarityService; - private final BloomCache bloomCache; - private final boolean asyncLoadBloomFilter; + + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); private volatile IndexWriter indexWriter; - // LUCENE MONITOR: 3.6 Remove using the custom SearchManager and use the Lucene 3.6 one private final SearcherFactory searcherFactory = new RobinSearchFactory(); private volatile SearcherManager searcherManager; @@ -136,14 +122,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { private volatile boolean possibleMergeNeeded = false; + private final AtomicBoolean optimizeMutex = new AtomicBoolean(); // we use flushNeeded here, since if there are no changes, then the commit won't write // will not really happen, and then the commitUserData and the new translog will not be reflected private volatile boolean flushNeeded = false; + private final AtomicInteger flushing = new AtomicInteger(); + private final Lock flushLock = new ReentrantLock(); - private volatile int disableFlushCounter = 0; + private volatile int onGoingRecoveries = 0; - // indexing searcher is initialized - private final AtomicBoolean flushing = new AtomicBoolean(); private final ConcurrentMap versionMap; @@ -226,7 +213,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (indexingBufferSize == Engine.INACTIVE_SHARD_INDEXING_BUFFER && preValue != Engine.INACTIVE_SHARD_INDEXING_BUFFER) { logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize); try { - flush(new Flush().full(true)); + flush(new Flush().type(Flush.Type.NEW_WRITER)); } catch (EngineClosedException e) { // ignore } catch (FlushNotAllowedEngineException e) { @@ -256,12 +243,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { throw new EngineClosedException(shardId); } if (logger.isDebugEnabled()) { - logger.debug("Starting engine"); + logger.debug("starting engine"); } try { this.indexWriter = createWriter(); } catch (IOException e) { - throw new EngineCreationFailureException(shardId, "Failed to create engine", e); + throw new EngineCreationFailureException(shardId, "failed to create engine", e); } try { @@ -296,7 +283,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // ignore } } - throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e); + throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e); } } finally { rwl.writeLock().unlock(); @@ -804,24 +791,27 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); } - // check outside the lock as well so we can check without blocking on the write lock - if (disableFlushCounter > 0) { - throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); + if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) { + // check outside the lock as well so we can check without blocking on the write lock + if (onGoingRecoveries > 0) { + throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + flush.type() + "] is not allowed"); + } } - // don't allow for concurrent flush operations... - if (!flushing.compareAndSet(false, true)) { - throw new FlushNotAllowedEngineException(shardId, "Already flushing..."); + int currentFlushing = flushing.incrementAndGet(); + if (currentFlushing > 1 && !flush.waitIfOngoing()) { + flushing.decrementAndGet(); + throw new FlushNotAllowedEngineException(shardId, "already flushing..."); } + flushLock.lock(); try { - boolean makeTransientCurrent = false; - if (flush.full()) { + if (flush.type() == Flush.Type.NEW_WRITER) { rwl.writeLock().lock(); try { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); } - if (disableFlushCounter > 0) { + if (onGoingRecoveries > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } // disable refreshing, not dirty @@ -861,13 +851,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } finally { rwl.writeLock().unlock(); } - } else { + } else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) { + boolean makeTransientCurrent = false; rwl.readLock().lock(); try { if (indexWriter == null) { throw new EngineClosedException(shardId, failedEngine); } - if (disableFlushCounter > 0) { + if (onGoingRecoveries > 0) { throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed"); } @@ -914,7 +905,41 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } finally { rwl.readLock().unlock(); } + } else if (flush.type() == Flush.Type.COMMIT) { + // note, its ok to just commit without cleaning the translog, its perfectly fine to replay a + // translog on an index that was opened on a committed point in time that is "in the future" + // of that translog + rwl.readLock().lock(); + try { + if (indexWriter == null) { + throw new EngineClosedException(shardId, failedEngine); + } + // we allow to *just* commit if there is an ongoing recovery happening... + // its ok to use this, only a flush will cause a new translogId, and we are locked here from + // other flushes use flushLock + try { + long translogId = translog.currentId(); + indexWriter.commit(MapBuilder.newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map()); + } catch (OutOfMemoryError e) { + translog.revertTransient(); + failEngine(e); + throw new FlushFailedEngineException(shardId, e); + } catch (IllegalStateException e) { + if (e.getMessage().contains("OutOfMemoryError")) { + failEngine(e); + } + throw new FlushFailedEngineException(shardId, e); + } catch (Exception e) { + throw new FlushFailedEngineException(shardId, e); + } + } finally { + rwl.readLock().unlock(); + } + } else { + throw new ElasticSearchIllegalStateException("flush type [" + flush.type() + "] not supported"); } + + // reread the last committed segment infos try { SegmentInfos infos = new SegmentInfos(); infos.read(store.directory()); @@ -925,7 +950,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } } finally { - flushing.set(false); + flushLock.unlock(); + flushing.decrementAndGet(); } } @@ -1061,13 +1087,29 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } } + @Override + public SnapshotIndexCommit snapshotIndex() throws EngineException { + rwl.readLock().lock(); + try { + flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); + if (indexWriter == null) { + throw new EngineClosedException(shardId, failedEngine); + } + return deletionPolicy.snapshot(); + } catch (IOException e) { + throw new SnapshotFailedEngineException(shardId, e); + } finally { + rwl.readLock().unlock(); + } + } + @Override public void recover(RecoveryHandler recoveryHandler) throws EngineException { // take a write lock here so it won't happen while a flush is in progress // this means that next commits will not be allowed once the lock is released rwl.writeLock().lock(); try { - disableFlushCounter++; + onGoingRecoveries++; } finally { rwl.writeLock().unlock(); } @@ -1076,14 +1118,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { phase1Snapshot = deletionPolicy.snapshot(); } catch (Exception e) { - --disableFlushCounter; + --onGoingRecoveries; throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e); } try { recoveryHandler.phase1(phase1Snapshot); } catch (Exception e) { - --disableFlushCounter; + --onGoingRecoveries; phase1Snapshot.release(); if (closed) { e = new EngineClosedException(shardId, e); @@ -1095,7 +1137,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { phase2Snapshot = translog.snapshot(); } catch (Exception e) { - --disableFlushCounter; + --onGoingRecoveries; phase1Snapshot.release(); if (closed) { e = new EngineClosedException(shardId, e); @@ -1106,7 +1148,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { try { recoveryHandler.phase2(phase2Snapshot); } catch (Exception e) { - --disableFlushCounter; + --onGoingRecoveries; phase1Snapshot.release(); phase2Snapshot.release(); if (closed) { @@ -1123,7 +1165,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } catch (Exception e) { throw new RecoveryEngineException(shardId, 3, "Execution failed", e); } finally { - --disableFlushCounter; + --onGoingRecoveries; rwl.writeLock().unlock(); phase1Snapshot.release(); phase2Snapshot.release(); @@ -1369,7 +1411,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { rwl.readLock().unlock(); } if (requiresFlushing) { - flush(new Flush().full(true)); + flush(new Flush().type(Flush.Type.NEW_WRITER)); } } }