Merge branch 'master' into ccr

* master:
  [DOCS] Omit shard failures assertion for incompatible responses  (#31430)
  [DOCS] Move licensing APIs to docs (#31445)
  Add Delete Snapshot High Level REST API
  Remove QueryCachingPolicy#ALWAYS_CACHE (#31451)
  [Docs] Extend Homebrew installation instructions (#28902)
  Choose JVM options ergonomically
  [Docs] Mention ip_range datatypes on ip type page (#31416)
  Multiplexing token filter (#31208)
  Fix use of time zone in date_histogram rewrite (#31407)
  Core: Remove index name resolver from base TransportAction (#31002)
  [DOCS] Fixes code snippet testing for machine learning (#31189)
  [DOCS] Removed  and  params from MLT. Closes #28128 (#31370)
  Security: fix joining cluster with production license (#31341)
  Unify http channels and exception handling (#31379)
  [DOCS] Moves the info API to docs (#31121)
  Preserve response headers on cluster update task (#31421)
  [DOCS] Add code snippet testing for more ML APIs (#31404)
  Do not preallocate bytes for channel buffer (#31400)
  Docs: Advice for reindexing many indices (#31279)
  Mute HttpExporterTests#testHttpExporterShutdown test Tracked by #31433
  Docs: Add note about removing prepareExecute from the java client (#31401)
  Make release notes ignore the `>test-failure` label. (#31309)
This commit is contained in:
Nhat Nguyen 2018-06-20 12:43:40 -04:00
commit efcb9a3603
264 changed files with 2322 additions and 1281 deletions

View File

@ -27,7 +27,6 @@ import org.gradle.api.tasks.OutputDirectory
import java.nio.file.Files
import java.nio.file.Path
import java.util.regex.Matcher
/**
* Generates REST tests for each snippet marked // TEST.
@ -100,6 +99,14 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
return snippet.language == 'js' || snippet.curl
}
/**
* Certain requests should not have the shard failure check because the
* format of the response is incompatible i.e. it is not a JSON object.
*/
static shouldAddShardFailureCheck(String path) {
return path.startsWith('_cat') == false && path.startsWith('_xpack/ml/datafeeds/') == false
}
/**
* Converts Kibana's block quoted strings into standard JSON. These
* {@code """} delimited strings can be embedded in CONSOLE and can
@ -309,13 +316,11 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
* no shard succeeds. But we need to fail the tests on all of these
* because they mean invalid syntax or broken queries or something
* else that we don't want to teach people to do. The REST test
* framework doesn't allow us to has assertions in the setup
* section so we have to skip it there. We also have to skip _cat
* actions because they don't return json so we can't is_false
* them. That is ok because they don't have this
* partial-success-is-success thing.
* framework doesn't allow us to have assertions in the setup
* section so we have to skip it there. We also omit the assertion
* from APIs that don't return a JSON object
*/
if (false == inSetup && false == path.startsWith('_cat')) {
if (false == inSetup && shouldAddShardFailureCheck(path)) {
current.println(" - is_false: _shards.failures")
}
}

View File

@ -19,9 +19,7 @@
package org.elasticsearch.gradle.doc
import org.elasticsearch.gradle.doc.SnippetsTask.Snippet
import org.gradle.api.InvalidUserDataException
import static org.elasticsearch.gradle.doc.RestTestsFromSnippetsTask.shouldAddShardFailureCheck
import static org.elasticsearch.gradle.doc.RestTestsFromSnippetsTask.replaceBlockQuote
class RestTestFromSnippetsTaskTest extends GroovyTestCase {
@ -47,4 +45,10 @@ class RestTestFromSnippetsTaskTest extends GroovyTestCase {
assertEquals("\"foo\": \"bort\\n baz\"",
replaceBlockQuote("\"foo\": \"\"\"bort\n baz\"\"\""));
}
void testIsDocWriteRequest() {
assertTrue(shouldAddShardFailureCheck("doc-index/_search"));
assertFalse(shouldAddShardFailureCheck("_cat"))
assertFalse(shouldAddShardFailureCheck("_xpack/ml/datafeeds/datafeed-id/_preview"));
}
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
@ -40,8 +39,8 @@ public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest,
@Inject
public TransportNoopBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NoopBulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, BulkRequest::new);
ActionFilters actionFilters) {
super(settings, NoopBulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
}
@Override

View File

@ -24,8 +24,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.SearchHit;
@ -40,10 +40,10 @@ import java.util.Collections;
public class TransportNoopSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Inject
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters
actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new,
indexNameExpressionResolver);
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters) {
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new);
}
@Override

View File

@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyReposito
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
@ -844,6 +845,18 @@ final class RequestConverters {
return request;
}
static Request deleteSnapshot(DeleteSnapshotRequest deleteSnapshotRequest) {
String endpoint = new EndpointBuilder().addPathPartAsIs("_snapshot")
.addPathPart(deleteSnapshotRequest.repository())
.addPathPart(deleteSnapshotRequest.snapshot())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
Params parameters = new Params(request);
parameters.withMasterTimeout(deleteSnapshotRequest.masterNodeTimeout());
return request;
}
static Request putTemplate(PutIndexTemplateRequest putIndexTemplateRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPartAsIs("_template").addPathPart(putIndexTemplateRequest.name()).build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequ
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import java.io.IOException;
@ -161,4 +163,34 @@ public final class SnapshotClient {
restHighLevelClient.performRequestAsyncAndParseEntity(verifyRepositoryRequest, RequestConverters::verifyRepository, options,
VerifyRepositoryResponse::fromXContent, listener, emptySet());
}
/**
* Deletes a snapshot.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
*
* @param deleteSnapshotRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public DeleteSnapshotResponse delete(DeleteSnapshotRequest deleteSnapshotRequest, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(deleteSnapshotRequest, RequestConverters::deleteSnapshot, options,
DeleteSnapshotResponse::fromXContent, emptySet());
}
/**
* Asynchronously deletes a snapshot.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
*
* @param deleteSnapshotRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void deleteAsync(DeleteSnapshotRequest deleteSnapshotRequest, RequestOptions options,
ActionListener<DeleteSnapshotResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(deleteSnapshotRequest, RequestConverters::deleteSnapshot, options,
DeleteSnapshotResponse::fromXContent, listener, emptySet());
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRe
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
@ -1857,6 +1858,25 @@ public class RequestConvertersTests extends ESTestCase {
assertThat(expectedParams, equalTo(request.getParameters()));
}
public void testDeleteSnapshot() {
Map<String, String> expectedParams = new HashMap<>();
String repository = randomIndicesNames(1, 1)[0];
String snapshot = "snapshot-" + randomAlphaOfLengthBetween(2, 5).toLowerCase(Locale.ROOT);
String endpoint = String.format(Locale.ROOT, "/_snapshot/%s/%s", repository, snapshot);
DeleteSnapshotRequest deleteSnapshotRequest = new DeleteSnapshotRequest();
deleteSnapshotRequest.repository(repository);
deleteSnapshotRequest.snapshot(snapshot);
setRandomMasterTimeout(deleteSnapshotRequest, expectedParams);
Request request = RequestConverters.deleteSnapshot(deleteSnapshotRequest);
assertThat(endpoint, equalTo(request.getEndpoint()));
assertThat(HttpDelete.METHOD_NAME, equalTo(request.getMethod()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertNull(request.getEntity());
}
public void testPutTemplateRequest() throws Exception {
Map<String, String> names = new HashMap<>();
names.put("log", "log");

View File

@ -28,11 +28,14 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequ
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Locale;
import static org.hamcrest.Matchers.equalTo;
@ -46,6 +49,13 @@ public class SnapshotIT extends ESRestHighLevelClientTestCase {
highLevelClient().snapshot()::createRepositoryAsync);
}
private Response createTestSnapshot(String repository, String snapshot) throws IOException {
Request createSnapshot = new Request("put", String.format(Locale.ROOT, "_snapshot/%s/%s", repository, snapshot));
createSnapshot.addParameter("wait_for_completion", "true");
return highLevelClient().getLowLevelClient().performRequest(createSnapshot);
}
public void testCreateRepository() throws IOException {
PutRepositoryResponse response = createTestRepository("test", FsRepository.TYPE, "{\"location\": \".\"}");
assertTrue(response.isAcknowledged());
@ -108,4 +118,21 @@ public class SnapshotIT extends ESRestHighLevelClientTestCase {
highLevelClient().snapshot()::verifyRepositoryAsync);
assertThat(response.getNodes().size(), equalTo(1));
}
public void testDeleteSnapshot() throws IOException {
String repository = "test_repository";
String snapshot = "test_snapshot";
PutRepositoryResponse putRepositoryResponse = createTestRepository(repository, FsRepository.TYPE, "{\"location\": \".\"}");
assertTrue(putRepositoryResponse.isAcknowledged());
Response putSnapshotResponse = createTestSnapshot(repository, snapshot);
// check that the request went ok without parsing JSON here. When using the high level client, check acknowledgement instead.
assertEquals(200, putSnapshotResponse.getStatusLine().getStatusCode());
DeleteSnapshotRequest request = new DeleteSnapshotRequest(repository, snapshot);
DeleteSnapshotResponse response = execute(request, highLevelClient().snapshot()::delete, highLevelClient().snapshot()::deleteAsync);
assertTrue(response.isAcknowledged());
}
}

View File

@ -29,8 +29,12 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequ
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.settings.Settings;
@ -41,6 +45,7 @@ import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -69,6 +74,8 @@ public class SnapshotClientDocumentationIT extends ESRestHighLevelClientTestCase
private static final String repositoryName = "test_repository";
private static final String snapshotName = "test_snapshot";
public void testSnapshotCreateRepository() throws IOException {
RestHighLevelClient client = highLevelClient();
@ -360,10 +367,76 @@ public class SnapshotClientDocumentationIT extends ESRestHighLevelClientTestCase
}
}
public void testSnapshotDeleteSnapshot() throws IOException {
RestHighLevelClient client = highLevelClient();
createTestRepositories();
createTestSnapshots();
// tag::delete-snapshot-request
DeleteSnapshotRequest request = new DeleteSnapshotRequest(repositoryName);
request.snapshot(snapshotName);
// end::delete-snapshot-request
// tag::delete-snapshot-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::delete-snapshot-request-masterTimeout
// tag::delete-snapshot-execute
DeleteSnapshotResponse response = client.snapshot().delete(request, RequestOptions.DEFAULT);
// end::delete-snapshot-execute
// tag::delete-snapshot-response
boolean acknowledged = response.isAcknowledged(); // <1>
// end::delete-snapshot-response
assertTrue(acknowledged);
}
public void testSnapshotDeleteSnapshotAsync() throws InterruptedException {
RestHighLevelClient client = highLevelClient();
{
DeleteSnapshotRequest request = new DeleteSnapshotRequest();
// tag::delete-snapshot-execute-listener
ActionListener<DeleteSnapshotResponse> listener =
new ActionListener<DeleteSnapshotResponse>() {
@Override
public void onResponse(DeleteSnapshotResponse deleteSnapshotResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::delete-snapshot-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::delete-snapshot-execute-async
client.snapshot().deleteAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::delete-snapshot-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
private void createTestRepositories() throws IOException {
PutRepositoryRequest request = new PutRepositoryRequest(repositoryName);
request.type(FsRepository.TYPE);
request.settings("{\"location\": \".\"}", XContentType.JSON);
assertTrue(highLevelClient().snapshot().createRepository(request, RequestOptions.DEFAULT).isAcknowledged());
}
private void createTestSnapshots() throws IOException {
Request createSnapshot = new Request("put", String.format(Locale.ROOT, "_snapshot/%s/%s", repositoryName, snapshotName));
createSnapshot.addParameter("wait_for_completion", "true");
Response response = highLevelClient().getLowLevelClient().performRequest(createSnapshot);
// check that the request went ok without parsing JSON here. When using the high level client, check acknowledgement instead.
assertEquals(200, response.getStatusLine().getStatusCode());
}
}

View File

@ -32,7 +32,7 @@ my @Groups = (
">enhancement", ">bug", ">regression", ">upgrade"
);
my %Ignore = map { $_ => 1 }
( ">non-issue", ">refactoring", ">docs", ">test", ":Core/Build" );
( ">non-issue", ">refactoring", ">docs", ">test", ">test-failure", ":Core/Build" );
my %Group_Labels = (
'>breaking' => 'Breaking changes',

View File

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tools.launchers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Tunes Elasticsearch JVM settings based on inspection of provided JVM options.
*/
final class JvmErgonomics {
private static final long KB = 1024L;
private static final long MB = 1024L * 1024L;
private static final long GB = 1024L * 1024L * 1024L;
private JvmErgonomics() {
throw new AssertionError("No instances intended");
}
/**
* Chooses additional JVM options for Elasticsearch.
*
* @param userDefinedJvmOptions A list of JVM options that have been defined by the user.
* @return A list of additional JVM options to set.
*/
static List<String> choose(List<String> userDefinedJvmOptions) {
List<String> ergonomicChoices = new ArrayList<>();
Long heapSize = extractHeapSize(userDefinedJvmOptions);
Map<String, String> systemProperties = extractSystemProperties(userDefinedJvmOptions);
if (heapSize != null) {
if (systemProperties.containsKey("io.netty.allocator.type") == false) {
if (heapSize <= 1 * GB) {
ergonomicChoices.add("-Dio.netty.allocator.type=unpooled");
} else {
ergonomicChoices.add("-Dio.netty.allocator.type=pooled");
}
}
}
return ergonomicChoices;
}
private static final Pattern MAX_HEAP_SIZE = Pattern.compile("^(-Xmx|-XX:MaxHeapSize=)(?<size>\\d+)(?<unit>\\w)?$");
// package private for testing
static Long extractHeapSize(List<String> userDefinedJvmOptions) {
for (String jvmOption : userDefinedJvmOptions) {
final Matcher matcher = MAX_HEAP_SIZE.matcher(jvmOption);
if (matcher.matches()) {
final long size = Long.parseLong(matcher.group("size"));
final String unit = matcher.group("unit");
if (unit == null) {
return size;
} else {
switch (unit.toLowerCase(Locale.ROOT)) {
case "k":
return size * KB;
case "m":
return size * MB;
case "g":
return size * GB;
default:
throw new IllegalArgumentException("Unknown unit [" + unit + "] for max heap size in [" + jvmOption + "]");
}
}
}
}
return null;
}
private static final Pattern SYSTEM_PROPERTY = Pattern.compile("^-D(?<key>[\\w+].*?)=(?<value>.*)$");
// package private for testing
static Map<String, String> extractSystemProperties(List<String> userDefinedJvmOptions) {
Map<String, String> systemProperties = new HashMap<>();
for (String jvmOption : userDefinedJvmOptions) {
final Matcher matcher = SYSTEM_PROPERTY.matcher(jvmOption);
if (matcher.matches()) {
systemProperties.put(matcher.group("key"), matcher.group("value"));
}
}
return systemProperties;
}
}

View File

@ -78,6 +78,8 @@ final class JvmOptionsParser {
}
if (invalidLines.isEmpty()) {
List<String> ergonomicJvmOptions = JvmErgonomics.choose(jvmOptions);
jvmOptions.addAll(ergonomicJvmOptions);
final String spaceDelimitedJvmOptions = spaceDelimitJvmOptions(jvmOptions);
Launchers.outPrintln(spaceDelimitedJvmOptions);
Launchers.exit(0);

View File

@ -0,0 +1,83 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tools.launchers;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JvmErgonomicsTests extends LaunchersTestCase {
public void testExtractValidHeapSize() {
assertEquals(Long.valueOf(1024), JvmErgonomics.extractHeapSize(Collections.singletonList("-Xmx1024")));
assertEquals(Long.valueOf(2L * 1024 * 1024 * 1024), JvmErgonomics.extractHeapSize(Collections.singletonList("-Xmx2g")));
assertEquals(Long.valueOf(32 * 1024 * 1024), JvmErgonomics.extractHeapSize(Collections.singletonList("-Xmx32M")));
assertEquals(Long.valueOf(32 * 1024 * 1024), JvmErgonomics.extractHeapSize(Collections.singletonList("-XX:MaxHeapSize=32M")));
}
public void testExtractInvalidHeapSize() {
try {
JvmErgonomics.extractHeapSize(Collections.singletonList("-Xmx2T"));
fail("Expected IllegalArgumentException to be raised");
} catch (IllegalArgumentException expected) {
assertEquals("Unknown unit [T] for max heap size in [-Xmx2T]", expected.getMessage());
}
}
public void testExtractNoHeapSize() {
assertNull("No spaces allowed", JvmErgonomics.extractHeapSize(Collections.singletonList("-Xmx 1024")));
assertNull("JVM option is not present", JvmErgonomics.extractHeapSize(Collections.singletonList("")));
assertNull("Multiple JVM options per line", JvmErgonomics.extractHeapSize(Collections.singletonList("-Xms2g -Xmx2g")));
}
public void testExtractSystemProperties() {
Map<String, String> expectedSystemProperties = new HashMap<>();
expectedSystemProperties.put("file.encoding", "UTF-8");
expectedSystemProperties.put("kv.setting", "ABC=DEF");
Map<String, String> parsedSystemProperties = JvmErgonomics.extractSystemProperties(
Arrays.asList("-Dfile.encoding=UTF-8", "-Dkv.setting=ABC=DEF"));
assertEquals(expectedSystemProperties, parsedSystemProperties);
}
public void testExtractNoSystemProperties() {
Map<String, String> parsedSystemProperties = JvmErgonomics.extractSystemProperties(Arrays.asList("-Xms1024M", "-Xmx1024M"));
assertTrue(parsedSystemProperties.isEmpty());
}
public void testLittleMemoryErgonomicChoices() {
String smallHeap = randomFrom(Arrays.asList("64M", "512M", "1024M", "1G"));
List<String> expectedChoices = Collections.singletonList("-Dio.netty.allocator.type=unpooled");
assertEquals(expectedChoices, JvmErgonomics.choose(Arrays.asList("-Xms" + smallHeap, "-Xmx" + smallHeap)));
}
public void testPlentyMemoryErgonomicChoices() {
String largeHeap = randomFrom(Arrays.asList("1025M", "2048M", "2G", "8G"));
List<String> expectedChoices = Collections.singletonList("-Dio.netty.allocator.type=pooled");
assertEquals(expectedChoices, JvmErgonomics.choose(Arrays.asList("-Xms" + largeHeap, "-Xmx" + largeHeap)));
}
}

View File

@ -0,0 +1,73 @@
[[java-rest-high-snapshot-delete-snapshot]]
=== Delete Snapshot API
The Delete Snapshot API allows to delete a snapshot.
[[java-rest-high-snapshot-delete-snapshot-request]]
==== Delete Snapshot Request
A `DeleteSnapshotRequest`:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-request]
--------------------------------------------------
==== Optional Arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-request-masterTimeout]
--------------------------------------------------
<1> Timeout to connect to the master node as a `TimeValue`
<2> Timeout to connect to the master node as a `String`
[[java-rest-high-snapshot-delete-snapshot-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-execute]
--------------------------------------------------
[[java-rest-high-snapshot-delete-snapshot-async]]
==== Asynchronous Execution
The asynchronous execution of a delete snapshot request requires both the
`DeleteSnapshotRequest` instance and an `ActionListener` instance to be
passed to the asynchronous method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-execute-async]
--------------------------------------------------
<1> The `DeleteSnapshotRequest` to execute and the `ActionListener`
to use when the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `DeleteSnapshotResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of a failure. The raised exception is provided as an argument
[[java-rest-high-cluster-delete-snapshot-response]]
==== Delete Snapshot Response
The returned `DeleteSnapshotResponse` allows to retrieve information about the
executed operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/SnapshotClientDocumentationIT.java[delete-snapshot-response]
--------------------------------------------------
<1> Indicates the node has acknowledged the request

View File

@ -136,11 +136,13 @@ The Java High Level REST Client supports the following Snapshot APIs:
* <<java-rest-high-snapshot-create-repository>>
* <<java-rest-high-snapshot-delete-repository>>
* <<java-rest-high-snapshot-verify-repository>>
* <<java-rest-high-snapshot-delete-snapshot>>
include::snapshot/get_repository.asciidoc[]
include::snapshot/create_repository.asciidoc[]
include::snapshot/delete_repository.asciidoc[]
include::snapshot/verify_repository.asciidoc[]
include::snapshot/delete_snapshot.asciidoc[]
== Tasks APIs

View File

@ -35,6 +35,8 @@ include::tokenfilters/word-delimiter-tokenfilter.asciidoc[]
include::tokenfilters/word-delimiter-graph-tokenfilter.asciidoc[]
include::tokenfilters/multiplexer-tokenfilter.asciidoc[]
include::tokenfilters/stemmer-tokenfilter.asciidoc[]
include::tokenfilters/stemmer-override-tokenfilter.asciidoc[]

View File

@ -0,0 +1,116 @@
[[analysis-multiplexer-tokenfilter]]
=== Multiplexer Token Filter
A token filter of type `multiplexer` will emit multiple tokens at the same position,
each version of the token having been run through a different filter. Identical
output tokens at the same position will be removed.
WARNING: If the incoming token stream has duplicate tokens, then these will also be
removed by the multiplexer
[float]
=== Options
[horizontal]
filters:: a list of token filters to apply to incoming tokens. These can be any
token filters defined elsewhere in the index mappings. Filters can be chained
using a comma-delimited string, so for example `"lowercase, porter_stem"` would
apply the `lowercase` filter and then the `porter_stem` filter to a single token.
WARNING: Shingle or multi-word synonym token filters will not function normally
when they are declared in the filters array because they read ahead internally
which is unsupported by the multiplexer
preserve_original:: if `true` (the default) then emit the original token in
addition to the filtered tokens
[float]
=== Settings example
You can set it up like:
[source,js]
--------------------------------------------------
PUT /multiplexer_example
{
"settings" : {
"analysis" : {
"analyzer" : {
"my_analyzer" : {
"tokenizer" : "standard",
"filter" : [ "my_multiplexer" ]
}
},
"filter" : {
"my_multiplexer" : {
"type" : "multiplexer",
"filters" : [ "lowercase", "lowercase, porter_stem" ]
}
}
}
}
}
--------------------------------------------------
// CONSOLE
And test it like:
[source,js]
--------------------------------------------------
POST /multiplexer_example/_analyze
{
"analyzer" : "my_analyzer",
"text" : "Going HOME"
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
And it'd respond:
[source,js]
--------------------------------------------------
{
"tokens": [
{
"token": "Going",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "going",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "go",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "HOME",
"start_offset": 6,
"end_offset": 10,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "home", <1>
"start_offset": 6,
"end_offset": 10,
"type": "<ALPHANUM>",
"position": 1
}
]
}
--------------------------------------------------
// TESTRESPONSE
<1> The stemmer has also emitted a token `home` at position 1, but because it is a
duplicate of this token it has been removed from the token stream

View File

@ -1028,11 +1028,38 @@ number of slices.
Whether query or indexing performance dominates the runtime depends on the
documents being reindexed and cluster resources.
[float]
=== Reindexing many indices
If you have many indices to reindex it is generally better to reindex them
one at a time rather than using a glob pattern to pick up many indices. That
way you can resume the process if there are any errors by removing the
partially completed index and starting over at that index. It also makes
parallelizing the process fairly simple: split the list of indices to reindex
and run each list in parallel.
One off bash scripts seem to work nicely for this:
[source,bash]
----------------------------------------------------------------
for index in i1 i2 i3 i4 i5; do
curl -HContent-Type:application/json -XPOST localhost:9200/_reindex?pretty -d'{
"source": {
"index": "'$index'"
},
"dest": {
"index": "'$index'-reindexed"
}
}'
done
----------------------------------------------------------------
// NOTCONSOLE
[float]
=== Reindex daily indices
You can use `_reindex` in combination with <<modules-scripting-painless, Painless>>
to reindex daily indices to apply a new template to the existing documents.
Notwithstanding the above advice, you can use `_reindex` in combination with
<<modules-scripting-painless, Painless>> to reindex daily indices to apply
a new template to the existing documents.
Assuming you have indices consisting of documents as follows:

View File

@ -158,6 +158,9 @@ On macOS, Elasticsearch can also be installed via https://brew.sh[Homebrew]:
brew install elasticsearch
--------------------------------------------------
If installation succeeds, Homebrew will finish by saying that you can start Elasticsearch by entering
`elasticsearch`. Do that now. The expected response is described below, under <<successfully-running-node>>.
[float]
=== Installation example with MSI Windows Installer
@ -216,6 +219,7 @@ And now we are ready to start our node and single cluster:
--------------------------------------------------
[float]
[[successfully-running-node]]
=== Successfully running node
If everything goes well with installation, you should see a bunch of messages that look like below:

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[delete-license]]
=== Delete License API
@ -41,3 +42,4 @@ When the license is successfully deleted, the API returns the following response
"acknowledged": true
}
------------------------------------------------------------
// NOTCONSOLE

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[get-basic-status]]
=== Get Basic Status API

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[get-license]]
=== Get License API
@ -52,11 +53,9 @@ GET _xpack/license
"license" : {
"status" : "active",
"uid" : "cbff45e7-c553-41f7-ae4f-9205eabd80xx",
"type" : "trial",
"type" : "basic",
"issue_date" : "2018-02-22T23:12:05.550Z",
"issue_date_in_millis" : 1519341125550,
"expiry_date" : "2018-03-24T23:12:05.550Z",
"expiry_date_in_millis" : 1521933125550,
"max_nodes" : 1000,
"issued_to" : "test",
"issuer" : "elasticsearch",
@ -65,11 +64,9 @@ GET _xpack/license
}
--------------------------------------------------
// TESTRESPONSE[s/"cbff45e7-c553-41f7-ae4f-9205eabd80xx"/$body.license.uid/]
// TESTRESPONSE[s/"trial"/$body.license.type/]
// TESTRESPONSE[s/"basic"/$body.license.type/]
// TESTRESPONSE[s/"2018-02-22T23:12:05.550Z"/$body.license.issue_date/]
// TESTRESPONSE[s/1519341125550/$body.license.issue_date_in_millis/]
// TESTRESPONSE[s/"2018-03-24T23:12:05.550Z"/$body.license.expiry_date/]
// TESTRESPONSE[s/1521933125550/$body.license.expiry_date_in_millis/]
// TESTRESPONSE[s/1000/$body.license.max_nodes/]
// TESTRESPONSE[s/"test"/$body.license.issued_to/]
// TESTRESPONSE[s/"elasticsearch"/$body.license.issuer/]

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[get-trial-status]]
=== Get Trial Status API

View File

@ -0,0 +1,22 @@
[role="xpack"]
[[licensing-apis]]
== Licensing APIs
You can use the following APIs to manage your licenses:
* <<delete-license>>
* <<get-license>>
* <<get-trial-status>>
* <<start-trial>>
* <<get-basic-status>>
* <<start-basic>>
* <<update-license>>
include::delete-license.asciidoc[]
include::get-license.asciidoc[]
include::get-trial-status.asciidoc[]
include::start-trial.asciidoc[]
include::get-basic-status.asciidoc[]
include::start-basic.asciidoc[]
include::update-license.asciidoc[]

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[start-basic]]
=== Start Basic API

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[start-trial]]
=== Start Trial API

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[update-license]]
=== Update License API
@ -123,6 +124,7 @@ receive the following response:
}
}
------------------------------------------------------------
// NOTCONSOLE
To complete the update, you must re-submit the API request and set the
`acknowledge` parameter to `true`. For example:

View File

@ -36,6 +36,8 @@ GET my_index/_search
// CONSOLE
// TESTSETUP
NOTE: You can also store ip ranges in a single field using an <<range,ip_range datatype>>.
[[ip-params]]
==== Parameters for `ip` fields

View File

@ -5,4 +5,10 @@
`isShardsAcked` has been replaced by `isShardsAcknowledged` in
`CreateIndexResponse`, `RolloverResponse` and
`CreateIndexClusterStateUpdateResponse`.
`CreateIndexClusterStateUpdateResponse`.
==== `prepareExecute` removed from the client api
The `prepareExecute` method which created a request builder has been
removed from the client api. Instead, construct a builder for the
appropriate request directly.

View File

@ -184,12 +184,6 @@ is the same as `like`.
`fields`::
A list of fields to fetch and analyze the text from.
`like_text`::
The text to find documents like it.
`ids` or `docs`::
A list of documents following the same syntax as the <<docs-multi-get,Multi GET API>>.
[float]
[[mlt-query-term-selection]]
==== Term Selection Parameters

View File

@ -18,9 +18,9 @@ directly to configure and access {xpack} features.
--
include::{xes-repo-dir}/rest-api/info.asciidoc[]
include::info.asciidoc[]
include::{xes-repo-dir}/rest-api/graph/explore.asciidoc[]
include::{xes-repo-dir}/rest-api/licensing.asciidoc[]
include::{es-repo-dir}/licensing/index.asciidoc[]
include::{xes-repo-dir}/rest-api/migration.asciidoc[]
include::{xes-repo-dir}/rest-api/ml-api.asciidoc[]
include::{xes-repo-dir}/rest-api/rollup-api.asciidoc[]

View File

@ -1,8 +1,9 @@
[role="xpack"]
[testenv="basic"]
[[info-api]]
== Info API
The info API provides general information about the installed {xpack}.
The info API provides general information about the installed {xpack} features.
[float]
=== Request
@ -55,30 +56,29 @@ Example response:
"date" : "2015-04-07T13:34:42Z"
},
"license" : {
"uid" : "893361dc-9749-4997-93cb-802e3dofh7aa",
"type" : "trial",
"mode" : "trial",
"status" : "active",
"expiry_date_in_millis" : 1914278399999
"uid" : "893361dc-9749-4997-93cb-xxx",
"type" : "basic",
"mode" : "basic",
"status" : "active"
},
"features" : {
"graph" : {
"description" : "Graph Data Exploration for the Elastic Stack",
"available" : true,
"available" : false,
"enabled" : true
},
"logstash" : {
"description" : "Logstash management component for X-Pack",
"available" : true,
"available" : false,
"enabled" : true
},
"ml" : {
"description" : "Machine Learning for the Elastic Stack",
"available" : true,
"available" : false,
"enabled" : true,
"native_code_info" : {
"version" : "6.0.0-alpha1-SNAPSHOT",
"build_hash" : "d081461967d61a"
"version" : "7.0.0-alpha1-SNAPSHOT",
"build_hash" : "99a07c016d5a73"
}
},
"monitoring" : {
@ -93,12 +93,12 @@ Example response:
},
"security" : {
"description" : "Security for the Elastic Stack",
"available" : true,
"available" : false,
"enabled" : true
},
"watcher" : {
"description" : "Alerting, Notification and Automation for the Elastic Stack",
"available" : true,
"available" : false,
"enabled" : true
}
},
@ -107,10 +107,10 @@ Example response:
------------------------------------------------------------
// TESTRESPONSE[s/"hash" : "2798b1a3ce779b3611bb53a0082d4d741e4d3168",/"hash" : "$body.build.hash",/]
// TESTRESPONSE[s/"date" : "2015-04-07T13:34:42Z"/"date" : "$body.build.date"/]
// TESTRESPONSE[s/"uid" : "893361dc-9749-4997-93cb-802e3dofh7aa",/"uid": "$body.license.uid",/]
// TESTRESPONSE[s/"uid" : "893361dc-9749-4997-93cb-xxx",/"uid": "$body.license.uid",/]
// TESTRESPONSE[s/"expiry_date_in_millis" : 1914278399999/"expiry_date_in_millis" : "$body.license.expiry_date_in_millis"/]
// TESTRESPONSE[s/"version" : "6.0.0-alpha1-SNAPSHOT",/"version": "$body.features.ml.native_code_info.version",/]
// TESTRESPONSE[s/"build_hash" : "d081461967d61a"/"build_hash": "$body.features.ml.native_code_info.build_hash"/]
// TESTRESPONSE[s/"version" : "7.0.0-alpha1-SNAPSHOT",/"version": "$body.features.ml.native_code_info.version",/]
// TESTRESPONSE[s/"build_hash" : "99a07c016d5a73"/"build_hash": "$body.features.ml.native_code_info.build_hash"/]
// So much s/// but at least we test that the layout is close to matching....
The following example only returns the build and features information:

View File

@ -58,7 +58,6 @@ public final class InboundChannelBuffer implements AutoCloseable {
this.pageSupplier = pageSupplier;
this.pages = new ArrayDeque<>();
this.capacity = PAGE_SIZE * pages.size();
ensureCapacity(PAGE_SIZE);
}
public static InboundChannelBuffer allocatingInstance() {

View File

@ -34,16 +34,20 @@ public class InboundChannelBufferTests extends ESTestCase {
new InboundChannelBuffer.Page(ByteBuffer.allocate(BigArrays.BYTE_PAGE_SIZE), () -> {
});
public void testNewBufferHasSinglePage() {
public void testNewBufferNoPages() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
assertEquals(0, channelBuffer.getCapacity());
assertEquals(0, channelBuffer.getRemaining());
assertEquals(0, channelBuffer.getIndex());
}
public void testExpandCapacity() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
assertEquals(0, channelBuffer.getCapacity());
assertEquals(0, channelBuffer.getRemaining());
channelBuffer.ensureCapacity(PAGE_SIZE);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -56,6 +60,7 @@ public class InboundChannelBufferTests extends ESTestCase {
public void testExpandCapacityMultiplePages() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
channelBuffer.ensureCapacity(PAGE_SIZE);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
@ -68,6 +73,7 @@ public class InboundChannelBufferTests extends ESTestCase {
public void testExpandCapacityRespectsOffset() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
channelBuffer.ensureCapacity(PAGE_SIZE);
assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -87,6 +93,7 @@ public class InboundChannelBufferTests extends ESTestCase {
public void testIncrementIndex() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
channelBuffer.ensureCapacity(PAGE_SIZE);
assertEquals(0, channelBuffer.getIndex());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
@ -99,6 +106,7 @@ public class InboundChannelBufferTests extends ESTestCase {
public void testIncrementIndexWithOffset() {
InboundChannelBuffer channelBuffer = new InboundChannelBuffer(defaultPageSupplier);
channelBuffer.ensureCapacity(PAGE_SIZE);
assertEquals(0, channelBuffer.getIndex());
assertEquals(PAGE_SIZE, channelBuffer.getRemaining());

View File

@ -226,6 +226,7 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin {
filters.put("limit", LimitTokenCountFilterFactory::new);
filters.put("lowercase", LowerCaseTokenFilterFactory::new);
filters.put("min_hash", MinHashTokenFilterFactory::new);
filters.put("multiplexer", MultiplexerTokenFilterFactory::new);
filters.put("ngram", NGramTokenFilterFactory::new);
filters.put("nGram", NGramTokenFilterFactory::new);
filters.put("pattern_capture", requriesAnalysisSettings(PatternCaptureGroupTokenFilterFactory::new));

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.analysis.common;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.ConditionalTokenFilter;
import org.apache.lucene.analysis.miscellaneous.RemoveDuplicatesTokenFilter;
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
import org.elasticsearch.index.analysis.ReferringFilterFactory;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class MultiplexerTokenFilterFactory extends AbstractTokenFilterFactory implements ReferringFilterFactory {
private List<TokenFilterFactory> filters;
private List<String> filterNames;
private final boolean preserveOriginal;
private static final TokenFilterFactory IDENTITY_FACTORY = new TokenFilterFactory() {
@Override
public String name() {
return "identity";
}
@Override
public TokenStream create(TokenStream tokenStream) {
return tokenStream;
}
};
public MultiplexerTokenFilterFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) throws IOException {
super(indexSettings, name, settings);
this.filterNames = settings.getAsList("filters");
this.preserveOriginal = settings.getAsBoolean("preserve_original", true);
}
@Override
public TokenStream create(TokenStream tokenStream) {
List<Function<TokenStream, TokenStream>> functions = new ArrayList<>();
for (TokenFilterFactory tff : filters) {
functions.add(tff::create);
}
return new RemoveDuplicatesTokenFilter(new MultiplexTokenFilter(tokenStream, functions));
}
@Override
public void setReferences(Map<String, TokenFilterFactory> factories) {
filters = new ArrayList<>();
if (preserveOriginal) {
filters.add(IDENTITY_FACTORY);
}
for (String filter : filterNames) {
String[] parts = Strings.tokenizeToStringArray(filter, ",");
if (parts.length == 1) {
filters.add(resolveFilterFactory(factories, parts[0]));
} else {
List<TokenFilterFactory> chain = new ArrayList<>();
for (String subfilter : parts) {
chain.add(resolveFilterFactory(factories, subfilter));
}
filters.add(chainFilters(filter, chain));
}
}
}
private TokenFilterFactory chainFilters(String name, List<TokenFilterFactory> filters) {
return new TokenFilterFactory() {
@Override
public String name() {
return name;
}
@Override
public TokenStream create(TokenStream tokenStream) {
for (TokenFilterFactory tff : filters) {
tokenStream = tff.create(tokenStream);
}
return tokenStream;
}
};
}
private TokenFilterFactory resolveFilterFactory(Map<String, TokenFilterFactory> factories, String name) {
if (factories.containsKey(name) == false) {
throw new IllegalArgumentException("Multiplexing filter [" + name() + "] refers to undefined tokenfilter [" + name + "]");
} else {
return factories.get(name);
}
}
private final class MultiplexTokenFilter extends TokenFilter {
private final TokenStream source;
private final int filterCount;
private int selector;
/**
* Creates a MultiplexTokenFilter on the given input with a set of filters
*/
MultiplexTokenFilter(TokenStream input, List<Function<TokenStream, TokenStream>> filters) {
super(input);
TokenStream source = new MultiplexerFilter(input);
for (int i = 0; i < filters.size(); i++) {
final int slot = i;
source = new ConditionalTokenFilter(source, filters.get(i)) {
@Override
protected boolean shouldFilter() {
return slot == selector;
}
};
}
this.source = source;
this.filterCount = filters.size();
this.selector = filterCount - 1;
}
@Override
public boolean incrementToken() throws IOException {
return source.incrementToken();
}
@Override
public void end() throws IOException {
source.end();
}
@Override
public void reset() throws IOException {
source.reset();
}
private final class MultiplexerFilter extends TokenFilter {
State state;
PositionIncrementAttribute posIncAtt = addAttribute(PositionIncrementAttribute.class);
private MultiplexerFilter(TokenStream input) {
super(input);
}
@Override
public boolean incrementToken() throws IOException {
if (selector >= filterCount - 1) {
selector = 0;
if (input.incrementToken() == false) {
return false;
}
state = captureState();
return true;
}
restoreState(state);
posIncAtt.setPositionIncrement(0);
selector++;
return true;
}
@Override
public void reset() throws IOException {
super.reset();
selector = filterCount - 1;
this.state = null;
}
}
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.analysis.common;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.test.ESTokenStreamTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.util.Collections;
public class MultiplexerTokenFilterTests extends ESTokenStreamTestCase {
public void testMultiplexingFilter() throws IOException {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put("index.analysis.filter.t.type", "truncate")
.put("index.analysis.filter.t.length", "2")
.put("index.analysis.filter.multiplexFilter.type", "multiplexer")
.putList("index.analysis.filter.multiplexFilter.filters", "lowercase, t", "uppercase")
.put("index.analysis.analyzer.myAnalyzer.type", "custom")
.put("index.analysis.analyzer.myAnalyzer.tokenizer", "standard")
.putList("index.analysis.analyzer.myAnalyzer.filter", "multiplexFilter")
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", indexSettings);
IndexAnalyzers indexAnalyzers = new AnalysisModule(TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin())).getAnalysisRegistry().build(idxSettings);
try (NamedAnalyzer analyzer = indexAnalyzers.get("myAnalyzer")) {
assertNotNull(analyzer);
assertAnalyzesTo(analyzer, "ONe tHree", new String[]{
"ONe", "on", "ONE", "tHree", "th", "THREE"
}, new int[]{
1, 0, 0, 1, 0, 0
});
// Duplicates are removed
assertAnalyzesTo(analyzer, "ONe THREE", new String[]{
"ONe", "on", "ONE", "THREE", "th"
}, new int[]{
1, 0, 0, 1, 0, 0
});
}
}
public void testMultiplexingNoOriginal() throws IOException {
Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.build();
Settings indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put("index.analysis.filter.t.type", "truncate")
.put("index.analysis.filter.t.length", "2")
.put("index.analysis.filter.multiplexFilter.type", "multiplexer")
.put("index.analysis.filter.multiplexFilter.preserve_original", "false")
.putList("index.analysis.filter.multiplexFilter.filters", "lowercase, t", "uppercase")
.put("index.analysis.analyzer.myAnalyzer.type", "custom")
.put("index.analysis.analyzer.myAnalyzer.tokenizer", "standard")
.putList("index.analysis.analyzer.myAnalyzer.filter", "multiplexFilter")
.build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", indexSettings);
IndexAnalyzers indexAnalyzers = new AnalysisModule(TestEnvironment.newEnvironment(settings),
Collections.singletonList(new CommonAnalysisPlugin())).getAnalysisRegistry().build(idxSettings);
try (NamedAnalyzer analyzer = indexAnalyzers.get("myAnalyzer")) {
assertNotNull(analyzer);
assertAnalyzesTo(analyzer, "ONe tHree", new String[]{
"on", "ONE", "th", "THREE"
}, new int[]{
1, 0, 1, 0,
});
}
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -116,9 +115,8 @@ public class GrokProcessorGetAction extends Action<GrokProcessorGetAction.Respon
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
ActionFilters actionFilters) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
}
@Override

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -47,11 +46,9 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver resolver,
ScriptService scriptService, NamedXContentRegistry xContentRegistry,
TransportMultiSearchAction multiSearchAction) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, resolver,
MultiSearchTemplateRequest::new);
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -44,6 +43,7 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Supplier;
public class TransportSearchTemplateAction extends HandledTransportAction<SearchTemplateRequest, SearchTemplateResponse> {
@ -55,11 +55,12 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver resolver,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters, resolver, SearchTemplateRequest::new);
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -282,9 +281,8 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ScriptService scriptService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
ActionFilters actionFilters, ScriptService scriptService) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
this.scriptService = scriptService;
}
@Override

View File

@ -27,9 +27,9 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -73,11 +73,11 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
private final NamedXContentRegistry namedXContentRegistry;
@Inject
public TransportRankEvalAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client, TransportService transportService,
ScriptService scriptService, NamedXContentRegistry namedXContentRegistry) {
super(settings, RankEvalAction.NAME, threadPool, transportService, actionFilters, RankEvalRequest::new,
indexNameExpressionResolver);
public TransportRankEvalAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService,
NamedXContentRegistry namedXContentRegistry) {
super(settings, RankEvalAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<RankEvalRequest>) RankEvalRequest::new);
this.scriptService = scriptService;
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;

View File

@ -19,13 +19,14 @@
package org.elasticsearch.index.reindex;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -40,10 +41,10 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
private final ClusterService clusterService;
@Inject
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, Client client, TransportService transportService,
ScriptService scriptService, ClusterService clusterService) {
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, resolver, DeleteByQueryRequest::new);
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters,
(Supplier<DeleteByQueryRequest>) DeleteByQueryRequest::new);
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;

View File

@ -97,18 +97,20 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
private final AutoCreateIndex autoCreateIndex;
private final Client client;
private final CharacterRunAutomaton remoteWhitelist;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
super(settings, ReindexAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
ReindexRequest::new);
super(settings, ReindexAction.NAME, threadPool, transportService, actionFilters,
ReindexRequest::new);
this.clusterService = clusterService;
this.scriptService = scriptService;
this.autoCreateIndex = autoCreateIndex;
this.client = client;
remoteWhitelist = buildRemoteWhitelist(REMOTE_CLUSTER_WHITELIST.get(settings));
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -45,10 +44,9 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
@Inject
public TransportRethrottleAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client) {
super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
TransportService transportService, ActionFilters actionFilters, Client client) {
super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters,
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
this.client = client;
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -44,6 +43,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkByScrollResponse> {
private final Client client;
@ -51,11 +51,10 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
private final ClusterService clusterService;
@Inject
public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client, TransportService transportService,
ScriptService scriptService, ClusterService clusterService) {
public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, UpdateByQueryAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, UpdateByQueryRequest::new);
(Supplier<UpdateByQueryRequest>) UpdateByQueryRequest::new);
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.netty4.Netty4Utils;
@ -31,9 +32,23 @@ import java.net.InetSocketAddress;
public class Netty4HttpChannel implements HttpChannel {
private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4HttpChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
@ -65,6 +80,16 @@ public class Netty4HttpChannel implements HttpChannel {
return (InetSocketAddress) channel.remoteAddress();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() {
channel.close();
@ -73,4 +98,12 @@ public class Netty4HttpChannel implements HttpChannel {
public Channel getNettyChannel() {
return channel;
}
@Override
public String toString() {
return "Netty4HttpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
'}';
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;
import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;
@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
@ -40,7 +42,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
try {
@ -75,7 +77,12 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
serverTransport.exceptionCaught(ctx, cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
}
}
}

View File

@ -40,15 +40,13 @@ import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -57,6 +55,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
@ -64,7 +63,6 @@ import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.io.IOException;
@ -171,10 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected final List<Channel> serverChannels = new ArrayList<>();
// package private for testing
Netty4OpenChannelsHandler serverOpenChannels;
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@ -216,8 +210,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected void doStart() {
boolean success = false;
try {
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
@ -281,10 +273,9 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
builder.allowCredentials();
}
String[] strMethods = Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_METHODS.get(settings), ",");
HttpMethod[] methods = Arrays.asList(strMethods)
.stream()
HttpMethod[] methods = Arrays.stream(strMethods)
.map(HttpMethod::valueOf)
.toArray(size -> new HttpMethod[size]);
.toArray(HttpMethod[]::new);
return builder.allowedRequestMethods(methods)
.maxAge(SETTING_CORS_MAX_AGE.get(settings))
.allowedRequestHeaders(Strings.tokenizeToStringArray(SETTING_CORS_ALLOW_HEADERS.get(settings), ","))
@ -327,15 +318,21 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
serverChannels.clear();
}
}
if (serverOpenChannels != null) {
serverOpenChannels.close();
serverOpenChannels = null;
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
@ -349,38 +346,18 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override
public HttpStats stats() {
Netty4OpenChannelsHandler channels = serverOpenChannels;
return new HttpStats(channels == null ? 0 : channels.numberOfOpenChannels(), channels == null ? 0 : channels.totalChannels());
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
public Netty4CorsConfig getCorsConfig() {
return corsConfig;
}
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Read timeout [{}]", ctx.channel().remoteAddress());
logger.trace("Http read timeout {}", channel);
}
ctx.channel().close();
CloseableChannel.closeChannel(channel);;
} else {
if (!lifecycle.started()) {
// ignore
return;
}
if (!NetworkExceptionHelper.isCloseConnectionException(cause)) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
} else {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", ctx.channel()),
cause);
ctx.channel().close();
}
super.onException(channel, cause);
}
}
@ -404,9 +381,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyTcpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
@ -423,10 +399,11 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
}
@Override

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ChannelHandler.Sharable
public class Netty4OpenChannelsHandler extends ChannelInboundHandlerAdapter implements Releasable {
final Set<Channel> openChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
final CounterMetric openChannelsMetric = new CounterMetric();
final CounterMetric totalChannelsMetric = new CounterMetric();
final Logger logger;
public Netty4OpenChannelsHandler(Logger logger) {
this.logger = logger;
}
final ChannelFutureListener remover = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
boolean removed = openChannels.remove(future.channel());
if (removed) {
openChannelsMetric.dec();
}
if (logger.isTraceEnabled()) {
logger.trace("channel closed: {}", future.channel());
}
}
};
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("channel opened: {}", ctx.channel());
}
final boolean added = openChannels.add(ctx.channel());
if (added) {
openChannelsMetric.inc();
totalChannelsMetric.inc();
ctx.channel().closeFuture().addListener(remover);
}
super.channelActive(ctx);
}
public long numberOfOpenChannels() {
return openChannelsMetric.count();
}
public long totalChannels() {
return totalChannelsMetric.count();
}
@Override
public void close() {
try {
Netty4Utils.closeChannels(openChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
}
openChannels.clear();
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty4;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -91,7 +92,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
final Netty4Transport t = (Netty4Transport) transport;
@SuppressWarnings("unchecked")
final TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {

View File

@ -36,4 +36,17 @@ public class NioHttpChannel extends NioSocketChannel implements HttpChannel {
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
getContext().sendMessage(response, ActionListener.toBiConsumer(listener));
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override
public String toString() {
return "NioHttpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + getRemoteAddress() +
'}';
}
}

View File

@ -20,22 +20,20 @@
package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
@ -44,6 +42,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
@ -115,7 +114,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private final int tcpReceiveBufferSize;
private final Set<NioServerSocketChannel> serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<NioSocketChannel> socketChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private NioGroup nioGroup;
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;
@ -156,7 +154,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
int workerCount = NIO_HTTP_WORKER_COUNT.get(settings);
nioGroup = new NioGroup(daemonThreadFactory(this.settings, HTTP_SERVER_ACCEPTOR_THREAD_NAME_PREFIX), acceptorCount,
daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount,
(s) -> new EventHandler(this::nonChannelExceptionCaught, s));
(s) -> new EventHandler(this::onNonChannelException, s));
channelFactory = new HttpChannelFactory();
this.boundAddress = createBoundHttpAddress();
@ -187,12 +185,13 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
closeChannels(new ArrayList<>(socketChannels));
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
socketChannels.clear();
httpChannels.clear();
try {
nioGroup.close();
@ -235,38 +234,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
@Override
public HttpStats stats() {
return new HttpStats(serverChannels.size(), socketChannels.size());
}
protected void exceptionCaught(NioSocketChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Read timeout [{}]", channel.getRemoteAddress());
}
channel.close();
} else {
if (lifecycle.started() == false) {
// ignore
return;
}
if (NetworkExceptionHelper.isCloseConnectionException(cause) == false) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel),
cause);
channel.close();
} else {
logger.debug(
(Supplier<?>) () -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel),
cause);
channel.close();
}
}
}
protected void nonChannelExceptionCaught(Exception ex) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()), ex);
return new HttpStats(serverChannels.size(), totalChannelsAccepted.get());
}
static NioCorsConfig buildCorsConfig(Settings settings) {
@ -324,7 +292,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
private void acceptChannel(NioSocketChannel socketChannel) {
socketChannels.add(socketChannel);
super.serverAcceptedChannel((HttpChannel) socketChannel);
}
private class HttpChannelFactory extends ChannelFactory<NioServerSocketChannel, NioHttpChannel> {
@ -342,7 +310,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);

View File

@ -28,11 +28,11 @@ import java.io.IOException;
import java.net.StandardSocketOptions;
import java.nio.channels.SocketChannel;
public class TcpNioSocketChannel extends NioSocketChannel implements TcpChannel {
public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
private final String profile;
public TcpNioSocketChannel(String profile, SocketChannel socketChannel) throws IOException {
public NioTcpChannel(String profile, SocketChannel socketChannel) throws IOException {
super(socketChannel);
this.profile = profile;
}

View File

@ -32,11 +32,11 @@ import java.nio.channels.ServerSocketChannel;
* This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel}
* interface. As it is a server socket, setting SO_LINGER and sending messages is not supported.
*/
public class TcpNioServerSocketChannel extends NioServerSocketChannel implements TcpChannel {
public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel {
private final String profile;
public TcpNioServerSocketChannel(String profile, ServerSocketChannel socketChannel) throws IOException {
public NioTcpServerChannel(String profile, ServerSocketChannel socketChannel) throws IOException {
super(socketChannel);
this.profile = profile;
}

View File

@ -40,7 +40,6 @@ import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
@ -78,14 +77,14 @@ public class NioTransport extends TcpTransport {
}
@Override
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
protected NioTcpServerChannel bind(String name, InetSocketAddress address) throws IOException {
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
return nioGroup.bindServerChannel(address, channelFactory);
}
@Override
protected TcpNioSocketChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
TcpNioSocketChannel channel = nioGroup.openChannel(address, clientChannelFactory);
protected NioTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> connectListener) throws IOException {
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel;
}
@ -131,19 +130,15 @@ public class NioTransport extends TcpTransport {
profileToChannelFactory.clear();
}
protected void exceptionCaught(NioSocketChannel channel, Exception exception) {
onException((TcpChannel) channel, exception);
}
protected void acceptChannel(NioSocketChannel channel) {
serverAcceptedChannel((TcpNioSocketChannel) channel);
serverAcceptedChannel((NioTcpChannel) channel);
}
protected TcpChannelFactory channelFactory(ProfileSettings settings, boolean isClient) {
return new TcpChannelFactoryImpl(settings);
}
protected abstract class TcpChannelFactory extends ChannelFactory<TcpNioServerSocketChannel, TcpNioSocketChannel> {
protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
protected TcpChannelFactory(RawChannelFactory rawChannelFactory) {
super(rawChannelFactory);
@ -164,14 +159,14 @@ public class NioTransport extends TcpTransport {
}
@Override
public TcpNioSocketChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
TcpNioSocketChannel nioChannel = new TcpNioSocketChannel(profileName, channel);
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel);
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
TcpReadWriteHandler readWriteHandler = new TcpReadWriteHandler(nioChannel, NioTransport.this);
Consumer<Exception> exceptionHandler = (e) -> exceptionCaught(nioChannel, e);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
BytesChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, readWriteHandler,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
@ -179,8 +174,8 @@ public class NioTransport extends TcpTransport {
}
@Override
public TcpNioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
TcpNioServerSocketChannel nioChannel = new TcpNioServerSocketChannel(profileName, channel);
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
Consumer<NioSocketChannel> acceptor = NioTransport.this::acceptChannel;

View File

@ -28,10 +28,10 @@ import java.io.IOException;
public class TcpReadWriteHandler extends BytesWriteHandler {
private final TcpNioSocketChannel channel;
private final NioTcpChannel channel;
private final TcpTransport transport;
public TcpReadWriteHandler(TcpNioSocketChannel channel, TcpTransport transport) {
public TcpReadWriteHandler(NioTcpChannel channel, TcpTransport transport) {
this.channel = channel;
this.transport = transport;
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.nio;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -96,7 +97,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
@SuppressWarnings("unchecked")
TcpTransport.NodeChannels channels = (TcpTransport.NodeChannels) connection;
TcpChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
CloseableChannel.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true);
}
public void testConnectException() throws UnknownHostException {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -43,11 +42,10 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
NodeHotThreads> {
@Inject
public TransportNodesHotThreadsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
public TransportNodesHotThreadsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters) {
super(settings, NodesHotThreadsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeHotThreads.class);
NodesHotThreadsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeHotThreads.class);
}
@Override

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,12 +43,10 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
private final NodeService nodeService;
@Inject
public TransportNodesInfoAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
public TransportNodesInfoAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesInfoAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT, NodeInfo.class);
NodesInfoRequest::new, NodeInfoRequest::new, ThreadPool.Names.MANAGEMENT, NodeInfo.class);
this.nodeService = nodeService;
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -55,11 +54,11 @@ public class TransportNodesReloadSecureSettingsAction extends TransportNodesActi
@Inject
public TransportNodesReloadSecureSettingsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Environment environment,
Environment environment,
PluginsService pluginService) {
super(settings, NodesReloadSecureSettingsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesReloadSecureSettingsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC,
NodesReloadSecureSettingsResponse.NodeResponse.class);
NodesReloadSecureSettingsRequest::new, NodeRequest::new, ThreadPool.Names.GENERIC,
NodesReloadSecureSettingsResponse.NodeResponse.class);
this.environment = environment;
this.pluginsService = pluginService;
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -46,10 +45,9 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
@Inject
public TransportNodesStatsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
NodeService nodeService, ActionFilters actionFilters) {
super(settings, NodesStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT, NodeStats.class);
NodesStatsRequest::new, NodeStatsRequest::new, ThreadPool.Names.MANAGEMENT, NodeStats.class);
this.nodeService = nodeService;
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -64,11 +63,9 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Inject
public TransportCancelTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
TransportService transportService, ActionFilters actionFilters) {
super(settings, CancelTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new,
ThreadPool.Names.MANAGEMENT);
CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -72,9 +71,8 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
@Inject
public TransportGetTaskAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, Client client,
NamedXContentRegistry xContentRegistry) {
super(settings, GetTaskAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetTaskRequest::new);
ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry) {
super(settings, GetTaskAction.NAME, threadPool, transportService, actionFilters, GetTaskRequest::new);
this.clusterService = clusterService;
this.transportService = transportService;
this.client = client;

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -53,9 +52,9 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
@Inject
public TransportListTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
TransportService transportService, ActionFilters actionFilters) {
super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -43,10 +42,9 @@ public class TransportNodesUsageAction
@Inject
public TransportNodesUsageAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, UsageService usageService) {
super(settings, NodesUsageAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
NodesUsageRequest::new, NodeUsageRequest::new, ThreadPool.Names.MANAGEMENT, NodeUsage.class);
TransportService transportService, ActionFilters actionFilters, UsageService usageService) {
super(settings, NodesUsageAction.NAME, threadPool, clusterService, transportService, actionFilters,
NodesUsageRequest::new, NodeUsageRequest::new, ThreadPool.Names.MANAGEMENT, NodeUsage.class);
this.usageService = usageService;
}

View File

@ -19,12 +19,13 @@
package org.elasticsearch.action.admin.cluster.remote;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -38,10 +39,9 @@ public final class TransportRemoteInfoAction extends HandledTransportAction<Remo
@Inject
public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService) {
super(settings, RemoteInfoAction.NAME, threadPool, transportService, actionFilters, RemoteInfoRequest::new,
indexNameExpressionResolver);
ActionFilters actionFilters, SearchTransportService searchTransportService) {
super(settings, RemoteInfoAction.NAME, threadPool, transportService, actionFilters,
(Supplier<RemoteInfoRequest>) RemoteInfoRequest::new);
this.remoteClusterService = searchTransportService.getRemoteClusterService();
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.snapshots.delete;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.xcontent.XContentParser;
/**
* Delete snapshot response
@ -32,4 +33,9 @@ public class DeleteSnapshotResponse extends AcknowledgedResponse {
DeleteSnapshotResponse(boolean acknowledged) {
super(acknowledged);
}
public static DeleteSnapshotResponse fromXContent(XContentParser parser) {
return new DeleteSnapshotResponse(parseAcknowledged(parser));
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
@ -63,12 +62,11 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
private final SnapshotShardsService snapshotShardsService;
@Inject
public TransportNodesSnapshotsStatus(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
SnapshotShardsService snapshotShardsService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeSnapshotStatus.class);
public TransportNodesSnapshotsStatus(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, SnapshotShardsService snapshotShardsService,
ActionFilters actionFilters) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeSnapshotStatus.class);
this.snapshotShardsService = snapshotShardsService;
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -58,13 +57,11 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Inject
public TransportClusterStatsAction(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
NodeService nodeService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
public TransportClusterStatsAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, NodeService nodeService, IndicesService indicesService,
ActionFilters actionFilters) {
super(settings, ClusterStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT,
ClusterStatsNodeResponse.class);
ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT, ClusterStatsNodeResponse.class);
this.nodeService = nodeService;
this.indicesService = indicesService;
}

View File

@ -19,10 +19,11 @@
package org.elasticsearch.action.admin.indices.flush;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.flush.SyncedFlushService;
@ -37,11 +38,10 @@ public class TransportSyncedFlushAction extends HandledTransportAction<SyncedFlu
SyncedFlushService syncedFlushService;
@Inject
public TransportSyncedFlushAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SyncedFlushService syncedFlushService) {
super(settings, SyncedFlushAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SyncedFlushRequest::new);
public TransportSyncedFlushAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, SyncedFlushService syncedFlushService) {
super(settings, SyncedFlushAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SyncedFlushRequest>) SyncedFlushRequest::new);
this.syncedFlushService = syncedFlushService;
}

View File

@ -41,14 +41,16 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
private final ClusterService clusterService;
private final TransportGetFieldMappingsIndexAction shardAction;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, TransportGetFieldMappingsIndexAction shardAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, GetFieldMappingsRequest::new);
super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters, GetFieldMappingsRequest::new);
this.clusterService = clusterService;
this.shardAction = shardAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override

View File

@ -91,6 +91,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
@ -110,7 +111,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, BulkRequest::new);
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
Objects.requireNonNull(relativeTimeProvider);
this.clusterService = clusterService;
this.ingestService = ingestService;
@ -119,6 +120,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
this.autoCreateIndex = autoCreateIndex;
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.indexNameExpressionResolver = indexNameExpressionResolver;
clusterService.addStateApplier(this.ingestForwarder);
}

View File

@ -46,19 +46,18 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
TransportFieldCapabilitiesIndexAction shardAction,
ActionFilters actionFilters,
IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, FieldCapabilitiesAction.NAME, threadPool, transportService,
actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, FieldCapabilitiesAction.NAME, threadPool, transportService, actionFilters, FieldCapabilitiesRequest::new);
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.shardAction = shardAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override

View File

@ -40,16 +40,17 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequest, MultiGetResponse> {
private final ClusterService clusterService;
private final TransportShardMultiGetAction shardAction;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportMultiGetAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportShardMultiGetAction shardAction,
ActionFilters actionFilters, IndexNameExpressionResolver resolver) {
super(settings, MultiGetAction.NAME, threadPool, transportService, actionFilters, resolver, MultiGetRequest::new);
super(settings, MultiGetAction.NAME, threadPool, transportService, actionFilters, MultiGetRequest::new);
this.clusterService = clusterService;
this.shardAction = shardAction;
this.indexNameExpressionResolver = resolver;
}
@Override

View File

@ -22,8 +22,8 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
@ -39,8 +39,10 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
private final SimulateExecutionService executionService;
@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, SimulatePipelineRequest::new, indexNameExpressionResolver);
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new);
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool);
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -40,9 +39,8 @@ public class TransportMainAction extends HandledTransportAction<MainRequest, Mai
@Inject
public TransportMainAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService) {
super(settings, MainAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MainRequest::new);
ActionFilters actionFilters, ClusterService clusterService) {
super(settings, MainAction.NAME, threadPool, transportService, actionFilters, MainRequest::new);
this.clusterService = clusterService;
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -37,9 +36,8 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
@Inject
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService) {
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters,
ClearScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -49,9 +48,8 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction,
ActionFilters actionFilters, IndexNameExpressionResolver resolver) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
@ -60,8 +58,8 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver resolver, int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, resolver, MultiSearchRequest::new);
int availableProcessors, LongSupplier relativeTimeProvider) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -74,19 +75,22 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final RemoteClusterService remoteClusterService;
private final SearchPhaseController searchPhaseController;
private final SearchService searchService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, SearchService searchService,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new, indexNameExpressionResolver);
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new);
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService);
this.clusterService = clusterService;
this.searchService = searchService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState,

View File

@ -22,9 +22,9 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -43,10 +43,9 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
@Inject
public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest::new,
indexNameExpressionResolver);
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<SearchScrollRequest>) SearchScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController;

View File

@ -22,7 +22,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
@ -39,29 +38,28 @@ import java.util.function.Supplier;
public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse>
extends TransportAction<Request, Response> {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
ActionFilters actionFilters,
Supplier<Request> request) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this(settings, actionName, true, threadPool, transportService, actionFilters, request);
}
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
IndexNameExpressionResolver indexNameExpressionResolver) {
this(settings, actionName, true, threadPool, transportService, actionFilters, requestReader, indexNameExpressionResolver);
ActionFilters actionFilters, Writeable.Reader<Request> requestReader) {
this(settings, actionName, true, threadPool, transportService, actionFilters, requestReader);
}
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
Supplier<Request> request) {
super(settings, actionName, threadPool, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
new TransportHandler());
}
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestReader, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
Writeable.Reader<Request> requestReader) {
super(settings, actionName, threadPool, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
new TransportHandler());
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
@ -39,16 +38,14 @@ public abstract class TransportAction<Request extends ActionRequest, Response ex
protected final ThreadPool threadPool;
protected final String actionName;
private final ActionFilter[] filters;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TaskManager taskManager;
protected TransportAction(Settings settings, String actionName, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TaskManager taskManager) {
TaskManager taskManager) {
super(settings);
this.threadPool = threadPool;
this.actionName = actionName;
this.filters = actionFilters.filters();
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.taskManager = taskManager;
}

View File

@ -54,15 +54,17 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
final String transportShardAction;
protected TransportBroadcastAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request, Supplier<ShardRequest> shardRequest, String shardExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.transportShardAction = actionName + "[s]";
transportService.registerRequestHandler(transportShardAction, shardRequest, shardExecutor, new ShardTransportHandler());

View File

@ -81,6 +81,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
private final ClusterService clusterService;
private final TransportService transportService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
final String transportNodeBroadcastAction;
@ -109,11 +110,12 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
Supplier<Request> request,
String executor,
boolean canTripCircuitBreaker) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters,
request);
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
transportNodeBroadcastAction = actionName + "[n]";

View File

@ -56,6 +56,7 @@ import java.util.function.Supplier;
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends HandledTransportAction<Request, Response> {
protected final TransportService transportService;
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
final String executor;
@ -74,10 +75,11 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters,
request);
this.transportService = transportService;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor();
}
@ -85,10 +87,11 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, Writeable.Reader<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, request,
indexNameExpressionResolver);
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, request
);
this.transportService = transportService;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor();
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -63,11 +62,9 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
protected TransportNodesAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest,
String nodeExecutor,
Supplier<NodesRequest> request, Supplier<NodeRequest> nodeRequest, String nodeExecutor,
Class<NodeResponse> nodeResponseClass) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = Objects.requireNonNull(clusterService);
this.transportService = Objects.requireNonNull(transportService);
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);

View File

@ -56,13 +56,15 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
private final TransportReplicationAction replicatedBroadcastShardAction;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
public TransportBroadcastReplicationAction(String name, Supplier<Request> request, Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
super(settings, name, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
super(settings, name, threadPool, transportService, actionFilters, request);
this.replicatedBroadcastShardAction = replicatedBroadcastShardAction;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}

View File

@ -104,6 +104,7 @@ public abstract class TransportReplicationAction<
protected final ClusterService clusterService;
protected final ShardStateAction shardStateAction;
protected final IndicesService indicesService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
protected final TransportRequestOptions transportOptions;
protected final String executor;
@ -131,11 +132,12 @@ public abstract class TransportReplicationAction<
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
Supplier<ReplicaRequest> replicaRequest, String executor,
boolean syncGlobalCheckpointAfterOperation) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
super(settings, actionName, threadPool, actionFilters, transportService.getTaskManager());
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.shardStateAction = shardStateAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor;
this.transportPrimaryAction = actionName + "[p]";

View File

@ -52,6 +52,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
extends HandledTransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
final String executor;
final String shardActionName;
@ -59,9 +60,10 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
super(settings, actionName, threadPool, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor();
this.shardActionName = actionName + "[s]";
transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler());

View File

@ -61,8 +61,8 @@ import static org.elasticsearch.action.support.TransportActions.isShardNotAvaila
public abstract class TransportSingleShardAction<Request extends SingleShardRequest<Request>, Response extends ActionResponse> extends TransportAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;
final String transportShardAction;
final String executor;
@ -70,9 +70,10 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request, String executor) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
super(settings, actionName, threadPool, actionFilters, transportService.getTaskManager());
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.transportShardAction = actionName + "[s]";
this.executor = executor;

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
@ -78,12 +77,10 @@ public abstract class TransportTasksAction<
protected final String transportNodeAction;
protected TransportTasksAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier,
String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestSupplier);
protected TransportTasksAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, Supplier<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier, String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, requestSupplier);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";

View File

@ -41,16 +41,17 @@ import java.util.concurrent.atomic.AtomicInteger;
public class TransportMultiTermVectorsAction extends HandledTransportAction<MultiTermVectorsRequest, MultiTermVectorsResponse> {
private final ClusterService clusterService;
private final TransportShardMultiTermsVectorAction shardAction;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportMultiTermVectorsAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportShardMultiTermsVectorAction shardAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, MultiTermVectorsAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiTermVectorsRequest::new);
super(settings, MultiTermVectorsAction.NAME, threadPool, transportService, actionFilters, MultiTermVectorsRequest::new);
this.clusterService = clusterService;
this.shardAction = shardAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override

View File

@ -730,7 +730,7 @@ public class MasterService extends AbstractLifecycleComponent {
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.network;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
public interface CloseableChannel extends Closeable {
/**
* Closes the channel. For most implementations, this will be be an asynchronous process. For this
* reason, this method does not throw {@link java.io.IOException} There is no guarantee that the channel
* will be closed when this method returns. Use the {@link #addCloseListener(ActionListener)} method
* to implement logic that depends on knowing when the channel is closed.
*/
@Override
void close();
/**
* Adds a listener that will be executed when the channel is closed. If the channel is still open when
* this listener is added, the listener will be executed by the thread that eventually closes the
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<Void> listener);
/**
* Indicates whether a channel is currently open
*
* @return boolean indicating if channel is open
*/
boolean isOpen();
/**
* Closes the channel without blocking.
*
* @param channel to close
*/
static <C extends CloseableChannel> void closeChannel(C channel) {
closeChannel(channel, false);
}
/**
* Closes the channel.
*
* @param channel to close
* @param blocking indicates if we should block on channel close
*/
static <C extends CloseableChannel> void closeChannel(C channel, boolean blocking) {
closeChannels(Collections.singletonList(channel), blocking);
}
/**
* Closes the channels.
*
* @param channels to close
* @param blocking indicates if we should block on channel close
*/
static <C extends CloseableChannel> void closeChannels(List<C> channels, boolean blocking) {
try {
IOUtils.close(channels);
} catch (IOException e) {
// The CloseableChannel#close method does not throw IOException, so this should not occur.
throw new UncheckedIOException(e);
}
if (blocking) {
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
futures.add(closeFuture);
}
blockOnFutures(futures);
}
}
static void blockOnFutures(List<ActionFuture<Void>> futures) {
for (ActionFuture<Void> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
// Ignore as we are only interested in waiting for the close process to complete. Logging
// close exceptions happens elsewhere.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -153,7 +153,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,

View File

@ -30,7 +30,6 @@ import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
@ -56,12 +55,10 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
private final GatewayMetaState metaState;
@Inject
public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
GatewayMetaState metaState) {
public TransportNodesListGatewayMetaState(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, GatewayMetaState metaState) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
Request::new, NodeRequest::new, ThreadPool.Names.GENERIC, NodeGatewayMetaState.class);
this.metaState = metaState;
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -72,14 +71,11 @@ public class TransportNodesListGatewayStartedShards extends
private final IndicesService indicesService;
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
public TransportNodesListGatewayStartedShards(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
NodeEnvironment env, IndicesService indicesService) {
super(settings, ACTION_NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED,
NodeGatewayStartedShards.class);
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED, NodeGatewayStartedShards.class);
this.nodeEnv = env;
this.indicesService = indicesService;
}

View File

@ -21,12 +21,16 @@ package org.elasticsearch.http;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -41,9 +45,14 @@ import org.elasticsearch.transport.BindTransportException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
@ -60,11 +69,13 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
protected final Dispatcher dispatcher;
private final NamedXContentRegistry xContentRegistry;
protected final String[] bindHosts;
protected final String[] publishHosts;
protected final PortsRange port;
protected final ByteSizeValue maxContentLength;
private final String[] bindHosts;
private final String[] publishHosts;
protected final AtomicLong totalChannelsAccepted = new AtomicLong();
protected final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected volatile BoundTransportAddress boundAddress;
protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@ -166,6 +177,49 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
return publishPort;
}
protected void onException(HttpChannel channel, Exception e) {
if (lifecycle.started() == false) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
return;
}
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
logger.trace(() -> new ParameterizedMessage(
"close connection exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (NetworkExceptionHelper.isConnectException(e)) {
logger.trace(() -> new ParameterizedMessage(
"connect exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else if (e instanceof CancelledKeyException) {
logger.trace(() -> new ParameterizedMessage(
"cancelled key exception caught while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
} else {
logger.warn(() -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
}
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
}
protected void serverAcceptedChannel(HttpChannel httpChannel) {
boolean addedOnThisCall = httpChannels.add(httpChannel);
assert addedOnThisCall : "Channel should only be added to http channel set once";
totalChannelsAccepted.incrementAndGet();
httpChannel.addCloseListener(ActionListener.wrap(() -> httpChannels.remove(httpChannel)));
logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel));
}
/**
* This method handles an incoming http request.
*
@ -181,7 +235,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
*
* @param httpRequest that is incoming
* @param httpChannel that received the http request
* @param exception that was encountered
* @param exception that was encountered
*/
public void incomingRequestError(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
handleIncomingRequest(httpRequest, httpChannel, exception);
@ -219,7 +273,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
innerRestRequest = requestWithoutContentTypeHeader(httpRequest, httpChannel, badRequestCause);
} catch (final RestRequest.BadParameterException e) {
badRequestCause = ExceptionsHelper.useOrSuppress(badRequestCause, e);
innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
innerRestRequest = RestRequest.requestWithoutParameters(xContentRegistry, httpRequest, httpChannel);
}
restRequest = innerRestRequest;
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.AbstractRestChannel;
@ -114,7 +115,7 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
}
if (isCloseConnection()) {
toClose.add(httpChannel);
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
ActionListener<Void> listener = ActionListener.wrap(() -> Releasables.close(toClose));

Some files were not shown because too many files have changed in this diff Show More