Merge branch 'master' into index-lifecycle

This commit is contained in:
Tal Levy 2018-06-01 11:44:38 -07:00
commit fbfb4ff8c7
42 changed files with 1202 additions and 247 deletions

View File

@ -23,10 +23,11 @@ import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import java.io.IOException;
@ -74,9 +75,9 @@ public final class ClusterClient {
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
*/
public PutPipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
public WritePipelineResponse putPipeline(PutPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, emptySet(), headers);
WritePipelineResponse::fromXContent, emptySet(), headers);
}
/**
@ -85,9 +86,9 @@ public final class ClusterClient {
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html"> Put Pipeline API on elastic.co</a>
*/
public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipelineResponse> listener, Header... headers) {
public void putPipelineAsync(PutPipelineRequest request, ActionListener<WritePipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
WritePipelineResponse::fromXContent, listener, emptySet(), headers);
}
/**
@ -111,4 +112,28 @@ public final class ClusterClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
/**
* Delete an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html">
* Delete Pipeline API on elastic.co</a>
*/
public WritePipelineResponse deletePipeline(DeletePipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::deletePipeline,
WritePipelineResponse::fromXContent, emptySet(), headers);
}
/**
* Asynchronously delete an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html">
* Delete Pipeline API on elastic.co</a>
*/
public void deletePipelineAsync(DeletePipelineRequest request, ActionListener<WritePipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline,
WritePipelineResponse::fromXContent, listener, emptySet(), headers);
}
}

View File

@ -60,6 +60,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
@ -648,6 +649,20 @@ final class RequestConverters {
return request;
}
static Request deletePipeline(DeletePipelineRequest deletePipelineRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
Params parameters = new Params(request);
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withMasterTimeout(deletePipelineRequest.masterNodeTimeout());
return request;
}
static Request listTasks(ListTasksRequest listTaskRequest) {
if (listTaskRequest.getTaskId() != null && listTaskRequest.getTaskId().isSet()) {
throw new IllegalArgumentException("TaskId cannot be used for list tasks request");

View File

@ -25,7 +25,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -121,7 +122,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType());
PutPipelineResponse putPipelineResponse =
WritePipelineResponse putPipelineResponse =
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}
@ -148,4 +149,17 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
}
public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
{
createPipeline(id);
}
DeletePipelineRequest request = new DeletePipelineRequest(id);
WritePipelineResponse response =
execute(request, highLevelClient().cluster()::deletePipeline, highLevelClient().cluster()::deletePipelineAsync);
assertTrue(response.isAcknowledged());
}
}

View File

@ -63,6 +63,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
@ -1465,6 +1466,21 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters());
}
public void testDeletePipeline() {
String pipelineId = "some_pipeline_id";
Map<String, String> expectedParams = new HashMap<>();
DeletePipelineRequest request = new DeletePipelineRequest(pipelineId);
setRandomMasterTimeout(request, expectedParams);
setRandomTimeout(request::timeout, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, expectedParams);
Request expectedRequest = RequestConverters.deletePipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpDelete.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}
public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));

View File

