parent
4f938ad37e
commit
c29c18f0f2
|
@ -918,7 +918,7 @@ public class InternalEngine implements Engine {
|
|||
}
|
||||
|
||||
// TODO: can we please remove this method?!
|
||||
private void waitForMerges(boolean flushAfter) {
|
||||
private void waitForMerges(boolean flushAfter, boolean upgrade) {
|
||||
try {
|
||||
Method method = IndexWriter.class.getDeclaredMethod("waitForMerges");
|
||||
method.setAccessible(true);
|
||||
|
@ -929,6 +929,9 @@ public class InternalEngine implements Engine {
|
|||
if (flushAfter) {
|
||||
flush(FlushType.COMMIT_TRANSLOG, true, true);
|
||||
}
|
||||
if (upgrade) {
|
||||
logger.info("Finished upgrade of " + shardId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -937,7 +940,7 @@ public class InternalEngine implements Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
|
||||
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
|
||||
if (optimizeMutex.compareAndSet(false, true)) {
|
||||
try (InternalLock _ = readLock.acquire()) {
|
||||
final IndexWriter writer = currentIndexWriter();
|
||||
|
@ -952,6 +955,7 @@ public class InternalEngine implements Engine {
|
|||
MergePolicy mp = writer.getConfig().getMergePolicy();
|
||||
assert mp instanceof ElasticsearchMergePolicy : "MergePolicy is " + mp.getClass().getName();
|
||||
if (upgrade) {
|
||||
logger.info("Starting upgrade of " + shardId);
|
||||
((ElasticsearchMergePolicy) mp).setUpgradeInProgress(true);
|
||||
}
|
||||
|
||||
|
@ -972,8 +976,8 @@ public class InternalEngine implements Engine {
|
|||
|
||||
// wait for the merges outside of the read lock
|
||||
if (waitForMerge) {
|
||||
waitForMerges(flush);
|
||||
} else if (flush) {
|
||||
waitForMerges(flush, upgrade);
|
||||
} else if (flush || upgrade) {
|
||||
// we only need to monitor merges for async calls if we are going to flush
|
||||
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
|
@ -983,7 +987,7 @@ public class InternalEngine implements Engine {
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
waitForMerges(true);
|
||||
waitForMerges(flush, upgrade);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue