add additional (internal) flush option

this flush option allows to just commit and index writer, without actually clearing the translog
This commit is contained in:
Shay Banon 2012-10-03 16:36:13 -04:00
parent 2532761c51
commit 2c150419ce
3 changed files with 130 additions and 59 deletions

View File

@ -116,7 +116,7 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) throws ElasticSearchException {
IndexShard indexShard = indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
indexShard.flush(new Engine.Flush().refresh(request.refresh()).full(request.full()).force(request.force()));
indexShard.flush(new Engine.Flush().refresh(request.refresh()).type(request.full() ? Engine.Flush.Type.NEW_WRITER : Engine.Flush.Type.COMMIT_TRANSLOG).force(request.force()));
return new ShardFlushResponse(request.index(), request.shardId());
}

View File

@ -114,6 +114,12 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
<T> T snapshot(SnapshotHandler<T> snapshotHandler) throws EngineException;
/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the
* lucene index to make sure we have a "fresh" copy of the files to snapshot.
*/
SnapshotIndexCommit snapshotIndex() throws EngineException;
void recover(RecoveryHandler recoveryHandler) throws EngineException;
static interface FailedEngineListener {
@ -141,8 +147,6 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
void phase3(Translog.Snapshot snapshot) throws ElasticSearchException;
}
/**
*/
static interface SnapshotHandler<T> {
T snapshot(SnapshotIndexCommit snapshotIndexCommit, Translog.Snapshot translogSnapshot) throws EngineException;
@ -211,9 +215,28 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
static class Flush {
private boolean full = false;
public static enum Type {
/**
* A flush that causes a new writer to be created.
*/
NEW_WRITER,
/**
* A flush that just commits the writer, without cleaning the translog.
*/
COMMIT,
/**
* A flush that does a commit, as well as clears the translog.
*/
COMMIT_TRANSLOG
}
private Type type = Type.COMMIT_TRANSLOG;
private boolean refresh = false;
private boolean force = false;
/**
* Should the flush operation wait if there is an ongoing flush operation.
*/
private boolean waitIfOngoing = false;
/**
* Should a refresh be performed after flushing. Defaults to <tt>false</tt>.
@ -230,18 +253,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
/**
* Should a "full" flush be issued, basically cleaning as much memory as possible.
*/
public boolean full() {
return this.full;
public Type type() {
return this.type;
}
/**
* Should a "full" flush be issued, basically cleaning as much memory as possible.
*/
public Flush full(boolean full) {
this.full = full;
public Flush type(Type type) {
this.type = type;
return this;
}
@ -254,9 +274,18 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this;
}
public boolean waitIfOngoing() {
return this.waitIfOngoing;
}
public Flush waitIfOngoing(boolean waitIfOngoing) {
this.waitIfOngoing = waitIfOngoing;
return this;
}
@Override
public String toString() {
return "full[" + full + "], refresh[" + refresh + "], force[" + force + "]";
return "type[" + type + "], refresh[" + refresh + "], force[" + force + "]";
}
}

View File

@ -25,6 +25,7 @@ import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable;
@ -69,8 +70,11 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.lucene.Lucene.safeClose;
@ -81,51 +85,33 @@ import static org.elasticsearch.common.lucene.Lucene.safeClose;
public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile ByteSizeValue indexingBufferSize;
private volatile int termIndexInterval;
private volatile int termIndexDivisor;
private volatile int indexConcurrency;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
private long gcDeletesInMillis;
private volatile boolean enableGcDeletes = true;
private final boolean asyncLoadBloomFilter;
private final ThreadPool threadPool;
private final ShardIndexingService indexingService;
private final IndexSettingsService indexSettingsService;
@Nullable
private final InternalIndicesWarmer warmer;
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final Translog translog;
private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler;
private final AnalysisService analysisService;
private final SimilarityService similarityService;
private final BloomCache bloomCache;
private final boolean asyncLoadBloomFilter;
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private volatile IndexWriter indexWriter;
// LUCENE MONITOR: 3.6 Remove using the custom SearchManager and use the Lucene 3.6 one
private final SearcherFactory searcherFactory = new RobinSearchFactory();
private volatile SearcherManager searcherManager;
@ -136,14 +122,15 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
private volatile boolean possibleMergeNeeded = false;
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private final AtomicInteger flushing = new AtomicInteger();
private final Lock flushLock = new ReentrantLock();
private volatile int disableFlushCounter = 0;
private volatile int onGoingRecoveries = 0;
// indexing searcher is initialized
private final AtomicBoolean flushing = new AtomicBoolean();
private final ConcurrentMap<String, VersionValue> versionMap;
@ -226,7 +213,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (indexingBufferSize == Engine.INACTIVE_SHARD_INDEXING_BUFFER && preValue != Engine.INACTIVE_SHARD_INDEXING_BUFFER) {
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize);
try {
flush(new Flush().full(true));
flush(new Flush().type(Flush.Type.NEW_WRITER));
} catch (EngineClosedException e) {
// ignore
} catch (FlushNotAllowedEngineException e) {
@ -256,12 +243,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
throw new EngineClosedException(shardId);
}
if (logger.isDebugEnabled()) {
logger.debug("Starting engine");
logger.debug("starting engine");
}
try {
this.indexWriter = createWriter();
} catch (IOException e) {
throw new EngineCreationFailureException(shardId, "Failed to create engine", e);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
try {
@ -296,7 +283,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
// ignore
}
}
throw new EngineCreationFailureException(shardId, "Failed to open reader on writer", e);
throw new EngineCreationFailureException(shardId, "failed to open reader on writer", e);
}
} finally {
rwl.writeLock().unlock();
@ -804,24 +791,27 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
// check outside the lock as well so we can check without blocking on the write lock
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
if (flush.type() == Flush.Type.NEW_WRITER || flush.type() == Flush.Type.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + flush.type() + "] is not allowed");
}
}
// don't allow for concurrent flush operations...
if (!flushing.compareAndSet(false, true)) {
throw new FlushNotAllowedEngineException(shardId, "Already flushing...");
int currentFlushing = flushing.incrementAndGet();
if (currentFlushing > 1 && !flush.waitIfOngoing()) {
flushing.decrementAndGet();
throw new FlushNotAllowedEngineException(shardId, "already flushing...");
}
flushLock.lock();
try {
boolean makeTransientCurrent = false;
if (flush.full()) {
if (flush.type() == Flush.Type.NEW_WRITER) {
rwl.writeLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
if (onGoingRecoveries > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
// disable refreshing, not dirty
@ -861,13 +851,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} finally {
rwl.writeLock().unlock();
}
} else {
} else if (flush.type() == Flush.Type.COMMIT_TRANSLOG) {
boolean makeTransientCurrent = false;
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
if (disableFlushCounter > 0) {
if (onGoingRecoveries > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
@ -914,7 +905,41 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} finally {
rwl.readLock().unlock();
}
} else if (flush.type() == Flush.Type.COMMIT) {
// note, its ok to just commit without cleaning the translog, its perfectly fine to replay a
// translog on an index that was opened on a committed point in time that is "in the future"
// of that translog
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
// we allow to *just* commit if there is an ongoing recovery happening...
// its ok to use this, only a flush will cause a new translogId, and we are locked here from
// other flushes use flushLock
try {
long translogId = translog.currentId();
indexWriter.commit(MapBuilder.<String, String>newMapBuilder().put(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)).map());
} catch (OutOfMemoryError e) {
translog.revertTransient();
failEngine(e);
throw new FlushFailedEngineException(shardId, e);
} catch (IllegalStateException e) {
if (e.getMessage().contains("OutOfMemoryError")) {
failEngine(e);
}
throw new FlushFailedEngineException(shardId, e);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
} finally {
rwl.readLock().unlock();
}
} else {
throw new ElasticSearchIllegalStateException("flush type [" + flush.type() + "] not supported");
}
// reread the last committed segment infos
try {
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
@ -925,7 +950,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
} finally {
flushing.set(false);
flushLock.unlock();
flushing.decrementAndGet();
}
}
@ -1061,13 +1087,29 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
@Override
public SnapshotIndexCommit snapshotIndex() throws EngineException {
rwl.readLock().lock();
try {
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
if (indexWriter == null) {
throw new EngineClosedException(shardId, failedEngine);
}
return deletionPolicy.snapshot();
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
}
}
@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
// take a write lock here so it won't happen while a flush is in progress
// this means that next commits will not be allowed once the lock is released
rwl.writeLock().lock();
try {
disableFlushCounter++;
onGoingRecoveries++;
} finally {
rwl.writeLock().unlock();
}
@ -1076,14 +1118,14 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
phase1Snapshot = deletionPolicy.snapshot();
} catch (Exception e) {
--disableFlushCounter;
--onGoingRecoveries;
throw new RecoveryEngineException(shardId, 1, "Snapshot failed", e);
}
try {
recoveryHandler.phase1(phase1Snapshot);
} catch (Exception e) {
--disableFlushCounter;
--onGoingRecoveries;
phase1Snapshot.release();
if (closed) {
e = new EngineClosedException(shardId, e);
@ -1095,7 +1137,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
phase2Snapshot = translog.snapshot();
} catch (Exception e) {
--disableFlushCounter;
--onGoingRecoveries;
phase1Snapshot.release();
if (closed) {
e = new EngineClosedException(shardId, e);
@ -1106,7 +1148,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
try {
recoveryHandler.phase2(phase2Snapshot);
} catch (Exception e) {
--disableFlushCounter;
--onGoingRecoveries;
phase1Snapshot.release();
phase2Snapshot.release();
if (closed) {
@ -1123,7 +1165,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
} catch (Exception e) {
throw new RecoveryEngineException(shardId, 3, "Execution failed", e);
} finally {
--disableFlushCounter;
--onGoingRecoveries;
rwl.writeLock().unlock();
phase1Snapshot.release();
phase2Snapshot.release();
@ -1369,7 +1411,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
rwl.readLock().unlock();
}
if (requiresFlushing) {
flush(new Flush().full(true));
flush(new Flush().type(Flush.Type.NEW_WRITER));
}
}
}