Merge branch 'master' into ccr

* master:
  Add get field mappings to High Level REST API Client ()
  [DOCS] Updates Watcher examples for code testing ()
  TEST: Add bwc recovery tests with synced-flush index
  [DOCS] Move sql to docs ()
  [DOCS] Move monitoring to docs folder ()
  Core: Combine doExecute methods in TransportAction ()
  IndexShard should not return null stats ()
  fix repository update with the same settings but different type ()
  Fix Mockito trying to mock IOException that isn't thrown by method () ()
  Node selector per client rather than per request ()
  Core: Combine messageRecieved methods in TransportRequestHandler ()
  Upgrade to Lucene 7.4.0. ()
  [ML] Add ML filter update API ()
  Allow multiple unicast host providers ()
  Avoid deprecation warning when running the ML datafeed extractor. ()
  REST high-level client: add simulate pipeline API ()
  Get Mapping API to honour allow_no_indices and ignore_unavailable ()
  [PkiRealm] Invalidate cache on role mappings change ()
  [Security] Check auth scheme case insensitively ()
  In NumberFieldType equals and hashCode, make sure that NumberType is taken into account. ()
  [DOCS] Fix REST tests in SQL docs
  [DOCS] Add code snippet testing in more ML APIs ()
  Core: Remove ThreadPool from base TransportAction ()
  [DOCS] Remove fixed file from build.gradle
  Rename createNewTranslog to fileBasedRecovery ()
  Test: Skip assertion on windows
  [DOCS] Creates field and document level security overview ()
  [DOCS] Significantly improve SQL docs
  [DOCS] Move migration APIs to docs ()
  Core: Convert TransportAction.execute uses to client calls ()
  Return transport addresses from UnicastHostsProvider ()
  Ensure local addresses aren't null ()
  Remove unused generic type for client execute method ()
  Introduce http and tcp server channels ()
This commit is contained in:
Nhat Nguyen 2018-06-23 17:27:17 -04:00
commit 08ee9b67c5
484 changed files with 5979 additions and 3024 deletions
buildSrc
client
docs
libs/nio/src
modules
ingest-common/src/main/java/org/elasticsearch/ingest/common
lang-expression/licenses
lang-mustache/src/main/java/org/elasticsearch/script/mustache
lang-painless/src/main/java/org/elasticsearch/painless
rank-eval/src/main/java/org/elasticsearch/index/rankeval
reindex/src
transport-netty4/src
plugins

@ -1,5 +1,5 @@
elasticsearch = 7.0.0-alpha1
lucene = 7.4.0-snapshot-518d303506
lucene = 7.4.0
# optional dependencies
spatial4j = 0.7

