Serialize and expose timeout of acknowledged requests in REST layer (#26189)

Due to the weird way of structuring the serialization code in AcknowledgedRequest, many request types forgot to properly serialize the request timeout, for example "index deletion", "index rollover", "index shrink", "putting pipeline", and other requests. This means that if those requests were not directly sent to the master node, the acknowledgement timeout information would be lost (and the default used instead).
Some requests also don't properly expose the timeout mechanism in the REST layer, such as put / delete stored script. This commit fixes all that.
This commit is contained in:
Yannick Welsch 2017-08-16 07:43:05 +08:00 committed by GitHub
parent 292dd8f992
commit 01f6851691
25 changed files with 36 additions and 202 deletions

View File

@ -81,13 +81,11 @@ public class DeleteRepositoryRequest extends AcknowledgedRequest<DeleteRepositor
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
name = in.readString();
readTimeout(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
writeTimeout(out);
}
}

View File

@ -220,7 +220,6 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
name = in.readString();
type = in.readString();
settings = readSettingsFromStream(in);
readTimeout(in);
verify = in.readBoolean();
}
@ -230,7 +229,6 @@ public class PutRepositoryRequest extends AcknowledgedRequest<PutRepositoryReque
out.writeString(name);
out.writeString(type);
writeSettingsToStream(settings, out);
writeTimeout(out);
out.writeBoolean(verify);
}
}

View File

@ -81,13 +81,11 @@ public class VerifyRepositoryRequest extends AcknowledgedRequest<VerifyRepositor
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
name = in.readString();
readTimeout(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
writeTimeout(out);
}
}

View File

@ -128,7 +128,6 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
dryRun = in.readBoolean();
explain = in.readBoolean();
retryFailed = in.readBoolean();
readTimeout(in);
}
@Override
@ -138,7 +137,6 @@ public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteReq
out.writeBoolean(dryRun);
out.writeBoolean(explain);
out.writeBoolean(retryFailed);
writeTimeout(out);
}
@Override

View File

@ -148,7 +148,6 @@ public class ClusterUpdateSettingsRequest extends AcknowledgedRequest<ClusterUpd
super.readFrom(in);
transientSettings = readSettingsFromStream(in);
persistentSettings = readSettingsFromStream(in);
readTimeout(in);
}
@Override
@ -156,6 +155,5 @@ public class ClusterUpdateSettingsRequest extends AcknowledgedRequest<ClusterUpd
super.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
writeTimeout(out);
}
}

View File

@ -467,14 +467,12 @@ public class IndicesAliasesRequest extends AcknowledgedRequest<IndicesAliasesReq
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
allAliasActions = in.readList(AliasActions::new);
readTimeout(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(allAliasActions);
writeTimeout(out);
}
public IndicesOptions indicesOptions() {

View File

@ -105,7 +105,6 @@ public class CloseIndexRequest extends AcknowledgedRequest<CloseIndexRequest> im
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
readTimeout(in);
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@ -113,7 +112,6 @@ public class CloseIndexRequest extends AcknowledgedRequest<CloseIndexRequest> im
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
writeTimeout(out);
indicesOptions.writeIndicesOptions(out);
}
}

View File

