Reject illegal flush parameters (#40213)

This change rejects an illegal combination of flush parameters where
force is true, but wait_if_ongoing is false. This combination is trappy
and should be forbidden.

Closes #36342
This commit is contained in:
Nhat Nguyen 2019-03-19 21:23:25 -04:00
parent c4960ad736
commit 2756a3936b
7 changed files with 70 additions and 8 deletions

View File

@ -23,10 +23,8 @@ POST twitter/_flush
The flush API accepts the following request parameters:
[horizontal]
`wait_if_ongoing`:: If set to `true` the flush operation will block until the
flush can be executed if another flush operation is already executing.
The default is `false` and will cause an exception to be thrown on
the shard level if another flush operation is already running.
`wait_if_ongoing`:: If set to `true`(the default value) the flush operation will
block until the flush can be executed if another flush operation is already executing.
`force`:: Whether a flush should be forced even if it is not necessarily needed i.e.
if no changes will be committed to the index. This is useful if transaction log IDs

View File

@ -52,3 +52,32 @@
# periodic flush is async
- gte: { indices.test.primaries.flush.periodic: 0 }
- gte: { indices.test.primaries.flush.total: 1 }
---
"Flush parameters validation":
- skip:
version: " - 7.9.99"
reason: flush parameters validation is introduced in 8.0
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
catch: /action_request_validation_exception.+ wait_if_ongoing must be true for a force flush/
indices.flush:
index: test
force: true
wait_if_ongoing: false
- do:
indices.stats: { index: test }
- match: { indices.test.primaries.flush.total: 0 }
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats: { index: test }
- match: { indices.test.primaries.flush.total: 1 }

View File

@ -19,12 +19,15 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A flush request to flush one or more indices. The flush process of an index basically frees memory from the index
* by flushing data to the index storage and clearing the internal transaction log. By default, Elasticsearch uses
@ -82,6 +85,15 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
return this;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationError = super.validate();
if (force && waitIfOngoing == false) {
validationError = addValidationError("wait_if_ongoing must be true for a force flush", validationError);
}
return validationError;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -1687,6 +1687,11 @@ public class InternalEngine extends Engine {
@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
throw new IllegalArgumentException(
"wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing);
}
final byte[] newCommitId;
/*
* Unfortunately the lock order is important here. We have to acquire the readlock first otherwise

View File

@ -4867,7 +4867,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (rarely()) {
engine.flush(randomBoolean(), randomBoolean());
engine.flush(randomBoolean(), true);
}
}
engine.flush(false, randomBoolean());
@ -4893,7 +4893,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int docId = 0; docId < numDocs; docId++) {
index(engine, docId);
if (frequently()) {
engine.flush(randomBoolean(), randomBoolean());
engine.flush(randomBoolean(), true);
}
}
engine.flush(false, randomBoolean());

View File

@ -131,8 +131,8 @@ public class ReadOnlyEngineTests extends EngineTestCase {
engine.syncTranslog();
engine.flushAndClose();
readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null , null, true, Function.identity());
Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), randomBoolean());
assertEquals(flush, readOnlyEngine.flush(randomBoolean(), randomBoolean()));
Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), true);
assertEquals(flush, readOnlyEngine.flush(randomBoolean(), true));
} finally {
IOUtils.close(readOnlyEngine);
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -59,6 +60,8 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ -104,6 +107,21 @@ public class FlushIT extends ESIntegTestCase {
}
}
public void testRejectIllegalFlushParameters() {
createIndex("test");
int numDocs = randomIntBetween(0, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "_doc").setSource("{}", XContentType.JSON).get();
}
assertThat(expectThrows(ValidationException.class,
() -> client().admin().indices().flush(new FlushRequest().force(true).waitIfOngoing(false)).actionGet()).getMessage(),
containsString("wait_if_ongoing must be true for a force flush"));
assertThat(client().admin().indices().flush(new FlushRequest().force(true).waitIfOngoing(true)).actionGet()
.getShardFailures(), emptyArray());
assertThat(client().admin().indices().flush(new FlushRequest().force(false).waitIfOngoing(randomBoolean()))
.actionGet().getShardFailures(), emptyArray());
}
public void testSyncedFlush() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();