@ -19,8 +19,8 @@
package org.elasticsearch.plugin.noop.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
@ -30,7 +30,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
@ -38,13 +38,12 @@ public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest,
new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 1L, DocWriteResponse.Result.CREATED));
@Inject
public TransportNoopBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters) {
super(settings, NoopBulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
public TransportNoopBulkAction(Settings settings, TransportService transportService, ActionFilters actionFilters) {
super(settings, NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new);
}
@Override
protected void doExecute(BulkRequest request, ActionListener<BulkResponse> listener) {
protected void doExecute(Task task, BulkRequest request, ActionListener<BulkResponse> listener) {
final int itemCount = request.requests().size();
// simulate at least a realistic amount of data that gets serialized
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemCount];

@ -27,27 +27,25 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
public class TransportNoopSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Inject
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters) {
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new);
public TransportNoopSearchAction(Settings settings, TransportService transportService, ActionFilters actionFilters) {
super(settings, NoopSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
}
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
listener.onResponse(new SearchResponse(new InternalSearchResponse(
new SearchHits(
new SearchHit[0], 0L, 0.0f),

@ -37,6 +37,8 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@ -188,6 +190,35 @@ public final class IndicesClient {
GetMappingsResponse::fromXContent, listener, emptySet());
}
/**
* Retrieves the field mappings on an index or indices using the Get Field Mapping API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html">
* Get Field Mapping API on elastic.co</a>
* @param getFieldMappingsRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetFieldMappingsResponse getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest,
RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(getFieldMappingsRequest, RequestConverters::getFieldMapping, options,
GetFieldMappingsResponse::fromXContent, emptySet());
}
/**
* Asynchronously retrieves the field mappings on an index on indices using the Get Field Mapping API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html">
* Get Field Mapping API on elastic.co</a>
* @param getFieldMappingsRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void getFieldMappingAsync(GetFieldMappingsRequest getFieldMappingsRequest, RequestOptions options,
ActionListener<GetFieldMappingsResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(getFieldMappingsRequest, RequestConverters::getFieldMapping, options,
GetFieldMappingsResponse::fromXContent, listener, emptySet());
}
/**
* Updates aliases using the Index Aliases API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html">

@ -24,6 +24,8 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import java.io.IOException;
@ -125,4 +127,37 @@ public final class IngestClient {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, options,
WritePipelineResponse::fromXContent, listener, emptySet());
}
/**
* Simulate a pipeline on a set of documents provided in the request
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
* Simulate Pipeline API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public SimulatePipelineResponse simulatePipeline(SimulatePipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::simulatePipeline, options,
SimulatePipelineResponse::fromXContent, emptySet());
}
/**
* Asynchronously simulate a pipeline on a set of documents provided in the request
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html">
* Simulate Pipeline API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void simulatePipelineAsync(SimulatePipelineRequest request,
RequestOptions options,
ActionListener<SimulatePipelineResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::simulatePipeline, options,
SimulatePipelineResponse::fromXContent, listener, emptySet());
}
}

@ -50,6 +50,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@ -71,6 +72,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -229,6 +231,25 @@ final class RequestConverters {
return request;
}
static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest) throws IOException {
String[] indices = getFieldMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.indices();
String[] types = getFieldMappingsRequest.types() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.types();
String[] fields = getFieldMappingsRequest.fields() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.fields();
String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(indices)
.addPathPartAsIs("_mapping").addCommaSeparatedPathParts(types)
.addPathPartAsIs("field").addCommaSeparatedPathParts(fields)
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
Params parameters = new Params(request);
parameters.withIndicesOptions(getFieldMappingsRequest.indicesOptions());
parameters.withIncludeDefaults(getFieldMappingsRequest.includeDefaults());
parameters.withLocal(getFieldMappingsRequest.local());
return request;
}
static Request refresh(RefreshRequest refreshRequest) {
String[] indices = refreshRequest.indices() == null ? Strings.EMPTY_ARRAY : refreshRequest.indices();
Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_refresh"));
@ -886,6 +907,20 @@ final class RequestConverters {
return request;
}
static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) throws IOException {
EndpointBuilder builder = new EndpointBuilder().addPathPartAsIs("_ingest/pipeline");
if (simulatePipelineRequest.getId() != null && !simulatePipelineRequest.getId().isEmpty()) {
builder.addPathPart(simulatePipelineRequest.getId());
}
builder.addPathPartAsIs("_simulate");
String endpoint = builder.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request);
params.putParam("verbose", Boolean.toString(simulatePipelineRequest.isVerbose()));
request.setEntity(createEntity(simulatePipelineRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
static Request getAlias(GetAliasesRequest getAliasesRequest) {
String[] indices = getAliasesRequest.indices() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.indices();
String[] aliases = getAliasesRequest.aliases() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.aliases();

@ -85,9 +85,7 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
}
}
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
protected static XContentBuilder buildRandomXContentPipeline(XContentBuilder pipelineBuilder) throws IOException {
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
@ -114,6 +112,12 @@ public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase {
return pipelineBuilder;
}
protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildRandomXContentPipeline(pipelineBuilder);
}
protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));

@ -43,6 +43,8 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@ -74,6 +76,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -378,6 +381,41 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
assertThat(mappings, equalTo(expected));
}
public void testGetFieldMapping() throws IOException {
String indexName = "test";
createIndex(indexName, Settings.EMPTY);
PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
putMappingRequest.type("_doc");
XContentBuilder mappingBuilder = JsonXContent.contentBuilder();
mappingBuilder.startObject().startObject("properties").startObject("field");
mappingBuilder.field("type", "text");
mappingBuilder.endObject().endObject().endObject();
putMappingRequest.source(mappingBuilder);
PutMappingResponse putMappingResponse =
execute(putMappingRequest, highLevelClient().indices()::putMapping, highLevelClient().indices()::putMappingAsync);
assertTrue(putMappingResponse.isAcknowledged());
GetFieldMappingsRequest getFieldMappingsRequest = new GetFieldMappingsRequest()
.indices(indexName)
.types("_doc")
.fields("field");
GetFieldMappingsResponse getFieldMappingsResponse =
execute(getFieldMappingsRequest,
highLevelClient().indices()::getFieldMapping,
highLevelClient().indices()::getFieldMappingAsync);
final Map<String, GetFieldMappingsResponse.FieldMappingMetaData> fieldMappingMap =
getFieldMappingsResponse.mappings().get(indexName).get("_doc");
final GetFieldMappingsResponse.FieldMappingMetaData metaData =
new GetFieldMappingsResponse.FieldMappingMetaData("field",
new BytesArray("{\"field\":{\"type\":\"text\"}}"));
assertThat(fieldMappingMap, equalTo(Collections.singletonMap("field", metaData)));
}
public void testDeleteIndex() throws IOException {
{
// Delete index if exists

@ -23,12 +23,22 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.PipelineConfiguration;
import java.io.IOException;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class IngestClientIT extends ESRestHighLevelClientTestCase {
@ -80,4 +90,93 @@ public class IngestClientIT extends ESRestHighLevelClientTestCase {
execute(request, highLevelClient().ingest()::deletePipeline, highLevelClient().ingest()::deletePipelineAsync);
assertTrue(response.isAcknowledged());
}
public void testSimulatePipeline() throws IOException {
testSimulatePipeline(false, false);
}
public void testSimulatePipelineWithFailure() throws IOException {
testSimulatePipeline(false, true);
}
public void testSimulatePipelineVerbose() throws IOException {
testSimulatePipeline(true, false);
}
public void testSimulatePipelineVerboseWithFailure() throws IOException {
testSimulatePipeline(true, true);
}
private void testSimulatePipeline(boolean isVerbose,
boolean isFailure) throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder builder = XContentBuilder.builder(xContentType.xContent());
String rankValue = isFailure ? "non-int" : Integer.toString(1234);
builder.startObject();
{
builder.field("pipeline");
buildRandomXContentPipeline(builder);
builder.startArray("docs");
{
builder.startObject()
.field("_index", "index")
.field("_type", "doc")
.field("_id", "doc_" + 1)
.startObject("_source").field("foo", "rab_" + 1).field("rank", rankValue).endObject()
.endObject();
}
builder.endArray();
}
builder.endObject();
SimulatePipelineRequest request = new SimulatePipelineRequest(
BytesReference.bytes(builder),
builder.contentType()
);
request.setVerbose(isVerbose);
SimulatePipelineResponse response =
execute(request, highLevelClient().ingest()::simulatePipeline, highLevelClient().ingest()::simulatePipelineAsync);
List<SimulateDocumentResult> results = response.getResults();
assertEquals(1, results.size());
if (isVerbose) {
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) results.get(0);
assertEquals(2, verboseResult.getProcessorResults().size());
if (isFailure) {
assertNotNull(verboseResult.getProcessorResults().get(1).getFailure());
assertThat(verboseResult.getProcessorResults().get(1).getFailure().getMessage(),
containsString("unable to convert [non-int] to integer"));
} else {
assertEquals(
verboseResult.getProcessorResults().get(0).getIngestDocument()
.getFieldValue("foo", String.class),
"bar"
);
assertEquals(
Integer.valueOf(1234),
verboseResult.getProcessorResults().get(1).getIngestDocument()
.getFieldValue("rank", Integer.class)
);
}
} else {
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0);
if (isFailure) {
assertNotNull(baseResult.getFailure());
assertThat(baseResult.getFailure().getMessage(),
containsString("unable to convert [non-int] to integer"));
} else {
assertNotNull(baseResult.getIngestDocument());
assertEquals(
baseResult.getIngestDocument().getFieldValue("foo", String.class),
"bar"
);
assertEquals(
Integer.valueOf(1234),
baseResult.getIngestDocument()
.getFieldValue("rank", Integer.class)
);
}
}
}
}

@ -52,6 +52,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
@ -74,6 +75,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
@ -456,6 +458,61 @@ public class RequestConvertersTests extends ESTestCase {
assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
}
public void testGetFieldMapping() throws IOException {
GetFieldMappingsRequest getFieldMappingsRequest = new GetFieldMappingsRequest();
String[] indices = Strings.EMPTY_ARRAY;
if (randomBoolean()) {
indices = randomIndicesNames(0, 5);
getFieldMappingsRequest.indices(indices);
} else if (randomBoolean()) {
getFieldMappingsRequest.indices((String[]) null);
}
String type = null;
if (randomBoolean()) {
type = randomAlphaOfLengthBetween(3, 10);
getFieldMappingsRequest.types(type);
} else if (randomBoolean()) {
getFieldMappingsRequest.types((String[]) null);
}
String[] fields = null;
if (randomBoolean()) {
fields = new String[randomIntBetween(1, 5)];
for (int i = 0; i < fields.length; i++) {
fields[i] = randomAlphaOfLengthBetween(3, 10);
}
getFieldMappingsRequest.fields(fields);
} else if (randomBoolean()) {
getFieldMappingsRequest.fields((String[]) null);
}
Map<String, String> expectedParams = new HashMap<>();
setRandomIndicesOptions(getFieldMappingsRequest::indicesOptions, getFieldMappingsRequest::indicesOptions, expectedParams);
setRandomLocal(getFieldMappingsRequest::local, expectedParams);
Request request = RequestConverters.getFieldMapping(getFieldMappingsRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_mapping");
if (type != null) {
endpoint.add(type);
}
endpoint.add("field");
if (fields != null) {
endpoint.add(String.join(",", fields));
}
assertThat(endpoint.toString(), equalTo(request.getEndpoint()));
assertThat(expectedParams, equalTo(request.getParameters()));
assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
}
public void testDeleteIndex() {
String[] indices = randomIndicesNames(0, 5);
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indices);
@ -1534,6 +1591,34 @@ public class RequestConvertersTests extends ESTestCase {
assertEquals(expectedParams, expectedRequest.getParameters());
}
public void testSimulatePipeline() throws IOException {
String pipelineId = randomBoolean() ? "some_pipeline_id" : null;
boolean verbose = randomBoolean();
String json = "{\"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]}," +
"\"docs\":[{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}]}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(json.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);
request.setId(pipelineId);
request.setVerbose(verbose);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("verbose", Boolean.toString(verbose));
Request expectedRequest = RequestConverters.simulatePipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
if (pipelineId != null && !pipelineId.isEmpty())
endpoint.add(pipelineId);
endpoint.add("_simulate");
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
assertToXContentBody(request, expectedRequest.getEntity());
}
public void testClusterHealth() {
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
Map<String, String> expectedParams = new HashMap<>();
@ -2239,16 +2324,20 @@ public class RequestConvertersTests extends ESTestCase {
}
}
private static void setRandomLocal(MasterNodeReadRequest<?> request, Map<String, String> expectedParams) {
private static void setRandomLocal(Consumer<Boolean> setter, Map<String, String> expectedParams) {
if (randomBoolean()) {
boolean local = randomBoolean();
request.local(local);
setter.accept(local);
if (local) {
expectedParams.put("local", String.valueOf(local));
}
}
}
private static void setRandomLocal(MasterNodeReadRequest<?> request, Map<String, String> expectedParams) {
setRandomLocal(request::local, expectedParams);
}
private static void setRandomTimeout(Consumer<String> setter, TimeValue defaultTimeout, Map<String, String> expectedParams) {
if (randomBoolean()) {
String timeout = randomTimeValue();

@ -41,6 +41,8 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
@ -703,6 +705,110 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
}
}
public void testGetFieldMapping() throws IOException, InterruptedException {
RestHighLevelClient client = highLevelClient();
{
CreateIndexResponse createIndexResponse = client.indices().create(new CreateIndexRequest("twitter"), RequestOptions.DEFAULT);
assertTrue(createIndexResponse.isAcknowledged());
PutMappingRequest request = new PutMappingRequest("twitter");
request.type("tweet");
request.source(
"{\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"timestamp\": {\n" +
" \"type\": \"date\"\n" +
" }\n" +
" }\n" +
"}", // <1>
XContentType.JSON);
PutMappingResponse putMappingResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);
assertTrue(putMappingResponse.isAcknowledged());
}
// tag::get-field-mapping-request
GetFieldMappingsRequest request = new GetFieldMappingsRequest(); // <1>
request.indices("twitter"); // <2>
request.types("tweet"); // <3>
request.fields("message", "timestamp"); // <4>
// end::get-field-mapping-request
// tag::get-field-mapping-request-indicesOptions
request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1>
// end::get-field-mapping-request-indicesOptions
// tag::get-field-mapping-request-local
request.local(true); // <1>
// end::get-field-mapping-request-local
{
// tag::get-field-mapping-execute
GetFieldMappingsResponse response =
client.indices().getFieldMapping(request, RequestOptions.DEFAULT);
// end::get-field-mapping-execute
// tag::get-field-mapping-response
final Map<String, Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>>> mappings =
response.mappings();// <1>
final Map<String, GetFieldMappingsResponse.FieldMappingMetaData> typeMappings =
mappings.get("twitter").get("tweet"); // <2>
final GetFieldMappingsResponse.FieldMappingMetaData metaData =
typeMappings.get("message");// <3>
final String fullName = metaData.fullName();// <4>
final Map<String, Object> source = metaData.sourceAsMap(); // <5>
// end::get-field-mapping-response
}
{
// tag::get-field-mapping-execute-listener
ActionListener<GetFieldMappingsResponse> listener =
new ActionListener<GetFieldMappingsResponse>() {
@Override
public void onResponse(GetFieldMappingsResponse putMappingResponse) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-field-mapping-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<GetFieldMappingsResponse> latchListener = new LatchedActionListener<>(listener, latch);
listener = ActionListener.wrap(r -> {
final Map<String, Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>>> mappings =
r.mappings();
final Map<String, GetFieldMappingsResponse.FieldMappingMetaData> typeMappings =
mappings.get("twitter").get("tweet");
final GetFieldMappingsResponse.FieldMappingMetaData metaData1 = typeMappings.get("message");
final String fullName = metaData1.fullName();
final Map<String, Object> source = metaData1.sourceAsMap();
latchListener.onResponse(r);
}, e -> {
latchListener.onFailure(e);
fail("should not fail");
});
// tag::get-field-mapping-execute-async
client.indices().getFieldMappingAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::get-field-mapping-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
public void testOpenIndex() throws Exception {
RestHighLevelClient client = highLevelClient();

@ -25,6 +25,12 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
import org.elasticsearch.client.RequestOptions;
@ -277,4 +283,109 @@ public class IngestClientDocumentationIT extends ESRestHighLevelClientTestCase {
}
}
public void testSimulatePipeline() throws IOException {
RestHighLevelClient client = highLevelClient();
{
// tag::simulate-pipeline-request
String source =
"{\"" +
"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
"}," +
"\"docs\":[" +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
"]" +
"}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <1>
XContentType.JSON // <2>
);
// end::simulate-pipeline-request
// tag::simulate-pipeline-request-pipeline-id
request.setId("my-pipeline-id"); // <1>
// end::simulate-pipeline-request-pipeline-id
// For testing we set this back to null
request.setId(null);
// tag::simulate-pipeline-request-verbose
request.setVerbose(true); // <1>
// end::simulate-pipeline-request-verbose
// tag::simulate-pipeline-execute
SimulatePipelineResponse response = client.ingest().simulatePipeline(request, RequestOptions.DEFAULT); // <1>
// end::simulate-pipeline-execute
// tag::simulate-pipeline-response
for (SimulateDocumentResult result: response.getResults()) { // <1>
if (request.isVerbose()) {
assert result instanceof SimulateDocumentVerboseResult;
SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult)result; // <2>
for (SimulateProcessorResult processorResult: verboseResult.getProcessorResults()) { // <3>
processorResult.getIngestDocument(); // <4>
processorResult.getFailure(); // <5>
}
} else {
assert result instanceof SimulateDocumentBaseResult;
SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)result; // <6>
baseResult.getIngestDocument(); // <7>
baseResult.getFailure(); // <8>
}
}
// end::simulate-pipeline-response
assert(response.getResults().size() > 0);
}
}
public void testSimulatePipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();
{
String source =
"{\"" +
"pipeline\":{" +
"\"description\":\"_description\"," +
"\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" +
"}," +
"\"docs\":[" +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," +
"{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" +
"]" +
"}";
SimulatePipelineRequest request = new SimulatePipelineRequest(
new BytesArray(source.getBytes(StandardCharsets.UTF_8)),
XContentType.JSON
);
// tag::simulate-pipeline-execute-listener
ActionListener<SimulatePipelineResponse> listener =
new ActionListener<SimulatePipelineResponse>() {
@Override
public void onResponse(SimulatePipelineResponse response) {
// <1>
}
@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::simulate-pipeline-execute-listener
// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);
// tag::simulate-pipeline-execute-async
client.ingest().simulatePipelineAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::simulate-pipeline-execute-async
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}

@ -24,7 +24,7 @@ import java.util.Iterator;
/**
* Selects nodes that can receive requests. Used to keep requests away
* from master nodes or to send them to nodes with a particular attribute.
* Use with {@link RequestOptions.Builder#setNodeSelector(NodeSelector)}.
* Use with {@link RestClientBuilder#setNodeSelector(NodeSelector)}.
*/
public interface NodeSelector {
/**
@ -68,7 +68,7 @@ public interface NodeSelector {
* have the {@code master} role OR it has the data {@code data}
* role.
*/
NodeSelector NOT_MASTER_ONLY = new NodeSelector() {
NodeSelector SKIP_DEDICATED_MASTERS = new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
@ -84,7 +84,7 @@ public interface NodeSelector {
@Override
public String toString() {
return "NOT_MASTER_ONLY";
return "SKIP_DEDICATED_MASTERS";
}
};
}

@ -37,22 +37,18 @@ import java.util.ArrayList;
*/
public final class RequestOptions {
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), NodeSelector.ANY,
HeapBufferedResponseConsumerFactory.DEFAULT).build();
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();
private final List<Header> headers;
private final NodeSelector nodeSelector;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private RequestOptions(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.nodeSelector = builder.nodeSelector;
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
}
public Builder toBuilder() {
Builder builder = new Builder(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return builder;
return new Builder(headers, httpAsyncResponseConsumerFactory);
}
/**
@ -62,14 +58,6 @@ public final class RequestOptions {
return headers;
}
/**
* The selector that chooses which nodes are valid destinations for
* {@link Request}s with these options.
*/
public NodeSelector getNodeSelector() {
return nodeSelector;
}
/**
* The {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
@ -93,9 +81,6 @@ public final class RequestOptions {
b.append(headers.get(h).toString());
}
}
if (nodeSelector != NodeSelector.ANY) {
b.append(", nodeSelector=").append(nodeSelector);
}
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
}
@ -113,24 +98,20 @@ public final class RequestOptions {
RequestOptions other = (RequestOptions) obj;
return headers.equals(other.headers)
&& nodeSelector.equals(other.nodeSelector)
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
}
@Override
public int hashCode() {
return Objects.hash(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
}
public static class Builder {
private final List<Header> headers;
private NodeSelector nodeSelector;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
private Builder(List<Header> headers, NodeSelector nodeSelector,
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
this.headers = new ArrayList<>(headers);
this.nodeSelector = nodeSelector;
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
}
@ -150,14 +131,6 @@ public final class RequestOptions {
this.headers.add(new ReqHeader(name, value));
}
/**
* Configure the selector that chooses which nodes are valid
* destinations for {@link Request}s with these options
*/
public void setNodeSelector(NodeSelector nodeSelector) {
this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector cannot be null");
}
/**
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the

@ -48,6 +48,7 @@ import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;
import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
@ -74,7 +75,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;
import static java.util.Collections.singletonList;
@ -108,15 +108,17 @@ public class RestClient implements Closeable {
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private final FailureListener failureListener;
private final NodeSelector nodeSelector;
private volatile NodeTuple<List<Node>> nodeTuple;
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
List<Node> nodes, String pathPrefix, FailureListener failureListener) {
List<Node> nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
setNodes(nodes);
}
@ -146,7 +148,7 @@ public class RestClient implements Closeable {
/**
* Replaces the hosts with which the client communicates.
*
* @deprecated prefer {@link setNodes} because it allows you
* @deprecated prefer {@link #setNodes(Collection)} because it allows you
* to set metadata for use with {@link NodeSelector}s
*/
@Deprecated
@ -180,8 +182,8 @@ public class RestClient implements Closeable {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (int i = 0; i < hosts.length; i++) {
nodes.add(new Node(hosts[i]));
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}
@ -509,7 +511,7 @@ public class RestClient implements Closeable {
setHeaders(httpRequest, request.getOptions().getHeaders());
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextNode(request.getOptions().getNodeSelector()), httpRequest, ignoreErrorCodes,
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
}
@ -611,7 +613,7 @@ public class RestClient implements Closeable {
* that is closest to being revived.
* @throws IOException if no nodes are available
*/
private NodeTuple<Iterator<Node>> nextNode(NodeSelector nodeSelector) throws IOException {
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);

@ -55,6 +55,7 @@ public final class RestClientBuilder {
private HttpClientConfigCallback httpClientConfigCallback;
private RequestConfigCallback requestConfigCallback;
private String pathPrefix;
private NodeSelector nodeSelector = NodeSelector.ANY;
/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
@ -173,6 +174,16 @@ public final class RestClientBuilder {
return this;
}
/**
* Sets the {@link NodeSelector} to be used for all requests.
* @throws NullPointerException if the provided nodeSelector is null
*/
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector) {
Objects.requireNonNull(nodeSelector, "nodeSelector must not be null");
this.nodeSelector = nodeSelector;
return this;
}
/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
@ -186,7 +197,8 @@ public final class RestClientBuilder {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, pathPrefix, failureListener);
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector);
httpClient.start();
return restClient;
}

@ -59,7 +59,7 @@ public class NodeSelectorTests extends RestClientTestCase {
Collections.shuffle(nodes, getRandom());
List<Node> expected = new ArrayList<>(nodes);
expected.remove(masterOnly);
NodeSelector.NOT_MASTER_ONLY.select(nodes);
NodeSelector.SKIP_DEDICATED_MASTERS.select(nodes);
assertEquals(expected, nodes);
}

@ -114,10 +114,6 @@ public class RequestOptionsTests extends RestClientTestCase {
}
}
if (randomBoolean()) {
builder.setNodeSelector(mock(NodeSelector.class));
}
if (randomBoolean()) {
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
}
@ -131,15 +127,12 @@ public class RequestOptionsTests extends RestClientTestCase {
private static RequestOptions mutate(RequestOptions options) {
RequestOptions.Builder mutant = options.toBuilder();
int mutationType = between(0, 2);
int mutationType = between(0, 1);
switch (mutationType) {
case 0:
mutant.addHeader("extra", "m");
return mutant.build();
case 1:
mutant.setNodeSelector(mock(NodeSelector.class));
return mutant.build();
case 2:
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
return mutant.build();
default:

@ -42,9 +42,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -77,14 +75,15 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
httpServers[i] = httpServer;
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
}
restClient = buildRestClient();
restClient = buildRestClient(NodeSelector.ANY);
}
private static RestClient buildRestClient() {
private static RestClient buildRestClient(NodeSelector nodeSelector) {
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClientBuilder.setNodeSelector(nodeSelector);
return restClientBuilder.build();
}
@ -201,27 +200,28 @@ public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
* test what happens after calling
*/
public void testNodeSelector() throws IOException {
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(firstPositionNodeSelector());
request.setOptions(options);
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// This is different in windows and linux but this matches both.
assertThat(e.getMessage(), startsWith("Connection refused"));
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
Request request = new Request("GET", "/200");
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
if (false == System.getProperty("os.name").startsWith("Windows")) {
assertEquals("Connection refused", e.getMessage());
}
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
}
}

@ -35,9 +35,7 @@ import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.Node.Roles;
import org.junit.After;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -74,13 +72,11 @@ import static org.mockito.Mockito.when;
public class RestClientMultipleHostsTests extends RestClientTestCase {
private ExecutorService exec = Executors.newFixedThreadPool(1);
private RestClient restClient;
private List<Node> nodes;
private HostsTrackingFailureListener failureListener;
@Before
@SuppressWarnings("unchecked")
public void createRestClient() throws IOException {
public RestClient createRestClient(NodeSelector nodeSelector) {
CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class);
when(httpClient.<HttpResponse>execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class),
any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer<Future<HttpResponse>>() {
@ -119,7 +115,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
restClient = new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener);
return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector);
}
/**
@ -131,12 +127,13 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinOkStatusCodes() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Set<HttpHost> hostsSet = hostsSet();
for (int j = 0; j < nodes.size(); j++) {
int statusCode = randomOkStatusCode(getRandom());
Response response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
assertEquals(statusCode, response.getStatusLine().getStatusCode());
assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost()));
}
@ -146,6 +143,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinNoRetryErrors() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5);
for (int i = 0; i < numIters; i++) {
Set<HttpHost> hostsSet = hostsSet();
@ -153,7 +151,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
String method = randomHttpMethod(getRandom());
int statusCode = randomErrorNoRetryStatusCode(getRandom());
try {
Response response = restClient.performRequest(method, "/" + statusCode);
Response response = restClient.performRequest(new Request(method, "/" + statusCode));
if (method.equals("HEAD") && statusCode == 404) {
//no exception gets thrown although we got a 404
assertEquals(404, response.getStatusLine().getStatusCode());
@ -178,9 +176,10 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
}
public void testRoundRobinRetryErrors() throws IOException {
RestClient restClient = createRestClient(NodeSelector.ANY);
String retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
/*
@ -237,7 +236,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
for (int j = 0; j < nodes.size(); j++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
@ -269,7 +268,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
int statusCode = randomErrorNoRetryStatusCode(getRandom());
Response response;
try {
response = restClient.performRequest(randomHttpMethod(getRandom()), "/" + statusCode);
response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode));
} catch (ResponseException e) {
response = e.getResponse();
}
@ -286,7 +285,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
for (int y = 0; y < i + 1; y++) {
retryEndpoint = randomErrorRetryEndpoint();
try {
restClient.performRequest(randomHttpMethod(getRandom()), retryEndpoint);
restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint));
fail("request should have failed");
} catch (ResponseException e) {
Response response = e.getResponse();
@ -323,6 +322,7 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
assertTrue(found);
}
};
RestClient restClient = createRestClient(firstPositionOnly);
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
@ -330,18 +330,16 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(firstPositionOnly);
request.setOptions(options);
Response response = restClient.performRequest(request);
assertEquals(nodes.get(0).getHost(), response.getHost());
}
}
public void testSetNodes() throws IOException {
RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS);
List<Node> newNodes = new ArrayList<>(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
Roles roles = i == 0 ? new Roles(false, true, true) : new Roles(true, false, false);
Node.Roles roles = i == 0 ? new Node.Roles(false, true, true) : new Node.Roles(true, false, false);
newNodes.add(new Node(nodes.get(i).getHost(), null, null, null, roles, null));
}
restClient.setNodes(newNodes);
@ -352,9 +350,6 @@ public class RestClientMultipleHostsTests extends RestClientTestCase {
* NodeSelector overrides the round robin behavior.
*/
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(NodeSelector.NOT_MASTER_ONLY);
request.setOptions(options);
Response response = restClient.performRequest(request);
assertEquals(newNodes.get(0).getHost(), response.getHost());
}

@ -150,7 +150,7 @@ public class RestClientSingleHostTests extends RestClientTestCase {
node = new Node(new HttpHost("localhost", 9200));
failureListener = new HostsTrackingFailureListener();
restClient = new RestClient(httpClient, 10000, defaultHeaders,
singletonList(node), null, failureListener);
singletonList(node), null, failureListener, NodeSelector.ANY);
}
/**

@ -54,7 +54,7 @@ public class RestClientTests extends RestClientTestCase {
public void testCloseIsIdempotent() throws IOException {
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null);
RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
@ -475,7 +475,7 @@ public class RestClientTests extends RestClientTestCase {
private static RestClient createRestClient() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000),
new Header[] {}, nodes, null, null);
new Header[] {}, nodes, null, null, null);
}

@ -36,7 +36,6 @@ import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.HasAttributeNodeSelector;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
@ -54,6 +53,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
/**
@ -82,8 +82,7 @@ public class RestClientDocumentation {
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.addHeader("Authorization", "Bearer " + TOKEN); // <1>
builder.setNodeSelector(NodeSelector.NOT_MASTER_ONLY); // <2>
builder.setHttpAsyncResponseConsumerFactory( // <3>
builder.setHttpAsyncResponseConsumerFactory( // <2>
new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
@ -115,6 +114,45 @@ public class RestClientDocumentation {
builder.setMaxRetryTimeoutMillis(10000); // <1>
//end::rest-client-init-max-retry-timeout
}
{
//tag::rest-client-init-node-selector
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS); // <1>
//end::rest-client-init-node-selector
}
{
//tag::rest-client-init-allocation-aware-selector
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
builder.setNodeSelector(new NodeSelector() { // <1>
@Override
public void select(Iterable<Node> nodes) {
/*
* Prefer any node that belongs to rack_one. If none is around
* we will go to another rack till it's time to try and revive
* some of the nodes that belong to rack_one.
*/
boolean foundOne = false;
for (Node node : nodes) {
String rackId = node.getAttributes().get("rack_id").get(0);
if ("rack_one".equals(rackId)) {
foundOne = true;
break;
}
}
if (foundOne) {
Iterator<Node> nodesIt = nodes.iterator();
while (nodesIt.hasNext()) {
Node node = nodesIt.next();
String rackId = node.getAttributes().get("rack_id").get(0);
if ("rack_one".equals(rackId) == false) {
nodesIt.remove();
}
}
}
}
});
//end::rest-client-init-allocation-aware-selector
}
{
//tag::rest-client-init-failure-listener
RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
@ -198,13 +236,6 @@ public class RestClientDocumentation {
request.setOptions(options);
//end::rest-client-options-customize-header
}
{
//tag::rest-client-options-customize-attribute
RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
options.setNodeSelector(new HasAttributeNodeSelector("rack", "c12")); // <1>
request.setOptions(options);
//end::rest-client-options-customize-attribute
}
}
{
HttpEntity[] documents = new HttpEntity[10];

@ -522,3 +522,85 @@ for (int i = 0; i < 5; i++) {
{"index":{}}
{"ip": "12.0.0.$i"}"""
}
// Used by SQL because it looks SQL-ish
buildRestTests.setups['library'] = '''
- do:
indices.create:
index: library
body:
settings:
number_of_shards: 1
number_of_replicas: 1
mappings:
book:
properties:
name:
type: text
fields:
keyword:
type: keyword
author:
type: text
fields:
keyword:
type: keyword
release_date:
type: date
page_count:
type: short
- do:
bulk:
index: library
type: book
refresh: true
body: |
{"index":{"_id": "Leviathan Wakes"}}
{"name": "Leviathan Wakes", "author": "James S.A. Corey", "release_date": "2011-06-02", "page_count": 561}
{"index":{"_id": "Hyperion"}}
{"name": "Hyperion", "author": "Dan Simmons", "release_date": "1989-05-26", "page_count": 482}
{"index":{"_id": "Dune"}}
{"name": "Dune", "author": "Frank Herbert", "release_date": "1965-06-01", "page_count": 604}
{"index":{"_id": "Dune Messiah"}}
{"name": "Dune Messiah", "author": "Frank Herbert", "release_date": "1969-10-15", "page_count": 331}
{"index":{"_id": "Children of Dune"}}
{"name": "Children of Dune", "author": "Frank Herbert", "release_date": "1976-04-21", "page_count": 408}
{"index":{"_id": "God Emperor of Dune"}}
{"name": "God Emperor of Dune", "author": "Frank Herbert", "release_date": "1981-05-28", "page_count": 454}
{"index":{"_id": "Consider Phlebas"}}
{"name": "Consider Phlebas", "author": "Iain M. Banks", "release_date": "1987-04-23", "page_count": 471}
{"index":{"_id": "Pandora's Star"}}
{"name": "Pandora's Star", "author": "Peter F. Hamilton", "release_date": "2004-03-02", "page_count": 768}
{"index":{"_id": "Revelation Space"}}
{"name": "Revelation Space", "author": "Alastair Reynolds", "release_date": "2000-03-15", "page_count": 585}
{"index":{"_id": "A Fire Upon the Deep"}}
{"name": "A Fire Upon the Deep", "author": "Vernor Vinge", "release_date": "1992-06-01", "page_count": 613}
{"index":{"_id": "Ender's Game"}}
{"name": "Ender's Game", "author": "Orson Scott Card", "release_date": "1985-06-01", "page_count": 324}
{"index":{"_id": "1984"}}
{"name": "1984", "author": "George Orwell", "release_date": "1985-06-01", "page_count": 328}
{"index":{"_id": "Fahrenheit 451"}}
{"name": "Fahrenheit 451", "author": "Ray Bradbury", "release_date": "1953-10-15", "page_count": 227}
{"index":{"_id": "Brave New World"}}
{"name": "Brave New World", "author": "Aldous Huxley", "release_date": "1932-06-01", "page_count": 268}
{"index":{"_id": "Foundation"}}
{"name": "Foundation", "author": "Isaac Asimov", "release_date": "1951-06-01", "page_count": 224}
{"index":{"_id": "The Giver"}}
{"name": "The Giver", "author": "Lois Lowry", "release_date": "1993-04-26", "page_count": 208}
{"index":{"_id": "Slaughterhouse-Five"}}
{"name": "Slaughterhouse-Five", "author": "Kurt Vonnegut", "release_date": "1969-06-01", "page_count": 275}
{"index":{"_id": "The Hitchhiker's Guide to the Galaxy"}}
{"name": "The Hitchhiker's Guide to the Galaxy", "author": "Douglas Adams", "release_date": "1979-10-12", "page_count": 180}
{"index":{"_id": "Snow Crash"}}
{"name": "Snow Crash", "author": "Neal Stephenson", "release_date": "1992-06-01", "page_count": 470}
{"index":{"_id": "Neuromancer"}}
{"name": "Neuromancer", "author": "William Gibson", "release_date": "1984-07-01", "page_count": 271}
{"index":{"_id": "The Handmaid's Tale"}}
{"name": "The Handmaid's Tale", "author": "Margaret Atwood", "release_date": "1985-06-01", "page_count": 311}
{"index":{"_id": "Starship Troopers"}}
{"name": "Starship Troopers", "author": "Robert A. Heinlein", "release_date": "1959-12-01", "page_count": 335}
{"index":{"_id": "The Left Hand of Darkness"}}
{"name": "The Left Hand of Darkness", "author": "Ursula K. Le Guin", "release_date": "1969-06-01", "page_count": 304}
{"index":{"_id": "The Moon is a Harsh Mistress"}}
{"name": "The Moon is a Harsh Mistress", "author": "Robert A. Heinlein", "release_date": "1966-04-01", "page_count": 288}
'''

@ -0,0 +1,86 @@
[[java-rest-high-get-field-mappings]]
=== Get Field Mappings API
[[java-rest-high-get-field-mappings-request]]
==== Get Field Mappings Request
A `GetFieldMappingsRequest` can have an optional list of indices, optional list of types and the list of fields:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-request]
--------------------------------------------------
<1> An empty request
<2> Setting the indices to fetch mapping for
<3> The types to be returned
<4> The fields to be returned
==== Optional arguments
The following arguments can also optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-request-indicesOptions]
--------------------------------------------------
<1> Setting `IndicesOptions` controls how unavailable indices are resolved and
how wildcard expressions are expanded
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-request-local]
--------------------------------------------------
<1> The `local` flag (defaults to `false`) controls whether the aliases need
to be looked up in the local cluster state or in the cluster state held by
the elected master node
[[java-rest-high-get-field-mappings-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-execute]
--------------------------------------------------
[[java-rest-high-get-field-mapping-async]]
==== Asynchronous Execution
The asynchronous execution of a get mappings request requires both the
`GetFieldMappingsRequest` instance and an `ActionListener` instance to be passed to
the asynchronous method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-execute-async]
--------------------------------------------------
<1> The `GetFieldMappingsRequest` to execute and the `ActionListener` to use when the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method if
the execution successfully completed or using the `onFailure` method if it
failed.
A typical listener for `GetMappingsResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-get-field-mapping-response]]
==== Get Field Mappings Response
The returned `GetFieldMappingsResponse` allows to retrieve information about the
executed operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[get-field-mapping-response]
--------------------------------------------------
<1> Returning all requested indices fields' mappings
<2> Retrieving the mappings for a particular index and type
<3> Getting the mappings metadata for the `message` field
<4> Getting the full name of the field
<5> Getting the mapping source of the field

@ -0,0 +1,90 @@
[[java-rest-high-ingest-simulate-pipeline]]
=== Simulate Pipeline API
[[java-rest-high-ingest-simulate-pipeline-request]]
==== Simulate Pipeline Request
A `SimulatePipelineRequest` requires a source and a `XContentType`. The source consists
of the request body. See the https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html[docs]
for more details on the request body.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request]
--------------------------------------------------
<1> The request body as a `ByteArray`.
<2> The XContentType for the request body supplied above.
==== Optional arguments
The following arguments can optionally be provided:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-pipeline-id]
--------------------------------------------------
<1> You can either specify an existing pipeline to execute against the provided documents, or supply a
pipeline definition in the body of the request. This option sets the id for an existing pipeline.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-verbose]
--------------------------------------------------
<1> To see the intermediate results of each processor in the simulate request, you can add the verbose parameter
to the request.
[[java-rest-high-ingest-simulate-pipeline-sync]]
==== Synchronous Execution
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute]
--------------------------------------------------
<1> Execute the request and get back the response in a `SimulatePipelineResponse` object.
[[java-rest-high-ingest-simulate-pipeline-async]]
==== Asynchronous Execution
The asynchronous execution of a simulate pipeline request requires both the `SimulatePipelineRequest`
instance and an `ActionListener` instance to be passed to the asynchronous
method:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-async]
--------------------------------------------------
<1> The `SimulatePipelineRequest` to execute and the `ActionListener` to use when
the execution completes
The asynchronous method does not block and returns immediately. Once it is
completed the `ActionListener` is called back using the `onResponse` method
if the execution successfully completed or using the `onFailure` method if
it failed.
A typical listener for `SimulatePipelineResponse` looks like:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-listener]
--------------------------------------------------
<1> Called when the execution is successfully completed. The response is
provided as an argument
<2> Called in case of failure. The raised exception is provided as an argument
[[java-rest-high-ingest-simulate-pipeline-response]]
==== Simulate Pipeline Response
The returned `SimulatePipelineResponse` allows to retrieve information about the executed
operation as follows:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-response]
--------------------------------------------------
<1> Get results for each of the documents provided as instance of `List<SimulateDocumentResult>`.
<2> If the request was in verbose mode cast the response to `SimulateDocumentVerboseResult`.
<3> Check the result after each processor is applied.
<4> Get the ingest document for the result obtained in 3.
<5> Or get the failure for the result obtained in 3.
<6> Get the result as `SimulateDocumentBaseResult` if the result was not verbose.
<7> Get the ingest document for the result obtained in 6.
<8> Or get the failure for the result obtained in 6.

@ -77,6 +77,7 @@ Index Management::
Mapping Management::
* <<java-rest-high-put-mapping>>
* <<java-rest-high-get-field-mappings>>
Alias Management::
* <<java-rest-high-update-aliases>>
@ -98,6 +99,7 @@ include::indices/force_merge.asciidoc[]
include::indices/rollover.asciidoc[]
include::indices/put_mapping.asciidoc[]
include::indices/get_mappings.asciidoc[]
include::indices/get_field_mappings.asciidoc[]
include::indices/update_aliases.asciidoc[]
include::indices/exists_alias.asciidoc[]
include::indices/get_alias.asciidoc[]
@ -123,10 +125,12 @@ The Java High Level REST Client supports the following Ingest APIs:
* <<java-rest-high-ingest-put-pipeline>>
* <<java-rest-high-ingest-get-pipeline>>
* <<java-rest-high-ingest-delete-pipeline>>
* <<java-rest-high-ingest-simulate-pipeline>>
include::ingest/put_pipeline.asciidoc[]
include::ingest/get_pipeline.asciidoc[]
include::ingest/delete_pipeline.asciidoc[]
include::ingest/simulate_pipeline.asciidoc[]
== Snapshot APIs

@ -99,3 +99,30 @@ http://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html[`netwo
to your
http://docs.oracle.com/javase/8/docs/technotes/guides/security/PolicyFiles.html[Java
security policy].
=== Node selector
The client sends each request to one of the configured nodes in round-robin
fashion. Nodes can optionally be filtered through a node selector that needs
to be provided when initializing the client. This is useful when sniffing is
enabled, in case only dedicated master nodes should be hit by HTTP requests.
For each request the client will run the eventually configured node selector
to filter the node candidates, then select the next one in the list out of the
remaining ones.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-allocation-aware-selector]
--------------------------------------------------
<1> Set an allocation aware node selector that allows to pick a node in the
local rack if any available, otherwise go to any other node in any rack. It
acts as a preference rather than a strict requirement, given that it goes to
another rack if none of the local nodes are available, rather than returning
no nodes in such case which would make the client forcibly revive a local node
whenever none of the nodes from the preferred rack is available.
WARNING: Node selectors that do not consistently select the same set of nodes
will make round-robin behaviour unpredictable and possibly unfair. The
preference example above is fine as it reasons about availability of nodes
which already affects the predictability of round-robin. Node selection should
not depend on other external factors or round-robin will not work properly.

@ -196,6 +196,16 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failur
<1> Set a listener that gets notified every time a node fails, in case actions
need to be taken. Used internally when sniffing on failure is enabled.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-node-selector]
--------------------------------------------------
<1> Set the node selector to be used to filter the nodes the client will send
requests to among the ones that are set to the client itself. This is useful
for instance to prevent sending requests to dedicated master nodes when
sniffing is enabled. By default the client sends requests to every configured
node.
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-request-config-callback]
@ -283,8 +293,7 @@ instance and share it between all requests:
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-singleton]
--------------------------------------------------
<1> Add any headers needed by all requests.
<2> Set a `NodeSelector`.
<3> Customize the response consumer.
<2> Customize the response consumer.
`addHeader` is for headers that are required for authorization or to work with
a proxy in front of Elasticsearch. There is no need to set the `Content-Type`
@ -315,15 +324,6 @@ adds an extra header:
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-header]
--------------------------------------------------
Or you can send requests to nodes with a particular attribute:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-options-customize-attribute]
--------------------------------------------------
<1> Replace the node selector with one that selects nodes on a particular rack.
==== Multiple parallel asynchronous actions
The client is quite happy to execute many actions in parallel. The following

@ -57,9 +57,9 @@ include::index-modules.asciidoc[]
include::ingest.asciidoc[]
include::{xes-repo-dir}/sql/index.asciidoc[]
include::sql/index.asciidoc[]
include::{xes-repo-dir}/monitoring/index.asciidoc[]
include::monitoring/index.asciidoc[]
include::{xes-repo-dir}/rollup/index.asciidoc[]

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-assistance]]
=== Migration Assistance API

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-deprecation]]
=== Deprecation Info APIs

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-upgrade]]
=== Migration Upgrade API

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api]]
== Migration APIs
@ -8,6 +9,6 @@ The migration APIs simplify upgrading {xpack} indices from one version to anothe
* <<migration-api-upgrade>>
* <<migration-api-deprecation>>
include::migration/assistance.asciidoc[]
include::migration/upgrade.asciidoc[]
include::migration/deprecation.asciidoc[]
include::apis/assistance.asciidoc[]
include::apis/upgrade.asciidoc[]
include::apis/deprecation.asciidoc[]

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[es-monitoring-collectors]]
== Collectors

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[es-monitoring-exporters]]
== Exporters

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[http-exporter]]
=== HTTP Exporters

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[es-monitoring]]
= Monitoring {es}

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[local-exporter]]
=== Local Exporters

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[pause-export]]
== Pausing Data Collection

@ -21,7 +21,7 @@ directly to configure and access {xpack} features.
include::info.asciidoc[]
include::{xes-repo-dir}/rest-api/graph/explore.asciidoc[]
include::{es-repo-dir}/licensing/index.asciidoc[]
include::{xes-repo-dir}/rest-api/migration.asciidoc[]
include::{es-repo-dir}/migration/migration.asciidoc[]
include::{xes-repo-dir}/rest-api/ml-api.asciidoc[]
include::{xes-repo-dir}/rest-api/rollup-api.asciidoc[]
include::{xes-repo-dir}/rest-api/security.asciidoc[]

@ -0,0 +1 @@
include::syntax-reserved.asciidoc[]

@ -1,5 +1,8 @@
[[sql-spec-reserved]]
=== Reserved Keywords
[role="xpack"]
[testenv="basic"]
[appendix]
[[sql-syntax-reserved]]
= Reserved Keywords
Table with reserved keywords that need to be quoted. Also provide an example to make it more obvious.

@ -0,0 +1,65 @@
[role="xpack"]
[testenv="basic"]
[[sql-concepts]]
== Conventions and Terminology
For clarity, it is important to establish the meaning behind certain words as, the same wording might convey different meanings to different readers depending on one's familiarity with SQL versus {es}.
NOTE: This documentation while trying to be complete, does assume the reader has _basic_ understanding of {es} and/or SQL. If that is not the case, please continue reading the documentation however take notes and pursue the topics that are unclear either through the main {es} documentation or through the plethora of SQL material available in the open (there are simply too many excellent resources here to enumerate).
As a general rule, {es-sql} as the name indicates provides a SQL interface to {es}. As such, it follows the SQL terminology and conventions first, whenever possible. However the backing engine itself is {es} for which {es-sql} was purposely created hence why features or concepts that are not available, or cannot be mapped correctly, in SQL appear
in {es-sql}.
Last but not least, {es-sql} tries to obey the https://en.wikipedia.org/wiki/Principle_of_least_astonishment[principle of least suprise], though as all things in the world, everything is relative.
=== Mapping concepts across SQL and {es}
While SQL and {es} have different terms for the way the data is organized (and different semantics), essentially their purpose is the same.
So let's start from the bottom; these roughly are:
[cols="1,1,5", options="header"]
|===
|SQL
|{es}
|Description
|`column`
|`field`
|In both cases, at the lowest level, data is stored in in _named_ entries, of a variety of <<sql-data-types, data types>>, containing _one_ value. SQL calls such an entry a _column_ while {es} a _field_.
Notice that in {es} a field can contain _multiple_ values of the same type (esentially a list) while in SQL, a _column_ can contain _exactly_ one value of said type.
{es-sql} will do its best to preserve the SQL semantic and, depending on the query, reject those that return fields with more than one value.
|`row`
|`document`
|++Column++s and ++field++s do _not_ exist by themselves; they are part of a `row` or a `document`. The two have slightly different semantics: a `row` tends to be _strict_ (and have more enforcements) while a `document` tends to be a bit more flexible or loose (while still having a structure).
|`table`
|`index`
|The target against which queries, whether in SQL or {es} get executed against.
|`schema`
|_implicit_
|In RDBMS, `schema` is mainly a namespace of tables and typically used as a security boundary. {es} does not provide an equivalent concept for it. However when security is enabled, {es} automatically applies the security enforcement so that a role sees only the data it is allowed to (in SQL jargon, its _schema_).
|`catalog` or `database`
|`cluster` instance
|In SQL, `catalog` or `database` are used interchangebly and represent a set of schemas that is, a number of tables.
In {es} the set of indices available are grouped in a `cluster`. The semantics also differ a bit; a `database` is essentially yet another namespace (which can have some implications on the way data is stored) while an {es} `cluster` is a runtime instance, or rather a set of at least one {es} instance (typically running distributed).
In practice this means that while in SQL one can potentially have multiple catalogs inside an instance, in {es} one is restricted to only _one_.
|`cluster`
|`cluster` (federated)
|Traditionally in SQL, _cluster_ refers to a single RDMBS instance which contains a number of ++catalog++s or ++database++s (see above). The same word can be reused inside {es} as well however its semantic clarified a bit.
While RDBMS tend to have only one running instance, on a single machine (_not_ distributed), {es} goes the opposite way and by default, is distributed and multi-instance.
Further more, an {es} `cluster` can be connected to other ++cluster++s in a _federated_ fashion thus `cluster` means:
single cluster::
Multiple {es} instances typically distributed across machines, running within the same namespace.
multiple clusters::
Multiple clusters, each with its own namespace, connected to each other in a federated setup (see <<modules-cross-cluster-search, Cross cluster Search>>).
|===
As one can see while the mapping between the concepts are not exactly one to one and the semantics somewhat different, there are more things in common than differences. In fact, thanks to SQL declarative nature, many concepts can move across {es} transparently and the terminology of the two likely to be used interchangebly through-out the rest of the material.

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[sql-cli]]
== SQL CLI
@ -37,18 +38,3 @@ James S.A. Corey |Leviathan Wakes |561 |1306972800000
--------------------------------------------------
// TODO it'd be lovely to be able to assert that this is correct but
// that is probably more work then it is worth right now.
[[sql-cli-permissions]]
[NOTE]
===============================
If you are using Security you need to add a few permissions to
users so they can run SQL. To run SQL using the CLI a user needs
`read`, `indices:admin/get`, and `cluster:monitor/main`. The
following example configures a role that can run SQL in the CLI
for the `test` and `bort` indices:
["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[cli_jdbc]
--------------------------------------------------
===============================

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="platinum"]
[[sql-jdbc]]
== SQL JDBC
@ -36,11 +37,11 @@ from `artifacts.elastic.co/maven` by adding it to the repositories list:
[float]
=== Setup
The driver main class is `org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcDriver`. Note the driver
also implements the JDBC 4.0 +Service Provider+ mechanism meaning it is registerd automatically
The driver main class is `org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcDriver`.
Note the driver implements the JDBC 4.0 +Service Provider+ mechanism meaning it is registerd automatically
as long as its available in the classpath.
Once registered, the driver expects the following syntax as an URL:
Once registered, the driver understands the following syntax as an URL:
["source","text",subs="attributes"]
----
@ -120,12 +121,12 @@ Query timeout (in seconds). That is the maximum amount of time waiting for a que
To put all of it together, the following URL:
["source","text",subs="attributes"]
["source","text"]
----
jdbc:es://http://server:3456/timezone=UTC&page.size=250
----
Opens up a {es-jdbc} connection to `server` on port `3456`, setting the JDBC timezone to `UTC` and its pagesize to `250` entries.
Opens up a {es-sql} connection to `server` on port `3456`, setting the JDBC connection timezone to `UTC` and its pagesize to `250` entries.
=== API usage
@ -175,20 +176,4 @@ connection. For example:
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{jdbc-tests}/SimpleExampleTestCase.java[simple_example]
--------------------------------------------------
[[sql-jdbc-permissions]]
[NOTE]
===============================
If you are using Security you need to add a few permissions to
users so they can run SQL. To run SQL a user needs `read` and
`indices:admin/get`. Some parts of the API require
`cluster:monitor/main`. The following example configures a
role that can run SQL in JDBC querying the `test` and `bort`
indices:
["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[cli_jdbc]
--------------------------------------------------
===============================
--------------------------------------------------

@ -1,3 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[sql-rest]]
== SQL REST API
@ -186,17 +188,3 @@ or fewer results though. `time_zone` is the time zone to use for date
functions and date parsing. `time_zone` defaults to `utc` and can take
any values documented
http://www.joda.org/joda-time/apidocs/org/joda/time/DateTimeZone.html[here].
[[sql-rest-permissions]]
[NOTE]
===============================
If you are using Security you need to add a few permissions to
users so they can run SQL. To run SQL a user needs `read` and
`indices:admin/get`. The following example configures a role
that can run SQL against the `test` and `bort` indices:
["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[rest]
--------------------------------------------------
===============================

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[sql-translate]]
== SQL Translate API
@ -57,18 +58,3 @@ the normal <<search-request-body,search>> API.
The request body accepts all of the <<sql-rest-fields,fields>> that
the <<sql-rest,SQL REST API>> accepts except `cursor`.
[[sql-translate-permissions]]
[NOTE]
===============================
If you are using Security you need to add a few permissions to
users so they can run translate SQL. To translate SQL a user
needs `read` and `indices:admin/get`. The following example
configures a role that can run SQL against the `test` and
`bort` indices:
["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[rest]
--------------------------------------------------
===============================

@ -1,3 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[sql-functions]]
== Functions and Operators
@ -348,6 +350,25 @@ include-tagged::{sql-specs}/datetime.csv-spec[minuteOfHour]
include-tagged::{sql-specs}/datetime.csv-spec[secondOfMinute]
--------------------------------------------------
* Extract
As an alternative, one can support `EXTRACT` to extract fields from datetimes.
You can run any <<sql-functions-datetime,datetime function>>
with `EXTRACT(<datetime_function> FROM <expression>)`. So
["source","sql",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-specs}/datetime.csv-spec[extractDayOfYear]
--------------------------------------------------
is the equivalent to
["source","sql",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-specs}/datetime.csv-spec[dayOfYear]
--------------------------------------------------
[[sql-functions-aggregate]]
=== Aggregate Functions

@ -1,3 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[sql-getting-started]]
== Getting Started with SQL

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[xpack-sql]]
= SQL Access
@ -19,7 +20,11 @@ indices and return results in tabular format.
<<sql-overview, Overview>>::
Overview of {es-sql} and its features.
<<sql-getting-started, Getting Started>>::
Start using SQL right away in {es}
Start using SQL right away in {es}.
<<sql-concepts, Concepts and Terminology>>::
Language conventions across SQL and {es}.
<<sql-security,Security>>::
Securing {es-sql} and {es}.
<<sql-rest,REST API>>::
Accepts SQL in a JSON document, executes it, and returns the
results.
@ -31,18 +36,20 @@ indices and return results in tabular format.
SQL and print tabular results.
<<sql-jdbc,JDBC>>::
A JDBC driver for {es}.
<<sql-spec,SQL Language>>::
Overview of the {es-sql} language, such as supported data types, commands and
syntax.
<<sql-functions,Functions and Operators>>::
List of functions and operators supported.
<<sql-spec,SQL Language>>::
Overview of the {es-sql} language, such as data types, syntax and
reserved keywords.
--
include::overview.asciidoc[]
include::getting-started.asciidoc[]
include::concepts.asciidoc[]
include::security.asciidoc[]
include::endpoints/index.asciidoc[]
include::functions/index.asciidoc[]
include::language/index.asciidoc[]
include::functions/index.asciidoc[]
include::appendix/index.asciidoc[]
:jdbc-tests!:

@ -1,5 +1,7 @@
[role="xpack"]
[testenv="basic"]
[[sql-data-types]]
=== Data Types
== Data Types
Most of {es} <<mapping-types, data types>> are available in {es-sql}, as indicated below:
@ -42,7 +44,7 @@ uses the data type _particularities_ of the former over the latter as ultimately
[[sql-multi-field]]
[float]
==== SQL and multi-fields
=== SQL and multi-fields
A core concept in {es} is that of an `analyzed` field, that is a full-text value that is interpreted in order
to be effectively indexed. These fields are of type <<text, `text`>> and are not used for sorting or aggregations as their actual value depends on the <<analyzer, `analyzer`>> used hence why {es} also offers the <<keyword, `keyword`>> type for storing the _exact_

@ -0,0 +1,12 @@
[role="xpack"]
[testenv="basic"]
[[sql-spec]]
== SQL Language
This chapter describes the SQL semantics supported in X-Pack namely:
<<sql-data-types>>:: Data types
<<sql-commands>>:: Commands
include::data-types.asciidoc[]
include::syntax/index.asciidoc[]

@ -0,0 +1,22 @@
[role="xpack"]
[testenv="basic"]
[[sql-syntax-describe-table]]
=== DESCRIBE TABLE
.Synopsis
[source, sql]
----
DESCRIBE table
----
or
[source, sql]
----
DESC table
----
.Description
`DESC` and `DESCRIBE` are aliases to <<sql-syntax-show-columns>>.

@ -0,0 +1,18 @@
[role="xpack"]
[testenv="basic"]
[[sql-commands]]
== SQL Commands
This section contains the list of SQL commands supported by {es-sql} along with their syntax:
<<sql-syntax-describe-table>>:: Describe a table.
<<sql-syntax-select>>:: Retrieve rows from zero or more tables.
<<sql-syntax-show-columns>>:: List columns in table.
<<sql-syntax-show-functions>>:: List supported functions.
<<sql-syntax-show-tables>>:: List tables available.
include::describe-table.asciidoc[]
include::select.asciidoc[]
include::show-columns.asciidoc[]
include::show-functions.asciidoc[]
include::show-tables.asciidoc[]

@ -0,0 +1,286 @@
[role="xpack"]
[testenv="basic"]
[[sql-syntax-select]]
=== SELECT
.Synopsis
[source, sql]
----
SELECT select_expr [, ...]
[ FROM table_name ]
[ WHERE condition ]
[ GROUP BY grouping_element [, ...] ]
[ HAVING condition]
[ ORDER BY expression [ ASC | DESC ] [, ...] ]
[ LIMIT [ count ] ]
----
.Description
Retrieves rows from zero or more tables.
The general execution of `SELECT` is as follows:
. All elements in the `FROM` list are computed (each element can be base or alias table). Currently `FROM` supports exactly one table. Do note however that the table name can be a pattern (see <<sql-syntax-from, FROM Clause>> below).
. If the `WHERE` clause is specified, all rows that do not satisfy the condition are eliminated from the output. (See <<sql-syntax-where, WHERE Clause>> below.)
. If the `GROUP BY` clause is specified, or if there are aggregate function calls, the output is combined into groups of rows that match on one or more values, and the results of aggregate functions are computed. If the `HAVING` clause is present, it eliminates groups that do not satisfy the given condition. (See <<sql-syntax-group-by, GROUP BY Clause>> and <<sql-syntax-having, HAVING Clause>> below.)
. The actual output rows are computed using the `SELECT` output expressions for each selected row or row group.
. If the `ORDER BY` clause is specified, the returned rows are sorted in the specified order. If `ORDER BY` is not given, the rows are returned in whatever order the system finds fastest to produce. (See <<sql-syntax-order-by,ORDER BY Clause>> below.)
. If the `LIMIT` is specified, the `SELECT` statement only returns a subset of the result rows. (See <<sql-syntax-limit, LIMIT Clause>> below.)
[[sql-syntax-select-list]]
==== `SELECT` List
`SELECT` list, namely the expressions between `SELECT` and `FROM`, represent the output rows of the `SELECT` statement.
As with a table, every output column of a `SELECT` has a name which can be either specified per column through the `AS` keyword :
[source,sql]
----
SELECT column AS c
----
assigned by {es-sql} if no name is given:
[source,sql]
----
SELECT 1 + 1
----
or if it's a simple column reference, use its name as the column name:
[source,sql]
----
SELECT col FROM table
----
[[sql-syntax-select-wildcard]]
==== Wildcard
To select all the columns in the source, one can use `*`:
["source","sql",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-specs}/select.sql-spec[wildcardWithOrder]
--------------------------------------------------
which essentially returns all columsn found.
[[sql-syntax-from]]
[float]
==== FROM Clause
The `FROM` clause specifies one table for the `SELECT` and has the following syntax:
[source, sql]
----
FROM table_name [ [ AS ] alias ]
----
where:
`table_name`::
Represents the name (optionally qualified) of an existing table, either a concrete or base one (actual index) or alias.
If the table name contains special SQL characters (such as `.`,`-`,etc...) use double quotes to escape them:
[source, sql]
----
SELECT ... FROM "some-table"
----
The name can be a <<multi-index, pattern>> pointing to multiple indices (likely requiring quoting as mentioned above) with the restriction that *all* resolved concrete tables have **exact mapping**.
`alias`::
A substitute name for the `FROM` item containing the alias. An alias is used for brevity or to eliminate ambiguity. When an alias is provided, it completely hides the actual name of the table and must be used in its place.
[[sql-syntax-where]]
[float]
==== WHERE Clause
The optional `WHERE` clause is used to filter rows from the query and has the following syntax:
[source, sql]
----
WHERE condition
----
where:
`condition`::
Represents an expression that evaluates to a `boolean`. Only the rows that match the condition (to `true`) are returned.
[[sql-syntax-group-by]]
[float]
==== GROUP BY
The `GROUP BY` clause is used to divide the results into groups of rows on matching values from the designated columns. It has the following syntax:
[source, sql]
----
GROUP BY grouping_element [, ...]
----
where:
`grouping_element`::
Represents an expression on which rows are being grouped _on_. It can be a column name, name or ordinal number of a column or an arbitrary expression of column values.
When a `GROUP BY` clause is used in a `SELECT`, _all_ output expressions must be either aggregate functions or expressions used for grouping or derivates of (otherwise there would be more than one possible value to return for each ungrouped column).
[[sql-syntax-having]]
[float]
==== HAVING
The `HAVING` clause can be used _only_ along aggregate functions (and thus `GROUP BY`) to filter what groups are kept or not and has the following syntax:
[source, sql]
----
GROUP BY condition
----
where:
`condition`::
Represents an expression that evaluates to a `boolean`. Only groups that match the condition (to `true`) are returned.
Both `WHERE` and `HAVING` are used for filtering however there are several differences between them:
. `WHERE` works on individual *rows*, `HAVING` works on the *groups* created by ``GROUP BY``
. `WHERE` is evaluated *before* grouping, `HAVING` is evaluated *after* grouping
Note that it is possible to have a `HAVING` clause without a ``GROUP BY``. In this case, an __implicit grouping__ is applied, meaning all selected rows are considered to form a single group and `HAVING` can be applied on any of the aggregate functions specified on this group. `
As such a query emits only a single row (as there is only a single group), `HAVING` condition returns either one row (the group) or zero if the condition fails.
[[sql-syntax-order-by]]
[float]
==== ORDER BY
The `ORDER BY` clause is used to sort the results of `SELECT` by one or more expressions:
[source, sql]
----
ORDER BY expression [ ASC | DESC ] [, ...]
----
where:
`expression`::
Represents an input column, an output column or an ordinal number of the position (starting from one) of an output column. Additionally, ordering can be done based on the results _score_ `
The direction, if not specified, is by default `ASC` (ascending). `
Regardless of the ordering specified, null values are ordered last (at the end).
IMPORTANT: When used along-side, `GROUP BY` expression can point _only_ to the columns used for grouping.
For example, the following query sorts by an arbitrary input field (`page_count`):
[source,js]
--------------------------------------------------
POST /_xpack/sql?format=txt
{
"query": "SELECT * FROM library ORDER BY page_count DESC LIMIT 5"
}
--------------------------------------------------
// CONSOLE
// TEST[setup:library]
which results in something like:
[source,text]
--------------------------------------------------
author | name | page_count | release_date
-----------------+--------------------+---------------+------------------------
Peter F. Hamilton|Pandora's Star |768 |2004-03-02T00:00:00.000Z
Vernor Vinge |A Fire Upon the Deep|613 |1992-06-01T00:00:00.000Z
Frank Herbert |Dune |604 |1965-06-01T00:00:00.000Z
Alastair Reynolds|Revelation Space |585 |2000-03-15T00:00:00.000Z
James S.A. Corey |Leviathan Wakes |561 |2011-06-02T00:00:00.000Z
--------------------------------------------------
// TESTRESPONSE[s/\|/\\|/ s/\+/\\+/]
// TESTRESPONSE[_cat]
[[sql-syntax-order-by-score]]
==== Order By Score
When doing full-text queries in the `WHERE` clause, results can be returned based on their
{defguide}/relevance-intro.html[score] or _relevance_ to the given query.
NOTE: When doing multiple text queries in the `WHERE` clause then, their scores will be
combined using the same rules as {es}'s
<<query-dsl-bool-query,bool query>>.
To sort based on the `score`, use the special function `SCORE()`:
[source,js]
--------------------------------------------------
POST /_xpack/sql?format=txt
{
"query": "SELECT SCORE(), * FROM library WHERE match(name, 'dune') ORDER BY SCORE() DESC"
}
--------------------------------------------------
// CONSOLE
// TEST[setup:library]
Which results in something like:
[source,text]
--------------------------------------------------
SCORE() | author | name | page_count | release_date
---------------+---------------+-------------------+---------------+------------------------
2.288635 |Frank Herbert |Dune |604 |1965-06-01T00:00:00.000Z
1.8893257 |Frank Herbert |Dune Messiah |331 |1969-10-15T00:00:00.000Z
1.6086555 |Frank Herbert |Children of Dune |408 |1976-04-21T00:00:00.000Z
1.4005898 |Frank Herbert |God Emperor of Dune|454 |1981-05-28T00:00:00.000Z
--------------------------------------------------
// TESTRESPONSE[s/\|/\\|/ s/\+/\\+/ s/\(/\\\(/ s/\)/\\\)/]
// TESTRESPONSE[_cat]
Note that you can return `SCORE()` by adding it to the where clause. This
is possible even if you are not sorting by `SCORE()`:
[source,js]
--------------------------------------------------
POST /_xpack/sql?format=txt
{
"query": "SELECT SCORE(), * FROM library WHERE match(name, 'dune') ORDER BY page_count DESC"
}
--------------------------------------------------
// CONSOLE
// TEST[setup:library]
[source,text]
--------------------------------------------------
SCORE() | author | name | page_count | release_date
---------------+---------------+-------------------+---------------+------------------------
2.288635 |Frank Herbert |Dune |604 |1965-06-01T00:00:00.000Z
1.4005898 |Frank Herbert |God Emperor of Dune|454 |1981-05-28T00:00:00.000Z
1.6086555 |Frank Herbert |Children of Dune |408 |1976-04-21T00:00:00.000Z
1.8893257 |Frank Herbert |Dune Messiah |331 |1969-10-15T00:00:00.000Z
--------------------------------------------------
// TESTRESPONSE[s/\|/\\|/ s/\+/\\+/ s/\(/\\\(/ s/\)/\\\)/]
// TESTRESPONSE[_cat]
NOTE:
Trying to return `score` from a non full-text queries will return the same value for all results, as
all are equilley relevant.
[[sql-syntax-limit]]
[float]
==== LIMIT
The `LIMIT` clause restricts (limits) the number of rows returns using the format:
[source, sql]
----
LIMIT ( count | ALL )
----
where
count:: is a positive integer or zero indicating the maximum *possible* number of results being returned (as there might be less matches than the limit). If `0` is specified, no results are returned.
ALL:: indicates there is no limit and thus all results are being returned.

@ -0,0 +1,14 @@
[role="xpack"]
[testenv="basic"]
[[sql-syntax-show-columns]]
=== SHOW COLUMNS
.Synopsis
[source, sql]
----
SHOW COLUMNS [ FROM | IN ] ? table
----
.Description
List the columns in table and their data type (and other attributes).

@ -0,0 +1,16 @@
[role="xpack"]
[testenv="basic"]
[[sql-syntax-show-functions]]
=== SHOW FUNCTIONS
.Synopsis
[source, sql]
----
SHOW FUNCTIONS [ LIKE? pattern<1>? ]?
----
<1> SQL match pattern
.Description
List all the SQL functions and their type. The `LIKE` clause can be used to restrict the list of names to the given pattern.

@ -0,0 +1,16 @@
[role="xpack"]
[testenv="basic"]
[[sql-syntax-show-tables]]
=== SHOW TABLES
.Synopsis
[source, sql]
----
SHOW TABLES [ LIKE? pattern<1>? ]?
----
<1> SQL match pattern
.Description
List the tables available to the current user and their type. The `LIKE` clause can be used to restrict the list of names to the given pattern.

@ -1,9 +1,12 @@
[role="xpack"]
[testenv="basic"]
[[sql-overview]]
== Overview
{es-sql} aims to provide a powerful yet lightweight SQL interface to {es}.
[[sql-introduction]]
[float]
=== Introduction
{es-sql} is an X-Pack component that allows SQL-like queries to be executed in real-time against {es}.
@ -12,6 +15,7 @@ _natively_ inside {es}.
One can think of {es-sql} as a _translator_, one that understands both SQL and {es} and makes it easy to read and process data in real-time, at scale by leveraging {es} capabilities.
[[sql-why]]
[float]
=== Why {es-sql} ?
Native integration::

@ -0,0 +1,39 @@
[role="xpack"]
[testenv="basic"]
[[sql-security]]
== Security
{es-sql} integrates with security, if this is enabled on your cluster.
In such a scenario, {es-sql} supports both security at the transport layer (by encrypting the communication between the consumer and the server) and authentication (for the access layer).
[float]
==== SSL/TLS configuration
In case of an encrypted transport, the SSL/TLS support needs to be enabled in {es-sql} to properly establish communication with {es}. This is done by setting the `ssl` property to `true` or by using the `https` prefix in the URL. +
Depending on your SSL configuration (whether the certificates are signed by a CA or not, whether they are global at JVM level or just local to one application), might require setting up the `keystore` and/or `truststore`, that is where the _credentials_ are stored (`keystore` - which typically stores private keys and certificates) and how to _verify_ them (`truststore` - which typically stores certificates from third party also known as CA - certificate authorities). +
Typically (and again, do note that your environment might differ significantly), if the SSL setup for {es-sql} is not already done at the JVM level, one needs to setup the keystore if the {es-sql} security requires client authentication (PKI - Public Key Infrastructure), and setup `truststore` if SSL is enabled.
[float]
==== Authentication
The authentication support in {es-sql} is of two types:
Username/Password:: Set these through `user` and `password` properties.
PKI/X.509:: Use X.509 certificates to authenticate {es-sql} to {es}. For this, one would need to setup the `keystore` containing the private key and certificate to the appropriate user (configured in {es}) and the `truststore` with the CA certificate used to sign the SSL/TLS certificates in the {es} cluster. That is, one should setup the key to authenticate {es-sql} and also to verify that is the right one. To do so, one should set the `ssl.keystore.location` and `ssl.truststore.location` properties to indicate the `keystore` and `truststore` to use. It is recommended to have these secured through a password in which case `ssl.keystore.pass` and `ssl.truststore.pass` properties are required.
[float]
[[sql-security-permissions]]
==== Permissions (server-side)
Lastly, one the server one need to add a few permissions to
users so they can run SQL. To run SQL a user needs `read` and
`indices:admin/get` permissions at minimum while some parts of
the API require `cluster:monitor/main`.
The following example configures a role that can run SQL in JDBC querying the `test` and `bort`
indices:
["source","yaml",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{sql-tests}/security/roles.yml[cli_jdbc]
--------------------------------------------------

@ -91,8 +91,9 @@ public class DocsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
final RestClient restClient,
final List<HttpHost> hosts,
final Version esVersion,
final Version masterVersion) throws IOException {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion);
final Version masterVersion) {
return new ClientYamlDocsTestClient(restSpec, restClient, hosts, esVersion, masterVersion,
restClientBuilder -> configureClient(restClientBuilder, restClientSettings()));
}
/**

@ -21,6 +21,7 @@ package org.elasticsearch.nio;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -99,6 +100,11 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
return channel;
} catch (UncheckedIOException e) {
// This can happen if getRemoteAddress throws IOException.
IOException cause = e.getCause();
closeRawChannel(rawChannel, cause);
throw cause;
} catch (Exception e) {
closeRawChannel(rawChannel, e);
throw e;

@ -19,7 +19,6 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.NetworkChannel;
import java.util.function.BiConsumer;
@ -32,20 +31,10 @@ import java.util.function.BiConsumer;
*/
public abstract class NioChannel {
private final InetSocketAddress localAddress;
NioChannel(NetworkChannel socketChannel) throws IOException {
this.localAddress = (InetSocketAddress) socketChannel.getLocalAddress();
}
public boolean isOpen() {
return getContext().isOpen();
}
public InetSocketAddress getLocalAddress() {
return localAddress;
}
/**
* Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee
* about the order in which close listeners will be executed. If the channel is already closed, the
@ -64,6 +53,8 @@ public abstract class NioChannel {
getContext().closeChannel();
}
public abstract InetSocketAddress getLocalAddress();
public abstract NetworkChannel getRawChannel();
public abstract ChannelContext<?> getContext();

@ -19,19 +19,20 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
public class NioServerSocketChannel extends NioChannel {
private final ServerSocketChannel socketChannel;
private final ServerSocketChannel serverSocketChannel;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private volatile InetSocketAddress localAddress;
private ServerChannelContext context;
public NioServerSocketChannel(ServerSocketChannel socketChannel) throws IOException {
super(socketChannel);
this.socketChannel = socketChannel;
public NioServerSocketChannel(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
attemptToSetLocalAddress();
}
/**
@ -48,9 +49,15 @@ public class NioServerSocketChannel extends NioChannel {
}
}
@Override
public InetSocketAddress getLocalAddress() {
attemptToSetLocalAddress();
return localAddress;
}
@Override
public ServerSocketChannel getRawChannel() {
return socketChannel;
return serverSocketChannel;
}
@Override
@ -64,4 +71,10 @@ public class NioServerSocketChannel extends NioChannel {
"localAddress=" + getLocalAddress() +
'}';
}
private void attemptToSetLocalAddress() {
if (localAddress == null) {
localAddress = (InetSocketAddress) serverSocketChannel.socket().getLocalSocketAddress();
}
}
}

@ -20,6 +20,7 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
@ -27,15 +28,19 @@ import java.util.function.BiConsumer;
public class NioSocketChannel extends NioChannel {
private final InetSocketAddress remoteAddress;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private final SocketChannel socketChannel;
private final InetSocketAddress remoteAddress;
private volatile InetSocketAddress localAddress;
private SocketChannelContext context;
public NioSocketChannel(SocketChannel socketChannel) throws IOException {
super(socketChannel);
public NioSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
try {
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void setContext(SocketChannelContext context) {
@ -46,6 +51,14 @@ public class NioSocketChannel extends NioChannel {
}
}
@Override
public InetSocketAddress getLocalAddress() {
if (localAddress == null) {
localAddress = (InetSocketAddress) socketChannel.socket().getLocalSocketAddress();
}
return localAddress;
}
@Override
public SocketChannel getRawChannel() {
return socketChannel;

@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
@ -69,7 +70,9 @@ public class EventHandlerTests extends ESTestCase {
channel.setContext(context);
handler.handleRegistration(context);
NioServerSocketChannel serverChannel = new NioServerSocketChannel(mock(ServerSocketChannel.class));
ServerSocketChannel serverSocketChannel = mock(ServerSocketChannel.class);
when(serverSocketChannel.socket()).thenReturn(mock(ServerSocket.class));
NioServerSocketChannel serverChannel = new NioServerSocketChannel(serverSocketChannel);
serverContext = new DoNotRegisterServerContext(serverChannel, mock(NioSelector.class), mock(Consumer.class));
serverChannel.setContext(serverContext);

@ -41,7 +41,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -114,13 +114,12 @@ public class GrokProcessorGetAction extends Action<GrokProcessorGetAction.Respon
public static class TransportAction extends HandledTransportAction<Request, Response> {
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
public TransportAction(Settings settings, TransportService transportService, ActionFilters actionFilters) {
super(settings, NAME, transportService, actionFilters, Request::new);
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
try {
listener.onResponse(new Response(GROK_PATTERNS));
} catch (Exception e) {

@ -1 +0,0 @@
a57659a275921d8ab3f7ec580e9bf713ce6143b1

@ -0,0 +1 @@
9f0a326f7ec1671ffb07f95b27f1a5812b7dc1c3

@ -23,14 +23,14 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
@ -42,20 +42,20 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
private final ScriptService scriptService;
private final NamedXContentRegistry xContentRegistry;
private final TransportMultiSearchAction multiSearchAction;
private final NodeClient client;
@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
public TransportMultiSearchTemplateAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, MultiSearchTemplateAction.NAME, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;
this.client = client;
}
@Override
protected void doExecute(MultiSearchTemplateRequest request, ActionListener<MultiSearchTemplateResponse> listener) {
protected void doExecute(Task task, MultiSearchTemplateRequest request, ActionListener<MultiSearchTemplateResponse> listener) {
List<Integer> originalSlots = new ArrayList<>();
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
multiSearchRequest.indicesOptions(request.indicesOptions());
@ -81,7 +81,7 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
}
}
multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> {
for (int i = 0; i < r.getResponses().length; i++) {
MultiSearchResponse.Item item = r.getResponses()[i];
int originalSlot = originalSlots.get(i);

@ -22,9 +22,9 @@ package org.elasticsearch.script.mustache;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -38,7 +38,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -50,29 +50,26 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
private static final String TEMPLATE_LANG = MustacheScriptEngine.NAME;
private final ScriptService scriptService;
private final TransportSearchAction searchAction;
private final NamedXContentRegistry xContentRegistry;
private final NodeClient client;
@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
public TransportSearchTemplateAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
ScriptService scriptService, NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, SearchTemplateAction.NAME, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
@Override
protected void doExecute(SearchTemplateRequest request, ActionListener<SearchTemplateResponse> listener) {
protected void doExecute(Task task, SearchTemplateRequest request, ActionListener<SearchTemplateResponse> listener) {
final SearchTemplateResponse response = new SearchTemplateResponse();
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {

@ -48,7 +48,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -280,13 +280,13 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
private final ScriptService scriptService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
public TransportAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService) {
super(settings, NAME, threadPool, transportService, actionFilters, Request::new);
super(settings, NAME, transportService, actionFilters, Request::new);
this.scriptService = scriptService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
switch (request.context) {
case PAINLESS_TEST:
PainlessTestScript.Factory factory = scriptService.compile(request.script, PainlessTestScript.CONTEXT);

@ -40,7 +40,7 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -73,10 +73,10 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
private final NamedXContentRegistry namedXContentRegistry;
@Inject
public TransportRankEvalAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
public TransportRankEvalAction(Settings settings, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService,
NamedXContentRegistry namedXContentRegistry) {
super(settings, RankEvalAction.NAME, threadPool, transportService, actionFilters,
super(settings, RankEvalAction.NAME, transportService, actionFilters,
(Writeable.Reader<RankEvalRequest>) RankEvalRequest::new);
this.scriptService = scriptService;
this.namedXContentRegistry = namedXContentRegistry;
@ -84,7 +84,7 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
}
@Override
protected void doExecute(RankEvalRequest request, ActionListener<RankEvalResponse> listener) {
protected void doExecute(Task task, RankEvalRequest request, ActionListener<RankEvalResponse> listener) {
RankEvalSpec evaluationSpecification = request.getRankEvalSpec();
EvaluationMetric metric = evaluationSpecification.getMetric();

@ -19,8 +19,6 @@
package org.elasticsearch.index.reindex;
import java.util.function.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -35,7 +33,11 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.function.Supplier;
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {
private final ThreadPool threadPool;
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
@ -43,8 +45,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
@Inject
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters,
super(settings, DeleteByQueryAction.NAME, transportService, actionFilters,
(Supplier<DeleteByQueryRequest>) DeleteByQueryRequest::new);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
@ -64,9 +67,4 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
}
);
}
@Override
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
}

@ -92,6 +92,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
public static final Setting<List<String>> REMOTE_CLUSTER_WHITELIST =
Setting.listSetting("reindex.remote.whitelist", emptyList(), Function.identity(), Property.NodeScope);
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final ScriptService scriptService;
private final AutoCreateIndex autoCreateIndex;
@ -103,8 +104,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService) {
super(settings, ReindexAction.NAME, threadPool, transportService, actionFilters,
ReindexRequest::new);
super(settings, ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.scriptService = scriptService;
this.autoCreateIndex = autoCreateIndex;
@ -133,11 +134,6 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
);
}
@Override
protected void doExecute(ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
if (remoteInfo == null) {
return;

@ -43,9 +43,9 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
private final Client client;
@Inject
public TransportRethrottleAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
public TransportRethrottleAction(Settings settings, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, Client client) {
super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters,
super(settings, RethrottleAction.NAME, clusterService, transportService, actionFilters,
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
this.client = client;
}

@ -46,6 +46,8 @@ import java.util.function.BiFunction;
import java.util.function.Supplier;
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkByScrollResponse> {
private final ThreadPool threadPool;
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
@ -53,8 +55,9 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
@Inject
public TransportUpdateByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, Client client,
TransportService transportService, ScriptService scriptService, ClusterService clusterService) {
super(settings, UpdateByQueryAction.NAME, threadPool, transportService, actionFilters,
super(settings, UpdateByQueryAction.NAME, transportService, actionFilters,
(Supplier<UpdateByQueryRequest>) UpdateByQueryRequest::new);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
@ -75,11 +78,6 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
);
}
@Override
protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
/**
* Simple implementation of update-by-query using scrolling and bulk.
*/

@ -25,7 +25,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -743,9 +742,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(
Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
if (false == expectedHeaders.equals(threadPool().getThreadContext().getHeaders())) {
listener.onFailure(
new RuntimeException("Expected " + expectedHeaders + " but got " + threadPool().getThreadContext().getHeaders()));

@ -29,8 +29,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;
import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;
@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
try {
@ -77,12 +75,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
serverTransport.onException(channel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
serverTransport.onException(channel, (Exception) cause);
}
}
}

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
public class Netty4HttpServerChannel implements HttpServerChannel {
private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() {
channel.close();
}
@Override
public String toString() {
return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}";
}
}

@ -23,6 +23,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -42,22 +43,19 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
@ -65,14 +63,9 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final int pipeliningMaxEvents;
private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
private final ByteSizeValue tcpSendBufferSize;
private final ByteSizeValue tcpReceiveBufferSize;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;
@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected volatile ServerBootstrap serverBootstrap;
protected final List<Channel> serverChannels = new ArrayList<>();
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@ -184,11 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
@ -217,6 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
@ -238,10 +219,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
bindServer();
success = true;
} finally {
if (success == false) {
@ -284,78 +262,29 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
serverChannels.add(future.channel());
boundSocket.set((InetSocketAddress) future.channel().localAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
ChannelFuture future = serverBootstrap.bind(socketAddress).sync();
Channel channel = future.channel();
Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel);
channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel);
return httpServerChannel;
}
@Override
protected void doStop() {
synchronized (serverChannels) {
if (!serverChannels.isEmpty()) {
try {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
}
}
@Override
protected void doClose() {
}
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Http read timeout {}", channel);
}
CloseableChannel.closeChannel(channel);;
CloseableChannel.closeChannel(channel);
} else {
super.onException(channel, cause);
}
@ -366,6 +295,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel");
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
@ -413,4 +343,24 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
}
@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
private final Netty4HttpServerTransport transport;
private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) {
this.transport = transport;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
transport.onServerException(httpServerChannel, new Exception(cause));
} else {
transport.onServerException(httpServerChannel, (Exception) cause);
}
}
}
}

@ -24,6 +24,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.Transports;
@ -36,11 +38,9 @@ import org.elasticsearch.transport.Transports;
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Netty4Transport transport;
private final String profileName;
Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
Netty4MessageChannelHandler(Netty4Transport transport) {
this.transport = transport;
this.profileName = profileName;
}
@Override
@ -58,7 +58,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
Attribute<NettyTcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.messageReceived(reference, channelAttribute.get());
} finally {
// Set the expected position of the buffer, no matter what happened
@ -69,7 +69,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
transport.exceptionCaught(ctx, cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
if (newCause instanceof Error) {
transport.onException(tcpChannel, new Exception(newCause));
} else {
transport.onException(tcpChannel, (Exception) newCause);
}
}
}

@ -30,13 +30,13 @@ import org.elasticsearch.transport.TransportException;
import java.net.InetSocketAddress;
public class NettyTcpChannel implements TcpChannel {
public class Netty4TcpChannel implements TcpChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
NettyTcpChannel(Channel channel, String profile) {
Netty4TcpChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
@ -118,7 +118,7 @@ public class NettyTcpChannel implements TcpChannel {
@Override
public String toString() {
return "NettyTcpChannel{" +
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpServerChannel;
import java.net.InetSocketAddress;
public class Netty4TcpServerChannel implements TcpServerChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4TcpServerChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public String getProfile() {
return profile;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void close() {
channel.close();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public String toString() {
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
'}';
}
}

@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -37,8 +38,6 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
@ -196,6 +195,7 @@ public class Netty4Transport extends TcpTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
@ -226,17 +226,11 @@ public class Netty4Transport extends TcpTransport {
return new ClientChannelInitializer();
}
static final AttributeKey<NettyTcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable t = unwrapped != null ? unwrapped : cause;
Channel channel = ctx.channel();
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}
static final AttributeKey<Netty4TcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
@Override
protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel();
if (channel == null) {
@ -245,7 +239,7 @@ public class Netty4Transport extends TcpTransport {
}
addClosedExceptionLogger(channel);
NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default");
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
channel.attr(CHANNEL_KEY).set(nettyChannel);
channelFuture.addListener(f -> {
@ -266,10 +260,10 @@ public class Netty4Transport extends TcpTransport {
}
@Override
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
NettyTcpChannel esChannel = new NettyTcpChannel(channel, name);
channel.attr(CHANNEL_KEY).set(esChannel);
Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name);
channel.attr(SERVER_CHANNEL_KEY).set(esChannel);
return esChannel;
}
@ -310,7 +304,7 @@ public class Netty4Transport extends TcpTransport {
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
}
@Override
@ -331,11 +325,11 @@ public class Netty4Transport extends TcpTransport {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name);
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name);
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
}
@ -353,4 +347,19 @@ public class Netty4Transport extends TcpTransport {
}
});
}
@ChannelHandler.Sharable
private class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));
} else {
onServerException(serverChannel, (Exception) cause);
}
}
}
}

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -91,7 +92,7 @@ public class Netty4ScheduledPingTests extends ESTestCase {
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {

@ -70,7 +70,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
nettyTransport.start();
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses);
TransportAddress transportAddress = randomFrom(boundAddresses);
port = transportAddress.address().getPort();
host = transportAddress.address().getAddress();
}

@ -1 +0,0 @@
b91a260d8d12ee4b3302a63059c73a34de0ce146

@ -0,0 +1 @@
394e811e9d9bf0b9fba837f7ceca9e8f3e39d1c2

@ -0,0 +1 @@
5cd56acfa16ba20e19b5d21d90b510eada841431

@ -1 +0,0 @@
2fa3662a10a9e085b1c7b87293d727422cbe6224

@ -0,0 +1 @@
db7b56f4cf533ad9022d2312c5ee48331edccca3

@ -0,0 +1 @@
e8dba4d28a595eab2e8fb6095d1ac5f2d3872144

@ -0,0 +1 @@
1243c771ee824c46a3d66ae3e4256d919fc06fbe

Some files were not shown because too many files have changed in this diff Show More