[ENGINE] Remove reflection call to waitForMerges

Since the index writer is now final we can remove the readlock around
the forceMerge calls and use the official API to wait for merges.
This commit is contained in:
Simon Willnauer 2015-03-15 19:45:35 -07:00
parent 9bf4afd88e
commit e2d82504cc
7 changed files with 145 additions and 53 deletions

View File

@ -24,9 +24,9 @@ import org.elasticsearch.index.shard.ShardId;
/**
*
*/
public class OptimizeFailedEngineException extends EngineException {
public class ForceMergeFailedEngineException extends EngineException {
public OptimizeFailedEngineException(ShardId shardId, Throwable t) {
super(shardId, "Optimize failed", t);
public ForceMergeFailedEngineException(ShardId shardId, Throwable t) {
super(shardId, "force merge failed", t);
}
}

View File

@ -52,7 +52,6 @@ import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -83,11 +82,11 @@ public class InternalEngine extends Engine {
private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
// we use flushNeeded here, since if there are no changes, then the commit won't write
// will not really happen, and then the commitUserData and the new translog will not be reflected
private volatile boolean flushNeeded = false;
private final Lock flushLock = new ReentrantLock();
private final ReentrantLock optimizeLock = new ReentrantLock();
protected final FlushingRecoveryCounter onGoingRecoveries;
// A uid (in the form of BytesRef) to the version map
@ -710,58 +709,59 @@ public class InternalEngine extends Engine {
lastDeleteVersionPruneTimeMSec = timeMSec;
}
// TODO: can we please remove this method?!
private void waitForMerges(boolean flushAfter, boolean upgrade) {
try {
Method method = IndexWriter.class.getDeclaredMethod("waitForMerges");
method.setAccessible(true);
method.invoke(indexWriter);
} catch (ReflectiveOperationException e) {
throw new OptimizeFailedEngineException(shardId, e);
}
if (flushAfter) {
flush(true, true, true);
}
if (upgrade) {
logger.info("Finished upgrade of " + shardId);
}
}
@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
/*
* The way we implement upgrades is a bit hackish in the sense that we set an instance
* variable and that this setting will thus apply to the next forced merge that will be run.
* This is ok because (1) this is the only place we call forceMerge, (2) we have a single
* thread for optimize, and the 'optimizeMutex' guarding this code, and (3) ConcurrentMergeScheduler
* syncs calls to findForcedMerges.
*/
MergePolicy mp = indexWriter.getConfig().getMergePolicy();
assert mp instanceof ElasticsearchMergePolicy : "MergePolicy is " + mp.getClass().getName();
if (upgrade) {
logger.info("Starting upgrade of " + shardId);
((ElasticsearchMergePolicy) mp).setUpgradeInProgress(true);
}
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
* causing us to fail the forceMerge
*
* The way we implement upgrades is a bit hackish in the sense that we set an instance
* variable and that this setting will thus apply to the next forced merge that will be run.
* This is ok because (1) this is the only place we call forceMerge, (2) we have a single
* thread for optimize, and the 'optimizeLock' guarding this code, and (3) ConcurrentMergeScheduler
* syncs calls to findForcedMerges.
*/
assert indexWriter.getConfig().getMergePolicy() instanceof ElasticsearchMergePolicy : "MergePolicy is " + indexWriter.getConfig().getMergePolicy().getClass().getName();
ElasticsearchMergePolicy mp = (ElasticsearchMergePolicy) indexWriter.getConfig().getMergePolicy();
optimizeLock.lock();
try {
ensureOpen();
if (upgrade) {
logger.info("starting segment upgrade");
mp.setUpgradeInProgress(true);
}
store.incRef(); // increment the ref just to ensure nobody closes the store while we optimize
try {
if (onlyExpungeDeletes) {
indexWriter.forceMergeDeletes(false);
assert upgrade == false;
indexWriter.forceMergeDeletes(true /* blocks and waits for merges*/);
} else if (maxNumSegments <= 0) {
assert upgrade == false;
indexWriter.maybeMerge();
} else {
indexWriter.forceMerge(maxNumSegments, false);
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
}
if (flush) {
flush(true, true, true);
}
if (upgrade) {
logger.info("finished segment upgrade");
}
} catch (Throwable t) {
maybeFailEngine("optimize", t);
throw new OptimizeFailedEngineException(shardId, t);
} finally {
optimizeMutex.set(false);
store.decRef();
}
} catch (Throwable t) {
ForceMergeFailedEngineException ex = new ForceMergeFailedEngineException(shardId, t);
maybeFailEngine("force merge", ex);
throw ex;
} finally {
try {
mp.setUpgradeInProgress(false); // reset it just to make sure we reset it in a case of an error
} finally {
optimizeLock.unlock();
}
}
waitForMerges(flush, upgrade);
}
@Override

View File

@ -32,6 +32,7 @@ import org.apache.lucene.index.*;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
@ -73,7 +74,6 @@ import org.elasticsearch.index.translog.TranslogSizeMatcher;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
import org.junit.After;
@ -84,9 +84,11 @@ import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean;
import static org.apache.lucene.util.AbstractRandomizedTest.CHILD_JVM_ID;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
@ -95,7 +97,6 @@ import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;
@TestLogging("index.translog:TRACE")
public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public static final String TRANSLOG_PRIMARY_LOCATION = "work/fs-translog/JVM_" + CHILD_JVM_ID + "/primary";
@ -1005,6 +1006,97 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
}
public void testForceMerge() {
int numDocs = randomIntBetween(10, 100);
for (int i=0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid(Integer.toString(i)), doc);
engine.index(index);
engine.refresh("test");
}
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs, test.reader().numDocs());
}
engine.forceMerge(true, 1, false, false);
assertEquals(engine.segments(true).size(), 1);
ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid(Integer.toString(0)), doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
engine.forceMerge(true, 10, true, false); //expunge deletes
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs-1, test.reader().numDocs());
assertEquals(numDocs-1, test.reader().maxDoc());
}
doc = testParsedDocument(Integer.toString(1), Integer.toString(1), "test", null, -1, -1, testDocument(), B_1, false);
index = new Engine.Index(null, newUid(Integer.toString(1)), doc);
engine.delete(new Engine.Delete(index.type(), index.id(), index.uid()));
engine.forceMerge(true, 10, false, false); //expunge deletes
assertEquals(engine.segments(true).size(), 1);
try (Engine.Searcher test = engine.acquireSearcher("test")) {
assertEquals(numDocs-2, test.reader().numDocs());
assertEquals(numDocs-1, test.reader().maxDoc());
}
}
public void testForceMergeAndClose() throws IOException, InterruptedException {
int numIters = randomIntBetween(2, 10);
for (int j = 0; j < numIters; j++) {
try (Store store = createStore()) {
final Translog translog = createTranslog();
final InternalEngine engine = createEngine(store, translog);
final CountDownLatch startGun = new CountDownLatch(1);
final CountDownLatch indexed = new CountDownLatch(1);
Thread thread = new Thread() {
public void run() {
try {
try {
startGun.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int i = 0;
while (true) {
int numDocs = randomIntBetween(1, 20);
for (int j = 0; j < numDocs; j++) {
i++;
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid(Integer.toString(i)), doc);
engine.index(index);
}
engine.refresh("test");
indexed.countDown();
try {
engine.forceMerge(randomBoolean(), 1, false, randomBoolean());
} catch (ForceMergeFailedEngineException ex) {
// ok
return;
}
}
} catch (AlreadyClosedException | EngineClosedException ex) {
// fine
}
}
};
thread.start();
startGun.countDown();
int someIters = randomIntBetween(1, 10);
for (int i = 0; i < someIters; i++) {
engine.forceMerge(randomBoolean(), 1, false, randomBoolean());
}
indexed.await();
IOUtils.close(engine, translog);
}
}
}
@Test
public void testVersioningDeleteConflict() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);

