Fix optimize behavior with 'force' and 'flush' flags.

This does the following:
* Make 'force' flag only build a merge if the delegate MP returned no merges
* Add async handling for 'flush' when 'waitForMerges' is false
* Remove flush at the beginning of optimize.  This is something the user can
  do if they wish, before calling optimize.

closes #7886
closes #7904
closes #7920
This commit is contained in:
Ryan Ernst 2014-09-29 15:20:19 -07:00
parent 25bce1db5d
commit 37b294aaec
3 changed files with 65 additions and 29 deletions

View File

@ -51,6 +51,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.analysis.AnalysisService;
@ -1009,12 +1010,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
throw new OptimizeFailedEngineException(shardId, t);
}
}
private void waitForMerges(boolean flushAfter) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
if (flushAfter) {
flush(new Flush().force(true).waitIfOngoing(true));
}
}
@Override
public void optimize(Optimize optimize) throws EngineException {
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
}
if (optimizeMutex.compareAndSet(false, true)) {
ElasticsearchMergePolicy elasticsearchMergePolicy = null;
try (InternalLock _ = readLock.acquire()) {
@ -1054,18 +1063,23 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
optimizeMutex.set(false);
}
}
// wait for the merges outside of the read lock
if (optimize.waitForMerge()) {
try {
currentIndexWriter().waitForMerges();
} catch (IOException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
}
if (optimize.flush()) {
flush(new Flush().force(true).waitIfOngoing(true));
waitForMerges(optimize.flush());
} else if (optimize.flush()) {
// we only need to monitor merges for async calls if we are going to flush
threadPool.executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
}
@Override
protected void doRun() throws Exception {
waitForMerges(true);
}
});
}
}

View File

@ -196,20 +196,22 @@ public final class ElasticsearchMergePolicy extends MergePolicy {
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
throws IOException {
if (force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
MergeSpecification spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer));
MergeSpecification spec = delegate.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);
if (spec == null && force) {
List<SegmentCommitInfo> segments = Lists.newArrayList();
for (SegmentCommitInfo info : segmentInfos) {
if (segmentsToMerge.containsKey(info)) {
segments.add(info);
}
}
if (!segments.isEmpty()) {
spec = new IndexUpgraderMergeSpecification();
spec.add(new OneMerge(segments));
return spec;
}
}
return upgradedMergeSpecification(spec);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine.internal;
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@ -29,6 +30,7 @@ import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
@ -374,7 +376,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
}
});
Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
engine.start();
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), Lucene.STANDARD_ANALYZER, B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
@ -410,13 +412,15 @@ public class InternalEngineTests extends ElasticsearchTestCase {
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush(new Engine.Flush());
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).waitForMerge(true));
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), nullValue());
}
// we could have multiple underlying merges, so the generation may increase more than once
assertTrue(store.readLastCommittedSegmentsInfo().getGeneration() > gen1);
// forcing an optimize will merge this single segment shard
final boolean force = randomBoolean();
@ -424,12 +428,28 @@ public class InternalEngineTests extends ElasticsearchTestCase {
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
}
engine.optimize(new Engine.Optimize().flush(true).maxNumSegments(1).force(force).waitForMerge(false));
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.optimize(new Engine.Optimize().flush(flush).maxNumSegments(1).force(force).waitForMerge(false));
waitTillMerge.get().await();
for (Segment segment : engine.segments()) {
assertThat(segment.getMergeId(), force ? notNullValue() : nullValue());
}
waitForMerge.get().countDown();
if (flush) {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
// we should have had just 1 merge, so last generation should be exact
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
} catch (IOException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
}
engine.close();
}