@ -26,7 +26,8 @@ import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResp
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -212,7 +213,7 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
// end::put-pipeline-request-masterTimeout
// tag::put-pipeline-execute
PutPipelineResponse response = client.cluster().putPipeline(request); // <1>
WritePipelineResponse response = client.cluster().putPipeline(request); // <1>
// end::put-pipeline-execute
// tag::put-pipeline-response
@ -236,10 +237,10 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
);
// tag::put-pipeline-execute-listener
ActionListener<PutPipelineResponse> listener =
new ActionListener<PutPipelineResponse>() {
ActionListener<WritePipelineResponse> listener =
new ActionListener<WritePipelineResponse>() {
@Override
public void onResponse(PutPipelineResponse response) {
public void onResponse(WritePipelineResponse response) {
// <1>
}
@ -331,4 +332,74 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testDeletePipeline() throws IOException {
RestHighLevelClient client = highLevelClient();
{
createPipeline("my-pipeline-id");
}
{
// tag::delete-pipeline-request
DeletePipelineRequest request = new DeletePipelineRequest("my-pipeline-id"); // <1>
// end::delete-pipeline-request
// tag::delete-pipeline-request-timeout
request.timeout(TimeValue.timeValueMinutes(2)); // <1>
request.timeout("2m"); // <2>
// end::delete-pipeline-request-timeout
// tag::delete-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::delete-pipeline-request-masterTimeout
// tag::delete-pipeline-execute
WritePipelineResponse response = client.cluster().deletePipeline(request); // <1>
// end::delete-pipeline-execute
// tag::delete-pipeline-response
boolean acknowledged = response.isAcknowledged(); // <1>
// end::delete-pipeline-response
assertTrue(acknowledged);
}
}
public void testDeletePipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
createPipeline("my-pipeline-id");
}
{
DeletePipelineRequest request = new DeletePipelineRequest("my-pipeline-id");
// tag::delete-pipeline-execute-listener
ActionListener<WritePipelineResponse> listener =
new ActionListener<WritePipelineResponse>() {
@Override
public void onResponse(WritePipelineResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::delete-pipeline-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::delete-pipeline-execute-async
client.cluster().deletePipelineAsync(request, listener); // <1>
// end::delete-pipeline-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}

View File

@ -0,0 +1,80 @@
[[java-rest-high-cluster-delete-pipeline]]
=== Delete Pipeline API
[[java-rest-high-cluster-delete-pipeline-request]]
==== Delete Pipeline Request
A `DeletePipelineRequest` requires a pipeline `id` to delete.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request]
--------------------------------------------------
<1> The pipeline id to delete
==== Optional arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request-timeout]
--------------------------------------------------
<1> Timeout to wait for the all the nodes to acknowledge the pipeline deletion as a `TimeValue`
<2> Timeout to wait for the all the nodes to acknowledge the pipeline deletion as a `String`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-request-masterTimeout]
--------------------------------------------------
<1> Timeout to connect to the master node as a `TimeValue`
<2> Timeout to connect to the master node as a `String`
[[java-rest-high-cluster-delete-pipeline-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a `WritePipelineResponse` object.
[[java-rest-high-cluster-delete-pipeline-async]]
==== Asynchronous Execution
The asynchronous execution of a delete pipeline request requires both the `DeletePipelineRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute-async]
--------------------------------------------------
<1> The `DeletePipelineRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `WritePipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-cluster-delete-pipeline-response]]
==== Delete Pipeline Response
The returned `WritePipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[delete-pipeline-response]
--------------------------------------------------
<1> Indicates whether all of the nodes have acknowledged the request

View File

@ -22,8 +22,8 @@ The following arguments can optionally be provided:
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-request-timeout]
--------------------------------------------------
<1> Timeout to wait for the all the nodes to acknowledge the index creation as a `TimeValue`
<2> Timeout to wait for the all the nodes to acknowledge the index creation as a `String`
<1> Timeout to wait for the all the nodes to acknowledge the pipeline creation as a `TimeValue`
<2> Timeout to wait for the all the nodes to acknowledge the pipeline creation as a `String`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
@ -39,7 +39,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-reque
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[put-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a PutPipelineResponse object.
<1> Execute the request and get back the response in a WritePipelineResponse object.
[[java-rest-high-cluster-put-pipeline-async]]
==== Asynchronous Execution
@ -60,7 +60,7 @@ completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `PutPipelineResponse` looks like:
A typical listener for `WritePipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
@ -73,7 +73,7 @@ provided as an argument
[[java-rest-high-cluster-put-pipeline-response]]
==== Put Pipeline Response
The returned `PutPipelineResponse` allows to retrieve information about the executed
The returned `WritePipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]

View File

@ -108,10 +108,12 @@ The Java High Level REST Client supports the following Cluster APIs:
* <<java-rest-high-cluster-put-settings>>
* <<java-rest-high-cluster-put-pipeline>>
* <<java-rest-high-cluster-get-pipeline>>
* <<java-rest-high-cluster-delete-pipeline>>
include::cluster/put_settings.asciidoc[]
include::cluster/put_pipeline.asciidoc[]
include::cluster/get_pipeline.asciidoc[]
include::cluster/delete_pipeline.asciidoc[]
== Snapshot APIs

View File

@ -23,7 +23,7 @@ There are several limitations to what you can name your index. The complete lis
- Cannot include `\`, `/`, `*`, `?`, `"`, `<`, `>`, `|`, ` ` (space character), `,`, `#`
- Indices prior to 7.0 could contain a colon (`:`), but that's been deprecated and won't be supported in 7.0+
- Cannot start with `-`, `_`, `+`
- Cannot be `.` or ``..`
- Cannot be `.` or `..`
- Cannot be longer than 255 bytes (note it is bytes, so multi-byte characters will count towards the 255 limit faster)
======================================================

View File

@ -104,6 +104,10 @@ final class ESLoggingHandler extends LoggingHandler {
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
// now we decode the features
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
in.readStringArray();
}
// now we can decode the action name
sb.append(", action: ").append(in.readString());
}

View File

@ -1,62 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public class PutPipelineResponse extends AcknowledgedResponse implements ToXContentObject {
private static final ConstructingObjectParser<PutPipelineResponse, Void> PARSER = new ConstructingObjectParser<>(
"cluster_put_pipeline", true, args -> new PutPipelineResponse((boolean) args[0]));
static {
declareAcknowledgedField(PARSER);
}
public PutPipelineResponse() {
}
public PutPipelineResponse(boolean acknowledged) {
super(acknowledged);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
}
public static PutPipelineResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -22,10 +22,20 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public class WritePipelineResponse extends AcknowledgedResponse {
public class WritePipelineResponse extends AcknowledgedResponse implements ToXContentObject {
private static final ConstructingObjectParser<WritePipelineResponse, Void> PARSER = new ConstructingObjectParser<>(
"write_pipeline_response", true, args -> new WritePipelineResponse((boolean) args[0]));
static {
declareAcknowledgedField(PARSER);
}
WritePipelineResponse() {
@ -46,4 +56,8 @@ public class WritePipelineResponse extends AcknowledgedResponse {
super.writeTo(out);
writeAcknowledged(out);
}
public static WritePipelineResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

View File

@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";
private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
@ -130,8 +132,12 @@ public abstract class TransportClient extends AbstractClient {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
+ "." + "transport_client", true).build();
final Settings settings =
Settings.builder()
.put(defaultSettings)
.put(pluginsService.updatedSettings())
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
.build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -61,6 +62,7 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
/**
* An interface that implementors use when a class requires a client to maybe have a feature.
*/
public interface FeatureAware {
/**
* An optional feature that is required for the client to have.
*
* @return an empty optional if no feature is required otherwise a string representing the required feature
*/
default Optional<String> getRequiredFeature() {
return Optional.empty();
}
/**
* Tests whether or not the custom should be serialized. The criteria are:
* <ul>
* <li>the output stream must be at least the minimum supported version of the custom</li>
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
* </ul>
* <p>
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
* for connected nodes we always require that the node has the required feature.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
if (custom.getRequiredFeature().isPresent()) {
final String requiredFeature = custom.getRequiredFeature().get();
// if it is a transport client we are lenient yet for a connected node it must have the required feature
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
}
return true;
}
}
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
@ -99,6 +145,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
default boolean isPrivate() {
return false;
}
}
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
@ -244,6 +291,15 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
if (metaData.customs().isEmpty() == false) {
sb.append("metadata customs:\n");
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
final String type = cursor.key;
final MetaData.Custom custom = cursor.value;
sb.append(TAB).append(type).append(": ").append(custom);
}
sb.append("\n");
}
sb.append(blocks());
sb.append(nodes());
sb.append(routingTable());
@ -691,14 +747,14 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}

View File

@ -24,6 +24,8 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
@ -117,9 +119,10 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
*/
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
EnumSet<XContentContext> context();
}
public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
@ -782,14 +785,14 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
}
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}

View File

@ -30,6 +30,8 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
@ -58,10 +60,12 @@ import java.util.Date;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
@ -98,6 +102,7 @@ public abstract class StreamOutput extends OutputStream {
}
private Version version = Version.CURRENT;
private Set<String> features = Collections.emptySet();
/**
* The version of the node on the other side of this stream.
@ -113,6 +118,27 @@ public abstract class StreamOutput extends OutputStream {
this.version = version;
}
/**
* Test if the stream has the specified feature. Features are used when serializing {@link ClusterState.Custom} or
* {@link MetaData.Custom}; see also {@link ClusterState.FeatureAware}.
*
* @param feature the feature to test
* @return true if the stream has the specified feature
*/
public boolean hasFeature(final String feature) {
return this.features.contains(feature);
}
/**
* Set the features on the stream. See {@link StreamOutput#hasFeature(String)}.
*
* @param features the features on the stream
*/
public void setFeatures(final Set<String> features) {
assert this.features.isEmpty() : this.features;
this.features = Collections.unmodifiableSet(new HashSet<>(features));
}
public long position() throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -379,6 +379,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
TcpTransport.DEFAULT_FEATURES_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,

View File

@ -288,18 +288,6 @@ public class TextFieldMapper extends FieldMapper {
return super.toString() + ",prefixChars=" + minChars + ":" + maxChars;
}
@Override
public void checkCompatibility(MappedFieldType other, List<String> conflicts) {
super.checkCompatibility(other, conflicts);
PrefixFieldType otherFieldType = (PrefixFieldType) other;
if (otherFieldType.minChars != this.minChars) {
conflicts.add("mapper [" + name() + "] has different min_chars values");
}
if (otherFieldType.maxChars != this.maxChars) {
conflicts.add("mapper [" + name() + "] has different max_chars values");
}
}
@Override
public Query existsQuery(QueryShardContext context) {
throw new UnsupportedOperationException();
@ -479,6 +467,25 @@ public class TextFieldMapper extends FieldMapper {
}
return new PagedBytesIndexFieldData.Builder(fielddataMinFrequency, fielddataMaxFrequency, fielddataMinSegmentSize);
}
@Override
public void checkCompatibility(MappedFieldType other, List<String> conflicts) {
super.checkCompatibility(other, conflicts);
TextFieldType tft = (TextFieldType) other;
if (Objects.equals(this.prefixFieldType, tft.prefixFieldType) == false) {
if (this.prefixFieldType == null) {
conflicts.add("mapper [" + name()
+ "] has different [index_prefixes] settings, cannot change from disabled to enabled");
}
else if (tft.prefixFieldType == null) {
conflicts.add("mapper [" + name()
+ "] has different [index_prefixes] settings, cannot change from enabled to disabled");
}
else {
conflicts.add("mapper [" + name() + "] has different [index_prefixes] settings");
}
}
}
}
private int positionIncrementGap;

View File

@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionModule;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -56,6 +57,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;
/**
@ -79,6 +81,17 @@ import java.util.function.UnaryOperator;
*/
public abstract class Plugin implements Closeable {
/**
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
* also {@link ClusterState.FeatureAware}.
*
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
* customs
*/
protected Optional<String> getFeature() {
return Optional.empty();
}
/**
* Node level guice modules.
*/

View File

@ -41,8 +41,10 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.lang.reflect.Constructor;
@ -57,16 +59,17 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
@ -196,6 +199,7 @@ public class PluginsService extends AbstractComponent {
public Settings updatedSettings() {
Map<String, String> foundSettings = new HashMap<>();
final Map<String, String> features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
@ -207,6 +211,23 @@ public class PluginsService extends AbstractComponent {
}
}
builder.put(settings);
final Optional<String> maybeFeature = plugin.v2().getFeature();
if (maybeFeature.isPresent()) {
final String feature = maybeFeature.get();
if (features.containsKey(feature)) {
final String message = String.format(
Locale.ROOT,
"duplicate feature [%s] in plugin [%s], already added in [%s]",
feature,
plugin.v1().getName(),
features.get(feature));
throw new IllegalArgumentException(message);
}
features.put(feature, plugin.v1().getName());
}
}
for (final String feature : features.keySet()) {
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -93,6 +94,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -189,6 +191,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
public static final String FEATURE_PREFIX = "transport.features";
public static final Setting<Settings> DEFAULT_FEATURES_SETTING = Setting.groupSetting(FEATURE_PREFIX + ".", Setting.Property.NodeScope);
private final String[] features;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
protected final ScheduledPing scheduledPing;
@ -240,6 +246,18 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.networkService = networkService;
this.transportName = transportName;
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
final Settings defaultFeatures = DEFAULT_FEATURES_SETTING.get(settings);
if (defaultFeatures == null) {
this.features = new String[0];
} else {
defaultFeatures.names().forEach(key -> {
if (Booleans.parseBoolean(defaultFeatures.get(key)) == false) {
throw new IllegalArgumentException("feature settings must have default [true] value");
}
});
// use a sorted set to present the features in a consistent order
this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
}
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@ -1103,6 +1121,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
stream.setVersion(version);
threadPool.getThreadContext().writeTo(stream);
if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
stream.writeStringArray(features);
}
stream.writeString(action);
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream);
final TransportRequestOptions finalOptions = options;
@ -1135,15 +1156,22 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* Sends back an error response to the caller via the given channel
*
* @param nodeVersion the caller node version
* @param features the caller features
* @param channel the channel to send the response to
* @param error the error to return
* @param requestId the request ID this response replies to
* @param action the action this response replies to
*/
public void sendErrorResponse(Version nodeVersion, TcpChannel channel, final Exception error, final long requestId,
final String action) throws IOException {
public void sendErrorResponse(
final Version nodeVersion,
final Set<String> features,
final TcpChannel channel,
final Exception error,
final long requestId,
final String action) throws IOException {
try (BytesStreamOutput stream = new BytesStreamOutput()) {
stream.setVersion(nodeVersion);
stream.setFeatures(features);
RemoteTransportException tx = new RemoteTransportException(
nodeName(), new TransportAddress(channel.getLocalAddress()), action, error);
threadPool.getThreadContext().writeTo(stream);
@ -1163,15 +1191,28 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
/**
* Sends the response to the given channel. This method should be used to send {@link TransportResponse} objects back to the caller.
*
* @see #sendErrorResponse(Version, TcpChannel, Exception, long, String) for sending back errors to the caller
* @see #sendErrorResponse(Version, Set, TcpChannel, Exception, long, String) for sending back errors to the caller
*/
public void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
final String action, TransportResponseOptions options) throws IOException {
sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0);
public void sendResponse(
final Version nodeVersion,
final Set<String> features,
final TcpChannel channel,
final TransportResponse response,
final long requestId,
final String action,
final TransportResponseOptions options) throws IOException {
sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0);
}
private void sendResponse(Version nodeVersion, TcpChannel channel, final TransportResponse response, final long requestId,
final String action, TransportResponseOptions options, byte status) throws IOException {
private void sendResponse(
final Version nodeVersion,
final Set<String> features,
final TcpChannel channel,
final TransportResponse response,
final long requestId,
final String action,
TransportResponseOptions options,
byte status) throws IOException {
if (compress) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
@ -1185,6 +1226,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
stream.setFeatures(features);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
final TransportResponseOptions finalOptions = options;
@ -1546,13 +1588,19 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
throws IOException {
final Set<String> features;
if (version.onOrAfter(Version.V_7_0_0_alpha1)) {
features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(stream.readStringArray())));
} else {
features = Collections.emptySet();
}
final String action = stream.readString();
transportService.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
sendResponse(version, features, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
TransportStatus.setHandshake((byte) 0));
} else {
final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
@ -1564,7 +1612,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} else {
getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName,
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(remoteAddress));
@ -1575,7 +1623,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} catch (Exception e) {
// the circuit breaker tripped
if (transportChannel == null) {
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, profileName, 0);
transportChannel =
new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0);
}
try {
transportChannel.sendResponse(e);

View File

@ -16,16 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
public final class TcpTransportChannel implements TransportChannel {
private final TcpTransport transport;
private final Version version;
private final Set<String> features;
private final String action;
private final long requestId;
private final String profileName;
@ -34,9 +38,10 @@ public final class TcpTransportChannel implements TransportChannel {
private final String channelType;
private final TcpChannel channel;
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action,
long requestId, Version version, String profileName, long reservedBytes) {
TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version,
Set<String> features, String profileName, long reservedBytes) {
this.version = version;
this.features = features;
this.channel = channel;
this.transport = transport;
this.action = action;
@ -59,7 +64,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
try {
transport.sendResponse(version, channel, response, requestId, action, options);
transport.sendResponse(version, features, channel, response, requestId, action, options);
} finally {
release(false);
}
@ -68,7 +73,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(Exception exception) throws IOException {
try {
transport.sendErrorResponse(version, channel, exception, requestId, action);
transport.sendErrorResponse(version, features, channel, exception, requestId, action);
} finally {
release(true);
}
@ -100,5 +105,6 @@ public final class TcpTransportChannel implements TransportChannel {
public TcpChannel getChannel() {
return channel;
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
public class PutPipelineResponseTests extends AbstractStreamableXContentTestCase<PutPipelineResponse> {
public void testToXContent() {
PutPipelineResponse response = new PutPipelineResponse(true);
String output = Strings.toString(response);
assertEquals("{\"acknowledged\":true}", output);
}
@Override
protected PutPipelineResponse doParseInstance(XContentParser parser) {
return PutPipelineResponse.fromXContent(parser);
}
@Override
protected PutPipelineResponse createTestInstance() {
return new PutPipelineResponse(randomBoolean());
}
@Override
protected PutPipelineResponse createBlankInstance() {
return new PutPipelineResponse();
}
@Override
protected PutPipelineResponse mutateInstance(PutPipelineResponse response) {
return new PutPipelineResponse(response.isAcknowledged() == false);
}
}

View File

@ -19,15 +19,17 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import java.io.IOException;
import static org.hamcrest.CoreMatchers.equalTo;
public class WritePipelineResponseTests extends ESTestCase {
public class WritePipelineResponseTests extends AbstractStreamableXContentTestCase<WritePipelineResponse> {
public void testSerializationWithoutError() throws IOException {
boolean isAcknowledged = randomBoolean();
@ -52,4 +54,30 @@ public class WritePipelineResponseTests extends ESTestCase {
assertThat(otherResponse.isAcknowledged(), equalTo(response.isAcknowledged()));
}
public void testToXContent() {
WritePipelineResponse response = new WritePipelineResponse(true);
String output = Strings.toString(response);
assertEquals("{\"acknowledged\":true}", output);
}
@Override
protected WritePipelineResponse doParseInstance(XContentParser parser) {
return WritePipelineResponse.fromXContent(parser);
}
@Override
protected WritePipelineResponse createTestInstance() {
return new WritePipelineResponse(randomBoolean());
}
@Override
protected WritePipelineResponse createBlankInstance() {
return new WritePipelineResponse();
}
@Override
protected WritePipelineResponse mutateInstance(WritePipelineResponse response) {
return new WritePipelineResponse(response.isAcknowledged() == false);
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.MockTransportClient;
import org.elasticsearch.transport.TcpTransport;
import java.io.IOException;
import java.util.Arrays;
@ -38,6 +39,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.object.HasToString.hasToString;
public class TransportClientTests extends ESTestCase {
@ -64,13 +67,23 @@ public class TransportClientTests extends ESTestCase {
}
}
public void testDefaultHeaderContainsPlugins() {
Settings baseSettings = Settings.builder()
public void testSettingsContainsTransportClient() {
final Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("transport_client"));
final Settings settings = TcpTransport.DEFAULT_FEATURES_SETTING.get(client.settings());
assertThat(settings.keySet(), hasItem("transport_client"));
assertThat(settings.get("transport_client"), equalTo("true"));
}
}
public void testDefaultHeader() {
final Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("test"));
}
}

View File

@ -0,0 +1,340 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
/**
* This test suite sets up a situation where the cluster has two plugins installed (node, and node-and-transport-client), and a transport
* client only has node-and-transport-client plugin installed. Each of these plugins inject customs into the cluster state and we want to
* check that the client can de-serialize a cluster state response based on the fact that the response should not contain customs that the
* transport client does not understand based on the fact that it only presents the node-and-transport-client-feature.
*/
@ESIntegTestCase.ClusterScope(scope = TEST)
public class ClusterStateIT extends ESIntegTestCase {
public abstract static class Custom implements MetaData.Custom {
private static final ParseField VALUE = new ParseField("value");
private final int value;
int value() {
return value;
}
Custom(final int value) {
this.value = value;
}
Custom(final StreamInput in) throws IOException {
value = in.readInt();
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
}
@Override
public Diff<MetaData.Custom> diff(final MetaData.Custom previousState) {
return null;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeInt(value);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(VALUE.getPreferredName(), value);
return builder;
}
}
public static class NodeCustom extends Custom {
public static final String TYPE = "node";
NodeCustom(final int value) {
super(value);
}
NodeCustom(final StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public Optional<String> getRequiredFeature() {
return Optional.of("node");
}
}
public static class NodeAndTransportClientCustom extends Custom {
public static final String TYPE = "node-and-transport-client";
NodeAndTransportClientCustom(final int value) {
super(value);
}
public NodeAndTransportClientCustom(final StreamInput in) throws IOException {
super(in);
}
@Override
public String getWriteableName() {
return TYPE;
}
/*
* This custom should always be returned yet we randomize whether it has a required feature that the client is expected to have
* versus not requiring any feature. We use a field to make the random choice exactly once.
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private final Optional<String> requiredFeature = randomBoolean() ? Optional.empty() : Optional.of("node-and-transport-client");
@Override
public Optional<String> getRequiredFeature() {
return requiredFeature;
}
}
public abstract static class CustomPlugin extends Plugin {
private final List<NamedWriteableRegistry.Entry> namedWritables = new ArrayList<>();
private final List<NamedXContentRegistry.Entry> namedXContents = new ArrayList<>();
public CustomPlugin() {
registerBuiltinWritables();
}
protected <T extends MetaData.Custom> void registerMetaDataCustom(
final String name, final Writeable.Reader<T> reader, final CheckedFunction<XContentParser, T, IOException> parser) {
namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, name, reader));
namedXContents.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(name), parser));
}
protected abstract void registerBuiltinWritables();
protected abstract String getType();
protected abstract Custom getInstance();
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return namedWritables;
}
@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return namedXContents;
}
private final AtomicBoolean installed = new AtomicBoolean();
@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
clusterService.addListener(event -> {
final ClusterState state = event.state();
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
return;
}
final MetaData metaData = state.metaData();
if (state.nodes().isLocalNodeElectedMaster()) {
if (metaData.custom(getType()) == null) {
if (installed.compareAndSet(false, true)) {
clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.custom(getType()) == null) {
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
builder.putCustom(getType(), getInstance());
return ClusterState.builder(currentState).metaData(builder).build();
} else {
return currentState;
}
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}
});
}
}
}
});
return Collections.emptyList();
}
}
public static class NodePlugin extends CustomPlugin {
public Optional<String> getFeature() {
return Optional.of("node");
}
static final int VALUE = randomInt();
@Override
protected void registerBuiltinWritables() {
registerMetaDataCustom(
NodeCustom.TYPE,
NodeCustom::new,
parser -> {
throw new IOException(new UnsupportedOperationException());
});
}
@Override
protected String getType() {
return NodeCustom.TYPE;
}
@Override
protected Custom getInstance() {
return new NodeCustom(VALUE);
}
}
public static class NodeAndTransportClientPlugin extends CustomPlugin {
@Override
protected Optional<String> getFeature() {
return Optional.of("node-and-transport-client");
}
static final int VALUE = randomInt();
@Override
protected void registerBuiltinWritables() {
registerMetaDataCustom(
NodeAndTransportClientCustom.TYPE,
NodeAndTransportClientCustom::new,
parser -> {
throw new IOException(new UnsupportedOperationException());
});
}
@Override
protected String getType() {
return NodeAndTransportClientCustom.TYPE;
}
@Override
protected Custom getInstance() {
return new NodeAndTransportClientCustom(VALUE);
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(NodePlugin.class, NodeAndTransportClientPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(NodeAndTransportClientPlugin.class);
}
public void testOptionalCustoms() throws Exception {
// ensure that the customs are injected into the cluster state
assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeCustom.TYPE)));
assertBusy(() -> assertTrue(clusterService().state().metaData().customs().containsKey(NodeAndTransportClientCustom.TYPE)));
final ClusterStateResponse state = internalCluster().transportClient().admin().cluster().prepareState().get();
final ImmutableOpenMap<String, MetaData.Custom> customs = state.getState().metaData().customs();
final Set<String> keys = new HashSet<>(Arrays.asList(customs.keys().toArray(String.class)));
assertThat(keys, hasItem(IndexGraveyard.TYPE));
assertThat(keys, not(hasItem(NodeCustom.TYPE)));
assertThat(keys, hasItem(NodeAndTransportClientCustom.TYPE));
final MetaData.Custom actual = customs.get(NodeAndTransportClientCustom.TYPE);
assertThat(actual, instanceOf(NodeAndTransportClientCustom.class));
assertThat(((NodeAndTransportClientCustom)actual).value(), equalTo(NodeAndTransportClientPlugin.VALUE));
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Optional;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
public class FeatureAwareTests extends ESTestCase {
abstract static class Custom implements MetaData.Custom {
private final Version version;
Custom(final Version version) {
this.version = version;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
}
@Override
public Diff<MetaData.Custom> diff(final MetaData.Custom previousState) {
return null;
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
return builder;
}
@Override
public Version getMinimalSupportedVersion() {
return version;
}
}
static class NoRequiredFeatureCustom extends Custom {
NoRequiredFeatureCustom(final Version version) {
super(version);
}
@Override
public String getWriteableName() {
return "no-required-feature";
}
}
static class RequiredFeatureCustom extends Custom {
RequiredFeatureCustom(final Version version) {
super(version);
}
@Override
public String getWriteableName() {
return null;
}
@Override
public Optional<String> getRequiredFeature() {
return Optional.of("required-feature");
}
}
public void testVersion() {
final Version version = VersionUtils.randomVersion(random());
for (final Custom custom : Arrays.asList(new NoRequiredFeatureCustom(version), new RequiredFeatureCustom(version))) {
{
final BytesStreamOutput out = new BytesStreamOutput();
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
out.setVersion(afterVersion);
if (custom.getRequiredFeature().isPresent()) {
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
}
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
}
{
final BytesStreamOutput out = new BytesStreamOutput();
final Version beforeVersion =
randomVersionBetween(random(), VersionUtils.getFirstVersion(), VersionUtils.getPreviousVersion(version));
out.setVersion(beforeVersion);
if (custom.getRequiredFeature().isPresent() && randomBoolean()) {
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
}
assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
}
}
}
public void testFeature() {
final Version version = VersionUtils.randomVersion(random());
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
final Custom custom = new RequiredFeatureCustom(version);
{
// the feature is present and the client is not a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(custom.getRequiredFeature().isPresent());
out.setFeatures(Collections.singleton(custom.getRequiredFeature().get()));
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
}
{
// the feature is present and the client is a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(custom.getRequiredFeature().isPresent());
out.setFeatures(new HashSet<>(Arrays.asList(custom.getRequiredFeature().get(), TransportClient.TRANSPORT_CLIENT_FEATURE)));
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
}
}
public void testMissingFeature() {
final Version version = VersionUtils.randomVersion(random());
final Version afterVersion = randomVersionBetween(random(), version, Version.CURRENT);
final Custom custom = new RequiredFeatureCustom(version);
{
// the feature is missing but we should serialize it anyway because the client is not a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
assertTrue(FeatureAware.shouldSerializeCustom(out, custom));
}
{
// the feature is missing and we should not serialize it because the client is a transport client
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(afterVersion);
out.setFeatures(Collections.singleton(TransportClient.TRANSPORT_CLIENT_FEATURE));
assertFalse(FeatureAware.shouldSerializeCustom(out, custom));
}
}
}

View File

@ -734,25 +734,6 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase {
Query q6 = mapper.mappers().getMapper("field").fieldType().prefixQuery("goings",
CONSTANT_SCORE_REWRITE, queryShardContext);
assertThat(q6, instanceOf(PrefixQuery.class));
indexService.mapperService().merge("type", json, MergeReason.MAPPING_UPDATE);
String badUpdate = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("field")
.field("type", "text")
.field("analyzer", "english")
.startObject("index_prefixes")
.field("min_chars", 1)
.field("max_chars", 10)
.endObject()
.endObject().endObject()
.endObject().endObject());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
indexService.mapperService()
.merge("type", new CompressedXContent(badUpdate), MergeReason.MAPPING_UPDATE);
});
assertThat(e.getMessage(), containsString("mapper [field._index_prefix] has different min_chars values"));
}
{

View File

@ -18,23 +18,20 @@
*/
package org.elasticsearch.index.mapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.FuzzyQuery;
import org.apache.lucene.search.RegexpQuery;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.unit.Fuzziness;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TextFieldTypeTests extends FieldTypeTestCase {
@Override
protected MappedFieldType createDefaultFieldType() {
@ -71,7 +68,7 @@ public class TextFieldTypeTests extends FieldTypeTestCase {
tft.setFielddataMinSegmentSize(1000);
}
});
addModifier(new Modifier("index_prefixes", true) {
addModifier(new Modifier("index_prefixes", false) {
@Override
public void modify(MappedFieldType ft) {
TextFieldMapper.TextFieldType tft = (TextFieldMapper.TextFieldType)ft;

View File

@ -227,6 +227,7 @@ public class TcpTransportTests extends ESTestCase {
.streamInput(streamIn);
}
threadPool.getThreadContext().readHeaders(streamIn);
assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);

View File

@ -26,7 +26,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.lucene.search.Sort;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -68,12 +67,18 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
@ -105,6 +110,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
@ -130,9 +136,11 @@ import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.node.NodeMocksPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.ScriptMetaData;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit;
@ -1108,7 +1116,8 @@ public abstract class ESIntegTestCase extends ESTestCase {
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
final NamedWriteableRegistry namedWriteableRegistry = cluster().getNamedWriteableRegistry();
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
final Client masterClient = client();
ClusterState masterClusterState = masterClient.admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
@ -1124,16 +1133,37 @@ public abstract class ESIntegTestCase extends ESTestCase {
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() && masterId.equals(localClusterState.nodes().getMasterNodeId())) {
if (masterClusterState.version() == localClusterState.version()
&& masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match", differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", masterClusterState.toString(), localClusterState.toString());
assertEquals("cluster state UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
/*
* The cluster state received by the transport client can miss customs that the client does not understand. This
* means that we only expect equality in the cluster state including customs if the master client and the local
* client are of the same type (both or neither are transport clients). Otherwise, we can only assert equality
* modulo non-core customs.
*/
if (isTransportClient(masterClient) == isTransportClient(client)) {
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("cluster state size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull(
"cluster state JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} else {
// remove non-core customs and compare the cluster states
assertNull(
"cluster state JSON serialization does not match (after removing some customs)",
differenceBetweenMapsIgnoringArrayOrder(
convertToMap(removePluginCustoms(masterClusterState)),
convertToMap(removePluginCustoms(localClusterState))));
}
} catch (final AssertionError error) {
logger.error(
"Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(),
localClusterState.toString());
throw error;
}
}
@ -1142,6 +1172,52 @@ public abstract class ESIntegTestCase extends ESTestCase {
}
/**
* Tests if the client is a transport client or wraps a transport client.
*
* @param client the client to test
* @return true if the client is a transport client or a wrapped transport client
*/
private boolean isTransportClient(final Client client) {
if (TransportClient.class.isAssignableFrom(client.getClass())) {
return true;
} else if (client instanceof RandomizingClient) {
return isTransportClient(((RandomizingClient) client).in());
}
return false;
}
private static final Set<String> SAFE_METADATA_CUSTOMS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(IndexGraveyard.TYPE, IngestMetadata.TYPE, RepositoriesMetaData.TYPE, ScriptMetaData.TYPE)));
private static final Set<String> SAFE_CUSTOMS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(RestoreInProgress.TYPE, SnapshotDeletionsInProgress.TYPE, SnapshotsInProgress.TYPE)));
/**
* Remove any customs except for customs that we know all clients understand.
*
* @param clusterState the cluster state to remove possibly-unknown customs from
* @return the cluster state with possibly-unknown customs removed
*/
private ClusterState removePluginCustoms(final ClusterState clusterState) {
final ClusterState.Builder builder = ClusterState.builder(clusterState);
clusterState.customs().keysIt().forEachRemaining(key -> {
if (SAFE_CUSTOMS.contains(key) == false) {
builder.removeCustom(key);
}
});
final MetaData.Builder mdBuilder = MetaData.builder(clusterState.metaData());
clusterState.metaData().customs().keysIt().forEachRemaining(key -> {
if (SAFE_METADATA_CUSTOMS.contains(key) == false) {
mdBuilder.removeCustom(key);
}
});
builder.metaData(mdBuilder);
return builder.build();
}
/**
* Ensures the cluster is in a searchable state for the given indices. This means a searchable copy of each
* shard is available on the cluster.

View File

@ -93,4 +93,8 @@ public class RandomizingClient extends FilterClient {
return "randomized(" + super.toString() + ")";
}
public Client in() {
return super.in();
}
}

View File

@ -23,7 +23,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -32,7 +31,6 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.network.NetworkService;
@ -45,6 +43,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.node.Node;

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.io.IOException;
import java.util.EnumSet;
@ -23,7 +24,7 @@ import java.util.EnumSet;
/**
* Contains metadata about registered licenses
*/
public class LicensesMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom,
public class LicensesMetaData extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom,
MergableCustomMetaData<LicensesMetaData> {
public static final String TYPE = "licenses";

View File

@ -16,7 +16,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.license.DeleteLicenseAction;
@ -28,13 +27,7 @@ import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.license.PostStartBasicAction;
import org.elasticsearch.license.PostStartTrialAction;
import org.elasticsearch.license.PutLicenseAction;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -114,7 +107,6 @@ import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
@ -174,10 +166,19 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
static Optional<String> X_PACK_FEATURE = Optional.of("x-pack");
@Override
protected Optional<String> getFeature() {
return X_PACK_FEATURE;
}
private final Settings settings;
public XPackClientPlugin(final Settings settings) {
@ -208,11 +209,10 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
static Settings additionalSettings(final Settings settings, final boolean enabled, final boolean transportClientMode) {
if (enabled && transportClientMode) {
final Settings.Builder builder = Settings.builder();
builder.put(SecuritySettings.addTransportSettings(settings));
builder.put(SecuritySettings.addUserSettings(settings));
builder.put(ThreadContext.PREFIX + "." + "has_xpack", true);
return builder.build();
return Settings.builder()
.put(SecuritySettings.addTransportSettings(settings))
.put(SecuritySettings.addUserSettings(settings))
.build();
} else {
return Settings.EMPTY;
}
@ -326,7 +326,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
// ILM
DeleteLifecycleAction.INSTANCE,
GetLifecycleAction.INSTANCE,
PutLifecycleAction.INSTANCE,
PutLifecycleAction.INSTANCE,
ExplainLifecycleAction.INSTANCE
);
}

View File

@ -59,19 +59,15 @@ import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader;
import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import javax.security.auth.DestroyFailedException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.GeneralSecurityException;
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -316,4 +312,23 @@ public class XPackPlugin extends XPackClientPlugin implements ScriptPlugin, Exte
}
return config;
}
public interface XPackClusterStateCustom extends ClusterState.Custom {
@Override
default Optional<String> getRequiredFeature() {
return XPackClientPlugin.X_PACK_FEATURE;
}
}
public interface XPackMetaDataCustom extends MetaData.Custom {
@Override
default Optional<String> getRequiredFeature() {
return XPackClientPlugin.X_PACK_FEATURE;
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
@ -53,7 +54,7 @@ import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class MlMetadata implements MetaData.Custom {
public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
private static final ParseField JOBS_FIELD = new ParseField("jobs");
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");

View File

@ -12,13 +12,14 @@ import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public final class TokenMetaData extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public final class TokenMetaData extends AbstractNamedDiffable<ClusterState.Custom> implements XPackPlugin.XPackClusterStateCustom {
/**
* The type of {@link ClusterState} data.

View File

@ -13,12 +13,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Objects;
public class WatcherMetaData extends AbstractNamedDiffable<MetaData.Custom> implements MetaData.Custom {
public class WatcherMetaData extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom {
public static final String TYPE = "watcher";

View File

@ -42,6 +42,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
@ -91,6 +92,8 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
return;
}
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest()
.indices(request.getConfig().getIndexPattern())
.fields(request.getConfig().getAllFields().toArray(new String[0]));

View File

@ -129,7 +129,11 @@ public class ModelPlotsIT extends MlNativeAutodetectIntegTestCase {
startDatafeed(datafeedId, 0, System.currentTimeMillis());
waitUntilJobIsClosed(job.getId());
assertThat(getBuckets(job.getId()).size(), equalTo(23));
// As the initial time is random, there's a chance the first record is
// aligned on a bucket start. Thus we check the buckets are in [23, 24]
assertThat(getBuckets(job.getId()).size(), greaterThanOrEqualTo(23));
assertThat(getBuckets(job.getId()).size(), lessThanOrEqualTo(24));
Set<String> modelPlotTerms = modelPlotTerms(job.getId(), "by_field_value");
assertThat(modelPlotTerms, containsInAnyOrder("user_2", "user_3"));
}