View File

@ -217,7 +217,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
logger.info("test: optimize");
client().admin().indices().prepareOptimize("test").get();
client().admin().indices().prepareOptimize("test").setMaxNumSegments(1).get();
logger.info("test: optimize done");
// Record current throttling so far

View File

@ -1929,7 +1929,7 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").get();
client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").get();
client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").get();
client().admin().indices().prepareOptimize("test").setFlush(true).get();
client().admin().indices().prepareOptimize("test").setMaxNumSegments(1).setFlush(true).get();
client().prepareIndex("test", "parent", "p3").setSource("p_field", "p_value3").get();
client().prepareIndex("test", "parent", "p4").setSource("p_field", "p_value4").get();
client().prepareIndex("test", "child", "c4").setParent("p3").setSource("c_field", "green").get();

View File

@ -948,7 +948,7 @@ public class CompletionSuggestSearchTests extends ElasticsearchIntegrationTest {
if (optimize) {
// make sure merging works just fine
client().admin().indices().prepareFlush(INDEX).execute().actionGet();
client().admin().indices().prepareOptimize(INDEX).execute().actionGet();
client().admin().indices().prepareOptimize(INDEX).setMaxNumSegments(randomIntBetween(1, 5)).get();
}
}

View File

@ -1229,7 +1229,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
*/
protected OptimizeResponse optimize() {
waitForRelocation();
OptimizeResponse actionGet = client().admin().indices().prepareOptimize().execute().actionGet();
OptimizeResponse actionGet = client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
assertNoFailures(actionGet);
return actionGet;
}