@ -487,7 +487,6 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
cause = in.readString();
index = in.readString();
settings = readSettingsFromStream(in);
readTimeout(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
final String type = in.readString();
@ -518,7 +517,6 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
out.writeString(cause);
out.writeString(index);
writeSettingsToStream(settings, out);
writeTimeout(out);
out.writeVInt(mappings.size());
for (Map.Entry<String, String> entry : mappings.entrySet()) {
out.writeString(entry.getKey());

View File

@ -20,34 +20,15 @@
package org.elasticsearch.action.admin.indices.delete;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
public class DeleteIndexRequestBuilder extends MasterNodeOperationRequestBuilder<DeleteIndexRequest, DeleteIndexResponse, DeleteIndexRequestBuilder> {
public class DeleteIndexRequestBuilder extends AcknowledgedRequestBuilder<DeleteIndexRequest, DeleteIndexResponse, DeleteIndexRequestBuilder> {
public DeleteIndexRequestBuilder(ElasticsearchClient client, DeleteIndexAction action, String... indices) {
super(client, action, new DeleteIndexRequest(indices));
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* to <tt>60s</tt>.
*/
public DeleteIndexRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* Timeout to wait for the index deletion to be acknowledged by current cluster nodes. Defaults
* to <tt>10s</tt>.
*/
public DeleteIndexRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
/**
* Specifies what type of requested indices to ignore and wildcard indices expressions.
* <p>

View File

@ -313,7 +313,6 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
source = XContentHelper.convertToJson(new BytesArray(source), false, false, XContentFactory.xContentType(source));
}
updateAllTypes = in.readBoolean();
readTimeout(in);
concreteIndex = in.readOptionalWriteable(Index::new);
}
@ -325,7 +324,6 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
out.writeOptionalString(type);
out.writeString(source);
out.writeBoolean(updateAllTypes);
writeTimeout(out);
out.writeOptionalWriteable(concreteIndex);
}
}

View File

@ -105,7 +105,6 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
readTimeout(in);
indicesOptions = IndicesOptions.readIndicesOptions(in);
}
@ -113,7 +112,6 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
writeTimeout(out);
indicesOptions.writeIndicesOptions(out);
}
}

View File

@ -166,7 +166,6 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
settings = readSettingsFromStream(in);
readTimeout(in);
preserveExisting = in.readBoolean();
}
@ -176,7 +175,6 @@ public class UpdateSettingsRequest extends AcknowledgedRequest<UpdateSettingsReq
out.writeStringArrayNullable(indices);
indicesOptions.writeIndicesOptions(out);
writeSettingsToStream(settings, out);
writeTimeout(out);
out.writeBoolean(preserveExisting);
}
}

View File

@ -86,7 +86,6 @@ public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsR
String oldestLuceneSegment = in.readString();
versions.put(index, new Tuple<>(upgradeVersion, oldestLuceneSegment));
}
readTimeout(in);
}
@Override
@ -98,6 +97,5 @@ public class UpgradeSettingsRequest extends AcknowledgedRequest<UpgradeSettingsR
Version.writeVersion(entry.getValue().v1(), out);
out.writeString(entry.getValue().v2());
}
writeTimeout(out);
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.support.master;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -70,22 +71,20 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
return timeout;
}
/**
* Reads the timeout value
*/
protected void readTimeout(StreamInput in) throws IOException {
timeout = new TimeValue(in);
}
/**
* writes the timeout value
*/
protected void writeTimeout(StreamOutput out) throws IOException {
timeout.writeTo(out);
}
@Override
public TimeValue ackTimeout() {
return timeout;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
timeout = new TimeValue(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
timeout.writeTo(out);
}
}

View File

@ -47,6 +47,9 @@ public class RestDeleteStoredScriptAction extends BaseRestHandler {
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String id = request.param("id");
DeleteStoredScriptRequest deleteStoredScriptRequest = new DeleteStoredScriptRequest(id);
deleteStoredScriptRequest.timeout(request.paramAsTime("timeout", deleteStoredScriptRequest.timeout()));
deleteStoredScriptRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteStoredScriptRequest.masterNodeTimeout()));
return channel -> client.admin().cluster().deleteStoredScript(deleteStoredScriptRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -58,6 +58,8 @@ public class RestPutStoredScriptAction extends BaseRestHandler {
StoredScriptSource source = StoredScriptSource.parse(content, xContentType);
PutStoredScriptRequest putRequest = new PutStoredScriptRequest(id, context, content, request.getXContentType(), source);
putRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putRequest.masterNodeTimeout()));
putRequest.timeout(request.paramAsTime("timeout", putRequest.timeout()));
return channel -> client.admin().cluster().putStoredScript(putRequest, new AcknowledgedRestListener<>(channel));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.admin.cluster.storedscripts;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,7 +27,6 @@ import org.elasticsearch.script.StoredScriptSource;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
public class PutStoredScriptRequestTests extends ESTestCase {
@ -50,25 +48,4 @@ public class PutStoredScriptRequestTests extends ESTestCase {
}
}
}
public void testSerializationBwc() throws IOException {
final byte[] rawStreamBytes = Base64.getDecoder().decode("ADwDCG11c3RhY2hlAQZzY3JpcHQCe30A");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(rawStreamBytes)) {
in.setVersion(version);
PutStoredScriptRequest serialized = new PutStoredScriptRequest();
serialized.readFrom(in);
assertEquals(XContentType.JSON, serialized.xContentType());
assertEquals("script", serialized.id());
assertEquals(new BytesArray("{}"), serialized.content());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
serialized.writeTo(out);
out.flush();
assertArrayEquals(rawStreamBytes, out.bytes().toBytesRef().bytes);
}
}
}
}

View File

@ -19,15 +19,12 @@
package org.elasticsearch.action.admin.indices.analyze;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Base64;
public class AnalyzeRequestTests extends ESTestCase {
@ -92,20 +89,4 @@ public class AnalyzeRequestTests extends ESTestCase {
}
}
}
public void testSerializationBwc() throws IOException {
// AnalyzeRequest serializedRequest = new AnalyzeRequest("foo");
// serializedRequest.text("text");
// serializedRequest.normalizer("normalizer");
// Using Version.V_6_0_0_beta1
final byte[] data = Base64.getDecoder().decode("AAABA2ZvbwEEdGV4dAAAAAAAAAABCm5vcm1hbGl6ZXI=");
final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_0_0, Version.V_5_4_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
AnalyzeRequest request = new AnalyzeRequest();
request.readFrom(in);
assertEquals("foo", request.index());
assertNull("normalizer support after 6.0.0", request.normalizer());
}
}
}

