improve async merge process, don't spawn a thread unless a merge is really needed, and add an optimized "maybeMerge" operation

This commit is contained in:
kimchy 2011-03-04 20:05:52 +02:00
parent 20ed540fe7
commit c097735196
3 changed files with 89 additions and 30 deletions

View File

@ -69,6 +69,13 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
*/
boolean refreshNeeded();
/**
* Returns <tt>true</tt> if a possible merge is really needed.
*/
boolean possibleMergeNeeded();
void maybeMerge() throws EngineException;
/**
* Refreshes the engine for new search operations to reflect the latest
* changes. Pass <tt>true</tt> if the refresh operation should include
@ -131,10 +138,21 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
private final boolean waitForOperations;
private boolean force = false;
public Refresh(boolean waitForOperations) {
this.waitForOperations = waitForOperations;
}
public Refresh force(boolean force) {
this.force = force;
return this;
}
public boolean force() {
return this.force;
}
public boolean waitForOperations() {
return waitForOperations;
}

View File

@ -113,6 +113,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
private volatile boolean possibleMergeNeeded = false;
private volatile int disableFlushCounter = 0;
// indexing searcher is initialized
@ -228,6 +230,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
innerCreate(create, writer);
dirty = true;
possibleMergeNeeded = true;
if (create.refresh()) {
refresh(new Refresh(false));
}
@ -318,6 +321,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
innerIndex(index, writer);
dirty = true;
possibleMergeNeeded = true;
if (index.refresh()) {
refresh(new Refresh(false));
}
@ -402,6 +406,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
}
innerDelete(delete, writer);
dirty = true;
possibleMergeNeeded = true;
if (delete.refresh()) {
refresh(new Refresh(false));
}
@ -485,6 +490,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
writer.deleteDocuments(delete.query());
translog.add(new Translog.DeleteByQuery(delete));
dirty = true;
possibleMergeNeeded = true;
} catch (IOException e) {
throw new DeleteByQueryFailedEngineException(shardId, delete, e);
} finally {
@ -521,6 +527,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
return dirty;
}
@Override public boolean possibleMergeNeeded() {
return this.possibleMergeNeeded;
}
@Override public void refresh(Refresh refresh) throws EngineException {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
@ -535,7 +545,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
throw new EngineClosedException(shardId);
}
try {
if (dirty) {
if (dirty || refresh.force()) {
// we eagerly set dirty to false so we won't miss refresh requests
dirty = false;
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
@ -586,6 +596,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (disableFlushCounter > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
indexingSearcher.set(null);
}
if (flush.full()) {
// disable refreshing, not dirty
dirty = false;
@ -613,11 +627,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
versionMap.clear();
dirty = true; // force a refresh
// we need to do a refresh here so we sync versioning support
refresh(new Refresh(true));
if (indexingSearcher.get() != null) {
indexingSearcher.get().release();
indexingSearcher.set(null);
}
refresh(new Refresh(true).force(true));
} finally {
rwl.writeLock().unlock();
flushing.set(false);
@ -628,6 +638,30 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
// }
}
@Override public void maybeMerge() throws EngineException {
if (!possibleMergeNeeded) {
return;
}
possibleMergeNeeded = false;
rwl.readLock().lock();
try {
if (indexWriter == null) {
throw new EngineClosedException(shardId);
}
if (indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
((EnableMergePolicy) indexWriter.getMergePolicy()).enableMerge();
}
indexWriter.maybeMerge();
} catch (Exception e) {
throw new OptimizeFailedEngineException(shardId, e);
} finally {
rwl.readLock().unlock();
if (indexWriter != null && indexWriter.getMergePolicy() instanceof EnableMergePolicy) {
((EnableMergePolicy) indexWriter.getMergePolicy()).disableMerge();
}
}
}
@Override public void optimize(Optimize optimize) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
rwl.readLock().lock();
@ -642,6 +676,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
indexWriter.expungeDeletes(false);
} else if (optimize.maxNumSegments() <= 0) {
indexWriter.maybeMerge();
possibleMergeNeeded = false;
} else {
indexWriter.optimize(optimize.maxNumSegments(), false);
}
@ -659,14 +694,11 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
if (optimize.waitForMerge()) {
indexWriter.waitForMerges();
}
// once we did the optimization, we are "dirty" since we removed deletes potentially which
// affects TermEnum
dirty = true;
if (optimize.flush()) {
flush(new Flush());
}
if (optimize.refresh()) {
refresh(new Refresh(false));
refresh(new Refresh(false).force(true));
}
}

View File

@ -562,7 +562,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, new EngineMerger());
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
@ -616,27 +616,36 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private class EngineMerger implements Runnable {
@Override public void run() {
try {
// -1 means maybe merge
engine.optimize(new Engine.Optimize().maxNumSegments(-1).waitForMerge(false).flush(false).refresh(false));
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (OptimizeFailedEngineException e) {
if (e.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ClosedByInterruptException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ThreadInterruptedException) {
// ignore, we are being shutdown
} else {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
if (!engine().possibleMergeNeeded()) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
}
} catch (Exception e) {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, this);
return;
}
threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() {
@Override public void run() {
try {
engine.maybeMerge();
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (OptimizeFailedEngineException e) {
if (e.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ClosedByInterruptException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ThreadInterruptedException) {
// ignore, we are being shutdown
} else {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
} catch (Exception e) {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this);
}
}
});
}
}