diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
index 34441d4160f..4254b132b57 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ClusterClient.java
@@ -23,10 +23,11 @@ import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
-import org.elasticsearch.action.ingest.PutPipelineRequest;
-import org.elasticsearch.action.ingest.PutPipelineResponse;
+import org.elasticsearch.action.ingest.DeletePipelineRequest;
+import org.elasticsearch.action.ingest.WritePipelineResponse;
import java.io.IOException;
@@ -74,9 +75,9 @@ public final class ClusterClient {
* See
* Put Pipeline API on elastic.co
*/
- public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
+ public WritePipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline,
- PutPipelineResponse::fromXContent, emptySet(), headers);
+ WritePipelineResponse::fromXContent, emptySet(), headers);
}
/**
@@ -85,9 +86,9 @@ public final class ClusterClient {
* See
* Put Pipeline API on elastic.co
*/
- public void putPipelineAsync(PutPipelineRequest request, ActionListener listener, Header... headers) {
+ public void putPipelineAsync(PutPipelineRequest request, ActionListener listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
- PutPipelineResponse::fromXContent, listener, emptySet(), headers);
+ WritePipelineResponse::fromXContent, listener, emptySet(), headers);
}
/**
@@ -111,4 +112,28 @@ public final class ClusterClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
+
+ /**
+ * Delete an existing pipeline
+ *
+ * See
+ *
+ * Delete Pipeline API on elastic.co
+ */
+ public WritePipelineResponse deletePipeline(DeletePipelineRequest request, Header... headers) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::deletePipeline,
+ WritePipelineResponse::fromXContent, emptySet(), headers);
+ }
+
+ /**
+ * Asynchronously delete an existing pipeline
+ *
+ * See
+ *
+ * Delete Pipeline API on elastic.co
+ */
+ public void deletePipelineAsync(DeletePipelineRequest request, ActionListener listener, Header... headers) {
+ restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline,
+ WritePipelineResponse::fromXContent, listener, emptySet(), headers);
+ }
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
index e137d3d2f5c..6c8bb845259 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java
@@ -60,6 +60,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
@@ -648,6 +649,20 @@ final class RequestConverters {
return request;
}
+ static Request deletePipeline(DeletePipelineRequest deletePipelineRequest) {
+ String endpoint = new EndpointBuilder()
+ .addPathPartAsIs("_ingest/pipeline")
+ .addPathPart(deletePipelineRequest.getId())
+ .build();
+ Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
+
+ Params parameters = new Params(request);
+ parameters.withTimeout(deletePipelineRequest.timeout());
+ parameters.withMasterTimeout(deletePipelineRequest.masterNodeTimeout());
+
+ return request;
+ }
+
static Request listTasks(ListTasksRequest listTaskRequest) {
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
index caab4c282f4..42db51e81b7 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java
@@ -25,7 +25,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
-import org.elasticsearch.action.ingest.PutPipelineResponse;
+import org.elasticsearch.action.ingest.DeletePipelineRequest;
+import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@@ -121,7 +122,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());
- PutPipelineResponse putPipelineResponse =
+ WritePipelineResponse putPipelineResponse =
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}
@@ -148,4 +149,17 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
}
+
+ public void testDeletePipeline() throws IOException {
+ String id = "some_pipeline_id";
+ {
+ createPipeline(id);
+ }
+
+ DeletePipelineRequest request = new DeletePipelineRequest(id);
+
+ WritePipelineResponse response =
+ execute(request, highLevelClient().cluster()::deletePipeline, highLevelClient().cluster()::deletePipelineAsync);
+ assertTrue(response.isAcknowledged());
+ }
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
index bf69aa76636..f61d79b8d42 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java
@@ -63,6 +63,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
@@ -1465,6 +1466,21 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters());
}
+ public void testDeletePipeline() {
+ String pipelineId = "some_pipeline_id";
+ Map expectedParams = new HashMap<>();
+ DeletePipelineRequest request = new DeletePipelineRequest(pipelineId);
+ setRandomMasterTimeout(request, expectedParams);
+ setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
+ Request expectedRequest = RequestConverters.deletePipeline(request);
+ StringJoiner endpoint = new StringJoiner("/", "/", "");
+ endpoint.add("_ingest/pipeline");
+ endpoint.add(pipelineId);
+ assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
+ assertEquals(HttpDelete.METHOD_NAME, expectedRequest.getMethod());
+ assertEquals(expectedParams, expectedRequest.getParameters());
+ }
+
public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
index 07785ecc03d..0da577f17e8 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java
@@ -26,7 +26,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
-import org.elasticsearch.action.ingest.PutPipelineResponse;
+import org.elasticsearch.action.ingest.DeletePipelineRequest;
+import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -212,7 +213,7 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
// end::put-pipeline-request-masterTimeout
// tag::put-pipeline-execute
- PutPipelineResponse response = client.cluster().putPipeline(request); // <1>
+ WritePipelineResponse response = client.cluster().putPipeline(request); // <1>
// end::put-pipeline-execute
// tag::put-pipeline-response
@@ -236,10 +237,10 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
);
// tag::put-pipeline-execute-listener
- ActionListener listener =
- new ActionListener() {
+ ActionListener listener =
+ new ActionListener() {
@Override
- public void onResponse(PutPipelineResponse response) {
+ public void onResponse(WritePipelineResponse response) {
// <1>
}
@@ -331,4 +332,74 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
+
+ public void testDeletePipeline() throws IOException {
+ RestHighLevelClient client = highLevelClient();
+
+ {
+ createPipeline("my-pipeline-id");
+ }
+
+ {
+ // tag::delete-pipeline-request
+ DeletePipelineRequest request = new DeletePipelineRequest("my-pipeline-id"); // <1>
+ // end::delete-pipeline-request
+
+ // tag::delete-pipeline-request-timeout
+ request.timeout(TimeValue.timeValueMinutes(2)); // <1>
+ request.timeout("2m"); // <2>
+ // end::delete-pipeline-request-timeout
+
+ // tag::delete-pipeline-request-masterTimeout
+ request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
+ request.masterNodeTimeout("1m"); // <2>
+ // end::delete-pipeline-request-masterTimeout
+
+ // tag::delete-pipeline-execute
+ WritePipelineResponse response = client.cluster().deletePipeline(request); // <1>
+ // end::delete-pipeline-execute
+
+ // tag::delete-pipeline-response
+ boolean acknowledged = response.isAcknowledged(); // <1>
+ // end::delete-pipeline-response
+ assertTrue(acknowledged);
+ }
+ }
+
+ public void testDeletePipelineAsync() throws Exception {
+ RestHighLevelClient client = highLevelClient();
+
+ {
+ createPipeline("my-pipeline-id");
+ }
+
+ {
+ DeletePipelineRequest request = new DeletePipelineRequest("my-pipeline-id");
+
+ // tag::delete-pipeline-execute-listener
+ ActionListener listener =
+ new ActionListener() {
+ @Override
+ public void onResponse(WritePipelineResponse response) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::delete-pipeline-execute-listener
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::delete-pipeline-execute-async
+ client.cluster().deletePipelineAsync(request, listener); // <1>
+ // end::delete-pipeline-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+ }
}
diff --git a/docs/java-rest/high-level/cluster/delete_pipeline.asciidoc b/docs/java-rest/high-level/cluster/delete_pipeline.asciidoc
new file mode 100644
index 00000000000..f809f831f78
--- /dev/null
+++ b/docs/java-rest/high-level/cluster/delete_pipeline.asciidoc
@@ -0,0 +1,80 @@
+[[java-rest-high-cluster-delete-pipeline]]
+=== Delete Pipeline API
+
+[[java-rest-high-cluster-delete-pipeline-request]]
+==== Delete Pipeline Request
+
+A `DeletePipelineRequest` requires a pipeline `id` to delete.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request]
+--------------------------------------------------
+<1> The pipeline id to delete
+
+==== Optional arguments
+The following arguments can optionally be provided:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request-timeout]
+--------------------------------------------------
+<1> Timeout to wait for the all the nodes to acknowledge the pipeline deletion as a `TimeValue`
+<2> Timeout to wait for the all the nodes to acknowledge the pipeline deletion as a `String`
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request-masterTimeout]
+--------------------------------------------------
+<1> Timeout to connect to the master node as a `TimeValue`
+<2> Timeout to connect to the master node as a `String`
+
+[[java-rest-high-cluster-delete-pipeline-sync]]
+==== Synchronous Execution
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute]
+--------------------------------------------------
+<1> Execute the request and get back the response in a `WritePipelineResponse` object.
+
+[[java-rest-high-cluster-delete-pipeline-async]]
+==== Asynchronous Execution
+
+The asynchronous execution of a delete pipeline request requires both the `DeletePipelineRequest`
+instance and an `ActionListener` instance to be passed to the asynchronous
+method:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute-async]
+--------------------------------------------------
+<1> The `DeletePipelineRequest` to execute and the `ActionListener` to use when
+the execution completes
+
+The asynchronous method does not block and returns immediately. Once it is
+completed the `ActionListener` is called back using the `onResponse` method
+if the execution successfully completed or using the `onFailure` method if
+it failed.
+
+A typical listener for `WritePipelineResponse` looks like:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute-listener]
+--------------------------------------------------
+<1> Called when the execution is successfully completed. The response is
+provided as an argument
+<2> Called in case of failure. The raised exception is provided as an argument
+
+[[java-rest-high-cluster-delete-pipeline-response]]
+==== Delete Pipeline Response
+
+The returned `WritePipelineResponse` allows to retrieve information about the executed
+ operation as follows:
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-response]
+--------------------------------------------------
+<1> Indicates whether all of the nodes have acknowledged the request
diff --git a/docs/java-rest/high-level/cluster/put_pipeline.asciidoc b/docs/java-rest/high-level/cluster/put_pipeline.asciidoc
index d50a6741cc0..942b75b74cd 100644
--- a/docs/java-rest/high-level/cluster/put_pipeline.asciidoc
+++ b/docs/java-rest/high-level/cluster/put_pipeline.asciidoc
@@ -22,8 +22,8 @@ The following arguments can optionally be provided:
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-timeout]
--------------------------------------------------
-<1> Timeout to wait for the all the nodes to acknowledge the index creation as a `TimeValue`
-<2> Timeout to wait for the all the nodes to acknowledge the index creation as a `String`
+<1> Timeout to wait for the all the nodes to acknowledge the pipeline creation as a `TimeValue`
+<2> Timeout to wait for the all the nodes to acknowledge the pipeline creation as a `String`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
@@ -39,7 +39,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-reque
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute]
--------------------------------------------------
-<1> Execute the request and get back the response in a PutPipelineResponse object.
+<1> Execute the request and get back the response in a WritePipelineResponse object.
[[java-rest-high-cluster-put-pipeline-async]]
==== Asynchronous Execution
@@ -60,7 +60,7 @@ completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
-A typical listener for `PutPipelineResponse` looks like:
+A typical listener for `WritePipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
@@ -73,7 +73,7 @@ provided as an argument
[[java-rest-high-cluster-put-pipeline-response]]
==== Put Pipeline Response
-The returned `PutPipelineResponse` allows to retrieve information about the executed
+The returned `WritePipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]
diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc
index 412fa2aec42..34873a248b7 100644
--- a/docs/java-rest/high-level/supported-apis.asciidoc
+++ b/docs/java-rest/high-level/supported-apis.asciidoc
@@ -108,10 +108,12 @@ The Java High Level REST Client supports the following Cluster APIs:
* <>
* <>
* <>
+* <>
include::cluster/put_settings.asciidoc[]
include::cluster/put_pipeline.asciidoc[]
include::cluster/get_pipeline.asciidoc[]
+include::cluster/delete_pipeline.asciidoc[]
== Snapshot APIs
diff --git a/docs/reference/indices/create-index.asciidoc b/docs/reference/indices/create-index.asciidoc
index ade59153644..b013c747a32 100644
--- a/docs/reference/indices/create-index.asciidoc
+++ b/docs/reference/indices/create-index.asciidoc
@@ -23,7 +23,7 @@ There are several limitations to what you can name your index. The complete lis
- Cannot include `\`, `/`, `*`, `?`, `"`, `<`, `>`, `|`, ` ` (space character), `,`, `#`
- Indices prior to 7.0 could contain a colon (`:`), but that's been deprecated and won't be supported in 7.0+
- Cannot start with `-`, `_`, `+`
-- Cannot be `.` or ``..`
+- Cannot be `.` or `..`
- Cannot be longer than 255 bytes (note it is bytes, so multi-byte characters will count towards the 255 limit faster)
======================================================
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java
index 47a31f268a6..62e52a8726f 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/ESLoggingHandler.java
@@ -104,6 +104,10 @@ final class ESLoggingHandler extends LoggingHandler {
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
+ // now we decode the features
+ if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
+ in.readStringArray();
+ }
// now we can decode the action name
sb.append(", action: ").append(in.readString());
}
diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java
deleted file mode 100644
index 13960ca99ef..00000000000
--- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.ingest;
-
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.ConstructingObjectParser;
-import org.elasticsearch.common.xcontent.ToXContentObject;
-import org.elasticsearch.common.xcontent.XContentParser;
-
-import java.io.IOException;
-
-public class PutPipelineResponse extends AcknowledgedResponse implements ToXContentObject {
-
- private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
- "cluster_put_pipeline", true, args -> new PutPipelineResponse((boolean) args[0]));
-
- static {
- declareAcknowledgedField(PARSER);
- }
-
- public PutPipelineResponse() {
- }
-
- public PutPipelineResponse(boolean acknowledged) {
- super(acknowledged);
- }
-
- @Override
- public void readFrom(StreamInput in) throws IOException {
- super.readFrom(in);
- readAcknowledged(in);
- }
-
- @Override
- public void writeTo(StreamOutput out) throws IOException {
- super.writeTo(out);
- writeAcknowledged(out);
- }
-
- public static PutPipelineResponse fromXContent(XContentParser parser) {
- return PARSER.apply(parser, null);
- }
-}
diff --git a/server/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java
index ced6085e0ac..36301d6735a 100644
--- a/server/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java
+++ b/server/src/main/java/org/elasticsearch/action/ingest/WritePipelineResponse.java
@@ -22,10 +22,20 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
-public class WritePipelineResponse extends AcknowledgedResponse {
+public class WritePipelineResponse extends AcknowledgedResponse implements ToXContentObject {
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "write_pipeline_response", true, args -> new WritePipelineResponse((boolean) args[0]));
+
+ static {
+ declareAcknowledgedField(PARSER);
+ }
WritePipelineResponse() {
@@ -46,4 +56,8 @@ public class WritePipelineResponse extends AcknowledgedResponse {
super.writeTo(out);
writeAcknowledged(out);
}
+
+ public static WritePipelineResponse fromXContent(XContentParser parser) {
+ return PARSER.apply(parser, null);
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
index f6d3a87f10d..40904e9a824 100644
--- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
+++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java
@@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
public static final Setting CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
+ public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";
+
private static PluginsService newPluginService(final Settings settings, Collection> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@@ -130,8 +132,12 @@ public abstract class TransportClient extends AbstractClient {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
- final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
- + "." + "transport_client", true).build();
+ final Settings settings =
+ Settings.builder()
+ .put(defaultSettings)
+ .put(pluginsService.updatedSettings())
+ .put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
+ .build();
final List resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
index 2b991d1dc61..6bc555eae0b 100644
--- a/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -61,6 +62,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
@@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
- public interface Custom extends NamedDiffable, ToXContentFragment {
+ /**
+ * An interface that implementors use when a class requires a client to maybe have a feature.
+ */
+ public interface FeatureAware {
+
+ /**
+ * An optional feature that is required for the client to have.
+ *
+ * @return an empty optional if no feature is required otherwise a string representing the required feature
+ */
+ default Optional getRequiredFeature() {
+ return Optional.empty();
+ }
+
+ /**
+ * Tests whether or not the custom should be serialized. The criteria are:
+ *
+ *
the output stream must be at least the minimum supported version of the custom
+ *
the output stream must have the feature required by the custom (if any) or not be a transport client
+ *
+ *
+ * That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
+ * that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
+ * for connected nodes we always require that the node has the required feature.
+ *
+ * @param out the output stream
+ * @param custom the custom to serialize
+ * @param the type of the custom
+ * @return true if the custom should be serialized and false otherwise
+ */
+ static boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
+ if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
+ return false;
+ }
+ if (custom.getRequiredFeature().isPresent()) {
+ final String requiredFeature = custom.getRequiredFeature().get();
+ // if it is a transport client we are lenient yet for a connected node it must have the required feature
+ return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
+ }
+ return true;
+ }
+
+ }
+
+ public interface Custom extends NamedDiffable, ToXContentFragment, FeatureAware {
/**
* Returns true iff this {@link Custom} is private to the cluster and should never be send to a client.
@@ -99,6 +145,7 @@ public class ClusterState implements ToXContentFragment, Diffable
default boolean isPrivate() {
return false;
}
+
}
private static final NamedDiffableValueSerializer CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
@@ -244,6 +291,15 @@ public class ClusterState implements ToXContentFragment, Diffable
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
+ if (metaData.customs().isEmpty() == false) {
+ sb.append("metadata customs:\n");
+ for (final ObjectObjectCursor cursor : metaData.customs()) {
+ final String type = cursor.key;
+ final MetaData.Custom custom = cursor.value;
+ sb.append(TAB).append(type).append(": ").append(custom);
+ }
+ sb.append("\n");
+ }
sb.append(blocks());
sb.append(nodes());
sb.append(routingTable());
@@ -691,14 +747,14 @@ public class ClusterState implements ToXContentFragment, Diffable
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
- for (ObjectCursor cursor : customs.values()) {
- if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+ for (final ObjectCursor cursor : customs.values()) {
+ if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
- for (ObjectCursor cursor : customs.values()) {
- if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+ for (final ObjectCursor cursor : customs.values()) {
+ if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
index b18c82712b3..8c732225195 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
@@ -24,6 +24,8 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
@@ -117,9 +119,10 @@ public class MetaData implements Iterable, Diffable, To
*/
public static EnumSet ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
- public interface Custom extends NamedDiffable, ToXContentFragment {
+ public interface Custom extends NamedDiffable, ToXContentFragment, ClusterState.FeatureAware {
EnumSet context();
+
}
public static final Setting SETTING_READ_ONLY_SETTING =
@@ -782,14 +785,14 @@ public class MetaData implements Iterable, Diffable, To
}
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
- for (ObjectCursor cursor : customs.values()) {
- if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+ for (final ObjectCursor cursor : customs.values()) {
+ if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
- for (ObjectCursor cursor : customs.values()) {
- if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
+ for (final ObjectCursor cursor : customs.values()) {
+ if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
index ab0f47bf14d..e8c4d197fda 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
@@ -30,6 +30,8 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
@@ -58,10 +60,12 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
@@ -98,6 +102,7 @@ public abstract class StreamOutput extends OutputStream {
}
private Version version = Version.CURRENT;
+ private Set features = Collections.emptySet();
/**
* The version of the node on the other side of this stream.
@@ -113,6 +118,27 @@ public abstract class StreamOutput extends OutputStream {
this.version = version;
}
+ /**
+ * Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
+ * {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
+ *
+ * @param feature the feature to test
+ * @return true if the stream has the specified feature
+ */
+ public boolean hasFeature(final String feature) {
+ return this.features.contains(feature);
+ }
+
+ /**
+ * Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
+ *
+ * @param features the features on the stream
+ */
+ public void setFeatures(final Set features) {
+ assert this.features.isEmpty() : this.features;
+ this.features = Collections.unmodifiableSet(new HashSet<>(features));
+ }
+
public long position() throws IOException {
throw new UnsupportedOperationException();
}
diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index d9cf0f630c0..e616613a425 100644
--- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -379,6 +379,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
+ TcpTransport.DEFAULT_FEATURES_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
index b5e15f1a026..9e2063adb14 100644
--- a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
+++ b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java
@@ -288,18 +288,6 @@ public class TextFieldMapper extends FieldMapper {
return super.toString() + ",prefixChars=" + minChars + ":" + maxChars;
}
- @Override
- public void checkCompatibility(MappedFieldType other, List conflicts) {
- super.checkCompatibility(other, conflicts);
- PrefixFieldType otherFieldType = (PrefixFieldType) other;
- if (otherFieldType.minChars != this.minChars) {
- conflicts.add("mapper [" + name() + "] has different min_chars values");
- }
- if (otherFieldType.maxChars != this.maxChars) {
- conflicts.add("mapper [" + name() + "] has different max_chars values");
- }
- }
-
@Override
public Query existsQuery(QueryShardContext context) {
throw new UnsupportedOperationException();
@@ -479,6 +467,25 @@ public class TextFieldMapper extends FieldMapper {
}
return new PagedBytesIndexFieldData.Builder(fielddataMinFrequency, fielddataMaxFrequency, fielddataMinSegmentSize);
}
+
+ @Override
+ public void checkCompatibility(MappedFieldType other, List conflicts) {
+ super.checkCompatibility(other, conflicts);
+ TextFieldType tft = (TextFieldType) other;
+ if (Objects.equals(this.prefixFieldType, tft.prefixFieldType) == false) {
+ if (this.prefixFieldType == null) {
+ conflicts.add("mapper [" + name()
+ + "] has different [index_prefixes] settings, cannot change from disabled to enabled");
+ }
+ else if (tft.prefixFieldType == null) {
+ conflicts.add("mapper [" + name()
+ + "] has different [index_prefixes] settings, cannot change from enabled to disabled");
+ }
+ else {
+ conflicts.add("mapper [" + name() + "] has different [index_prefixes] settings");
+ }
+ }
+ }
}
private int positionIncrementGap;
diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java
index 6d2c21a764a..bdee87cc77c 100644
--- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java
+++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java
@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java
index 82c8bf1bbcb..0ef703448b7 100644
--- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java
+++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java
@@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionModule;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
+import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@@ -56,6 +57,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.UnaryOperator;
/**
@@ -79,6 +81,17 @@ import java.util.function.UnaryOperator;
*/
public abstract class Plugin implements Closeable {
+ /**
+ * A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
+ * also {@link ClusterState.FeatureAware}.
+ *
+ * @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
+ * customs
+ */
+ protected Optional getFeature() {
+ return Optional.empty();
+ }
+
/**
* Node level guice modules.
*/
diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java
index 68a19bb9bca..5b64b5be639 100644
--- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java
+++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java
@@ -41,8 +41,10 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -57,16 +59,17 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
@@ -196,6 +199,7 @@ public class PluginsService extends AbstractComponent {
public Settings updatedSettings() {
Map foundSettings = new HashMap<>();
+ final Map features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
@@ -207,6 +211,23 @@ public class PluginsService extends AbstractComponent {
}
}
builder.put(settings);
+ final Optional maybeFeature = plugin.v2().getFeature();
+ if (maybeFeature.isPresent()) {
+ final String feature = maybeFeature.get();
+ if (features.containsKey(feature)) {
+ final String message = String.format(
+ Locale.ROOT,
+ "duplicate feature [%s] in plugin [%s], already added in [%s]",
+ feature,
+ plugin.v1().getName(),
+ features.get(feature));
+ throw new IllegalArgumentException(message);
+ }
+ features.put(feature, plugin.v1().getName());
+ }
+ }
+ for (final String feature : features.keySet()) {
+ builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}
diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java
index 0b3d4e1b0a1..df46c945540 100644
--- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java
+++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java
@@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.common.Booleans;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@@ -93,6 +94,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -189,6 +191,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
+ public static final String FEATURE_PREFIX = "transport.features";
+ public static final Setting DEFAULT_FEATURES_SETTING = Setting.groupSetting(FEATURE_PREFIX + ".", Setting.Property.NodeScope);
+ private final String[] features;
+
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
protected final ScheduledPing scheduledPing;
@@ -240,6 +246,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.networkService = networkService;
this.transportName = transportName;
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
+ final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
+ if (defaultFeatures == null) {
+ this.features = new String[0];
+ } else {
+ defaultFeatures.names().forEach(key -> {
+ if (Booleans.parseBoolean(defaultFeatures.get(key)) == false) {
+ throw new IllegalArgumentException("feature settings must have default [true] value");
+ }
+ });
+ // use a sorted set to present the features in a consistent order
+ this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
+ }
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@@ -1103,6 +1121,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
stream.setVersion(version);
threadPool.getThreadContext().writeTo(stream);
+ if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
+ stream.writeStringArray(features);
+ }
stream.writeString(action);
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream);
final TransportRequestOptions finalOptions = options;
@@ -1135,15 +1156,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* Sends back an error response to the caller via the given channel
*
* @param nodeVersion the caller node version
+ * @param features the caller features
* @param channel the channel to send the response to
* @param error the error to return
* @param requestId the request ID this response replies to
* @param action the action this response replies to
*/
- public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exception error, final long requestId,
- final String action) throws IOException {
+ public void sendErrorResponse(
+ final Version nodeVersion,
+ final Set features,
+ final TcpChannel channel,
+ final Exception error,
+ final long requestId,
+ final String action) throws IOException {
try (BytesStreamOutput stream = new BytesStreamOutput()) {
stream.setVersion(nodeVersion);
+ stream.setFeatures(features);
RemoteTransportException tx = new RemoteTransportException(
nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
threadPool.getThreadContext().writeTo(stream);
@@ -1163,15 +1191,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
/**
* Sends the response to the given channel. This method should be used to send {@link TransportResponse} objects back to the caller.
*
- * @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller
+ * @see #sendErrorResponse(Version, Set, TcpChannel, Exception, long, String) for sending back errors to the caller
*/
- public void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
- final String action, TransportResponseOptions options) throws IOException {
- sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0);
+ public void sendResponse(
+ final Version nodeVersion,
+ final Set features,
+ final TcpChannel channel,
+ final TransportResponse response,
+ final long requestId,
+ final String action,
+ final TransportResponseOptions options) throws IOException {
+ sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0);
}
- private void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
- final String action, TransportResponseOptions options, byte status) throws IOException {
+ private void sendResponse(
+ final Version nodeVersion,
+ final Set features,
+ final TcpChannel channel,
+ final TransportResponse response,
+ final long requestId,
+ final String action,
+ TransportResponseOptions options,
+ byte status) throws IOException {
if (compress) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
@@ -1185,6 +1226,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
+ stream.setFeatures(features);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
final TransportResponseOptions finalOptions = options;
@@ -1546,13 +1588,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
throws IOException {
+ final Set features;
+ if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
+ features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(stream.readStringArray())));
+ } else {
+ features = Collections.emptySet();
+ }
final String action = stream.readString();
transportService.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
- sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
+ sendResponse(version, features, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
TransportStatus.setHandshake((byte) 0));
} else {
final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
@@ -1564,7 +1612,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
}
- transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName,
+ transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(remoteAddress));
@@ -1575,7 +1623,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} catch (Exception e) {
// the circuit breaker tripped
if (transportChannel == null) {
- transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, 0);
+ transportChannel =
+ new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0);
}
try {
transportChannel.sendResponse(e);
diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
index eb4c244c7a9..1bf1d027329 100644
--- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
+++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java
@@ -16,16 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
public final class TcpTransportChannel implements TransportChannel {
private final TcpTransport transport;
private final Version version;
+ private final Set features;
private final String action;
private final long requestId;
private final String profileName;
@@ -34,9 +38,10 @@ public final class TcpTransportChannel implements TransportChannel {
private final String channelType;
private final TcpChannel channel;
- TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action,
- long requestId, Version version, String profileName, long reservedBytes) {
+ TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version,
+ Set features, String profileName, long reservedBytes) {
this.version = version;
+ this.features = features;
this.channel = channel;
this.transport = transport;
this.action = action;
@@ -59,7 +64,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
try {
- transport.sendResponse(version, channel, response, requestId, action, options);
+ transport.sendResponse(version, features, channel, response, requestId, action, options);
} finally {
release(false);
}
@@ -68,7 +73,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(Exception exception) throws IOException {
try {
- transport.sendErrorResponse(version, channel, exception, requestId, action);
+ transport.sendErrorResponse(version, features, channel, exception, requestId, action);
} finally {
release(true);
}
@@ -100,5 +105,6 @@ public final class TcpTransportChannel implements TransportChannel {
public TcpChannel getChannel() {
return channel;
}
+
}
diff --git a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java
deleted file mode 100644
index 438d3e55044..00000000000
--- a/server/src/test/java/org/elasticsearch/action/ingest/PutPipelineResponseTests.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.action.ingest;
-
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.test.AbstractStreamableXContentTestCase;
-
-public class PutPipelineResponseTests extends AbstractStreamableXContentTestCase {
-
- public void testToXContent() {
- PutPipelineResponse response = new PutPipelineResponse(true);
- String output = Strings.toString(response);
- assertEquals("{\"acknowledged\":true}", output);
- }
-
- @Override
- protected PutPipelineResponse doParseInstance(XContentParser parser) {
- return PutPipelineResponse.fromXContent(parser);
- }
-
- @Override
- protected PutPipelineResponse createTestInstance() {
- return new PutPipelineResponse(randomBoolean());
- }
-
- @Override
- protected PutPipelineResponse createBlankInstance() {
- return new PutPipelineResponse();
- }
-
- @Override
- protected PutPipelineResponse mutateInstance(PutPipelineResponse response) {
- return new PutPipelineResponse(response.isAcknowledged() == false);
- }
-}
diff --git a/server/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java
index 00327603ba8..f68545c3f79 100644
--- a/server/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/ingest/WritePipelineResponseTests.java
@@ -19,15 +19,17 @@
package org.elasticsearch.action.ingest;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
-public class WritePipelineResponseTests extends ESTestCase {
+public class WritePipelineResponseTests extends AbstractStreamableXContentTestCase {
public void testSerializationWithoutError() throws IOException {
boolean isAcknowledged = randomBoolean();
@@ -52,4 +54,30 @@ public class WritePipelineResponseTests extends ESTestCase {
assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
+
+ public void testToXContent() {
+ WritePipelineResponse response = new WritePipelineResponse(true);
+ String output = Strings.toString(response);
+ assertEquals("{\"acknowledged\":true}", output);
+ }
+
+ @Override
+ protected WritePipelineResponse doParseInstance(XContentParser parser) {
+ return WritePipelineResponse.fromXContent(parser);
+ }
+
+ @Override
+ protected WritePipelineResponse createTestInstance() {
+ return new WritePipelineResponse(randomBoolean());
+ }
+
+ @Override
+ protected WritePipelineResponse createBlankInstance() {
+ return new WritePipelineResponse();
+ }
+
+ @Override
+ protected WritePipelineResponse mutateInstance(WritePipelineResponse response) {
+ return new WritePipelineResponse(response.isAcknowledged() == false);
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java
index 1830698d90c..1dc30e951b6 100644
--- a/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java
+++ b/server/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java
@@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.MockTransportClient;
+import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.util.Arrays;
@@ -38,6 +39,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.object.HasToString.hasToString;
public class TransportClientTests extends ESTestCase {
@@ -64,13 +67,23 @@ public class TransportClientTests extends ESTestCase {
}
}
- public void testDefaultHeaderContainsPlugins() {
- Settings baseSettings = Settings.builder()
+ public void testSettingsContainsTransportClient() {
+ final Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
- ThreadContext threadContext = client.threadPool().getThreadContext();
- assertEquals("true", threadContext.getHeader("transport_client"));
+ final Settings settings = TcpTransport.DEFAULT_FEATURES_SETTING.get(client.settings());
+ assertThat(settings.keySet(), hasItem("transport_client"));
+ assertThat(settings.get("transport_client"), equalTo("true"));
+ }
+ }
+
+ public void testDefaultHeader() {
+ final Settings baseSettings = Settings.builder()
+ .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
+ .build();
+ try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
+ final ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("test"));
}
}
diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java
new file mode 100644
index 00000000000..07a974a2ca7
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateIT.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexGraveyard;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.CheckedFunction;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TcpTransport;
+import org.elasticsearch.watcher.ResourceWatcherService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
+import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * This test suite sets up a situation where the cluster has two plugins installed (node, and node-and-transport-client), and a transport
+ * client only has node-and-transport-client plugin installed. Each of these plugins inject customs into the cluster state and we want to
+ * check that the client can de-serialize a cluster state response based on the fact that the response should not contain customs that the
+ * transport client does not understand based on the fact that it only presents the node-and-transport-client-feature.
+ */
+@ESIntegTestCase.ClusterScope(scope = TEST)
+public class ClusterStateIT extends ESIntegTestCase {
+
+ public abstract static class Custom implements MetaData.Custom {
+
+ private static final ParseField VALUE = new ParseField("value");
+
+ private final int value;
+
+ int value() {
+ return value;
+ }
+
+ Custom(final int value) {
+ this.value = value;
+ }
+
+ Custom(final StreamInput in) throws IOException {
+ value = in.readInt();
+ }
+
+ @Override
+ public EnumSet context() {
+ return MetaData.ALL_CONTEXTS;
+ }
+
+ @Override
+ public Diff diff(final MetaData.Custom previousState) {
+ return null;
+ }
+
+ @Override
+ public void writeTo(final StreamOutput out) throws IOException {
+ out.writeInt(value);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.field(VALUE.getPreferredName(), value);
+ return builder;
+ }
+
+ }
+
+ public static class NodeCustom extends Custom {
+
+ public static final String TYPE = "node";
+
+ NodeCustom(final int value) {
+ super(value);
+ }
+
+ NodeCustom(final StreamInput in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return TYPE;
+ }
+
+ @Override
+ public Optional getRequiredFeature() {
+ return Optional.of("node");
+ }
+
+ }
+
+ public static class NodeAndTransportClientCustom extends Custom {
+
+ public static final String TYPE = "node-and-transport-client";
+
+ NodeAndTransportClientCustom(final int value) {
+ super(value);
+ }
+
+ public NodeAndTransportClientCustom(final StreamInput in) throws IOException {
+ super(in);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return TYPE;
+ }
+
+ /*
+ * This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have
+ * versus not requiring any feature. We use a field to make the random choice exactly once.
+ */
+ @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ private final Optional requiredFeature = randomBoolean() ? Optional.empty() : Optional.of("node-and-transport-client");
+
+ @Override
+ public Optional getRequiredFeature() {
+ return requiredFeature;
+ }
+
+ }
+
+ public abstract static class CustomPlugin extends Plugin {
+
+ private final List namedWritables = new ArrayList<>();
+ private final List namedXContents = new ArrayList<>();
+
+ public CustomPlugin() {
+ registerBuiltinWritables();
+ }
+
+ protected void registerMetaDataCustom(
+ final String name, final Writeable.Reader reader, final CheckedFunction parser) {
+ namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, name, reader));
+ namedXContents.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(name), parser));
+ }
+
+ protected abstract void registerBuiltinWritables();
+
+ protected abstract String getType();
+
+ protected abstract Custom getInstance();
+
+ @Override
+ public List getNamedWriteables() {
+ return namedWritables;
+ }
+
+ @Override
+ public List getNamedXContent() {
+ return namedXContents;
+ }
+
+ private final AtomicBoolean installed = new AtomicBoolean();
+
+ @Override
+ public Collection