View File

@ -19,8 +19,6 @@
package org.elasticsearch.action.admin.indices.create;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentType;
@ -28,7 +26,6 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Base64;
public class CreateIndexRequestTests extends ESTestCase {
@ -48,25 +45,4 @@ public class CreateIndexRequestTests extends ESTestCase {
}
}
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("ADwDAANmb28APAMBB215X3R5cGULeyJ0eXBlIjp7fX0AAAD////+AA==");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2, Version.V_5_1_1, Version.V_5_1_2,
Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
CreateIndexRequest serialized = new CreateIndexRequest();
serialized.readFrom(in);
assertEquals("foo", serialized.index());
BytesReference bytesReference = JsonXContent.contentBuilder().startObject().startObject("type").endObject().endObject().bytes();
assertEquals(bytesReference.utf8ToString(), serialized.mappings().get("my_type"));
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
serialized.writeTo(out);
out.flush();
assertArrayEquals(data, out.bytes().toBytesRef().bytes);
}
}
}
}

View File

@ -31,7 +31,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Base64;
public class PutMappingRequestTests extends ESTestCase {
@ -95,17 +94,4 @@ public class PutMappingRequestTests extends ESTestCase {
}
}
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("ADwDAQNmb28MAA8tLS0KZm9vOiAiYmFyIgoAPAMAAAA=");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
PutMappingRequest request = new PutMappingRequest();
request.readFrom(in);
String mapping = YamlXContent.contentBuilder().startObject().field("foo", "bar").endObject().string();
assertEquals(XContentHelper.convertToJson(new BytesArray(mapping), false, XContentType.YAML), request.source());
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,7 +27,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class PutPipelineRequestTests extends ESTestCase {
@ -45,23 +43,4 @@ public class PutPipelineRequestTests extends ESTestCase {
assertEquals(XContentType.JSON, serialized.getXContentType());
assertEquals("{}", serialized.getSource().utf8ToString());
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("ADwDATECe30=");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
PutPipelineRequest request = new PutPipelineRequest();
request.readFrom(in);
assertEquals(XContentType.JSON, request.getXContentType());
assertEquals("{}", request.getSource().utf8ToString());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
request.writeTo(out);
assertArrayEquals(data, out.bytes().toBytesRef().bytes);
}
}
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.ingest;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -35,7 +34,6 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
public class PipelineConfigurationTests extends ESTestCase {
@ -52,24 +50,6 @@ public class PipelineConfigurationTests extends ESTestCase {
assertEquals("{}", serialized.getConfig().utf8ToString());
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("ATECe30AAAA=");
try (StreamInput in = StreamInput.wrap(data)) {
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
in.setVersion(version);
PipelineConfiguration configuration = PipelineConfiguration.readFrom(in);
assertEquals(XContentType.JSON, configuration.getXContentType());
assertEquals("{}", configuration.getConfig().utf8ToString());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
configuration.writeTo(out);
assertArrayEquals(data, out.bytes().toBytesRef().bytes);
}
}
}
public void testParser() throws IOException {
ContextParser<Void, PipelineConfiguration> parser = PipelineConfiguration.getParser();
XContentType xContentType = randomFrom(XContentType.values());

View File

@ -35,7 +35,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
@ -54,7 +53,6 @@ import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -250,26 +248,6 @@ public class PercolateQueryBuilderTests extends AbstractQueryTestCase<PercolateQ
assertThat(result.clauses().get(1).getOccur(), equalTo(BooleanClause.Occur.MUST_NOT));
}
public void testSerializationBwc() throws IOException {
final byte[] data = Base64.getDecoder().decode("P4AAAAAFZmllbGQEdHlwZQAAAAAAAA57ImZvbyI6ImJhciJ9AAAAAA==");
final Version version = randomFrom(Version.V_5_0_0, Version.V_5_0_1, Version.V_5_0_2,
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(in);
assertEquals("type", queryBuilder.getDocumentType());
assertEquals("field", queryBuilder.getField());
assertEquals("{\"foo\":\"bar\"}", queryBuilder.getDocument().utf8ToString());
assertEquals(XContentType.JSON, queryBuilder.getXContentType());
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.setVersion(version);
queryBuilder.writeTo(out);
assertArrayEquals(data, out.bytes().toBytesRef().bytes);
}
}
}
private static BytesReference randomSource() {
try {
XContentBuilder xContent = XContentFactory.jsonBuilder();

View File

@ -18,6 +18,14 @@
}
},
"params" : {
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"master_timeout": {
"type" : "time",
"description" : "Specify timeout for connection to master"
}
}
},
"body": null

View File

@ -18,6 +18,14 @@
}
},
"params" : {
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"master_timeout": {
"type" : "time",
"description" : "Specify timeout for connection to master"
},
"context": {
"type" : "string",
"description" : "Context name to compile script against"