Merge remote-tracking branch 'es/master' into ccr

* es/master: (24 commits)
  Add missing_bucket option in the composite agg (#29465)
  Rename index_prefix to index_prefixes (#30932)
  Rename methods in PersistentTasksService (#30837)
  [DOCS] Fix watcher file location
  Update the version checks around range bucket keys, now that the change was backported.
  Use dedicated ML APIs in tests (#30941)
  [DOCS] Remove reference to platinum Docker image (#30916)
  Minor clean-up in InternalRange. (#30886)
  stable filemode for zip distributions (#30854)
  [DOCS] Adds missing TLS settings for auditing (#30822)
  [test] packaging: use shell when running commands (#30852)
  Fix location of AbstractHttpServerTransport (#30888)
  [test] packaging test logging for suse distros
  Moved keyword tokenizer to analysis-common module (#30642)
  Upgrade to Lucene-7.4-snapshot-1cbadda4d3 (#30928)
  Limit the scope of BouncyCastle dependency (#30358)
  [DOCS] Reset edit links (#30909)
  Fix IndexTemplateMetaData parsing from xContent (#30917)
  Remove log traces in AzureStorageServiceImpl and fix test (#30924)
  Deprecate accepting malformed requests in stored script API (#28939)
  ...
This commit is contained in:
Martijn van Groningen 2018-05-30 09:55:34 +02:00
commit 56472d6505
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
409 changed files with 7674 additions and 2649 deletions

View File

@ -1,5 +1,5 @@
elasticsearch = 7.0.0-alpha1
lucene = 7.4.0-snapshot-cc2ee23050
lucene = 7.4.0-snapshot-1cbadda4d3
# optional dependencies
spatial4j = 0.7

View File

@ -21,8 +21,6 @@ package org.elasticsearch.client;
import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@ -68,28 +66,6 @@ public final class ClusterClient {
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
}
/**
* Get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public ListTasksResponse listTasks(ListTasksRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
emptySet(), headers);
}
/**
* Asynchronously get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public void listTasksAsync(ListTasksRequest request, ActionListener<ListTasksResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
listener, emptySet(), headers);
}
/**
* Add a pipeline or update an existing pipeline in the cluster
* <p>

View File

@ -192,6 +192,7 @@ public class RestHighLevelClient implements Closeable {
private final IndicesClient indicesClient = new IndicesClient(this);
private final ClusterClient clusterClient = new ClusterClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
@ -264,6 +265,15 @@ public class RestHighLevelClient implements Closeable {
return snapshotClient;
}
/**
* Provides a {@link TasksClient} which can be used to access the Tasks API.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html">Task Management API on elastic.co</a>
*/
public final TasksClient tasks() {
return tasksClient;
}
/**
* Executes a bulk request using the Bulk API
*

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import java.io.IOException;
import static java.util.Collections.emptySet;
/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Tasks API.
* <p>
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html">Task Management API on elastic.co</a>
*/
public class TasksClient {
private final RestHighLevelClient restHighLevelClient;
TasksClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}
/**
* Get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public ListTasksResponse list(ListTasksRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
emptySet(), headers);
}
/**
* Asynchronously get current tasks using the Task Management API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
*/
public void listAsync(ListTasksRequest request, ActionListener<ListTasksResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, ListTasksResponse::fromXContent,
listener, emptySet(), headers);
}
}

View File

@ -20,9 +20,6 @@
package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@ -37,16 +34,13 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -117,31 +111,6 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
}
public void testListTasks() throws IOException {
ListTasksRequest request = new ListTasksRequest();
ListTasksResponse response = execute(request, highLevelClient().cluster()::listTasks, highLevelClient().cluster()::listTasksAsync);
assertThat(response, notNullValue());
assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
// It's possible that there are other tasks except 'cluster:monitor/tasks/lists[n]' and 'action":"cluster:monitor/tasks/lists'
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
boolean listTasksFound = false;
for (TaskGroup taskGroup : response.getTaskGroups()) {
TaskInfo parent = taskGroup.getTaskInfo();
if ("cluster:monitor/tasks/lists".equals(parent.getAction())) {
assertThat(taskGroup.getChildTasks().size(), equalTo(1));
TaskGroup childGroup = taskGroup.getChildTasks().iterator().next();
assertThat(childGroup.getChildTasks().isEmpty(), equalTo(true));
TaskInfo child = childGroup.getTaskInfo();
assertThat(child.getAction(), equalTo("cluster:monitor/tasks/lists[n]"));
assertThat(child.getParentTaskId(), equalTo(parent.getTaskId()));
listTasksFound = true;
}
}
assertTrue("List tasks were not found", listTasksFound);
}
public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values());

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public class TasksIT extends ESRestHighLevelClientTestCase {
public void testListTasks() throws IOException {
ListTasksRequest request = new ListTasksRequest();
ListTasksResponse response = execute(request, highLevelClient().tasks()::list, highLevelClient().tasks()::listAsync);
assertThat(response, notNullValue());
assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
// It's possible that there are other tasks except 'cluster:monitor/tasks/lists[n]' and 'action":"cluster:monitor/tasks/lists'
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
boolean listTasksFound = false;
for (TaskGroup taskGroup : response.getTaskGroups()) {
TaskInfo parent = taskGroup.getTaskInfo();
if ("cluster:monitor/tasks/lists".equals(parent.getAction())) {
assertThat(taskGroup.getChildTasks().size(), equalTo(1));
TaskGroup childGroup = taskGroup.getChildTasks().iterator().next();
assertThat(childGroup.getChildTasks().isEmpty(), equalTo(true));
TaskInfo child = childGroup.getTaskInfo();
assertThat(child.getAction(), equalTo("cluster:monitor/tasks/lists[n]"));
assertThat(child.getParentTaskId(), equalTo(parent.getTaskId()));
listTasksFound = true;
}
}
assertTrue("List tasks were not found", listTasksFound);
}
}

View File

@ -19,13 +19,8 @@
package org.elasticsearch.client.documentation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
@ -39,21 +34,15 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
/**
* This class is used to generate the Java Cluster API documentation.
@ -193,89 +182,6 @@ public class ClusterClientDocumentationIT extends ESRestHighLevelClientTestCase
}
}
public void testListTasks() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::list-tasks-request
ListTasksRequest request = new ListTasksRequest();
// end::list-tasks-request
// tag::list-tasks-request-filter
request.setActions("cluster:*"); // <1>
request.setNodes("nodeId1", "nodeId2"); // <2>
request.setParentTaskId(new TaskId("parentTaskId", 42)); // <3>
// end::list-tasks-request-filter
// tag::list-tasks-request-detailed
request.setDetailed(true); // <1>
// end::list-tasks-request-detailed
// tag::list-tasks-request-wait-completion
request.setWaitForCompletion(true); // <1>
request.setTimeout(TimeValue.timeValueSeconds(50)); // <2>
request.setTimeout("50s"); // <3>
// end::list-tasks-request-wait-completion
}
ListTasksRequest request = new ListTasksRequest();
// tag::list-tasks-execute
ListTasksResponse response = client.cluster().listTasks(request);
// end::list-tasks-execute
assertThat(response, notNullValue());
// tag::list-tasks-response-tasks
List<TaskInfo> tasks = response.getTasks(); // <1>
// end::list-tasks-response-tasks
// tag::list-tasks-response-calc
Map<String, List<TaskInfo>> perNodeTasks = response.getPerNodeTasks(); // <1>
List<TaskGroup> groups = response.getTaskGroups(); // <2>
// end::list-tasks-response-calc
// tag::list-tasks-response-failures
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
// end::list-tasks-response-failures
assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
}
public void testListTasksAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
ListTasksRequest request = new ListTasksRequest();
// tag::list-tasks-execute-listener
ActionListener<ListTasksResponse> listener =
new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::list-tasks-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::list-tasks-execute-async
client.cluster().listTasksAsync(request, listener); // <1>
// end::list-tasks-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testPutPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();

View File

@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
/**
* This class is used to generate the Java Cluster API documentation.
* This class is used to generate the Java Snapshot API documentation.
* You need to wrap your code between two tags like:
* // tag::example
* // end::example

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.documentation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
/**
* This class is used to generate the Java Tasks API documentation.
* You need to wrap your code between two tags like:
* // tag::example
* // end::example
*
* Where example is your tag name.
*
* Then in the documentation, you can extract what is between tag and end tags with
* ["source","java",subs="attributes,callouts,macros"]
* --------------------------------------------------
* include-tagged::{doc-tests}/{@link TasksClientDocumentationIT}.java[example]
* --------------------------------------------------
*
* The column width of the code block is 84. If the code contains a line longer
* than 84, the line will be cut and a horizontal scroll bar will be displayed.
* (the code indentation of the tag is not included in the width)
*/
public class TasksClientDocumentationIT extends ESRestHighLevelClientTestCase {
public void testListTasks() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::list-tasks-request
ListTasksRequest request = new ListTasksRequest();
// end::list-tasks-request
// tag::list-tasks-request-filter
request.setActions("cluster:*"); // <1>
request.setNodes("nodeId1", "nodeId2"); // <2>
request.setParentTaskId(new TaskId("parentTaskId", 42)); // <3>
// end::list-tasks-request-filter
// tag::list-tasks-request-detailed
request.setDetailed(true); // <1>
// end::list-tasks-request-detailed
// tag::list-tasks-request-wait-completion
request.setWaitForCompletion(true); // <1>
request.setTimeout(TimeValue.timeValueSeconds(50)); // <2>
request.setTimeout("50s"); // <3>
// end::list-tasks-request-wait-completion
}
ListTasksRequest request = new ListTasksRequest();
// tag::list-tasks-execute
ListTasksResponse response = client.tasks().list(request);
// end::list-tasks-execute
assertThat(response, notNullValue());
// tag::list-tasks-response-tasks
List<TaskInfo> tasks = response.getTasks(); // <1>
// end::list-tasks-response-tasks
// tag::list-tasks-response-calc
Map<String, List<TaskInfo>> perNodeTasks = response.getPerNodeTasks(); // <1>
List<TaskGroup> groups = response.getTaskGroups(); // <2>
// end::list-tasks-response-calc
// tag::list-tasks-response-failures
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
// end::list-tasks-response-failures
assertThat(response.getNodeFailures(), equalTo(emptyList()));
assertThat(response.getTaskFailures(), equalTo(emptyList()));
assertThat(response.getTasks().size(), greaterThanOrEqualTo(2));
}
public void testListTasksAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
ListTasksRequest request = new ListTasksRequest();
// tag::list-tasks-execute-listener
ActionListener<ListTasksResponse> listener =
new ActionListener<ListTasksResponse>() {
@Override
public void onResponse(ListTasksResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::list-tasks-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::list-tasks-execute-async
client.tasks().listAsync(request, listener); // <1>
// end::list-tasks-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}

View File

@ -106,15 +106,23 @@ tasks.withType(AbstractArchiveTask) {
baseName = "elasticsearch${ subdir.contains('oss') ? '-oss' : ''}"
}
Closure commonZipConfig = {
dirMode 0755
fileMode 0644
}
task buildIntegTestZip(type: Zip) {
configure(commonZipConfig)
with archiveFiles(transportModulesFiles, 'zip', false)
}
task buildZip(type: Zip) {
configure(commonZipConfig)
with archiveFiles(modulesFiles(false), 'zip', false)
}
task buildOssZip(type: Zip) {
configure(commonZipConfig)
with archiveFiles(modulesFiles(true), 'zip', true)
}

View File

@ -104,11 +104,9 @@ include::indices/put_template.asciidoc[]
The Java High Level REST Client supports the following Cluster APIs:
* <<java-rest-high-cluster-put-settings>>
* <<java-rest-high-cluster-list-tasks>>
* <<java-rest-high-cluster-put-pipeline>>
include::cluster/put_settings.asciidoc[]
include::cluster/list_tasks.asciidoc[]
include::cluster/put_pipeline.asciidoc[]
== Snapshot APIs
@ -122,3 +120,11 @@ The Java High Level REST Client supports the following Snapshot APIs:
include::snapshot/get_repository.asciidoc[]
include::snapshot/create_repository.asciidoc[]
include::snapshot/delete_repository.asciidoc[]
== Tasks APIs
The Java High Level REST Client supports the following Tasks APIs:
* <<java-rest-high-tasks-list>>
include::tasks/list_tasks.asciidoc[]

View File

@ -1,4 +1,4 @@
[[java-rest-high-cluster-list-tasks]]
[[java-rest-high-tasks-list]]
=== List Tasks API
The List Tasks API allows to get information about the tasks currently executing in the cluster.
@ -10,7 +10,7 @@ A `ListTasksRequest`:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-request]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request]
--------------------------------------------------
There is no required parameters. By default the client will list all tasks and will not wait
for task completion.
@ -19,7 +19,7 @@ for task completion.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-request-filter]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-filter]
--------------------------------------------------
<1> Request only cluster-related tasks
<2> Request all tasks running on nodes nodeId1 and nodeId2
@ -27,13 +27,13 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-request
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-request-detailed]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-detailed]
--------------------------------------------------
<1> Should the information include detailed, potentially slow to generate data. Defaults to `false`
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-request-wait-completion]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-wait-completion]
--------------------------------------------------
<1> Should this request wait for all found tasks to complete. Defaults to `false`
<2> Timeout for the request as a `TimeValue`. Applicable only if `setWaitForCompletion` is `true`.
@ -45,7 +45,7 @@ Defaults to 30 seconds
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-execute]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute]
--------------------------------------------------
[[java-rest-high-cluster-list-tasks-async]]
@ -57,7 +57,7 @@ passed to the asynchronous method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-execute-async]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute-async]
--------------------------------------------------
<1> The `ListTasksRequest` to execute and the `ActionListener` to use
when the execution completes
@ -71,7 +71,7 @@ A typical listener for `ListTasksResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-execute-listener]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
@ -82,20 +82,20 @@ provided as an argument
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-response-tasks]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-tasks]
--------------------------------------------------
<1> List of currently running tasks
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-response-calc]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-calc]
--------------------------------------------------
<1> List of tasks grouped by a node
<2> List of tasks grouped by a parent task
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[list-tasks-response-failures]
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-failures]
--------------------------------------------------
<1> List of node failures
<2> List of tasks failures

View File

@ -348,6 +348,34 @@ GET /_search
\... will sort the composite bucket in descending order when comparing values from the `date_histogram` source
and in ascending order when comparing values from the `terms` source.
====== Missing bucket
By default documents without a value for a given source are ignored.
It is possible to include them in the response by setting `missing_bucket` to
`true` (defaults to `false`):
[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{ "product_name": { "terms" : { "field": "product", "missing_bucket": true } } }
]
}
}
}
}
--------------------------------------------------
// CONSOLE
In the example above the source `product_name` will emit an explicit `null` value
for documents without a value for the field `product`.
The `order` specified in the source dictates whether the `null` values should rank
first (ascending order, `asc`) or last (descending order, `desc`).
==== Size
The `size` parameter can be set to define how many composite buckets should be returned.

View File

@ -26,6 +26,7 @@ include::{xes-repo-dir}/settings/configuring-xes.asciidoc[]
include::{xes-repo-dir}/setup/bootstrap-checks-xes.asciidoc[]
:edit_url:
include::upgrade.asciidoc[]
include::migration/index.asciidoc[]
@ -66,6 +67,7 @@ include::{xes-repo-dir}/rest-api/index.asciidoc[]
include::{xes-repo-dir}/commands/index.asciidoc[]
:edit_url:
include::how-to.asciidoc[]
include::testing.asciidoc[]

View File

@ -89,7 +89,7 @@ The following parameters are accepted by `text` fields:
What information should be stored in the index, for search and highlighting purposes.
Defaults to `positions`.
<<index-prefix-config,`index_prefix`>>::
<<index-prefix-config,`index_prefixes`>>::
If enabled, term prefixes of between 2 and 5 characters are indexed into a
separate field. This allows prefix searches to run more efficiently, at
@ -138,7 +138,7 @@ The following parameters are accepted by `text` fields:
[[index-prefix-config]]
==== Index Prefix configuration
Text fields may also index term prefixes to speed up prefix searches. The `index_prefix`
Text fields may also index term prefixes to speed up prefix searches. The `index_prefixes`
parameter is configured as below. Either or both of `min_chars` and `max_chars` may be excluded.
Both values are treated as inclusive
@ -151,7 +151,7 @@ PUT my_index
"properties": {
"full_name": {
"type": "text",
"index_prefix" : {
"index_prefixes" : {
"min_chars" : 1, <1>
"max_chars" : 10 <2>
}

View File

@ -1,8 +1,10 @@
When you shut down a node, the allocation process waits for one minute
before starting to replicate the shards on that node to other nodes
in the cluster, causing a lot of wasted I/O. You can avoid racing the clock
by disabling allocation before shutting down the node:
When you shut down a node, the allocation process waits for
`index.unassigned.node_left.delayed_timeout` (by default, one minute) before
starting to replicate the shards on that node to other nodes in the cluster,
which can involve a lot of I/O. Since the node is shortly going to be
restarted, this I/O is unnecessary. You can avoid racing the clock by disabling
allocation before shutting down the node:
[source,js]
--------------------------------------------------

View File

@ -193,6 +193,7 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin {
tokenizers.put("pattern", PatternTokenizerFactory::new);
tokenizers.put("uax_url_email", UAX29URLEmailTokenizerFactory::new);
tokenizers.put("whitespace", WhitespaceTokenizerFactory::new);
tokenizers.put("keyword", KeywordTokenizerFactory::new);
return tokenizers;
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.analysis;
package org.elasticsearch.analysis.common;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.core.KeywordTokenizer;
@ -30,7 +30,7 @@ public class KeywordTokenizerFactory extends AbstractTokenizerFactory {
private final int bufferSize;
public KeywordTokenizerFactory(IndexSettings indexSettings, Environment environment, String name, Settings settings) {
KeywordTokenizerFactory(IndexSettings indexSettings, Environment environment, String name, Settings settings) {
super(indexSettings, name, settings);
bufferSize = settings.getAsInt("buffer_size", 256);
}

View File

@ -24,7 +24,6 @@ import org.apache.lucene.analysis.en.PorterStemFilterFactory;
import org.apache.lucene.analysis.miscellaneous.LimitTokenCountFilterFactory;
import org.apache.lucene.analysis.reverse.ReverseStringFilterFactory;
import org.apache.lucene.analysis.snowball.SnowballPorterFilterFactory;
import org.elasticsearch.index.analysis.KeywordTokenizerFactory;
import org.elasticsearch.index.analysis.SoraniNormalizationFilterFactory;
import org.elasticsearch.index.analysis.SynonymTokenFilterFactory;
import org.elasticsearch.indices.analysis.AnalysisFactoryTestCase;
@ -56,6 +55,7 @@ public class CommonAnalysisFactoryTests extends AnalysisFactoryTestCase {
tokenizers.put("pattern", PatternTokenizerFactory.class);
tokenizers.put("uax29urlemail", UAX29URLEmailTokenizerFactory.class);
tokenizers.put("whitespace", WhitespaceTokenizerFactory.class);
tokenizers.put("keyword", KeywordTokenizerFactory.class);
return tokenizers;
}

View File

@ -5,9 +5,22 @@
indices.analyze:
body:
text: Foo Bar!
explain: true
tokenizer: keyword
- length: { tokens: 1 }
- match: { tokens.0.token: Foo Bar! }
- length: { detail.tokenizer.tokens: 1 }
- match: { detail.tokenizer.name: keyword }
- match: { detail.tokenizer.tokens.0.token: Foo Bar! }
- do:
indices.analyze:
body:
text: Foo Bar!
explain: true
tokenizer:
type: keyword
- length: { detail.tokenizer.tokens: 1 }
- match: { detail.tokenizer.name: _anonymous_tokenizer }
- match: { detail.tokenizer.tokens.0.token: Foo Bar! }
---
"nGram":

View File

@ -97,3 +97,19 @@
- length: { tokens: 2 }
- match: { tokens.0.token: sha }
- match: { tokens.1.token: hay }
---
"Custom normalizer in request":
- do:
indices.analyze:
body:
text: ABc
explain: true
filter: ["lowercase"]
- length: { detail.tokenizer.tokens: 1 }
- length: { detail.tokenfilters.0.tokens: 1 }
- match: { detail.tokenizer.name: keyword_for_normalizer }
- match: { detail.tokenizer.tokens.0.token: ABc }
- match: { detail.tokenfilters.0.name: lowercase }
- match: { detail.tokenfilters.0.tokens.0.token: abc }

View File

@ -0,0 +1 @@
98c920972b2f5e8563540e805d87e6a3bc888972

View File

@ -1 +0,0 @@
1e28b448387ec05d655f8c81ee54e13ff2975a4d

View File

@ -198,6 +198,7 @@ public class SearchTemplateIT extends ESSingleNodeTestCase {
getResponse = client().admin().cluster().prepareGetStoredScript("testTemplate").get();
assertNull(getResponse.getSource());
assertWarnings("the template context is now deprecated. Specify templates in a \"script\" element.");
}
public void testIndexedTemplate() throws Exception {
@ -267,6 +268,7 @@ public class SearchTemplateIT extends ESSingleNodeTestCase {
.setScript("2").setScriptType(ScriptType.STORED).setScriptParams(templateParams)
.get();
assertHitCount(searchResponse.getResponse(), 1);
assertWarnings("the template context is now deprecated. Specify templates in a \"script\" element.");
}
// Relates to #10397
@ -311,6 +313,7 @@ public class SearchTemplateIT extends ESSingleNodeTestCase {
.get();
assertHitCount(searchResponse.getResponse(), 1);
}
assertWarnings("the template context is now deprecated. Specify templates in a \"script\" element.");
}
public void testIndexedTemplateWithArray() throws Exception {
@ -339,6 +342,7 @@ public class SearchTemplateIT extends ESSingleNodeTestCase {
.setScript("4").setScriptType(ScriptType.STORED).setScriptParams(arrayTemplateParams)
.get();
assertHitCount(searchResponse.getResponse(), 5);
assertWarnings("the template context is now deprecated. Specify templates in a \"script\" element.");
}
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;

View File

@ -273,7 +273,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
try (Netty4HttpServerTransport transport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses());
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (Netty4HttpClient client = new Netty4HttpClient()) {
final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
@ -352,7 +352,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
try (Netty4HttpServerTransport transport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher)) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress.boundAddresses());
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
AtomicBoolean channelClosed = new AtomicBoolean(false);

View File

@ -0,0 +1 @@
844e2b76f4bc6e646e1c3257d668ac598e03f36a

View File

@ -1 +0,0 @@
452c9a9f86b79b9b3eaa7d6aa782e189d5bcfe8f

View File

@ -16,9 +16,11 @@
body:
filter: [icu_normalizer]
text: Foo Bar Ruß
tokenizer: keyword
- length: { tokens: 1 }
- match: { tokens.0.token: foo bar russ }
tokenizer: standard
- length: { tokens: 3 }
- match: { tokens.0.token: foo}
- match: { tokens.1.token: bar }
- match: { tokens.2.token: russ }
---
"Normalization charfilter":
- do:
@ -26,9 +28,11 @@
body:
char_filter: [icu_normalizer]
text: Foo Bar Ruß
tokenizer: keyword
- length: { tokens: 1 }
- match: { tokens.0.token: foo bar russ }
tokenizer: standard
- length: { tokens: 3 }
- match: { tokens.0.token: foo }
- match: { tokens.1.token: bar }
- match: { tokens.2.token: russ }
---
"Folding filter":
- do:
@ -36,9 +40,11 @@
body:
filter: [icu_folding]
text: Foo Bar résumé
tokenizer: keyword
- length: { tokens: 1 }
- match: { tokens.0.token: foo bar resume }
tokenizer: standard
- length: { tokens: 3 }
- match: { tokens.0.token: foo }
- match: { tokens.1.token: bar }
- match: { tokens.2.token: resume }
---
"Normalization with a UnicodeSet Filter":
- do:
@ -64,25 +70,34 @@
index: test
body:
char_filter: ["charfilter_icu_normalizer"]
tokenizer: keyword
tokenizer: standard
text: charfilter Föo Bâr Ruß
- length: { tokens: 1 }
- match: { tokens.0.token: charfilter föo bâr ruß }
- length: { tokens: 4 }
- match: { tokens.0.token: charfilter }
- match: { tokens.1.token: föo }
- match: { tokens.2.token: bâr }
- match: { tokens.3.token: ruß }
- do:
indices.analyze:
index: test
body:
tokenizer: keyword
tokenizer: standard
filter: ["tokenfilter_icu_normalizer"]
text: tokenfilter Föo Bâr Ruß
- length: { tokens: 1 }
- match: { tokens.0.token: tokenfilter föo Bâr ruß }
- length: { tokens: 4 }
- match: { tokens.0.token: tokenfilter }
- match: { tokens.1.token: föo }
- match: { tokens.2.token: Bâr }
- match: { tokens.3.token: ruß }
- do:
indices.analyze:
index: test
body:
tokenizer: keyword
tokenizer: standard
filter: ["tokenfilter_icu_folding"]
text: icufolding Föo Bâr Ruß
- length: { tokens: 1 }
- match: { tokens.0.token: icufolding foo bâr russ }
- length: { tokens: 4 }
- match: { tokens.0.token: icufolding }
- match: { tokens.1.token: foo }
- match: { tokens.2.token: bâr }
- match: { tokens.3.token: russ }

View File

@ -0,0 +1 @@
2f2bd2d67c7952e4ae14ab3f742824a45d0d1719

View File

@ -1 +0,0 @@
48c76a922bdfc7f50b1b6fe22e9456c555f3f990

View File

@ -0,0 +1 @@
46ad7ebcfcdbdb60dd54aae4d720356a7a51c7c0

View File

@ -1 +0,0 @@
4db5777df468b0867ff6539c9ab687e0ed6cab41

View File

@ -0,0 +1 @@
548e9f2b4d4a985dc174b2eee4007c0bd5642e68

View File

@ -1 +0,0 @@
0e09e6b011ab2b1a0e3e0e1df2ab2a91dca8ba23

View File

@ -0,0 +1 @@
b90e66f4104f0234cfef335762f65a6fed695231

View File

@ -1 +0,0 @@
ceefa0f9789ab9ea5c8ab9f67ed7a601a3ae6aa9

View File

@ -0,0 +1 @@
929a4eb52b11f6d3f0df9c8eba014f5ee2464c67

View File

@ -1 +0,0 @@
b013adc183e52a74795ad3d3032f4d0f9db30b73

View File

@ -5,7 +5,7 @@
indices.analyze:
body:
text: studenci
tokenizer: keyword
tokenizer: standard
filter: [polish_stem]
- length: { tokens: 1 }
- match: { tokens.0.token: student }

View File

@ -0,0 +1 @@
0e6575a411b65cd95e0e54f04d3da278b68be521

View File

@ -1 +0,0 @@
95300f29418f60e57e022d934d3462be9e1e2225

View File

@ -35,6 +35,7 @@ import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
@ -45,6 +46,7 @@ import org.elasticsearch.repositories.RepositoryException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@ -52,66 +54,59 @@ import java.util.Map;
public class AzureStorageServiceImpl extends AbstractComponent implements AzureStorageService {
final Map<String, AzureStorageSettings> storageSettings;
final Map<String, CloudBlobClient> clients = new HashMap<>();
final Map<String, CloudBlobClient> clients;
public AzureStorageServiceImpl(Settings settings, Map<String, AzureStorageSettings> storageSettings) {
super(settings);
this.storageSettings = storageSettings;
if (storageSettings.isEmpty()) {
// If someone did not register any settings, they basically can't use the plugin
throw new IllegalArgumentException("If you want to use an azure repository, you need to define a client configuration.");
}
logger.debug("starting azure storage client instance");
// We register all regular azure clients
for (Map.Entry<String, AzureStorageSettings> azureStorageSettingsEntry : this.storageSettings.entrySet()) {
logger.debug("registering regular client for account [{}]", azureStorageSettingsEntry.getKey());
createClient(azureStorageSettingsEntry.getValue());
}
this.storageSettings = storageSettings;
this.clients = createClients(storageSettings);
}
void createClient(AzureStorageSettings azureStorageSettings) {
try {
logger.trace("creating new Azure storage client using account [{}], key [{}], endpoint suffix [{}]",
azureStorageSettings.getAccount(), azureStorageSettings.getKey(), azureStorageSettings.getEndpointSuffix());
private Map<String, CloudBlobClient> createClients(final Map<String, AzureStorageSettings> storageSettings) {
final Map<String, CloudBlobClient> clients = new HashMap<>();
for (Map.Entry<String, AzureStorageSettings> azureStorageEntry : storageSettings.entrySet()) {
final String clientName = azureStorageEntry.getKey();
final AzureStorageSettings clientSettings = azureStorageEntry.getValue();
try {
logger.trace("creating new Azure storage client with name [{}]", clientName);
String storageConnectionString =
"DefaultEndpointsProtocol=https;"
+ "AccountName=" + clientSettings.getAccount() + ";"
+ "AccountKey=" + clientSettings.getKey();
String storageConnectionString =
"DefaultEndpointsProtocol=https;"
+ "AccountName=" + azureStorageSettings.getAccount() + ";"
+ "AccountKey=" + azureStorageSettings.getKey();
final String endpointSuffix = clientSettings.getEndpointSuffix();
if (Strings.hasLength(endpointSuffix)) {
storageConnectionString += ";EndpointSuffix=" + endpointSuffix;
}
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
String endpointSuffix = azureStorageSettings.getEndpointSuffix();
if (endpointSuffix != null && !endpointSuffix.isEmpty()) {
storageConnectionString += ";EndpointSuffix=" + endpointSuffix;
// Create the blob client.
CloudBlobClient client = storageAccount.createCloudBlobClient();
// Register the client
clients.put(clientSettings.getAccount(), client);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Can not create azure storage client [{}]", clientName), e);
}
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
// Create the blob client.
CloudBlobClient client = storageAccount.createCloudBlobClient();
// Register the client
this.clients.put(azureStorageSettings.getAccount(), client);
} catch (Exception e) {
logger.error("can not create azure storage client: {}", e.getMessage());
}
return Collections.unmodifiableMap(clients);
}
CloudBlobClient getSelectedClient(String clientName, LocationMode mode) {
logger.trace("selecting a client named [{}], mode [{}]", clientName, mode.name());
AzureStorageSettings azureStorageSettings = this.storageSettings.get(clientName);
if (azureStorageSettings == null) {
throw new IllegalArgumentException("Can not find named azure client [" + clientName + "]. Check your settings.");
throw new IllegalArgumentException("Unable to find client with name [" + clientName + "]");
}
CloudBlobClient client = this.clients.get(azureStorageSettings.getAccount());
if (client == null) {
throw new IllegalArgumentException("Can not find an azure client named [" + azureStorageSettings.getAccount() + "]");
throw new IllegalArgumentException("No account defined for client with name [" + clientName + "]");
}
// NOTE: for now, just set the location mode in case it is different;

View File

@ -23,7 +23,6 @@ import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.core.Base64;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
@ -36,6 +35,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import static org.elasticsearch.repositories.azure.AzureStorageServiceImpl.blobNameFromUri;
@ -49,31 +49,14 @@ import static org.hamcrest.Matchers.nullValue;
public class AzureStorageServiceTests extends ESTestCase {
private MockSecureSettings buildSecureSettings() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.azure1.account", "myaccount1");
secureSettings.setString("azure.client.azure1.key", "mykey1");
secureSettings.setString("azure.client.azure2.account", "myaccount2");
secureSettings.setString("azure.client.azure2.key", "mykey2");
secureSettings.setString("azure.client.azure3.account", "myaccount3");
secureSettings.setString("azure.client.azure3.key", "mykey3");
return secureSettings;
}
private Settings buildSettings() {
Settings settings = Settings.builder()
.setSecureSettings(buildSecureSettings())
.build();
return settings;
}
public void testReadSecuredSettings() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.azure1.account", "myaccount1");
secureSettings.setString("azure.client.azure1.key", "mykey1");
secureSettings.setString("azure.client.azure1.key", encodeKey("mykey1"));
secureSettings.setString("azure.client.azure2.account", "myaccount2");
secureSettings.setString("azure.client.azure2.key", "mykey2");
secureSettings.setString("azure.client.azure2.key", encodeKey("mykey2"));
secureSettings.setString("azure.client.azure3.account", "myaccount3");
secureSettings.setString("azure.client.azure3.key", "mykey3");
secureSettings.setString("azure.client.azure3.key", encodeKey("mykey3"));
Settings settings = Settings.builder().setSecureSettings(secureSettings)
.put("azure.client.azure3.endpoint_suffix", "my_endpoint_suffix").build();
@ -88,9 +71,9 @@ public class AzureStorageServiceTests extends ESTestCase {
public void testCreateClientWithEndpointSuffix() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.azure1.account", "myaccount1");
secureSettings.setString("azure.client.azure1.key", Base64.encode("mykey1".getBytes(StandardCharsets.UTF_8)));
secureSettings.setString("azure.client.azure1.key", encodeKey("mykey1"));
secureSettings.setString("azure.client.azure2.account", "myaccount2");
secureSettings.setString("azure.client.azure2.key", Base64.encode("mykey2".getBytes(StandardCharsets.UTF_8)));
secureSettings.setString("azure.client.azure2.key", encodeKey("mykey2"));
Settings settings = Settings.builder().setSecureSettings(secureSettings)
.put("azure.client.azure1.endpoint_suffix", "my_endpoint_suffix").build();
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceImpl(settings, AzureStorageSettings.load(settings));
@ -103,7 +86,7 @@ public class AzureStorageServiceTests extends ESTestCase {
public void testGetSelectedClientWithNoPrimaryAndSecondary() {
try {
new AzureStorageServiceMockForSettings(Settings.EMPTY);
new AzureStorageServiceImpl(Settings.EMPTY, Collections.emptyMap());
fail("we should have raised an IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), is("If you want to use an azure repository, you need to define a client configuration."));
@ -111,11 +94,11 @@ public class AzureStorageServiceTests extends ESTestCase {
}
public void testGetSelectedClientNonExisting() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMockForSettings(buildSettings());
AzureStorageServiceImpl azureStorageService = createAzureService(buildSettings());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
azureStorageService.getSelectedClient("azure4", LocationMode.PRIMARY_ONLY);
});
assertThat(e.getMessage(), is("Can not find named azure client [azure4]. Check your settings."));
assertThat(e.getMessage(), is("Unable to find client with name [azure4]"));
}
public void testGetSelectedClientDefaultTimeout() {
@ -123,7 +106,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.setSecureSettings(buildSecureSettings())
.put("azure.client.azure3.timeout", "30s")
.build();
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMockForSettings(timeoutSettings);
AzureStorageServiceImpl azureStorageService = createAzureService(timeoutSettings);
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), nullValue());
CloudBlobClient client3 = azureStorageService.getSelectedClient("azure3", LocationMode.PRIMARY_ONLY);
@ -131,13 +114,13 @@ public class AzureStorageServiceTests extends ESTestCase {
}
public void testGetSelectedClientNoTimeout() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMockForSettings(buildSettings());
AzureStorageServiceImpl azureStorageService = createAzureService(buildSettings());
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getTimeoutIntervalInMs(), is(nullValue()));
}
public void testGetSelectedClientBackoffPolicy() {
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMockForSettings(buildSettings());
AzureStorageServiceImpl azureStorageService = createAzureService(buildSettings());
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
@ -149,7 +132,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.max_retries", 7)
.build();
AzureStorageServiceImpl azureStorageService = new AzureStorageServiceMockForSettings(timeoutSettings);
AzureStorageServiceImpl azureStorageService = createAzureService(timeoutSettings);
CloudBlobClient client1 = azureStorageService.getSelectedClient("azure1", LocationMode.PRIMARY_ONLY);
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), is(notNullValue()));
assertThat(client1.getDefaultRequestOptions().getRetryPolicyFactory(), instanceOf(RetryExponentialRetry.class));
@ -159,7 +142,7 @@ public class AzureStorageServiceTests extends ESTestCase {
Settings settings = Settings.builder()
.setSecureSettings(buildSecureSettings())
.build();
AzureStorageServiceMockForSettings mock = new AzureStorageServiceMockForSettings(settings);
AzureStorageServiceImpl mock = createAzureService(settings);
assertThat(mock.storageSettings.get("azure1").getProxy(), nullValue());
assertThat(mock.storageSettings.get("azure2").getProxy(), nullValue());
assertThat(mock.storageSettings.get("azure3").getProxy(), nullValue());
@ -172,7 +155,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.port", 8080)
.put("azure.client.azure1.proxy.type", "http")
.build();
AzureStorageServiceMockForSettings mock = new AzureStorageServiceMockForSettings(settings);
AzureStorageServiceImpl mock = createAzureService(settings);
Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
assertThat(azure1Proxy, notNullValue());
@ -192,7 +175,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure2.proxy.port", 8081)
.put("azure.client.azure2.proxy.type", "http")
.build();
AzureStorageServiceMockForSettings mock = new AzureStorageServiceMockForSettings(settings);
AzureStorageServiceImpl mock = createAzureService(settings);
Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
assertThat(azure1Proxy, notNullValue());
assertThat(azure1Proxy.type(), is(Proxy.Type.HTTP));
@ -211,7 +194,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.port", 8080)
.put("azure.client.azure1.proxy.type", "socks")
.build();
AzureStorageServiceMockForSettings mock = new AzureStorageServiceMockForSettings(settings);
AzureStorageServiceImpl mock = createAzureService(settings);
Proxy azure1Proxy = mock.storageSettings.get("azure1").getProxy();
assertThat(azure1Proxy, notNullValue());
assertThat(azure1Proxy.type(), is(Proxy.Type.SOCKS));
@ -227,7 +210,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
.build();
SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceMockForSettings(settings));
SettingsException e = expectThrows(SettingsException.class, () -> createAzureService(settings));
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
}
@ -238,7 +221,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.type", randomFrom("socks", "http"))
.build();
SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceMockForSettings(settings));
SettingsException e = expectThrows(SettingsException.class, () -> createAzureService(settings));
assertEquals("Azure Proxy type has been set but proxy host or port is not defined.", e.getMessage());
}
@ -249,7 +232,7 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.port", 8080)
.build();
SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceMockForSettings(settings));
SettingsException e = expectThrows(SettingsException.class, () -> createAzureService(settings));
assertEquals("Azure Proxy port or host have been set but proxy type is not defined.", e.getMessage());
}
@ -261,26 +244,10 @@ public class AzureStorageServiceTests extends ESTestCase {
.put("azure.client.azure1.proxy.port", 8080)
.build();
SettingsException e = expectThrows(SettingsException.class, () -> new AzureStorageServiceMockForSettings(settings));
SettingsException e = expectThrows(SettingsException.class, () -> createAzureService(settings));
assertEquals("Azure proxy host is unknown.", e.getMessage());
}
/**
* This internal class just overload createClient method which is called by AzureStorageServiceImpl.doStart()
*/
class AzureStorageServiceMockForSettings extends AzureStorageServiceImpl {
AzureStorageServiceMockForSettings(Settings settings) {
super(settings, AzureStorageSettings.load(settings));
}
// We fake the client here
@Override
void createClient(AzureStorageSettings azureStorageSettings) {
this.clients.put(azureStorageSettings.getAccount(),
new CloudBlobClient(URI.create("https://" + azureStorageSettings.getAccount())));
}
}
public void testBlobNameFromUri() throws URISyntaxException {
String name = blobNameFromUri(new URI("https://myservice.azure.net/container/path/to/myfile"));
assertThat(name, is("path/to/myfile"));
@ -291,4 +258,27 @@ public class AzureStorageServiceTests extends ESTestCase {
name = blobNameFromUri(new URI("https://127.0.0.1/container/path/to/myfile"));
assertThat(name, is("path/to/myfile"));
}
private static MockSecureSettings buildSecureSettings() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.azure1.account", "myaccount1");
secureSettings.setString("azure.client.azure1.key", encodeKey("mykey1"));
secureSettings.setString("azure.client.azure2.account", "myaccount2");
secureSettings.setString("azure.client.azure2.key", encodeKey("mykey2"));
secureSettings.setString("azure.client.azure3.account", "myaccount3");
secureSettings.setString("azure.client.azure3.key", encodeKey("mykey3"));
return secureSettings;
}
private static Settings buildSettings() {
return Settings.builder().setSecureSettings(buildSecureSettings()).build();
}
private static AzureStorageServiceImpl createAzureService(final Settings settings) {
return new AzureStorageServiceImpl(settings, AzureStorageSettings.load(settings));
}
private static String encodeKey(final String value) {
return Base64.encode(value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -42,7 +42,7 @@ import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.AbstractHttpServerTransport;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.BytesChannelContext;

View File

@ -82,38 +82,26 @@ In general it's probably best to avoid running external commands when a good
Java alternative exists. For example most filesystem operations can be done with
the java.nio.file APIs. For those that aren't, use an instance of [Shell](src/main/java/org/elasticsearch/packaging/util/Shell.java)
Despite the name, commands run with this class are not run in a shell, and any
familiar features of shells like variables or expansion won't work.
If you do need the shell, you must explicitly invoke the shell's command. For
example to run a command with Bash, use the `bash -c command` syntax. Note that
the entire script must be in a single string argument
This class runs scripts in either bash with the `bash -c <script>` syntax,
or in powershell with the `powershell.exe -Command <script>` syntax.
```java
Shell sh = new Shell();
sh.run("bash", "-c", "echo $foo; echo $bar");
// equivalent to `bash -c 'echo $foo; echo $bar'`
sh.bash("echo $foo; echo $bar");
// equivalent to `powershell.exe -Command 'Write-Host $foo; Write-Host $bar'`
sh.powershell("Write-Host $foo; Write-Host $bar");
```
Similary for powershell - again, the entire powershell script must go in a
single string argument
### Notes about powershell
```java
sh.run("powershell.exe", "-Command", "Write-Host $foo; Write-Host $bar");
```
Powershell scripts for the most part have backwards compatibility with legacy
cmd.exe commands and their syntax. Most of the commands you'll want to use
in powershell are [Cmdlets](https://msdn.microsoft.com/en-us/library/ms714395.aspx)
which generally don't have a one-to-one mapping with an executable file.
On Linux, most commands you'll want to use will be executable files and will
work fine without a shell
```java
sh.run("tar", "-xzpf", "elasticsearch-6.1.0.tar.gz");
```
On Windows you'll mostly want to use powershell as it can do a lot more and
gives much better feedback than Windows' legacy command line. Unfortunately that
means that you'll need to use the `powershell.exe -Command` syntax as
powershell's [Cmdlets](https://msdn.microsoft.com/en-us/library/ms714395.aspx)
don't correspond to executable files and are not runnable by `Runtime` directly.
When writing powershell commands this way, make sure to test them as some types
of formatting can cause it to return a successful exit code but not run
anything.
When writing powershell commands in this project it's worth testing them by
hand, as sometimes when a script can't be interpreted correctly it will
fail silently.

View File

@ -64,7 +64,7 @@ public class Archives {
if (distribution.packaging == Distribution.Packaging.TAR) {
if (Platforms.LINUX) {
sh.run("tar", "-C", baseInstallPath.toString(), "-xzpf", distributionFile.toString());
sh.bash("tar -C " + baseInstallPath + " -xzpf " + distributionFile);
} else {
throw new RuntimeException("Distribution " + distribution + " is not supported on windows");
}
@ -72,11 +72,12 @@ public class Archives {
} else if (distribution.packaging == Distribution.Packaging.ZIP) {
if (Platforms.LINUX) {
sh.run("unzip", distributionFile.toString(), "-d", baseInstallPath.toString());
sh.bash("unzip " + distributionFile + " -d " + baseInstallPath);
} else {
sh.run("powershell.exe", "-Command",
sh.powershell(
"Add-Type -AssemblyName 'System.IO.Compression.Filesystem'; " +
"[IO.Compression.ZipFile]::ExtractToDirectory('" + distributionFile + "', '" + baseInstallPath + "')");
"[IO.Compression.ZipFile]::ExtractToDirectory('" + distributionFile + "', '" + baseInstallPath + "')"
);
}
} else {
@ -102,35 +103,35 @@ public class Archives {
private static void setupArchiveUsersLinux(Path installPath) {
final Shell sh = new Shell();
if (sh.runIgnoreExitCode("getent", "group", "elasticsearch").isSuccess() == false) {
if (sh.bashIgnoreExitCode("getent group elasticsearch").isSuccess() == false) {
if (isDPKG()) {
sh.run("addgroup", "--system", "elasticsearch");
sh.bash("addgroup --system elasticsearch");
} else {
sh.run("groupadd", "-r", "elasticsearch");
sh.bash("groupadd -r elasticsearch");
}
}
if (sh.runIgnoreExitCode("id", "elasticsearch").isSuccess() == false) {
if (sh.bashIgnoreExitCode("id elasticsearch").isSuccess() == false) {
if (isDPKG()) {
sh.run("adduser",
"--quiet",
"--system",
"--no-create-home",
"--ingroup", "elasticsearch",
"--disabled-password",
"--shell", "/bin/false",
sh.bash("adduser " +
"--quiet " +
"--system " +
"--no-create-home " +
"--ingroup elasticsearch " +
"--disabled-password " +
"--shell /bin/false " +
"elasticsearch");
} else {
sh.run("useradd",
"--system",
"-M",
"--gid", "elasticsearch",
"--shell", "/sbin/nologin",
"--comment", "elasticsearch user",
sh.bash("useradd " +
"--system " +
"-M " +
"--gid elasticsearch " +
"--shell /sbin/nologin " +
"--comment 'elasticsearch user' " +
"elasticsearch");
}
}
sh.run("chown", "-R", "elasticsearch:elasticsearch", installPath.toString());
sh.bash("chown -R elasticsearch:elasticsearch " + installPath);
}
public static void verifyArchiveInstallation(Installation installation, Distribution distribution) {

View File

@ -59,16 +59,16 @@ public class Cleanup {
if (Platforms.WINDOWS) {
// the view of processes returned by Get-Process doesn't expose command line arguments, so we use WMI here
sh.runIgnoreExitCode("powershell.exe", "-Command",
sh.powershellIgnoreExitCode(
"Get-WmiObject Win32_Process | " +
"Where-Object { $_.CommandLine -Match 'org.elasticsearch.bootstrap.Elasticsearch' } | " +
"ForEach-Object { $_.Terminate() }");
"ForEach-Object { $_.Terminate() }"
);
} else {
sh.runIgnoreExitCode("pkill", "-u", "elasticsearch");
sh.runIgnoreExitCode("bash", "-c",
"ps aux | grep -i 'org.elasticsearch.bootstrap.Elasticsearch' | awk {'print $2'} | xargs kill -9");
sh.bashIgnoreExitCode("pkill -u elasticsearch");
sh.bashIgnoreExitCode("ps aux | grep -i 'org.elasticsearch.bootstrap.Elasticsearch' | awk {'print $2'} | xargs kill -9");
}
@ -78,8 +78,8 @@ public class Cleanup {
// remove elasticsearch users
if (Platforms.LINUX) {
sh.runIgnoreExitCode("userdel", "elasticsearch");
sh.runIgnoreExitCode("groupdel", "elasticsearch");
sh.bashIgnoreExitCode("userdel elasticsearch");
sh.bashIgnoreExitCode("groupdel elasticsearch");
}
// delete files that may still exist
@ -95,7 +95,7 @@ public class Cleanup {
// disable elasticsearch service
// todo add this for windows when adding tests for service intallation
if (Platforms.LINUX && isSystemd()) {
sh.run("systemctl", "unmask", "systemd-sysctl.service");
sh.bash("systemctl unmask systemd-sysctl.service");
}
}
@ -103,19 +103,19 @@ public class Cleanup {
final Shell sh = new Shell();
if (isRPM()) {
sh.runIgnoreExitCode("rpm", "--quiet", "-e", "elasticsearch", "elasticsearch-oss");
sh.bashIgnoreExitCode("rpm --quiet -e elasticsearch elasticsearch-oss");
}
if (isYUM()) {
sh.runIgnoreExitCode("yum", "remove", "-y", "elasticsearch", "elasticsearch-oss");
sh.bashIgnoreExitCode("yum remove -y elasticsearch elasticsearch-oss");
}
if (isDPKG()) {
sh.runIgnoreExitCode("dpkg", "--purge", "elasticsearch", "elasticsearch-oss");
sh.bashIgnoreExitCode("dpkg --purge elasticsearch elasticsearch-oss");
}
if (isAptGet()) {
sh.runIgnoreExitCode("apt-get", "--quiet", "--yes", "purge", "elasticsearch", "elasticsearch-oss");
sh.bashIgnoreExitCode("apt-get --quiet --yes purge elasticsearch elasticsearch-oss");
}
}
}

View File

@ -28,41 +28,41 @@ public class Platforms {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "dpkg").isSuccess();
return new Shell().bashIgnoreExitCode("which dpkg").isSuccess();
}
public static boolean isAptGet() {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "apt-get").isSuccess();
return new Shell().bashIgnoreExitCode("which apt-get").isSuccess();
}
public static boolean isRPM() {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "rpm").isSuccess();
return new Shell().bashIgnoreExitCode("which rpm").isSuccess();
}
public static boolean isYUM() {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "yum").isSuccess();
return new Shell().bashIgnoreExitCode("which yum").isSuccess();
}
public static boolean isSystemd() {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "systemctl").isSuccess();
return new Shell().bashIgnoreExitCode("which systemctl").isSuccess();
}
public static boolean isSysVInit() {
if (WINDOWS) {
return false;
}
return new Shell().runIgnoreExitCode("which", "service").isSuccess();
return new Shell().bashIgnoreExitCode("which service").isSuccess();
}
}

View File

@ -29,6 +29,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
@ -57,7 +58,47 @@ public class Shell {
this.workingDirectory = workingDirectory;
}
public Result run(String... command) {
/**
* Runs a script in a bash shell, throwing an exception if its exit code is nonzero
*/
public Result bash(String script) {
return run(bashCommand(script));
}
/**
* Runs a script in a bash shell
*/
public Result bashIgnoreExitCode(String script) {
return runIgnoreExitCode(bashCommand(script));
}
private static String[] bashCommand(String script) {
return Stream.concat(Stream.of("bash", "-c"), Stream.of(script)).toArray(String[]::new);
}
/**
* Runs a script in a powershell shell, throwing an exception if its exit code is nonzero
*/
public Result powershell(String script) {
return run(powershellCommand(script));
}
/**
* Runs a script in a powershell shell
*/
public Result powershellIgnoreExitCode(String script) {
return runIgnoreExitCode(powershellCommand(script));
}
private static String[] powershellCommand(String script) {
return Stream.concat(Stream.of("powershell.exe", "-Command"), Stream.of(script)).toArray(String[]::new);
}
/**
* Runs an executable file, passing all elements of {@code command} after the first as arguments. Throws an exception if the process'
* exit code is nonzero
*/
private Result run(String[] command) {
Result result = runIgnoreExitCode(command);
if (result.isSuccess() == false) {
throw new RuntimeException("Command was not successful: [" + String.join(" ", command) + "] result: " + result.toString());
@ -65,7 +106,10 @@ public class Shell {
return result;
}
public Result runIgnoreExitCode(String... command) {
/**
* Runs an executable file, passing all elements of {@code command} after the first as arguments
*/
private Result runIgnoreExitCode(String[] command) {
ProcessBuilder builder = new ProcessBuilder();
builder.command(command);

View File

@ -68,6 +68,7 @@ setup() {
@test "[TAR] install archive" {
# Install the archive
install_archive
set_debug_logging
count=$(find /tmp -type d -name 'elasticsearch*' | wc -l)
[ "$count" -eq 1 ]

View File

@ -61,6 +61,7 @@ setup() {
[ ! -d "$ESHOME" ]; then
clean_before_test
install
set_debug_logging
fi
}

View File

@ -461,6 +461,13 @@ debug_collect_logs() {
describe_port 127.0.0.1 9201
}
set_debug_logging() {
if [ "$ESCONFIG" ] && [ -d "$ESCONFIG" ] && [ -f /etc/os-release ] && (grep -qi suse /etc/os-release); then
echo 'logger.org.elasticsearch.indices: DEBUG' >> "$ESCONFIG/elasticsearch.yml"
echo 'logger.org.elasticsearch.gateway: TRACE' >> "$ESCONFIG/elasticsearch.yml"
fi
}
# Waits for Elasticsearch to reach some status.
# $1 - expected status - defaults to green
wait_for_elasticsearch_status() {

View File

@ -75,19 +75,3 @@
- match: { detail.tokenizer.tokens.2.token: buzz }
- match: { detail.tokenfilters.0.name: "_anonymous_tokenfilter" }
- match: { detail.tokenfilters.0.tokens.0.token: bar }
---
"Custom normalizer in request":
- do:
indices.analyze:
body:
text: ABc
explain: true
filter: ["lowercase"]
- length: { detail.tokenizer.tokens: 1 }
- length: { detail.tokenfilters.0.tokens: 1 }
- match: { detail.tokenizer.name: keyword_for_normalizer }
- match: { detail.tokenizer.tokens.0.token: ABc }
- match: { detail.tokenfilters.0.name: lowercase }
- match: { detail.tokenfilters.0.tokens.0.token: abc }

View File

@ -323,3 +323,32 @@ setup:
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }
---
"Composite aggregation and array size":
- skip:
version: " - 6.99.99"
reason: starting in 7.0 the composite sources do not allocate arrays eagerly.
- do:
search:
index: test
body:
aggregations:
test:
composite:
size: 1000000000
sources: [
{
"keyword": {
"terms": {
"field": "keyword",
}
}
}
]
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- length: { aggregations.test.after_key: 1 }
- match: { aggregations.test.after_key.keyword: "foo" }

View File

@ -2,7 +2,7 @@
"search with index prefixes":
- skip:
version: " - 6.99.99"
reason: index_prefix is only available as of 6.3.0
reason: index_prefixes is only available as of 6.3.0
- do:
indices.create:
index: test
@ -12,7 +12,7 @@
properties:
text:
type: text
index_prefix:
index_prefixes:
min_chars: 1
max_chars: 10

View File

@ -0,0 +1 @@
0f75703c30756c31f7d09ec79191dab6fb35c958

View File

@ -1 +0,0 @@
96ab108569c77932ecb17c45421affece207df5c

View File

@ -0,0 +1 @@
c5c519fdea65726612f79e3dd942b7316966646e

View File

@ -1 +0,0 @@
72d09ca50979f716a57f53f2de33d55023a166ec

View File

@ -0,0 +1 @@
f345b6aa3c550dafc63de3e5a5c404691e782336

View File

@ -1 +0,0 @@
e118e4d05070378516b9055184b74498ba528dee

View File

@ -0,0 +1 @@
7a74855e37124a27af36390c9d15abe33614129e

View File

@ -1 +0,0 @@
2b2ea6bfe6fa159bbf205bf7f7fa2ed2c22bbffc

View File

@ -0,0 +1 @@
0e3df4b469465ef101254fdcbb08ebd8a19f1f9d

View File

@ -1 +0,0 @@
423e4fff9276101d845d6073dc6cd27504def207

View File

@ -0,0 +1 @@
05d236149c99c860e6b627a8f78ea32918c108c3

View File

@ -1 +0,0 @@
27561038da2edcae3ecc3a08b0a52824966af87a

View File

@ -0,0 +1 @@
d83e7e65eb268425f7bd5be2425d4a00b556bc47

View File

@ -1 +0,0 @@
d7d422159f705261784d121e24877119d9c95083

View File

@ -0,0 +1 @@
440a998b5bf99871bec4272a219de01b25751d5c

View File

@ -1 +0,0 @@
fc09508fde6ba87f241d7e3148d9e310c0db9cb9

View File

@ -0,0 +1 @@
2a5c031155fdfa743af321150c0dd654a6ea3c71

View File

@ -1 +0,0 @@
201fdf3432ff3fef0f48c38c2c0f482c144f6868

View File

@ -0,0 +1 @@
d021c9a461ff0f020d038ad5ecc5127973d4674a

View File

@ -1 +0,0 @@
917df8c8d08952a012a34050b183b6204ae7081b

View File

@ -0,0 +1 @@
9877a14c53e69b39fff2bf10d49a61171746d940

View File

@ -1 +0,0 @@
caff84fa66cb0376835c39f3d4ca7dfd2177d8f4

View File

@ -0,0 +1 @@
7d7e5101b46a120efa311509948c0d1f9bf30155

View File

@ -1 +0,0 @@
e1bce61a9d9129a8d0fdd3127a84665d29f53eb0

View File

@ -0,0 +1 @@
5a4c11db96ae70b9048243cc530fcbc76faa0978

View File

@ -1 +0,0 @@
3a2e4373d79fda968a078971efa2cb8ec9ff65b0

View File

@ -0,0 +1 @@
afb01af1450067b145ca2c1d737b5907288af560

View File

@ -1 +0,0 @@
7f14927e5c3c1c85c4c5b3681c28c5e36f241dda

View File

@ -0,0 +1 @@
473f0221e0b2ea45940d8ae6dcf16e39c81b18c2

View File

@ -1 +0,0 @@
6e708a38c957a655e0cfedb06a1b9aa892929db0

View File

@ -270,7 +270,8 @@ public class IndexTemplateMetaData extends AbstractDiffable<IndexTemplateMetaDat
public static class Builder {
private static final Set<String> VALID_FIELDS = Sets.newHashSet("template", "order", "mappings", "settings", "index_patterns");
private static final Set<String> VALID_FIELDS = Sets.newHashSet(
"template", "order", "mappings", "settings", "index_patterns", "aliases", "version");
static {
VALID_FIELDS.addAll(IndexMetaData.customPrototypes.keySet());
}

View File

@ -18,12 +18,9 @@
*/
package org.elasticsearch.common.geo.parsers;
import org.locationtech.jts.geom.Coordinate;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoShapeType;
import java.io.StringReader;
import org.elasticsearch.common.geo.builders.CoordinatesBuilder;
import org.elasticsearch.common.geo.builders.EnvelopeBuilder;
import org.elasticsearch.common.geo.builders.GeometryCollectionBuilder;
@ -37,9 +34,11 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
import org.locationtech.jts.geom.Coordinate;
import java.io.IOException;
import java.io.StreamTokenizer;
import java.io.StringReader;
import java.util.List;
/**
@ -77,8 +76,7 @@ public class GeoWKTParser {
public static ShapeBuilder parseExpectedType(XContentParser parser, final GeoShapeType shapeType,
final GeoShapeFieldMapper shapeMapper)
throws IOException, ElasticsearchParseException {
StringReader reader = new StringReader(parser.text());
try {
try (StringReader reader = new StringReader(parser.text())) {
boolean ignoreZValue = (shapeMapper != null && shapeMapper.ignoreZValue().value() == true);
// setup the tokenizer; configured to read words w/o numbers
StreamTokenizer tokenizer = new StreamTokenizer(reader);
@ -95,8 +93,6 @@ public class GeoWKTParser {
ShapeBuilder builder = parseGeometry(tokenizer, shapeType, ignoreZValue);
checkEOF(tokenizer);
return builder;
} finally {
reader.close();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.http.netty4;
package org.elasticsearch.http;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
@ -30,8 +30,6 @@ import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.threadpool.ThreadPool;

View File

@ -234,8 +234,8 @@ public class Analysis {
final Path path = env.configFile().resolve(wordListPath);
try (BufferedReader reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)) {
return loadWordList(reader, "#");
try {
return loadWordList(path, "#");
} catch (CharacterCodingException ex) {
String message = String.format(Locale.ROOT,
"Unsupported character encoding detected while reading %s_path: %s - files must be UTF-8 encoded",
@ -247,15 +247,9 @@ public class Analysis {
}
}
public static List<String> loadWordList(Reader reader, String comment) throws IOException {
private static List<String> loadWordList(Path path, String comment) throws IOException {
final List<String> result = new ArrayList<>();
BufferedReader br = null;
try {
if (reader instanceof BufferedReader) {
br = (BufferedReader) reader;
} else {
br = new BufferedReader(reader);
}
try (BufferedReader br = Files.newBufferedReader(path, StandardCharsets.UTF_8)) {
String word;
while ((word = br.readLine()) != null) {
if (!Strings.hasText(word)) {
@ -265,9 +259,6 @@ public class Analysis {
result.add(word.trim());
}
}
} finally {
if (br != null)
br.close();
}
return result;
}

View File

@ -548,6 +548,10 @@ public final class AnalysisRegistry implements Closeable {
TokenizerFactory keywordTokenizerFactory,
Map<String, TokenFilterFactory> tokenFilters,
Map<String, CharFilterFactory> charFilters) {
if (keywordTokenizerFactory == null) {
throw new IllegalStateException("keyword tokenizer factory is null, normalizers require analysis-common module");
}
if (normalizerFactory instanceof CustomNormalizerProvider) {
((CustomNormalizerProvider) normalizerFactory).build(keywordTokenizerFactory, charFilters, tokenFilters);
}

View File

@ -1433,10 +1433,6 @@ public abstract class Engine implements Closeable {
@Override
public void close() {
release();
}
public void release() {
Releasables.close(searcher);
}
}

View File

@ -159,7 +159,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm)
.version(version).versionType(versionType));
if (get.exists() == false) {
get.release();
get.close();
}
}
}
@ -172,7 +172,7 @@ public final class ShardGetService extends AbstractIndexShardComponent {
// break between having loaded it from translog (so we only have _source), and having a document to load
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.release();
get.close();
}
}

View File

@ -156,7 +156,7 @@ public class TextFieldMapper extends FieldMapper {
PrefixFieldMapper prefixMapper = null;
if (prefixFieldType != null) {
if (fieldType().isSearchable() == false) {
throw new IllegalArgumentException("Cannot set index_prefix on unindexed field [" + name() + "]");
throw new IllegalArgumentException("Cannot set index_prefixes on unindexed field [" + name() + "]");
}
if (fieldType.indexOptions() == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) {
prefixFieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
@ -203,7 +203,7 @@ public class TextFieldMapper extends FieldMapper {
builder.fielddataFrequencyFilter(minFrequency, maxFrequency, minSegmentSize);
DocumentMapperParser.checkNoRemainingFields(propName, frequencyFilter, parserContext.indexVersionCreated());
iterator.remove();
} else if (propName.equals("index_prefix")) {
} else if (propName.equals("index_prefixes")) {
Map<?, ?> indexPrefix = (Map<?, ?>) propNode;
int minChars = XContentMapValues.nodeIntegerValue(indexPrefix.remove("min_chars"),
Defaults.INDEX_PREFIX_MIN_CHARS);
@ -243,7 +243,7 @@ public class TextFieldMapper extends FieldMapper {
}
}
private static final class PrefixFieldType extends StringFieldType {
static final class PrefixFieldType extends StringFieldType {
final int minChars;
final int maxChars;
@ -268,14 +268,14 @@ public class TextFieldMapper extends FieldMapper {
}
void doXContent(XContentBuilder builder) throws IOException {
builder.startObject("index_prefix");
builder.startObject("index_prefixes");
builder.field("min_chars", minChars);
builder.field("max_chars", maxChars);
builder.endObject();
}
@Override
public MappedFieldType clone() {
public PrefixFieldType clone() {
return new PrefixFieldType(name(), minChars, maxChars);
}
@ -305,6 +305,22 @@ public class TextFieldMapper extends FieldMapper {
public Query existsQuery(QueryShardContext context) {
throw new UnsupportedOperationException();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
PrefixFieldType that = (PrefixFieldType) o;
return minChars == that.minChars &&
maxChars == that.maxChars;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), minChars, maxChars);
}
}
private static final class PrefixFieldMapper extends FieldMapper {
@ -355,6 +371,9 @@ public class TextFieldMapper extends FieldMapper {
this.fielddataMinFrequency = ref.fielddataMinFrequency;
this.fielddataMaxFrequency = ref.fielddataMaxFrequency;
this.fielddataMinSegmentSize = ref.fielddataMinSegmentSize;
if (ref.prefixFieldType != null) {
this.prefixFieldType = ref.prefixFieldType.clone();
}
}
public TextFieldType clone() {
@ -368,6 +387,7 @@ public class TextFieldMapper extends FieldMapper {
}
TextFieldType that = (TextFieldType) o;
return fielddata == that.fielddata
&& Objects.equals(prefixFieldType, that.prefixFieldType)
&& fielddataMinFrequency == that.fielddataMinFrequency
&& fielddataMaxFrequency == that.fielddataMaxFrequency
&& fielddataMinSegmentSize == that.fielddataMinSegmentSize;
@ -375,7 +395,7 @@ public class TextFieldMapper extends FieldMapper {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), fielddata,
return Objects.hash(super.hashCode(), fielddata, prefixFieldType,
fielddataMinFrequency, fielddataMaxFrequency, fielddataMinSegmentSize);
}
@ -420,6 +440,10 @@ public class TextFieldMapper extends FieldMapper {
this.prefixFieldType = prefixFieldType;
}
public PrefixFieldType getPrefixFieldType() {
return this.prefixFieldType;
}
@Override
public String typeName() {
return CONTENT_TYPE;

View File

@ -83,7 +83,7 @@ public final class ElasticsearchMergePolicy extends FilterMergePolicy {
@Override
public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, MergeContext mergeContext)
throws IOException {
if (upgradeInProgress) {
@ -122,7 +122,7 @@ public final class ElasticsearchMergePolicy extends FilterMergePolicy {
// has a chance to decide what to do (e.g. collapse the segments to satisfy maxSegmentCount)
}
return super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer);
return super.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext);
}
/**

View File

@ -85,8 +85,6 @@ public class TermVectorsService {
termVectorsResponse.setExists(false);
return termVectorsResponse;
}
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), false, request.type(), request.id(), uidTerm)
.version(request.version()).versionType(request.versionType()));
Fields termVectorsByField = null;
AggregatedDfs dfs = null;
@ -97,8 +95,9 @@ public class TermVectorsService {
handleFieldWildcards(indexShard, request);
}
final Engine.Searcher searcher = indexShard.acquireSearcher("term_vector");
try {
try (Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), false, request.type(), request.id(), uidTerm)
.version(request.version()).versionType(request.versionType()));
Engine.Searcher searcher = indexShard.acquireSearcher("term_vector")) {
Fields topLevelFields = MultiFields.getFields(get.searcher() != null ? get.searcher().reader() : searcher.reader());
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
/* from an artificial document */
@ -143,14 +142,12 @@ public class TermVectorsService {
}
}
// write term vectors
termVectorsResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields, dfs, termVectorsFilter);
termVectorsResponse.setFields(termVectorsByField, request.selectedFields(), request.getFlags(), topLevelFields, dfs,
termVectorsFilter);
}
termVectorsResponse.setTookInMillis(TimeUnit.NANOSECONDS.toMillis(nanoTimeSupplier.getAsLong() - startTime));
} catch (Exception ex) {
throw new ElasticsearchException("failed to execute term vector request", ex);
} finally {
searcher.close();
get.release();
}
return termVectorsResponse;
}

View File

@ -56,7 +56,6 @@ import org.elasticsearch.index.analysis.IndonesianAnalyzerProvider;
import org.elasticsearch.index.analysis.IrishAnalyzerProvider;
import org.elasticsearch.index.analysis.ItalianAnalyzerProvider;
import org.elasticsearch.index.analysis.KeywordAnalyzerProvider;
import org.elasticsearch.index.analysis.KeywordTokenizerFactory;
import org.elasticsearch.index.analysis.LatvianAnalyzerProvider;
import org.elasticsearch.index.analysis.LithuanianAnalyzerProvider;
import org.elasticsearch.index.analysis.NorwegianAnalyzerProvider;
@ -225,7 +224,6 @@ public final class AnalysisModule {
private NamedRegistry<AnalysisProvider<TokenizerFactory>> setupTokenizers(List<AnalysisPlugin> plugins) {
NamedRegistry<AnalysisProvider<TokenizerFactory>> tokenizers = new NamedRegistry<>("tokenizer");
tokenizers.register("standard", StandardTokenizerFactory::new);
tokenizers.register("keyword", KeywordTokenizerFactory::new);
tokenizers.extractAndRegister(plugins, AnalysisPlugin::getTokenizers);
return tokenizers;
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
@ -38,18 +37,16 @@ import java.util.function.Predicate;
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private volatile String persistentTaskId;
private volatile long allocationId;
private final AtomicReference<State> state;
@Nullable
private volatile Exception failure;
private volatile String persistentTaskId;
private volatile long allocationId;
private volatile @Nullable Exception failure;
private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
@ -101,31 +98,29 @@ public class AllocatedPersistentTask extends CancellableTask {
return failure;
}
boolean markAsCancelled() {
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
}
public State getState() {
return state.get();
}
public long getAllocationId() {
return allocationId;
}
public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
/**
* Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
*
* @param predicate the persistent task predicate to evaluate
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTask(final Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
final @Nullable TimeValue timeout,
final PersistentTasksService.WaitForPersistentTaskListener<?> listener) {
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
}
/**
* Waits for this persistent task to have the desired state.
*/
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
@Nullable TimeValue timeout,
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
final boolean isCompleted() {
return state.get() == State.COMPLETED;
}
boolean markAsCancelled() {
return state.compareAndSet(State.STARTED, State.PENDING_CANCEL);
}
public void markAsCompleted() {
@ -138,11 +133,10 @@ public class AllocatedPersistentTask extends CancellableTask {
} else {
completeAndNotifyIfNeeded(e);
}
}
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
final State prevState = state.getAndSet(State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
@ -153,7 +147,7 @@ public class AllocatedPersistentTask extends CancellableTask {
this.failure = failure;
if (prevState == State.STARTED) {
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
persistentTasksService.sendCompletionRequest(getPersistentTaskId(), getAllocationId(), failure, new
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
@ -173,4 +167,10 @@ public class AllocatedPersistentTask extends CancellableTask {
}
}
}
public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}
}

View File

@ -123,7 +123,7 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
for (Long id : notVisitedTasks) {
AllocatedPersistentTask task = runningTasks.get(id);
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
if (task.isCompleted()) {
// Result was sent to the caller and the caller acknowledged acceptance of the result
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
@ -196,7 +196,8 @@ public class PersistentTasksNodeService extends AbstractComponent implements Clu
AllocatedPersistentTask task = runningTasks.remove(allocationId);
if (task.markAsCancelled()) {
// Cancel the local task using the task manager
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
String reason = "task has been removed, cancelling locally";
persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<CancelTasksResponse>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),

View File

@ -22,14 +22,12 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
@ -37,20 +35,24 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* This service is used by persistent actions to propagate changes in the action state and notify about completion
* This service is used by persistent tasks and allocated persistent tasks to communicate changes
* to the master node so that the master can update the cluster state and can track of the states
* of the persistent tasks.
*/
public class PersistentTasksService extends AbstractComponent {
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
@ -63,92 +65,115 @@ public class PersistentTasksService extends AbstractComponent {
}
/**
* Creates the specified persistent task and attempts to assign it to a node.
* Notifies the master node to create new persistent task and to assign it to a node.
*/
@SuppressWarnings("unchecked")
public <Params extends PersistentTaskParams> void startPersistentTask(String taskId, String taskName, @Nullable Params params,
ActionListener<PersistentTask<Params>> listener) {
StartPersistentTaskAction.Request createPersistentActionRequest =
new StartPersistentTaskAction.Request(taskId, taskName, params);
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
final String taskName,
final @Nullable Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked")
final ActionListener<PersistentTask<?>> wrappedListener =
ActionListener.wrap(t -> listener.onResponse((PersistentTask<Params>) t), listener::onFailure);
StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams);
execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener);
}
/**
* Notifies the master node about the completion of a persistent task.
* <p>
* When {@code failure} is {@code null}, the persistent task is considered as successfully completed.
*/
public void sendCompletionRequest(final String taskId,
final long taskAllocationId,
final @Nullable Exception taskFailure,
final ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request request = new CompletionPersistentTaskAction.Request(taskId, taskAllocationId, taskFailure);
execute(request, CompletionPersistentTaskAction.INSTANCE, listener);
}
/**
* Cancels a locally running task using the Task Manager API
*/
void sendCancelRequest(final long taskId, final String reason, final ActionListener<CancelTasksResponse> listener) {
CancelTasksRequest request = new CancelTasksRequest();
request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId));
request.setReason(reason);
try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest,
ActionListener.wrap(o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener));
}
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
*/
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Cancels a locally running task using the task manager
*/
void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
DiscoveryNode localNode = clusterService.localNode();
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
cancelTasksRequest.setReason("persistent action was removed");
try {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener,
client.admin().cluster()::cancelTasks);
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Updates status of the persistent task.
* Notifies the master node that the state of a persistent task has changed.
* <p>
* Persistent task implementers shouldn't call this method directly and use
* {@link AllocatedPersistentTask#updatePersistentStatus} instead
*/
void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
void updateStatus(final String taskId,
final long taskAllocationID,
final Task.Status status,
final ActionListener<PersistentTask<?>> listener) {
UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status);
execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener);
}
/**
* Cancels if needed and removes a persistent task
* Notifies the master node to remove a persistent task from the cluster state
*/
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
try {
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
public void sendRemoveRequest(final String taskId, final ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId);
execute(request, RemovePersistentTaskAction.INSTANCE, listener);
}
/**
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
* waits of it.
* Executes an asynchronous persistent task action using the client.
* <p>
* The origin is set in the context and the listener is wrapped to ensure the proper context is restored
*/
public void waitForPersistentTaskStatus(String taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener<?> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId));
private <Req extends ActionRequest, Resp extends PersistentTaskResponse, Builder extends ActionRequestBuilder<Req, Resp, Builder>>
void execute(final Req request, final Action<Req, Resp, Builder> action, final ActionListener<PersistentTask<?>> listener) {
try {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) {
client.execute(action, request,
new ContextPreservingActionListener<>(supplier,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)));
}
} catch (Exception e) {
listener.onFailure(e);
}
}
/**
* Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
*
* @param taskId the persistent task id
* @param predicate the persistent task predicate to evaluate
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTaskCondition(final String taskId,
final Predicate<PersistentTask<?>> predicate,
final @Nullable TimeValue timeout,
final WaitForPersistentTaskListener<?> listener) {
final Predicate<ClusterState> clusterStatePredicate = clusterState ->
predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId));
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
final ClusterState clusterState = observer.setAndGetObservedState();
if (clusterStatePredicate.test(clusterState)) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId));
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId));
@ -163,18 +188,28 @@ public class PersistentTasksService extends AbstractComponent {
public void onTimeout(TimeValue timeout) {
listener.onTimeout(timeout);
}
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
}, clusterStatePredicate);
}
}
public void waitForPersistentTasksStatus(Predicate<PersistentTasksCustomMetaData> predicate,
@Nullable TimeValue timeout, ActionListener<Boolean> listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout,
logger, threadPool.getThreadContext());
if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) {
/**
* Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly.
*
* @param predicate the predicate to evaluate
* @param timeout a timeout for waiting
* @param listener the callback listener
*/
public void waitForPersistentTasksCondition(final Predicate<PersistentTasksCustomMetaData> predicate,
final @Nullable TimeValue timeout,
final ActionListener<Boolean> listener) {
final Predicate<ClusterState> clusterStatePredicate = clusterState ->
predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
if (clusterStatePredicate.test(observer.setAndGetObservedState())) {
listener.onResponse(true);
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(true);
@ -187,45 +222,15 @@ public class PersistentTasksService extends AbstractComponent {
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new IllegalStateException("timed out after " + timeout));
listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout));
}
}, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout);
}, clusterStatePredicate, timeout);
}
}
public interface WaitForPersistentTaskStatusListener<Params extends PersistentTaskParams>
extends ActionListener<PersistentTask<Params>> {
public interface WaitForPersistentTaskListener<P extends PersistentTaskParams> extends ActionListener<PersistentTask<P>> {
default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout));
}
}
private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
/**
* Executes a consumer after setting the origin and wrapping the listener so that the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse> void executeAsyncWithOrigin(
ThreadContext threadContext, String origin, Request request, ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener));
}
}
/**
* Executes an asynchronous action using the provided client. The origin is set in the context and the listener
* is wrapped to ensure the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeAsyncWithOrigin(
Client client, String origin, Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout));
}
}
@ -234,5 +239,4 @@ public class PersistentTasksService extends AbstractComponent {
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
}
}

Some files were not shown because too many files have changed in this diff Show More