Core: Remove ability to run optimize and upgrade async

This has been very trappy. Rather than continue to allow buggy behavior
of having upgrade/optimize requests sidestep the single shard per node
limits optimize is supposed to be subject to, this removes
the ability to run the upgrade/optimize async.

closes #9638
This commit is contained in:
Ryan Ernst 2015-02-10 14:52:19 -08:00
parent faae98c5d8
commit f735baf306
17 changed files with 33 additions and 135 deletions

View File

@ -7,6 +7,10 @@ operations (and relates to the number of segments a Lucene index holds
within each shard). The optimize operation allows to reduce the number
of segments by merging them.
This call will block until the optimize is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous optimize is complete.
[source,js]
--------------------------------------------------
$ curl -XPOST 'http://localhost:9200/twitter/_optimize'
@ -33,10 +37,6 @@ deletes. Defaults to `false`. Note that this won't override the
`flush`:: Should a flush be performed after the optimize. Defaults to
`true`.
`wait_for_merge`:: Should the request wait for the merge to end. Defaults
to `true`. Note, a merge can potentially be a very heavy operation, so
it might make sense to run it set to `false`.
[float]
[[optimize-multi-index]]
=== Multi Index

View File

@ -17,15 +17,9 @@ NOTE: Upgrading is an I/O intensive operation, and is limited to processing a
single shard per node at a time. It also is not allowed to run at the same
time as optimize.
[float]
[[upgrade-parameters]]
==== Request Parameters
The `upgrade` API accepts the following request parameters:
[horizontal]
`wait_for_completion`:: Should the request wait for the upgrade to complete. Defaults
to `false`.
This call will block until the upgrade is complete. If the http connection
is lost, the request will continue in the background, and
any new requests will block until the previous upgrade is complete.
[float]
=== Check upgrade status

View File

@ -30,9 +30,6 @@ import java.io.IOException;
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
* <tt>null</tt> for the indices.
* <p/>
* <p>{@link #waitForMerge(boolean)} allows to control if the call will block until the optimize completes and
* defaults to <tt>true</tt>.
* <p/>
* <p>{@link #maxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
* cause the optimize process to optimize down to half the configured number of segments.
*
@ -43,14 +40,12 @@ import java.io.IOException;
public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest> {
public static final class Defaults {
public static final boolean WAIT_FOR_MERGE = true;
public static final int MAX_NUM_SEGMENTS = -1;
public static final boolean ONLY_EXPUNGE_DELETES = false;
public static final boolean FLUSH = true;
public static final boolean UPGRADE = false;
}
private boolean waitForMerge = Defaults.WAIT_FOR_MERGE;
private int maxNumSegments = Defaults.MAX_NUM_SEGMENTS;
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;
@ -69,21 +64,6 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
}
/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public boolean waitForMerge() {
return waitForMerge;
}
/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public OptimizeRequest waitForMerge(boolean waitForMerge) {
this.waitForMerge = waitForMerge;
return this;
}
/**
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
* process to optimize down to half the configured number of segments.
@ -151,7 +131,6 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForMerge = in.readBoolean();
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
@ -160,7 +139,6 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(waitForMerge);
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
@ -170,8 +148,7 @@ public class OptimizeRequest extends BroadcastOperationRequest<OptimizeRequest>
@Override
public String toString() {
return "OptimizeRequest{" +
"waitForMerge=" + waitForMerge +
", maxNumSegments=" + maxNumSegments +
"maxNumSegments=" + maxNumSegments +
", onlyExpungeDeletes=" + onlyExpungeDeletes +
", flush=" + flush +
", upgrade=" + upgrade +

View File

@ -27,9 +27,6 @@ import org.elasticsearch.client.IndicesAdminClient;
* A request to optimize one or more indices. In order to optimize on all the indices, pass an empty array or
* <tt>null</tt> for the indices.
* <p/>
* <p>{@link #setWaitForMerge(boolean)} allows to control if the call will block until the optimize completes and
* defaults to <tt>true</tt>.
* <p/>
* <p>{@link #setMaxNumSegments(int)} allows to control the number of segments to optimize down to. By default, will
* cause the optimize process to optimize down to half the configured number of segments.
*/
@ -39,14 +36,6 @@ public class OptimizeRequestBuilder extends BroadcastOperationRequestBuilder<Opt
super(indicesClient, new OptimizeRequest());
}
/**
* Should the call block until the optimize completes. Defaults to <tt>true</tt>.
*/
public OptimizeRequestBuilder setWaitForMerge(boolean waitForMerge) {
request.waitForMerge(waitForMerge);
return this;
}
/**
* Will optimize the index down to <= maxNumSegments. By default, will cause the optimize
* process to optimize down to half the configured number of segments.

View File

@ -232,12 +232,12 @@ public abstract class Engine implements Closeable {
/**
* Optimizes to 1 segment
*/
abstract void forceMerge(boolean flush, boolean waitForMerge);
abstract void forceMerge(boolean flush);
/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException;
/**
* Snapshots the index and returns a handle to it. Will always try and "commit" the

View File

@ -817,12 +817,12 @@ public class InternalEngine extends Engine {
}
@Override
public void forceMerge(boolean flush, boolean waitForMerge) {
forceMerge(flush, waitForMerge, 1, false, false);
public void forceMerge(boolean flush) {
forceMerge(flush, 1, false, false);
}
@Override
public void forceMerge(final boolean flush, boolean waitForMerge, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, final boolean upgrade) throws EngineException {
if (optimizeMutex.compareAndSet(false, true)) {
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
@ -855,23 +855,7 @@ public class InternalEngine extends Engine {
}
}
// wait for the merges outside of the read lock
if (waitForMerge) {
waitForMerges(flush, upgrade);
} else if (flush || upgrade) {
// we only need to monitor merges for async calls if we are going to flush
engineConfig.getThreadPool().executor(ThreadPool.Names.OPTIMIZE).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.error("Exception while waiting for merges asynchronously after optimize", t);
}
@Override
protected void doRun() throws Exception {
waitForMerges(flush, upgrade);
}
});
}
waitForMerges(flush, upgrade);
}

View File

@ -625,8 +625,7 @@ public class IndexShard extends AbstractIndexShardComponent {
if (logger.isTraceEnabled()) {
logger.trace("optimize with {}", optimize);
}
engine().forceMerge(optimize.flush(), optimize.waitForMerge(), optimize
.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(), optimize.upgrade());
}
public SnapshotIndexCommit snapshotIndex() throws EngineException {

View File

@ -55,7 +55,6 @@ public class RestOptimizeAction extends BaseRestHandler {
OptimizeRequest optimizeRequest = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeRequest.listenerThreaded(false);
optimizeRequest.indicesOptions(IndicesOptions.fromRequest(request, optimizeRequest.indicesOptions()));
optimizeRequest.waitForMerge(request.paramAsBoolean("wait_for_merge", optimizeRequest.waitForMerge()));
optimizeRequest.maxNumSegments(request.paramAsInt("max_num_segments", optimizeRequest.maxNumSegments()));
optimizeRequest.onlyExpungeDeletes(request.paramAsBoolean("only_expunge_deletes", optimizeRequest.onlyExpungeDeletes()));
optimizeRequest.flush(request.paramAsBoolean("flush", optimizeRequest.flush()));

View File

@ -90,7 +90,6 @@ public class RestUpgradeAction extends BaseRestHandler {
void handlePost(RestRequest request, RestChannel channel, Client client) {
OptimizeRequest optimizeReq = new OptimizeRequest(Strings.splitStringByCommaToArray(request.param("index")));
optimizeReq.waitForMerge(request.paramAsBoolean("wait_for_completion", false));
optimizeReq.flush(true);
optimizeReq.upgrade(true);
optimizeReq.maxNumSegments(Integer.MAX_VALUE); // we just want to upgrade the segments, not actually optimize to a single segment

View File

@ -367,7 +367,7 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest {
}
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareOptimize("test").setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();
logger.info("--> disabling allocation while the cluster is shut down");

View File

@ -411,30 +411,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public void testSegmentsWithMergeFlag() throws Exception {
final Store store = createStore();
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final AtomicReference<CountDownLatch> waitTillMerge = new AtomicReference<>();
final AtomicReference<CountDownLatch> waitForMerge = new AtomicReference<>();
mergeSchedulerProvider.addListener(new MergeSchedulerProvider.Listener() {
@Override
public void beforeMerge(OnGoingMerge merge) {
try {
if (waitTillMerge.get() != null) {
waitTillMerge.get().countDown();
}
if (waitForMerge.get() != null) {
waitForMerge.get().await();
}
} catch (InterruptedException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
@Override
public void afterMerge(OnGoingMerge merge) {
}
});
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -456,24 +435,13 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
for (Segment segment : segments) {
assertThat(segment.getMergeId(), nullValue());
}
waitTillMerge.set(new CountDownLatch(1));
waitForMerge.set(new CountDownLatch(1));
engine.forceMerge(false, false);
waitTillMerge.get().await();
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), notNullValue());
}
waitForMerge.get().countDown();
index = new Engine.Index(null, newUid("4"), doc);
engine.index(index);
engine.flush();
final long gen1 = store.readLastCommittedSegmentsInfo().getGeneration();
// now, optimize and wait for merges, see that we have no merge flag
engine.forceMerge(true, true);
engine.forceMerge(true);
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
@ -483,25 +451,14 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
final boolean flush = randomBoolean();
final long gen2 = store.readLastCommittedSegmentsInfo().getGeneration();
engine.forceMerge(flush, false);
waitTillMerge.get().await();
engine.forceMerge(flush);
for (Segment segment : engine.segments(false)) {
assertThat(segment.getMergeId(), nullValue());
}
waitForMerge.get().countDown();
if (flush) {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
try {
// we should have had just 1 merge, so last generation should be exact
return store.readLastCommittedSegmentsInfo().getLastGeneration() == gen2;
} catch (IOException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
});
// we should have had just 1 merge, so last generation should be exact
assertEquals(gen2 + 1, store.readLastCommittedSegmentsInfo().getLastGeneration());
}
engine.close();

View File

@ -215,7 +215,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
logger.info("test: optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
logger.info("test: optimize done");
// Record current throttling so far
@ -253,7 +253,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
// Wait for merges to finish
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
flush();
logger.info("test: test done");
@ -369,7 +369,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
)
.get();
// Make sure we log the change:
assertTrue(mockAppender.sawUpdateMaxThreadCount);

View File

@ -380,7 +380,7 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
logger.info("test: now optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").get();
flush();
logger.info("test: test done");
}
@ -517,7 +517,7 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test1", "type2", Integer.toString(i)).setSource("field", "value").execute().actionGet();
client().admin().indices().prepareFlush().execute().actionGet();
}
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats()
.setMerge(true)
.execute().actionGet();
@ -544,7 +544,7 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
assertThat(stats.getTotal().getSegments().getVersionMapMemoryInBytes(), greaterThan(0l));
client().admin().indices().prepareFlush().get();
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareOptimize().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setSegments(true).get();
assertThat(stats.getTotal().getSegments(), notNullValue());

View File

@ -157,7 +157,7 @@ public class UpgradeTest extends ElasticsearchBackwardsCompatIntegrationTest {
logger.info("--> Single index upgrade complete");
logger.info("--> Running upgrade on the rest of the indexes");
runUpgrade(httpClient, null, "wait_for_completion", "true");
runUpgrade(httpClient, null);
logSegmentsState();
logger.info("--> Full upgrade complete");
assertUpgraded(httpClient, null);

View File

@ -1929,7 +1929,7 @@ public class SimpleChildQuerySearchTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test", "child", "c1").setParent("p1").setSource("c_field", "blue").get();
client().prepareIndex("test", "child", "c2").setParent("p1").setSource("c_field", "red").get();
client().prepareIndex("test", "child", "c3").setParent("p2").setSource("c_field", "red").get();
client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).get();
client().admin().indices().prepareOptimize("test").setFlush(true).get();
client().prepareIndex("test", "parent", "p3").setSource("p_field", "p_value3").get();
client().prepareIndex("test", "parent", "p4").setSource("p_field", "p_value4").get();
client().prepareIndex("test", "child", "c4").setParent("p3").setSource("c_field", "green").get();

View File

@ -1414,7 +1414,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
}
indexRandom(true, builders);
flushAndRefresh();
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setMaxNumSegments(1).get());
CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));

View File

@ -188,7 +188,7 @@ public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCo
}
indexRandom(true, builders);
flushAndRefresh();
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
assertNoFailures(client().admin().indices().prepareOptimize("test").setFlush(true).setMaxNumSegments(1).get());
CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));