mirror of
synced 2025-03-25 09:28:27 +00:00
Merge branch 'master' into feature/http_client
This commit is contained in:
@ -296,7 +296,7 @@ gradle :distribution:integ-test-zip:integTest \
-Dtests.method="test {p0=cat.shards/10_basic/Help}"
`RestNIT` are the executable test classes that runs all the
`RestIT` are the executable test classes that runs all the
yaml suites available within the `rest-api-spec` folder.
The REST tests support all the options provided by the randomized runner, plus the following:
Normal file
Normal file
@ -0,0 +1,62 @@
# Elasticsearch Microbenchmark Suite
This directory contains the microbenchmark suite of Elasticsearch. It relies on [JMH](http://openjdk.java.net/projects/code-tools/jmh/).
## Purpose
We do not want to microbenchmark everything but the kitchen sink and should typically rely on our
[macrobenchmarks](https://elasticsearch-benchmarks.elastic.co/app/kibana#/dashboard/Nightly-Benchmark-Overview) with
[Rally](http://github.com/elastic/rally). Microbenchmarks are intended to spot performance regressions in performance-critical components.
The microbenchmark suite is also handy for ad-hoc microbenchmarks but please remove them again before merging your PR.
## Getting Started
Just run `gradle :benchmarks:jmh` from the project root directory. It will build all microbenchmarks, execute them and print the result.
## Running Microbenchmarks
Benchmarks are always run via Gradle with `gradle :benchmarks:jmh`.
Running via an IDE is not supported as the results are meaningless (we have no control over the JVM running the benchmarks).
If you want to run a specific benchmark class, e.g. `org.elasticsearch.benchmark.MySampleBenchmark` or have special requirements
generate the uberjar with `gradle :benchmarks:jmhJar` and run it directly with:
java -jar benchmarks/build/distributions/elasticsearch-benchmarks-*.jar
JMH supports lots of command line parameters. Add `-h` to the command above to see the available command line options.
## Adding Microbenchmarks
Before adding a new microbenchmark, make yourself familiar with the JMH API. You can check our existing microbenchmarks and also the
[JMH samples](http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/).
In contrast to tests, the actual name of the benchmark class is not relevant to JMH. However, stick to the naming convention and
end the class name of a benchmark with `Benchmark`. To have JMH execute a benchmark, annotate the respective methods with `@Benchmark`.
## Tips and Best Practices
To get realistic results, you should exercise care when running benchmarks. Here are a few tips:
### Do
* Ensure that the system executing your microbenchmarks has as little load as possible. Shutdown every process that can cause unnecessary
runtime jitter. Watch the `Error` column in the benchmark results to see the run-to-run variance.
* Ensure to run enough warmup iterations to get the benchmark into a stable state. If you are unsure, don't change the defaults.
* Avoid CPU migrations by pinning your benchmarks to specific CPU cores. On Linux you can use `taskset`.
* Fix the CPU frequency to avoid Turbo Boost from kicking in and skewing your results. On Linux you can use `cpufreq-set` and the
`performance` CPU governor.
* Vary the problem input size with `@Param`.
* Use the integrated profilers in JMH to dig deeper if benchmark results to not match your hypotheses:
* Run the generated uberjar directly and use `-prof gc` to check whether the garbage collector runs during a microbenchmarks and skews
your results. If so, try to force a GC between runs (`-gc true`) but watch out for the caveats.
* Use `-prof perf` or `-prof perfasm` (both only available on Linux) to see hotspots.
* Have your benchmarks peer-reviewed.
### Don't
* Blindly believe the numbers that your microbenchmark produces but verify them by measuring e.g. with `-prof perfasm`.
* Run more threads than your number of CPU cores (in case you run multi-threaded microbenchmarks).
* Look only at the `Score` column and ignore `Error`. Instead take countermeasures to keep `Error` low / variance explainable.
Normal file
Normal file
@ -0,0 +1,96 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
buildscript {
repositories {
maven {
url 'https://plugins.gradle.org/m2/'
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.3'
apply plugin: 'elasticsearch.build'
// build an uberjar with all benchmarks
apply plugin: 'com.github.johnrengelman.shadow'
// have the shadow plugin provide the runShadow task
apply plugin: 'application'
archivesBaseName = 'elasticsearch-benchmarks'
mainClassName = 'org.openjdk.jmh.Main'
// never try to invoke tests on the benchmark project - there aren't any
// explicitly override the test task too in case somebody invokes 'gradle test' so it won't trip
task test(type: Test, overwrite: true)
dependencies {
compile("org.elasticsearch:elasticsearch:${version}") {
// JMH ships with the conflicting version 4.6 (JMH will not update this dependency as it is Java 6 compatible and joptsimple is one
// of the most recent compatible version). This prevents us from using jopt-simple in benchmarks (which should be ok) but allows us
// to invoke the JMH uberjar as usual.
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
compile "org.openjdk.jmh:jmh-core:$versions.jmh"
compile "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh"
// Dependencies of JMH
runtime 'net.sf.jopt-simple:jopt-simple:4.6'
runtime 'org.apache.commons:commons-math3:3.2'
compileJava.options.compilerArgs << "-Xlint:-cast,-deprecation,-rawtypes,-try,-unchecked"
compileTestJava.options.compilerArgs << "-Xlint:-cast,-deprecation,-rawtypes,-try,-unchecked"
forbiddenApis {
// classes generated by JMH can use all sorts of forbidden APIs but we have no influence at all and cannot exclude these classes
ignoreFailures = true
// No licenses for our benchmark deps (we don't ship benchmarks)
dependencyLicenses.enabled = false
thirdPartyAudit.excludes = [
// these classes intentionally use JDK internal API (and this is ok since the project is maintained by Oracle employees)
shadowJar {
classifier = 'benchmarks'
// alias the shadowJar and runShadow tasks to abstract from the concrete plugin that we are using and provide a more consistent interface
task jmhJar(
dependsOn: shadowJar,
description: 'Generates an uberjar with the microbenchmarks and all dependencies',
group: 'Benchmark'
task jmh(
dependsOn: runShadow,
description: 'Runs all microbenchmarks',
group: 'Benchmark'
@ -0,0 +1,170 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.benchmark.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Settings;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@Warmup(iterations = 10)
@Measurement(iterations = 10)
@SuppressWarnings("unused") //invoked by benchmarking framework
public class AllocationBenchmark {
// Do NOT make any field final (even if it is not annotated with @Param)! See also
// http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/JMHSample_10_ConstantFold.java
// we cannot use individual @Params as some will lead to invalid combinations which do not let the benchmark terminate. JMH offers no
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR.
// indices, shards, replicas, nodes
" 10, 1, 0, 1",
" 10, 3, 0, 1",
" 10, 10, 0, 1",
" 100, 1, 0, 1",
" 100, 3, 0, 1",
" 100, 10, 0, 1",
" 10, 1, 0, 10",
" 10, 3, 0, 10",
" 10, 10, 0, 10",
" 100, 1, 0, 10",
" 100, 3, 0, 10",
" 100, 10, 0, 10",
" 10, 1, 1, 10",
" 10, 3, 1, 10",
" 10, 10, 1, 10",
" 100, 1, 1, 10",
" 100, 3, 1, 10",
" 100, 10, 1, 10",
" 10, 1, 2, 10",
" 10, 3, 2, 10",
" 10, 10, 2, 10",
" 100, 1, 2, 10",
" 100, 3, 2, 10",
" 100, 10, 2, 10",
" 10, 1, 0, 50",
" 10, 3, 0, 50",
" 10, 10, 0, 50",
" 100, 1, 0, 50",
" 100, 3, 0, 50",
" 100, 10, 0, 50",
" 10, 1, 1, 50",
" 10, 3, 1, 50",
" 10, 10, 1, 50",
" 100, 1, 1, 50",
" 100, 3, 1, 50",
" 100, 10, 1, 50",
" 10, 1, 2, 50",
" 10, 3, 2, 50",
" 10, 10, 2, 50",
" 100, 1, 2, 50",
" 100, 3, 2, 50",
" 100, 10, 2, 50"
public String indicesShardsReplicasNodes = "10,1,0,1";
public int numTags = 2;
private AllocationService strategy;
private ClusterState initialClusterState;
public void setUp() throws Exception {
final String[] params = indicesShardsReplicasNodes.split(",");
int numIndices = toInt(params[0]);
int numShards = toInt(params[1]);
int numReplicas = toInt(params[2]);
int numNodes = toInt(params[3]);
strategy = Allocators.createAllocationService(Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "tag")
MetaData.Builder mb = MetaData.builder();
for (int i = 1; i <= numIndices; i++) {
mb.put(IndexMetaData.builder("test_" + i)
.settings(Settings.builder().put("index.version.created", Version.CURRENT))
MetaData metaData = mb.build();
RoutingTable.Builder rb = RoutingTable.builder();
for (int i = 1; i <= numIndices; i++) {
rb.addAsNew(metaData.index("test_" + i));
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.put(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
initialClusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).nodes
private int toInt(String v) {
return Integer.valueOf(v.trim());
public ClusterState measureAllocation() {
ClusterState clusterState = initialClusterState;
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
RoutingAllocation.Result result = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes()
clusterState = ClusterState.builder(clusterState).routingResult(result).build();
result = strategy.reroute(clusterState, "reroute");
clusterState = ClusterState.builder(clusterState).routingResult(result).build();
return clusterState;
@ -0,0 +1,108 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.benchmark.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public final class Allocators {
private static class NoopGatewayAllocator extends GatewayAllocator {
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();
protected NoopGatewayAllocator() {
super(Settings.EMPTY, null, null);
public void applyStartedShards(StartedRerouteAllocation allocation) {
// noop
public void applyFailedShards(FailedRerouteAllocation allocation) {
// noop
public boolean allocateUnassigned(RoutingAllocation allocation) {
return false;
private Allocators() {
throw new AssertionError("Do not instantiate");
public static AllocationService createAllocationService(Settings settings) throws NoSuchMethodException, InstantiationException,
IllegalAccessException, InvocationTargetException {
return createAllocationService(settings, new ClusterSettings(Settings.Builder.EMPTY_SETTINGS, ClusterSettings
public static AllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings) throws
InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
return new AllocationService(settings,
defaultAllocationDeciders(settings, clusterSettings),
NoopGatewayAllocator.INSTANCE, new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE);
public static AllocationDeciders defaultAllocationDeciders(Settings settings, ClusterSettings clusterSettings) throws
IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchMethodException {
List<AllocationDecider> list = new ArrayList<>();
// Keep a deterministic order of allocation deciders for the benchmark
for (Class<? extends AllocationDecider> deciderClass : ClusterModule.DEFAULT_ALLOCATION_DECIDERS) {
try {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class, ClusterSettings
list.add(constructor.newInstance(settings, clusterSettings));
} catch (NoSuchMethodException e) {
Constructor<? extends AllocationDecider> constructor = deciderClass.getConstructor(Settings.class);
return new AllocationDeciders(settings, list.toArray(new AllocationDecider[0]));
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Sets.newHashSet(DiscoveryNode.Role.MASTER,
DiscoveryNode.Role.DATA), Version.CURRENT);
Normal file
Normal file
@ -0,0 +1,8 @@
# Do not log at all if it is not really critical - we're in a benchmark
log4j.rootLogger=${benchmarks.es.logger.level}, out
log4j.appender.out.layout.conversionPattern=[%d{ISO8601}][%-5p][%-25c] %m%n
@ -753,7 +753,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]AbstractClientHeadersTestCase.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterHealthIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterInfoServiceIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterModuleTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateDiffIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterStateTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]DiskUsageTests.java" checks="LineLength" />
@ -31,3 +31,8 @@ org.apache.lucene.index.IndexReader#getCombinedCoreAndDeletesKey()
@defaultMessage Soon to be removed
@defaultMessage Don't use MethodHandles in slow ways, dont be lenient in tests.
# unfortunately, invoke() cannot be banned, because forbidden apis does not support signature polymorphic methods
@ -92,8 +92,3 @@ org.joda.time.DateTime#<init>(int, int, int, int, int, int)
org.joda.time.DateTime#<init>(int, int, int, int, int, int, int)
@defaultMessage Don't use MethodHandles in slow ways, except in tests.
@ -1,5 +1,5 @@
elasticsearch = 5.0.0-alpha4
lucene = 6.1.0-snapshot-3a57bea
lucene = 6.1.0
# optional dependencies
spatial4j = 0.6
@ -18,4 +18,6 @@ httpcore = 4.4.4
commonslogging = 1.1.3
commonscodec = 1.10
hamcrest = 1.3
securemock = 1.2
securemock = 1.2
# benchmark dependencies
jmh = 1.12
@ -32,7 +32,7 @@ public class ResourceNotFoundException extends ElasticsearchException {
super(msg, args);
protected ResourceNotFoundException(String msg, Throwable cause, Object... args) {
public ResourceNotFoundException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
@ -38,9 +38,9 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.PersistedTaskInfo;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForTaskCompletion;
* Action to get a single task. If the task isn't running then it'll try to request the status from request index.
@ -148,7 +147,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
threadPool.generic().execute(new AbstractRunnable() {
protected void doRun() throws Exception {
waitForTaskCompletion(taskManager, runningTask, waitForCompletionTimeout(request.getTimeout()));
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
// TODO look up the task's result from the .tasks index now that it is done
new GetTaskResponse(new PersistedTaskInfo(runningTask.taskInfo(clusterService.localNode(), true))));
@ -19,8 +19,6 @@
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
@ -34,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -42,26 +39,12 @@ import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
public class TransportListTasksAction extends TransportTasksAction<Task, ListTasksRequest, ListTasksResponse, TaskInfo> {
public static void waitForTaskCompletion(TaskManager taskManager, Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (taskManager.getTask(task.getId()) == null) {
try {
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
public static long waitForCompletionTimeout(TimeValue timeout) {
if (timeout == null) {
@ -69,7 +52,6 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
return System.nanoTime() + timeout.nanos();
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);
@ -105,7 +87,7 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
// for itself or one of its child tasks
waitForTaskCompletion(taskManager, task, timeoutNanos);
taskManager.waitForTaskCompletion(task, timeoutNanos);
super.processTasks(request, operation);
@ -141,7 +141,7 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.STAGE, getStage());
stats.toXContent(builder, params);
if (getNodeId() != null) {
@ -49,7 +49,7 @@ public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>,
Map<Integer, SnapshotIndexShardStatus> indexShards = new HashMap<>();
stats = new SnapshotStats();
for (SnapshotIndexShardStatus shard : shards) {
indexShards.put(shard.getShardId(), shard);
indexShards.put(shard.getShardId().getId(), shard);
shardsStats = new SnapshotShardsStats(shards);
@ -18,15 +18,25 @@
package org.elasticsearch.action.admin.indices.shrink;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -36,6 +46,17 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements IndicesRequest {
public static ObjectParser<ShrinkRequest, ParseFieldMatcherSupplier> PARSER =
new ObjectParser<>("shrink_request", null);
static {
PARSER.declareField((parser, request, parseFieldMatcherSupplier) ->
new ParseField("settings"), ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, request, parseFieldMatcherSupplier) ->
new ParseField("aliases"), ObjectParser.ValueType.OBJECT);
private CreateIndexRequest shrinkIndexRequest;
private String sourceIndex;
@ -104,4 +125,17 @@ public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements
public String getSourceIndex() {
return sourceIndex;
public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
try (XContentParser parser = XContentFactory.xContent(xContentType).createParser(source)) {
PARSER.parse(parser, this, () -> ParseFieldMatcher.EMPTY);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse source for shrink index", e);
} else {
throw new ElasticsearchParseException("failed to parse content type for shrink index source");
@ -44,6 +44,7 @@ public class FieldStatsRequest extends BroadcastRequest<FieldStatsRequest> {
private String[] fields = Strings.EMPTY_ARRAY;
private String level = DEFAULT_LEVEL;
private IndexConstraint[] indexConstraints = new IndexConstraint[0];
private boolean useCache = true;
public String[] getFields() {
return fields;
@ -56,6 +57,14 @@ public class FieldStatsRequest extends BroadcastRequest<FieldStatsRequest> {
this.fields = fields;
public void setUseCache(boolean useCache) {
this.useCache = useCache;
public boolean shouldUseCache() {
return useCache;
public IndexConstraint[] getIndexConstraints() {
return indexConstraints;
@ -184,6 +193,7 @@ public class FieldStatsRequest extends BroadcastRequest<FieldStatsRequest> {
indexConstraints[i] = new IndexConstraint(in);
level = in.readString();
useCache = in.readBoolean();
@ -201,6 +211,7 @@ public class FieldStatsRequest extends BroadcastRequest<FieldStatsRequest> {
@ -45,4 +45,9 @@ public class FieldStatsRequestBuilder extends
return this;
public FieldStatsRequestBuilder setUseCache(boolean useCache) {
return this;
@ -34,6 +34,7 @@ import java.util.Set;
public class FieldStatsShardRequest extends BroadcastShardRequest {
private String[] fields;
private boolean useCache;
public FieldStatsShardRequest() {
@ -46,22 +47,29 @@ public class FieldStatsShardRequest extends BroadcastShardRequest {
this.fields = fields.toArray(new String[fields.size()]);
useCache = request.shouldUseCache();
public String[] getFields() {
return fields;
public boolean shouldUseCache() {
return useCache;
public void readFrom(StreamInput in) throws IOException {
fields = in.readStringArray();
useCache = in.readBoolean();
public void writeTo(StreamOutput out) throws IOException {
@ -46,7 +46,6 @@ public class FieldStatsShardResponse extends BroadcastShardResponse {
return fieldStats;
public void readFrom(StreamInput in) throws IOException {
@ -187,16 +187,15 @@ public class TransportFieldStatsAction extends
ShardId shardId = request.shardId();
Map<String, FieldStats<?>> fieldStats = new HashMap<>();
IndexService indexServices = indicesService.indexServiceSafe(shardId.getIndex());
MapperService mapperService = indexServices.mapperService();
IndexShard shard = indexServices.getShard(shardId.id());
try (Engine.Searcher searcher = shard.acquireSearcher("fieldstats")) {
// Resolve patterns and deduplicate
Set<String> fieldNames = new HashSet<>();
for (String field : request.getFields()) {
for (String field : fieldNames) {
FieldStats<?> stats = indicesService.getFieldStats(shard, searcher, field);
FieldStats<?> stats = indicesService.getFieldStats(shard, searcher, field, request.shouldUseCache());
if (stats != null) {
fieldStats.put(field, stats);
@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
@ -28,6 +29,7 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Arrays;
@ -155,8 +157,10 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
if (item.isFailure()) {
ElasticsearchException.renderThrowable(builder, params, item.getFailure());
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().toXContent(builder, params);
builder.field(Fields.STATUS, item.getResponse().status().getStatus());
@ -166,6 +170,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
static final class Fields {
static final String RESPONSES = "responses";
static final String STATUS = "status";
static final String ERROR = "error";
static final String ROOT_CAUSE = "root_cause";
@ -45,8 +45,8 @@ public abstract class BroadcastShardResponse extends TransportResponse {
return this.shardId.getIndexName();
public int getShardId() {
return this.shardId.id();
public ShardId getShardId() {
return this.shardId;
@ -37,19 +37,21 @@ import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
@ -134,10 +136,9 @@ public class TransportClient extends AbstractClient {
modules.add(new PluginsModule(pluginsService));
modules.add(new SettingsModule(settings));
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(new ClusterNameModule(settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry) {
protected void configure() {
@ -145,9 +146,20 @@ public class TransportClient extends AbstractClient {
modules.add(new ActionModule(false, true));
modules.add(new CircuitBreakerModule(settings));
final List<Setting<?>> additionalSettings = new ArrayList<>();
final List<String> additionalSettingsFilter = new ArrayList<>();
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
modules.add((b -> b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService)));
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
@ -40,8 +40,10 @@ public interface ClusterStateTaskExecutor<T> {
* Callback invoked after new cluster state is published. Note that
* this method is not invoked if the cluster state was not updated.
* @param clusterChangedEvent the change event for this cluster state change, containing
* both old and new states
default void clusterStatePublished(ClusterState newClusterState) {
default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
@ -92,7 +94,8 @@ public interface ClusterStateTaskExecutor<T> {
private Builder<T> result(T task, TaskResult executionResult) {
executionResults.put(task, executionResult);
TaskResult existing = executionResults.put(task, executionResult);
assert existing == null : task + " already has result " + existing;
return this;
@ -21,6 +21,7 @@ package org.elasticsearch.cluster.action.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
@ -312,8 +313,8 @@ public class ShardStateAction extends AbstractComponent {
public void clusterStatePublished(ClusterState newClusterState) {
int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size();
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
if (numberOfUnassignedShards > 0) {
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
if (logger.isTraceEnabled()) {
@ -614,5 +614,9 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
public static DiscoveryNodes readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
return PROTO.readFrom(in, localNode);
public boolean isLocalNodeElectedMaster() {
return masterNodeId != null && masterNodeId.equals(localNodeId);
@ -66,7 +66,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@ -371,34 +373,61 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
public <T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor,
final ClusterStateTaskListener listener
) {
innerSubmitStateUpdateTask(source, task, config, executor, safe(listener, logger));
final ClusterStateTaskListener listener) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
private <T> void innerSubmitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor executor,
final SafeClusterStateTaskListener listener) {
* Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together,
* potentially with more tasks of the same executor.
* @param source the source of the cluster state update task
* @param tasks a map of update tasks and their corresponding listeners
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param <T> the type of the cluster state update task state
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
if (!lifecycle.started()) {
if (tasks.isEmpty()) {
try {
final UpdateTask<T> updateTask = new UpdateTask<>(source, task, config, executor, listener);
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
final IdentityHashMap<T, ClusterStateTaskListener> tasksIdentity = new IdentityHashMap<>(tasks);
final List<UpdateTask<T>> updateTasks = tasksIdentity.entrySet().stream().map(
entry -> new UpdateTask<>(source, entry.getKey(), config, executor, safe(entry.getValue(), logger))
synchronized (updateTasksPerExecutor) {
updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>()).add(updateTask);
List<UpdateTask> existingTasks = updateTasksPerExecutor.computeIfAbsent(executor, k -> new ArrayList<>());
for (@SuppressWarnings("unchecked") UpdateTask<T> existing : existingTasks) {
if (tasksIdentity.containsKey(existing.task)) {
throw new IllegalArgumentException("task [" + existing.task + "] is already queued");
final UpdateTask<T> firstTask = updateTasks.get(0);
if (config.timeout() != null) {
updateTasksExecutor.execute(updateTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
if (updateTask.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
updateTasksExecutor.execute(firstTask, threadPool.scheduler(), config.timeout(), () -> threadPool.generic().execute(() -> {
for (UpdateTask<T> task : updateTasks) {
if (task.processed.getAndSet(true) == false) {
logger.debug("cluster state update task [{}] timed out after [{}]", source, config.timeout());
task.listener.onFailure(source, new ProcessClusterEventTimeoutException(config.timeout(), source));
} else {
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
@ -681,7 +710,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
try {
} catch (Exception e) {
logger.error("exception thrown while notifying executor of new cluster state publication [{}]", e, source);
@ -420,6 +420,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
@ -81,6 +81,11 @@ public class Setting<T> extends ToXContentToBytes {
* iff this setting is shared with more than one module ie. can be defined multiple times.
* iff this setting can be dynamically updateable
@ -247,6 +252,13 @@ public class Setting<T> extends ToXContentToBytes {
return properties.contains(Property.Deprecated);
* Returns <code>true</code> if this setting is shared with more than one other module or plugin, otherwise <code>false</code>
public boolean isShared() {
return properties.contains(Property.Shared);
* Returns <code>true</code> iff this setting is a group setting. Group settings represent a set of settings rather than a single value.
* The key, see {@link #getKey()}, in contrast to non-group settings is a prefix like <tt>cluster.store.</tt> that matches all settings
@ -19,7 +19,8 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Binder;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
@ -28,9 +29,11 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.tribe.TribeService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
@ -40,7 +43,7 @@ import java.util.stream.IntStream;
* A module that binds the provided settings to the {@link Settings} interface.
public class SettingsModule extends AbstractModule {
public class SettingsModule implements Module {
private final Settings settings;
private final Set<String> settingsFilterPattern = new HashSet<>();
@ -49,8 +52,14 @@ public class SettingsModule extends AbstractModule {
private static final Predicate<String> TRIBE_CLIENT_NODE_SETTINGS_PREDICATE = (s) -> s.startsWith("tribe.")
&& TribeService.TRIBE_SETTING_KEYS.contains(s) == false;
private final ESLogger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
public SettingsModule(Settings settings) {
public SettingsModule(Settings settings, Setting<?>... additionalSettings) {
this(settings, Arrays.asList(additionalSettings), Collections.emptyList());
public SettingsModule(Settings settings, List<Setting<?>> additionalSettings, List<String> settingsFilter) {
logger = Loggers.getLogger(getClass(), settings);
this.settings = settings;
for (Setting<?> setting : ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) {
@ -59,12 +68,16 @@ public class SettingsModule extends AbstractModule {
for (Setting<?> setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) {
protected void configure() {
final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values()));
final ClusterSettings clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values()));
for (Setting<?> setting : additionalSettings) {
for (String filter : settingsFilter) {
this.indexScopedSettings = new IndexScopedSettings(settings, new HashSet<>(this.indexSettings.values()));
this.clusterSettings = new ClusterSettings(settings, new HashSet<>(this.nodeSettings.values()));
Settings indexSettings = settings.filter((s) -> (s.startsWith("index.") &&
// special case - we want to get Did you mean indices.query.bool.max_clause_count
// which means we need to by-pass this check for this setting
@ -87,7 +100,7 @@ public class SettingsModule extends AbstractModule {
"In order to upgrade all indices the settings must be updated via the /${index}/_settings API. " +
"Unless all settings are dynamic all indices must be closed in order to apply the upgrade" +
"Indices created in the future should use index templates to set default values."
).split(" ")) {
).split(" ")) {
if (count + word.length() > 85) {
count = 0;
@ -124,19 +137,23 @@ public class SettingsModule extends AbstractModule {
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
validateTribeSettings(settings, clusterSettings);
bind(SettingsFilter.class).toInstance(new SettingsFilter(settings, settingsFilterPattern));
public void configure(Binder binder) {
binder.bind(SettingsFilter.class).toInstance(new SettingsFilter(settings, settingsFilterPattern));
* Registers a new setting. This method should be used by plugins in order to expose any custom settings the plugin defines.
* Unless a setting is registered the setting is unusable. If a setting is never the less specified the node will reject
* the setting during startup.
public void registerSetting(Setting<?> setting) {
private void registerSetting(Setting<?> setting) {
if (setting.isFiltered()) {
if (settingsFilterPattern.contains(setting.getKey()) == false) {
@ -144,13 +161,15 @@ public class SettingsModule extends AbstractModule {
if (setting.hasNodeScope() || setting.hasIndexScope()) {
if (setting.hasNodeScope()) {
if (nodeSettings.containsKey(setting.getKey())) {
Setting<?> existingSetting = nodeSettings.get(setting.getKey());
if (existingSetting != null && (setting.isShared() == false || existingSetting.isShared() == false)) {
throw new IllegalArgumentException("Cannot register setting [" + setting.getKey() + "] twice");
nodeSettings.put(setting.getKey(), setting);
if (setting.hasIndexScope()) {
if (indexSettings.containsKey(setting.getKey())) {
Setting<?> existingSetting = indexSettings.get(setting.getKey());
if (existingSetting != null && (setting.isShared() == false || existingSetting.isShared() == false)) {
throw new IllegalArgumentException("Cannot register setting [" + setting.getKey() + "] twice");
indexSettings.put(setting.getKey(), setting);
@ -164,7 +183,7 @@ public class SettingsModule extends AbstractModule {
* Registers a settings filter pattern that allows to filter out certain settings that for instance contain sensitive information
* or if a setting is for internal purposes only. The given pattern must either be a valid settings key or a simple regexp pattern.
public void registerSettingsFilter(String filter) {
private void registerSettingsFilter(String filter) {
if (SettingsFilter.isValidPattern(filter) == false) {
throw new IllegalArgumentException("filter [" + filter +"] is invalid must be either a key or a regex pattern");
@ -174,19 +193,6 @@ public class SettingsModule extends AbstractModule {
* Check if a setting has already been registered
public boolean exists(Setting<?> setting) {
if (setting.hasNodeScope()) {
return nodeSettings.containsKey(setting.getKey());
if (setting.hasIndexScope()) {
return indexSettings.containsKey(setting.getKey());
throw new IllegalArgumentException("setting scope is unknown. This should never happen!");
private void validateTribeSettings(Settings settings, ClusterSettings clusterSettings) {
Map<String, Settings> groups = settings.filter(TRIBE_CLIENT_NODE_SETTINGS_PREDICATE).getGroups("tribe.", true);
for (Map.Entry<String, Settings> tribeSettings : groups.entrySet()) {
@ -200,4 +206,16 @@ public class SettingsModule extends AbstractModule {
public Settings getSettings() {
return settings;
public IndexScopedSettings getIndexScopedSettings() {
return indexScopedSettings;
public ClusterSettings getClusterSettings() {
return clusterSettings;
@ -18,9 +18,13 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -30,21 +34,22 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
@ -52,18 +57,17 @@ import java.util.concurrent.atomic.AtomicReference;
public class NodeJoinController extends AbstractComponent {
final ClusterService clusterService;
final RoutingService routingService;
final ElectMasterService electMaster;
final DiscoverySettings discoverySettings;
final AtomicBoolean accumulateJoins = new AtomicBoolean(false);
private final ClusterService clusterService;
private final RoutingService routingService;
private final ElectMasterService electMaster;
private final DiscoverySettings discoverySettings;
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
// this is site while trying to become a master
final AtomicReference<ElectionContext> electionContext = new AtomicReference<>();
// this is set while trying to become a master
// mutation should be done under lock
private ElectionContext electionContext = null;
protected final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> pendingJoinRequests = new HashMap<>();
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
this.clusterService = clusterService;
@ -75,7 +79,7 @@ public class NodeJoinController extends AbstractComponent {
* waits for enough incoming joins from master eligible nodes to complete the master election
* <p>
* You must start accumulating joins before calling this method. See {@link #startAccumulatingJoins()}
* You must start accumulating joins before calling this method. See {@link #startElectionContext()}
* <p>
* The method will return once the local node has been elected as master or some failure/timeout has happened.
* The exact outcome is communicated via the callback parameter, which is guaranteed to be called.
@ -86,29 +90,32 @@ public class NodeJoinController extends AbstractComponent {
* object
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
final CountDownLatch done = new CountDownLatch(1);
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
final ElectionCallback wrapperCallback = new ElectionCallback() {
void onClose() {
if (electionContext.compareAndSet(this, null)) {
stopAccumulatingJoins("election closed");
} else {
assert false : "failed to remove current election context";
public void onElectedAsMaster(ClusterState state) {
public void onFailure(Throwable t) {
if (electionContext.compareAndSet(null, newContext) == false) {
// should never happen, but be conservative
failContext(newContext, new IllegalStateException("double waiting for election"));
ElectionContext myElectionContext = null;
try {
// check what we have so far..
// capture the context we add the callback to make sure we fail our own
synchronized (this) {
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
@ -119,69 +126,46 @@ public class NodeJoinController extends AbstractComponent {
if (logger.isTraceEnabled()) {
final int pendingNodes;
synchronized (pendingJoinRequests) {
pendingNodes = pendingJoinRequests.size();
logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes);
final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
// callback will clear the context, if it's active
failContext(newContext, new ElasticsearchTimeoutException("timed out waiting to be elected"));
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
} catch (Throwable t) {
logger.error("unexpected failure while waiting for incoming joins", t);
failContext(newContext, "unexpected failure while waiting for pending joins", t);
if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + t.getMessage() + "]");
private void failContext(final ElectionContext context, final Throwable throwable) {
failContext(context, throwable.getMessage(), throwable);
/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
public boolean runOnlyOnMaster() {
return false;
public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
public void onFailure(String source, Throwable updateFailure) {
logger.warn("unexpected error while trying to fail election context due to [{}]. original exception [{}]", updateFailure, reason, throwable);
* utility method to fail the given election context under the cluster state thread
private synchronized void failContextIfNeeded(final ElectionContext context, final String reason) {
if (electionContext == context) {
* Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a
* master or when {@link #stopAccumulatingJoins(String)} is called.
* master or when {@link #stopElectionContext(String)} is called.
public void startAccumulatingJoins() {
logger.trace("starting to accumulate joins");
boolean b = accumulateJoins.getAndSet(true);
assert b == false : "double startAccumulatingJoins() calls";
assert electionContext.get() == null : "startAccumulatingJoins() called, but there is an ongoing election context";
public synchronized void startElectionContext() {
logger.trace("starting an election context, will accumulate joins");
assert electionContext == null : "double startElectionContext() calls";
electionContext = new ElectionContext();
/** Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately */
public void stopAccumulatingJoins(String reason) {
logger.trace("stopping join accumulation ([{}])", reason);
assert electionContext.get() == null : "stopAccumulatingJoins() called, but there is an ongoing election context";
boolean b = accumulateJoins.getAndSet(false);
assert b : "stopAccumulatingJoins() called but not accumulating";
synchronized (pendingJoinRequests) {
if (pendingJoinRequests.size() > 0) {
processJoins("pending joins after accumulation stop [" + reason + "]");
* Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately
public void stopElectionContext(String reason) {
logger.trace("stopping election ([{}])", reason);
synchronized (this) {
assert electionContext != null : "stopElectionContext() called but not accumulating";
electionContext = null;
@ -190,19 +174,14 @@ public class NodeJoinController extends AbstractComponent {
* <p>
* Note: doesn't do any validation. This should have been done before.
public void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
synchronized (pendingJoinRequests) {
List<MembershipAction.JoinCallback> nodeCallbacks = pendingJoinRequests.get(node);
if (nodeCallbacks == null) {
nodeCallbacks = new ArrayList<>();
pendingJoinRequests.put(node, nodeCallbacks);
if (accumulateJoins.get() == false) {
processJoins("join from node[" + node + "]");
} else {
public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
if (electionContext != null) {
electionContext.addIncomingJoin(node, callback);
} else {
clusterService.submitStateUpdateTask("zen-disco-join(node " + node + "])",
node, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(callback, logger));
@ -210,86 +189,24 @@ public class NodeJoinController extends AbstractComponent {
* checks if there is an on going request to become master and if it has enough pending joins. If so, the node will
* become master via a ClusterState update task.
private void checkPendingJoinsAndElectIfNeeded() {
assert accumulateJoins.get() : "election check requested but we are not accumulating joins";
final ElectionContext context = electionContext.get();
if (context == null) {
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
} else {
if (logger.isTraceEnabled()) {
logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext = null; // clear this out so future joins won't be accumulated
int pendingMasterJoins = 0;
synchronized (pendingJoinRequests) {
for (DiscoveryNode node : pendingJoinRequests.keySet()) {
if (node.isMasterNode()) {
if (pendingMasterJoins < context.requiredMasterJoins) {
if (context.pendingSetAsMasterTask.get() == false) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, context.requiredMasterJoins);
if (context.pendingSetAsMasterTask.getAndSet(true)) {
logger.trace("elected as master task already submitted, ignoring...");
final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)";
clusterService.submitStateUpdateTask(source, new ProcessJoinsTask(Priority.IMMEDIATE) {
public ClusterState execute(ClusterState currentState) {
// Take into account the previous known nodes, if they happen not to be available
// then fault detection will remove these nodes.
if (currentState.nodes().getMasterNode() != null) {
// TODO can we tie break here? we don't have a remote master cluster state version to decide on
logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().getMasterNode());
throw new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().getLocalNode().getId());
// update the fact that we are the master...
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState, "nodes joined");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
// Add the incoming join requests.
// Note: we only do this now (after the reroute) to avoid assigning shards to these nodes.
return super.execute(currentState);
public boolean runOnlyOnMaster() {
return false;
public void onFailure(String source, Throwable t) {
super.onFailure(source, t);
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
super.clusterStateProcessed(source, oldState, newState);
/** process all pending joins */
private void processJoins(String reason) {
clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", new ProcessJoinsTask(Priority.URGENT));
public interface ElectionCallback {
* called when the local node is successfully elected as master
@ -304,119 +221,143 @@ public class NodeJoinController extends AbstractComponent {
void onFailure(Throwable t);
static abstract class ElectionContext implements ElectionCallback {
private final ElectionCallback callback;
private final int requiredMasterJoins;
class ElectionContext {
private ElectionCallback callback = null;
private int requiredMasterJoins = -1;
private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();
/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();
ElectionContext(ElectionCallback callback, int requiredMasterJoins) {
this.callback = callback;
public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
assert this.requiredMasterJoins < 0;
assert this.callback == null;
this.requiredMasterJoins = requiredMasterJoins;
this.callback = callback;
abstract void onClose();
public synchronized void addIncomingJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
joinRequestAccumulator.computeIfAbsent(node, n -> new ArrayList<>()).add(callback);
public void onElectedAsMaster(ClusterState state) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
final boolean hasEnough;
if (requiredMasterJoins < 0) {
// requiredMasterNodes is unknown yet, return false and keep on waiting
hasEnough = false;
} else {
assert callback != null : "requiredMasterJoins is set but not the callback";
hasEnough = pendingMasterJoins >= requiredMasterJoins;
return hasEnough;
private Map<DiscoveryNode, ClusterStateTaskListener> getPendingAsTasks() {
Map<DiscoveryNode, ClusterStateTaskListener> tasks = new HashMap<>();
joinRequestAccumulator.entrySet().stream().forEach(e -> tasks.put(e.getKey(), new JoinTaskListener(e.getValue(), logger)));
return tasks;
public synchronized int getPendingMasterJoinsCount() {
int pendingMasterJoins = 0;
for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
if (node.isMasterNode()) {
return pendingMasterJoins;
public synchronized void closeAndBecomeMaster() {
assert callback != null : "becoming a master but the callback is not yet set";
assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
+ getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(elected_as_master, [" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
public synchronized void closeAndProcessPending(String reason) {
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-join(election stopped [" + reason + "] nodes joined";
tasks.put(FINISH_ELECTION_NOT_MASTER_TASK, joinProcessedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
private void innerClose() {
if (closed.getAndSet(true)) {
throw new AlreadyClosedException("election context is already closed");
private void ensureOpen() {
if (closed.get()) {
throw new AlreadyClosedException("election context is already closed");
private synchronized ElectionCallback getCallback() {
return callback;
private void onElectedAsMaster(ClusterState state) {
assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) {
try {
} finally {
ElectionCallback callback = getCallback(); // get under lock
if (callback != null) {
public void onFailure(Throwable t) {
private void onFailure(Throwable t) {
if (closed.compareAndSet(false, true)) {
try {
} finally {
ElectionCallback callback = getCallback(); // get under lock
if (callback != null) {
private final ClusterStateTaskListener joinProcessedListener = new ClusterStateTaskListener() {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
assert newState.nodes().isLocalNodeElectedMaster() : "should have become a master but isn't " + newState.prettyPrint();
public void onFailure(String source, Throwable t) {
static class JoinTaskListener implements ClusterStateTaskListener {
final List<MembershipAction.JoinCallback> callbacks;
final private ESLogger logger;
* Processes any pending joins via a ClusterState update task.
* Note: this task automatically fails (and fails all pending joins) if the current node is not marked as master
class ProcessJoinsTask extends ClusterStateUpdateTask {
JoinTaskListener(MembershipAction.JoinCallback callback, ESLogger logger) {
this(Collections.singletonList(callback), logger);
private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
private boolean nodeAdded = false;
public ProcessJoinsTask(Priority priority) {
JoinTaskListener(List<MembershipAction.JoinCallback> callbacks, ESLogger logger) {
this.callbacks = callbacks;
this.logger = logger;
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder nodesBuilder;
synchronized (pendingJoinRequests) {
if (pendingJoinRequests.isEmpty()) {
return currentState;
nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
final DiscoveryNode node = entry.getKey();
if (currentState.nodes().nodeExists(node.getId())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
nodeAdded = true;
for (DiscoveryNode existingNode : currentState.nodes()) {
if (node.getAddress().equals(existingNode.getAddress())) {
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize it's join and set us as a master
final ClusterState.Builder newState = ClusterState.builder(currentState);
if (nodeAdded) {
return newState.build();
public void onNoLongerMaster(String source) {
// we are rejected, so drain all pending task (execute never run)
synchronized (pendingJoinRequests) {
Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
void innerOnFailure(Throwable t) {
for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
public void onFailure(String source, Throwable t) {
for (MembershipAction.JoinCallback callback : callbacks) {
try {
} catch (Exception e) {
@ -425,29 +366,111 @@ public class NodeJoinController extends AbstractComponent {
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (nodeAdded) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
for (MembershipAction.JoinCallback callback : callbacks) {
try {
} catch (Exception e) {
logger.error("unexpected error during [{}]", e, source);
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(oldState, newState);
// a task indicated that the current node should become master, if no current master is known
private final static DiscoveryNode BECOME_MASTER_TASK = new DiscoveryNode("_BECOME_MASTER_TASK_", DummyTransportAddress.INSTANCE,
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
// a task that is used to process pending joins without explicitly becoming a master and listening to the results
// this task is used when election is stop without the local node becoming a master per se (though it might
private final static DiscoveryNode FINISH_ELECTION_NOT_MASTER_TASK = new DiscoveryNode("_NOT_MASTER_TASK_",
DummyTransportAddress.INSTANCE, Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
final DiscoveryNodes currentNodes = currentState.nodes();
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();
boolean nodesChanged = false;
ClusterState.Builder newState = ClusterState.builder(currentState);
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentNodes);
if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
// use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
nodesChanged = true;
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
if (nodesBuilder.isLocalNodeElectedMaster() == false) {
logger.trace("processing node joins, but we are not the master. current master: {}", currentNodes.getMasterNode());
throw new NotMasterException("Node [" + currentNodes.getLocalNode() + "] not master for join request");
for (final DiscoveryNode node : joiningNodes) {
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_NOT_MASTER_TASK)) {
// noop
} else if (currentNodes.nodeExists(node.getId())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
nodesChanged = true;
for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) {
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
if (nodesChanged) {
// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize its join and set us as a master
return results.build(newState.build());
public boolean runOnlyOnMaster() {
// we validate that we are allowed to change the cluster state during cluster state processing
return false;
public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
@ -372,7 +372,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
@ -406,7 +406,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} else {
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopAccumulatingJoins("not master");
nodeJoinController.stopElectionContext("not master");
// send join request
final boolean success = joinElectedMaster(masterNode);
@ -20,6 +20,7 @@
package org.elasticsearch.env;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.threadpool.ThreadPool;
@ -27,13 +28,16 @@ import org.elasticsearch.common.inject.AbstractModule;
public class EnvironmentModule extends AbstractModule {
private final Environment environment;
private final ThreadPool threadPool;
public EnvironmentModule(Environment environment) {
public EnvironmentModule(Environment environment, ThreadPool threadPool) {
this.threadPool = threadPool;
this.environment = environment;
protected void configure() {
@ -108,7 +108,11 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
if (restController.canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, threadContext);
@ -29,6 +29,7 @@ public interface IndexNumericFieldData extends IndexFieldData<AtomicNumericField
@ -19,6 +19,7 @@
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.document.HalfFloatPoint;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.LeafReader;
@ -61,6 +62,7 @@ public class SortedNumericDVIndexFieldData extends DocValuesIndexFieldData imple
public org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource comparatorSource(Object missingValue, MultiValueMode sortMode, Nested nested) {
switch (numericType) {
case FLOAT:
return new FloatValuesComparatorSource(this, missingValue, sortMode, nested);
case DOUBLE:
@ -87,6 +89,8 @@ public class SortedNumericDVIndexFieldData extends DocValuesIndexFieldData imple
final String field = fieldName;
switch (numericType) {
return new SortedNumericHalfFloatFieldData(reader, field);
case FLOAT:
return new SortedNumericFloatFieldData(reader, field);
case DOUBLE:
@ -134,6 +138,95 @@ public class SortedNumericDVIndexFieldData extends DocValuesIndexFieldData imple
* FieldData implementation for 16-bit float values.
* <p>
* Order of values within a document is consistent with
* {@link Float#compareTo(Float)}, hence the following reversible
* transformation is applied at both index and search:
* {@code bits ^ (bits >> 15) & 0x7fff}
* <p>
* Although the API is multi-valued, most codecs in Lucene specialize
* for the case where documents have at most one value. In this case
* {@link FieldData#unwrapSingleton(SortedNumericDoubleValues)} will return
* the underlying single-valued NumericDoubleValues representation, and
* {@link FieldData#unwrapSingletonBits(SortedNumericDoubleValues)} will return
* a Bits matching documents that have a real value (as opposed to missing).
static final class SortedNumericHalfFloatFieldData extends AtomicDoubleFieldData {
final LeafReader reader;
final String field;
SortedNumericHalfFloatFieldData(LeafReader reader, String field) {
this.reader = reader;
this.field = field;
public SortedNumericDoubleValues getDoubleValues() {
try {
SortedNumericDocValues raw = DocValues.getSortedNumeric(reader, field);
NumericDocValues single = DocValues.unwrapSingleton(raw);
if (single != null) {
return FieldData.singleton(new SingleHalfFloatValues(single), DocValues.unwrapSingletonBits(raw));
} else {
return new MultiHalfFloatValues(raw);
} catch (IOException e) {
throw new IllegalStateException("Cannot load doc values", e);
public Collection<Accountable> getChildResources() {
return Collections.emptyList();
* Wraps a NumericDocValues and exposes a single 16-bit float per document.
static final class SingleHalfFloatValues extends NumericDoubleValues {
final NumericDocValues in;
SingleHalfFloatValues(NumericDocValues in) {
this.in = in;
public double get(int docID) {
return HalfFloatPoint.sortableShortToHalfFloat((short) in.get(docID));
* Wraps a SortedNumericDocValues and exposes multiple 16-bit floats per document.
static final class MultiHalfFloatValues extends SortedNumericDoubleValues {
final SortedNumericDocValues in;
MultiHalfFloatValues(SortedNumericDocValues in) {
this.in = in;
public void setDocument(int doc) {
public double valueAt(int index) {
return HalfFloatPoint.sortableShortToHalfFloat((short) in.valueAt(index));
public int count() {
return in.count();
* FieldData implementation for 32-bit float values.
* <p>
@ -22,6 +22,7 @@ package org.elasticsearch.index.mapper.core;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.HalfFloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
@ -180,6 +181,86 @@ public class NumberFieldMapper extends FieldMapper implements AllFieldMapper.Inc
public enum NumberType {
HALF_FLOAT("half_float", NumericType.HALF_FLOAT) {
Float parse(Object value) {
return (Float) FLOAT.parse(value);
Float parse(XContentParser parser, boolean coerce) throws IOException {
return parser.floatValue(coerce);
Query termQuery(String field, Object value) {
float v = parse(value);
return HalfFloatPoint.newExactQuery(field, v);
Query termsQuery(String field, List<Object> values) {
float[] v = new float[values.size()];
for (int i = 0; i < values.size(); ++i) {
v[i] = parse(values.get(i));
return HalfFloatPoint.newSetQuery(field, v);
Query rangeQuery(String field, Object lowerTerm, Object upperTerm,
boolean includeLower, boolean includeUpper) {
float l = Float.NEGATIVE_INFINITY;
float u = Float.POSITIVE_INFINITY;
if (lowerTerm != null) {
l = parse(lowerTerm);
if (includeLower) {
l = Math.nextDown(l);
l = HalfFloatPoint.nextUp(l);
if (upperTerm != null) {
u = parse(upperTerm);
if (includeUpper) {
u = Math.nextUp(u);
u = HalfFloatPoint.nextDown(u);
return HalfFloatPoint.newRangeQuery(field, l, u);
public List<Field> createFields(String name, Number value,
boolean indexed, boolean docValued, boolean stored) {
List<Field> fields = new ArrayList<>();
if (indexed) {
fields.add(new HalfFloatPoint(name, value.floatValue()));
if (docValued) {
fields.add(new SortedNumericDocValuesField(name,
if (stored) {
fields.add(new StoredField(name, value.floatValue()));
return fields;
FieldStats.Double stats(IndexReader reader, String fieldName,
boolean isSearchable, boolean isAggregatable) throws IOException {
long size = XPointValues.size(reader, fieldName);
if (size == 0) {
return null;
int docCount = XPointValues.getDocCount(reader, fieldName);
byte[] min = XPointValues.getMinPackedValue(reader, fieldName);
byte[] max = XPointValues.getMaxPackedValue(reader, fieldName);
return new FieldStats.Double(reader.maxDoc(),docCount, -1L, size,
isSearchable, isAggregatable,
HalfFloatPoint.decodeDimension(min, 0), HalfFloatPoint.decodeDimension(max, 0));
FLOAT("float", NumericType.FLOAT) {
Float parse(Object value) {
@ -38,11 +38,4 @@ public interface QueryParser<QB extends QueryBuilder> {
* @return the new QueryBuilder
Optional<QB> fromXContent(QueryParseContext parseContext) throws IOException;
* @return an empty {@link QueryBuilder} instance for this parser that can be used for deserialization
default QB getBuilderPrototype() { // TODO remove this when nothing implements it
throw new UnsupportedOperationException();
@ -1121,12 +1121,16 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService>
* @param shard the shard to use with the cache key
* @param searcher searcher to use to lookup the field stats
* @param field the actual field
* @param useCache should this request use the cache?
public FieldStats<?> getFieldStats(IndexShard shard, Engine.Searcher searcher, String field) throws Exception {
public FieldStats<?> getFieldStats(IndexShard shard, Engine.Searcher searcher, String field, boolean useCache) throws Exception {
MappedFieldType fieldType = shard.mapperService().fullName(field);
if (fieldType == null) {
return null;
if (useCache == false) {
return fieldType.stats(searcher.reader());
BytesReference cacheKey = new BytesArray("fieldstats:" + field);
BytesReference statsRef = cacheShardLevelResult(shard, searcher.getDirectoryReader(), cacheKey, out -> {
@ -54,16 +54,6 @@ import java.util.Map;
* The {@link org.elasticsearch.indices.analysis.AnalysisModule.AnalysisProvider} is only a functional interface that allows to register factory constructors directly like the plugin example below:
* <pre>
* public class MyAnalysisPlugin extends Plugin {
* \@Override
* public String name() {
* return "analysis-my-plugin";
* }
* \@Override
* public String description() {
* return "my very fast and efficient analyzer";
* }
* public void onModule(AnalysisModule module) {
* module.registerAnalyzer("my-analyzer-name", MyAnalyzer::new);
* }
@ -1,48 +0,0 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.indices.breaker;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
public class CircuitBreakerModule extends AbstractModule {
public static final String TYPE_KEY = "indices.breaker.type";
private final Settings settings;
public CircuitBreakerModule(Settings settings) {
this.settings = settings;
protected void configure() {
String type = settings.get(TYPE_KEY);
Class<? extends CircuitBreakerService> impl;
if (type == null || type.equals("hierarchy")) {
impl = HierarchyCircuitBreakerService.class;
} else if (type.equals("none")) {
impl = NoneCircuitBreakerService.class;
} else {
throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
@ -64,4 +64,5 @@ public abstract class CircuitBreakerService extends AbstractLifecycleComponent<C
protected void doClose() {
@ -23,7 +23,6 @@ import org.elasticsearch.common.breaker.ChildMemoryCircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -79,7 +78,6 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
// Tripped count for when redistribution was attempted but wasn't successful
private final AtomicLong parentTripCount = new AtomicLong(0);
public HierarchyCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
this.fielddataSettings = new BreakerSettings(CircuitBreaker.FIELDDATA,
@ -19,6 +19,7 @@
package org.elasticsearch.ingest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestInfo;
import org.elasticsearch.ingest.core.Processor;
@ -55,8 +56,8 @@ public class IngestService implements Closeable {
return pipelineExecutionService;
public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService);
public void buildProcessorsFactoryRegistry(ScriptService scriptService, ClusterService clusterService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService, clusterService);
public IngestInfo info() {
@ -66,9 +66,8 @@ public class PipelineStore extends AbstractComponent implements Closeable, Clust
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService) {
TemplateService templateService = new InternalTemplateService(scriptService);
this.processorRegistry = processorsRegistryBuilder.build(templateService);
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService, ClusterService clusterService) {
this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService);
@ -20,9 +20,10 @@
package org.elasticsearch.ingest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.ProcessorInfo;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.ScriptService;
import java.io.Closeable;
import java.io.IOException;
@ -31,21 +32,39 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
public final class ProcessorsRegistry implements Closeable {
private final Map<String, Processor.Factory> processorFactories;
private final TemplateService templateService;
private final ScriptService scriptService;
private final ClusterService clusterService;
private ProcessorsRegistry(TemplateService templateService,
Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers) {
private ProcessorsRegistry(ScriptService scriptService, ClusterService clusterService,
Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers) {
this.templateService = new InternalTemplateService(scriptService);
this.scriptService = scriptService;
this.clusterService = clusterService;
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(templateService, this));
for (Map.Entry<String, Function<ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(this));
this.processorFactories = Collections.unmodifiableMap(processorFactories);
public TemplateService getTemplateService() {
return templateService;
public ScriptService getScriptService() {
return scriptService;
public ClusterService getClusterService() {
return clusterService;
public Processor.Factory getProcessorFactory(String name) {
return processorFactories.get(name);
@ -68,20 +87,20 @@ public final class ProcessorsRegistry implements Closeable {
public static final class Builder {
private final Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();
private final Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();
* Adds a processor factory under a specific name.
public void registerProcessor(String name, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
public void registerProcessor(String name, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
Function<ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
public ProcessorsRegistry build(TemplateService templateService) {
return new ProcessorsRegistry(templateService, providers);
public ProcessorsRegistry build(ScriptService scriptService, ClusterService clusterService) {
return new ProcessorsRegistry(scriptService, clusterService, providers);
@ -78,7 +78,9 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.ttl.IndicesTTLService;
@ -89,6 +91,7 @@ import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptModule;
@ -100,7 +103,6 @@ import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskPersistenceService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.tribe.TribeModule;
import org.elasticsearch.tribe.TribeService;
@ -132,6 +134,7 @@ import java.util.function.Function;
public class Node implements Closeable {
public static final Setting<Boolean> WRITE_PORTS_FIELD_SETTING =
Setting.boolSetting("node.portsfile", false, Property.NodeScope);
public static final Setting<Boolean> NODE_DATA_SETTING = Setting.boolSetting("node.data", true, Property.NodeScope);
@ -145,6 +148,16 @@ public class Node implements Closeable {
Setting.boolSetting("node.ingest", true, Property.NodeScope);
public static final Setting<String> NODE_NAME_SETTING = Setting.simpleString("node.name", Property.NodeScope);
public static final Setting<Settings> NODE_ATTRIBUTES = Setting.groupSetting("node.attr.", Property.NodeScope);
public static final Setting<String> BREAKER_TYPE_KEY = new Setting<>("indices.breaker.type", "hierarchy", (s) -> {
switch (s) {
case "hierarchy":
case "none":
return s;
throw new IllegalArgumentException("indices.breaker.type must be one of [hierarchy, none] but was: " + s);
}, Setting.Property.NodeScope);
private static final String CLIENT_TYPE = "node";
@ -167,8 +180,9 @@ public class Node implements Closeable {
protected Node(Environment tmpEnv, Version version, Collection<Class<? extends Plugin>> classpathPlugins) {
Settings tmpSettings = Settings.builder().put(tmpEnv.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
tmpSettings = TribeService.processSettings(tmpSettings);
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
tmpSettings = TribeService.processSettings(tmpSettings);
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(tmpSettings));
final String displayVersion = version + (Build.CURRENT.isSnapshot() ? "-SNAPSHOT" : "");
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
@ -202,41 +216,48 @@ public class Node implements Closeable {
this.pluginsService = new PluginsService(tmpSettings, tmpEnv.modulesFile(), tmpEnv.pluginsFile(), classpathPlugins);
this.settings = pluginsService.updatedSettings();
// create the environment based on the finalized (processed) view of the settings
this.environment = new Environment(this.settings());
final NodeEnvironment nodeEnvironment;
try {
nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to created node environment", ex);
final NetworkService networkService = new NetworkService(settings);
this.environment = new Environment(this.settings);
final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final List<Setting<?>> additionalSettings = new ArrayList<>();
final List<String> additionalSettingsFilter = new ArrayList<>();
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
final ScriptModule scriptModule = ScriptModule.create(settings, pluginsService.filterPlugins(ScriptPlugin.class));
// this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool
// so we might be late here already
final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
final NodeEnvironment nodeEnvironment;
try {
nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to created node environment", ex);
final NetworkService networkService = new NetworkService(settings);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CircuitBreakerModule(settings));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
modules.add(new PluginsModule(pluginsService));
SettingsModule settingsModule = new SettingsModule(this.settings);
modules.add(new EnvironmentModule(environment));
modules.add(new EnvironmentModule(environment, threadPool));
modules.add(new NodeModule(this, monitorService));
modules.add(new NetworkModule(networkService, settings, false, namedWriteableRegistry));
ScriptModule scriptModule = new ScriptModule();
modules.add(new NodeEnvironmentModule(nodeEnvironment));
modules.add(new ClusterNameModule(this.settings));
final ThreadPoolModule threadPoolModule = new ThreadPoolModule(threadPool);
modules.add(new DiscoveryModule(this.settings));
modules.add(new ClusterModule(this.settings));
modules.add(new IndicesModule());
@ -248,23 +269,20 @@ public class Node implements Closeable {
modules.add(new RepositoriesModule());
modules.add(new TribeModule());
modules.add(new AnalysisModule(environment));
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
modules.add(b -> b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService));
injector = modules.createInjector();
client = injector.getInstance(Client.class);
success = true;
} catch (IOException ex) {
throw new ElasticsearchException("failed to bind service", ex);
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
@ -590,4 +608,19 @@ public class Node implements Closeable {
throw new RuntimeException("Failed to rename ports file", e);
* Creates a new {@link CircuitBreakerService} based on the settings provided.
public static CircuitBreakerService createCircuitBreakerService(Settings settings, ClusterSettings clusterSettings) {
String type = BREAKER_TYPE_KEY.get(settings);
if (type.equals("hierarchy")) {
return new HierarchyCircuitBreakerService(settings, clusterSettings);
} else if (type.equals("none")) {
return new NoneCircuitBreakerService();
} else {
throw new IllegalArgumentException("Unknown circuit breaker type [" + type + "]");
@ -23,11 +23,10 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -71,7 +70,7 @@ public class NodeModule extends AbstractModule {
* Adds a processor factory under a specific type name.
public void registerProcessor(String type, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
@ -62,6 +62,7 @@ public class NodeService extends AbstractComponent implements Closeable {
private final CircuitBreakerService circuitBreakerService;
private final IngestService ingestService;
private final SettingsFilter settingsFilter;
private ClusterService clusterService;
private ScriptService scriptService;
@ -87,6 +88,7 @@ public class NodeService extends AbstractComponent implements Closeable {
this.version = version;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.clusterService = clusterService;
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
this.settingsFilter = settingsFilter;
@ -97,7 +99,7 @@ public class NodeService extends AbstractComponent implements Closeable {
@Inject(optional = true)
public void setScriptService(ScriptService scriptService) {
this.scriptService = scriptService;
this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService);
public void setHttpServer(@Nullable HttpServer httpServer) {
@ -237,6 +237,10 @@ class InstallPluginCommand extends SettingCommand {
// fall back to plain old URL
if (pluginId.contains(":/") == false) {
// definitely not a valid url, so assume it is a plugin name
throw new UserError(ExitCodes.USAGE, "Unknown plugin " + pluginId);
terminal.println("-> Downloading " + URLDecoder.decode(pluginId, "UTF-8"));
return downloadZip(terminal, pluginId, tmpDir);
@ -21,10 +21,12 @@ package org.elasticsearch.plugins;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Collections;
@ -38,16 +40,6 @@ import java.util.List;
public abstract class Plugin {
* The name of the plugin.
public abstract String name();
* The description of the plugin.
public abstract String description();
* Node level modules.
@ -76,6 +68,16 @@ public abstract class Plugin {
public void onIndexModule(IndexModule indexModule) {}
* Returns a list of additional {@link Setting} definitions for this plugin.
public List<Setting<?>> getSettings() { return Collections.emptyList(); }
* Returns a list of additional settings filter for this plugin
public List<String> getSettingsFilter() { return Collections.emptyList(); }
* Old-style guice index level extension point.
@ -84,6 +86,23 @@ public abstract class Plugin {
public final void onModule(IndexModule indexModule) {}
* Old-style guice settings extension point.
* @deprecated use #getSettings and #getSettingsFilter instead
public final void onModule(SettingsModule settingsModule) {}
* Old-style guice scripting extension point.
* @deprecated implement {@link ScriptPlugin} instead
public final void onModule(ScriptModule module) {}
* Provides the list of this plugin's custom thread pools, empty if
* none.
@ -39,7 +39,11 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.script.NativeScriptFactory;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import java.io.IOException;
@ -60,6 +64,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;
@ -78,6 +83,14 @@ public class PluginsService extends AbstractComponent {
private final Map<Plugin, List<OnModuleReference>> onModuleReferences;
public List<Setting<?>> getPluginSettings() {
return plugins.stream().flatMap(p -> p.v2().getSettings().stream()).collect(Collectors.toList());
public List<String> getPluginSettingsFilter() {
return plugins.stream().flatMap(p -> p.v2().getSettingsFilter().stream()).collect(Collectors.toList());
static class OnModuleReference {
public final Class<? extends Module> moduleClass;
public final Method onModuleMethod;
@ -104,7 +117,7 @@ public class PluginsService extends AbstractComponent {
// first we load plugins that are on the classpath. this is for tests and transport clients
for (Class<? extends Plugin> pluginClass : classpathPlugins) {
Plugin plugin = loadPlugin(pluginClass, settings);
PluginInfo pluginInfo = new PluginInfo(plugin.name(), plugin.description(), "NA", pluginClass.getName());
PluginInfo pluginInfo = new PluginInfo(pluginClass.getName(), "classpath plugin", "NA", pluginClass.getName());
if (logger.isTraceEnabled()) {
logger.trace("plugin loaded from classpath [{}]", pluginInfo);
@ -188,12 +201,12 @@ public class PluginsService extends AbstractComponent {
if (method.getParameterTypes().length == 0 || method.getParameterTypes().length > 1) {
logger.warn("Plugin: {} implementing onModule with no parameters or more than one parameter", plugin.name());
logger.warn("Plugin: {} implementing onModule with no parameters or more than one parameter", pluginEntry.v1().getName());
Class moduleClass = method.getParameterTypes()[0];
if (!Module.class.isAssignableFrom(moduleClass)) {
logger.warn("Plugin: {} implementing onModule by the type is not of Module type {}", plugin.name(), moduleClass);
logger.warn("Plugin: {} implementing onModule by the type is not of Module type {}", pluginEntry.v1().getName(), moduleClass);
list.add(new OnModuleReference(moduleClass, method));
@ -225,10 +238,10 @@ public class PluginsService extends AbstractComponent {
try {
reference.onModuleMethod.invoke(plugin.v2(), module);
} catch (IllegalAccessException | InvocationTargetException e) {
logger.warn("plugin {}, failed to invoke custom onModule method", e, plugin.v2().name());
logger.warn("plugin {}, failed to invoke custom onModule method", e, plugin.v1().getName());
throw new ElasticsearchException("failed to invoke onModule", e);
} catch (Exception e) {
logger.warn("plugin {}, failed to invoke custom onModule method", e, plugin.v2().name());
logger.warn("plugin {}, failed to invoke custom onModule method", e, plugin.v1().getName());
throw e;
@ -283,6 +296,7 @@ public class PluginsService extends AbstractComponent {
* Get information about plugins and modules
@ -440,4 +454,9 @@ public class PluginsService extends AbstractComponent {
throw new ElasticsearchException("Failed to load plugin class [" + pluginClass.getName() + "]", e);
public <T> List<T> filterPlugins(Class<T> type) {
return plugins.stream().filter(x -> type.isAssignableFrom(x.v2().getClass()))
.map(p -> ((T)p.v2())).collect(Collectors.toList());
@ -0,0 +1,55 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.plugins;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.NativeScriptFactory;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngineService;
import java.util.Collections;
import java.util.List;
* An additional extension point to {@link Plugin}. Plugins extending the scripting functionality must implement this inteface
* to provide access to script engines or script factories.
public interface ScriptPlugin {
* Returns a {@link ScriptEngineService} instance or <code>null</code> if this plugin doesn't add a new script engine
default ScriptEngineService getScriptEngineService(Settings settings) {
return null;
* Returns a list of {@link NativeScriptFactory} instances.
default List<NativeScriptFactory> getNativeScripts() {
return Collections.emptyList();
* Returns a {@link ScriptContext.Plugin} instance or <code>null</code> if this plugin doesn't add a new script context plugin
default ScriptContext.Plugin getCustomScriptContexts() {
return null;
@ -113,30 +113,14 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
* Registers a rest handler to be execute when the provided method and path match the request.
* Registers a rest handler to be executed when the provided method and path match the request.
public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
switch (method) {
case GET:
getHandlers.insert(path, handler);
case DELETE:
deleteHandlers.insert(path, handler);
case POST:
postHandlers.insert(path, handler);
case PUT:
putHandlers.insert(path, handler);
optionsHandlers.insert(path, handler);
case HEAD:
headHandlers.insert(path, handler);
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
PathTrie<RestHandler> handlers = getHandlersForMethod(method);
if (handlers != null) {
handlers.insert(path, handler);
} else {
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
@ -159,6 +143,15 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
return new ControllerFilterChain(executionFilter);
* @param request The current request. Must not be null.
* @return true iff the circuit breaker limit must be enforced for processing this request.
public boolean canTripCircuitBreaker(RestRequest request) {
RestHandler handler = getHandler(request);
return (handler != null) ? handler.canTripCircuitBreaker() : true;
public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception {
if (!checkRequestParameters(request, channel)) {
@ -226,19 +219,27 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
private RestHandler getHandler(RestRequest request) {
String path = getPath(request);
RestRequest.Method method = request.method();
PathTrie<RestHandler> handlers = getHandlersForMethod(request.method());
if (handlers != null) {
return handlers.retrieve(path, request.params());
} else {
return null;
private PathTrie<RestHandler> getHandlersForMethod(RestRequest.Method method) {
if (method == RestRequest.Method.GET) {
return getHandlers.retrieve(path, request.params());
return getHandlers;
} else if (method == RestRequest.Method.POST) {
return postHandlers.retrieve(path, request.params());
return postHandlers;
} else if (method == RestRequest.Method.PUT) {
return putHandlers.retrieve(path, request.params());
return putHandlers;
} else if (method == RestRequest.Method.DELETE) {
return deleteHandlers.retrieve(path, request.params());
return deleteHandlers;
} else if (method == RestRequest.Method.HEAD) {
return headHandlers.retrieve(path, request.params());
return headHandlers;
} else if (method == RestRequest.Method.OPTIONS) {
return optionsHandlers.retrieve(path, request.params());
return optionsHandlers;
} else {
return null;
@ -25,4 +25,8 @@ package org.elasticsearch.rest;
public interface RestHandler {
void handleRequest(RestRequest request, RestChannel channel) throws Exception;
default boolean canTripCircuitBreaker() {
return true;
@ -64,4 +64,9 @@ public class RestClusterHealthAction extends BaseRestHandler {
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<ClusterHealthResponse>(channel));
public boolean canTripCircuitBreaker() {
return false;
@ -78,4 +78,9 @@ public class RestNodesHotThreadsAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
@ -103,4 +103,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
public boolean canTripCircuitBreaker() {
return false;
@ -115,4 +115,9 @@ public class RestNodesStatsAction extends BaseRestHandler {
client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
public boolean canTripCircuitBreaker() {
return false;
@ -64,4 +64,9 @@ public class RestCancelTasksAction extends BaseRestHandler {
ActionListener<CancelTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
public boolean canTripCircuitBreaker() {
return false;
@ -91,4 +91,9 @@ public class RestListTasksAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
@ -72,6 +72,11 @@ public class RestClusterGetSettingsAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
private XContentBuilder renderResponse(ClusterState state, boolean renderDefaults, XContentBuilder builder, ToXContent.Params params) throws IOException {
@ -76,4 +76,9 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
@ -96,6 +96,11 @@ public class RestClusterStateAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
static final class Fields {
static final String CLUSTER_NAME = "cluster_name";
@ -47,4 +47,9 @@ public class RestClusterStatsAction extends BaseRestHandler {
client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
public boolean canTripCircuitBreaker() {
return false;
@ -52,7 +52,7 @@ public class RestShrinkIndexAction extends BaseRestHandler {
ShrinkRequest shrinkIndexRequest = new ShrinkRequest(request.param("target"), request.param("index"));
if (request.hasContent()) {
shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
@ -75,6 +75,11 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
public static ClearIndicesCacheRequest fromRequest(final RestRequest request, ClearIndicesCacheRequest clearIndicesCacheRequest, ParseFieldMatcher parseFieldMatcher) {
for (Map.Entry<String, String> entry : request.params().entrySet()) {
@ -117,4 +117,9 @@ public class RestIndicesStatsAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
@ -275,7 +275,7 @@ public class RestActions {
public RestResponse buildResponse(NodesResponse response, XContentBuilder builder) throws Exception {
return RestActions.nodesResponse(builder, ToXContent.EMPTY_PARAMS, response);
return RestActions.nodesResponse(builder, channel.request(), response);
@ -22,13 +22,10 @@ package org.elasticsearch.script;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.lookup.SearchLookup;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
@ -42,7 +39,6 @@ public class NativeScriptEngineService extends AbstractComponent implements Scri
private final Map<String, NativeScriptFactory> scripts;
public NativeScriptEngineService(Settings settings, Map<String, NativeScriptFactory> scripts) {
this.scripts = unmodifiableMap(scripts);
@ -98,4 +94,9 @@ public class NativeScriptEngineService extends AbstractComponent implements Scri
public void scriptRemoved(CompiledScript script) {
// Nothing to do here
public boolean isInlineScriptEnabled() {
return true;
@ -43,8 +43,13 @@ public interface NativeScriptFactory {
* Indicates if document scores may be needed by the produced scripts.
* @return {@code true} if scores are needed.
boolean needsScores();
* Returns the name of the script factory
String getName();
@ -19,6 +19,8 @@
package org.elasticsearch.script;
import org.elasticsearch.common.settings.Settings;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -30,7 +32,8 @@ import static java.util.Collections.unmodifiableSet;
* Registry for operations that use scripts as part of their execution. Can be standard operations of custom defined ones (via plugin).
* Allows plugins to register custom operations that they use scripts for, via {@link ScriptModule#registerScriptContext(org.elasticsearch.script.ScriptContext.Plugin)}.
* Allows plugins to register custom operations that they use scripts for,
* via {@link org.elasticsearch.plugins.ScriptPlugin}
* Scripts can be enabled/disabled via fine-grained settings for each single registered operation.
public final class ScriptContextRegistry {
@ -29,29 +29,29 @@ import org.elasticsearch.common.Strings;
public class ScriptEngineRegistry {
private final Map<Class<? extends ScriptEngineService>, String> registeredScriptEngineServices;
private final Map<String, Class<? extends ScriptEngineService>> registeredLanguages;
private final Map<String, ScriptEngineService> registeredLanguages;
private final Map<String, Boolean> defaultInlineScriptEnableds;
public ScriptEngineRegistry(Iterable<ScriptEngineRegistration> registrations) {
public ScriptEngineRegistry(Iterable<ScriptEngineService> registrations) {
Map<Class<? extends ScriptEngineService>, String> registeredScriptEngineServices = new HashMap<>();
Map<String, Class<? extends ScriptEngineService>> registeredLanguages = new HashMap<>();
Map<String, ScriptEngineService> registeredLanguages = new HashMap<>();
Map<String, Boolean> inlineScriptEnableds = new HashMap<>();
for (ScriptEngineRegistration registration : registrations) {
String oldLanguage = registeredScriptEngineServices.putIfAbsent(registration.getScriptEngineService(),
for (ScriptEngineService service : registrations) {
String oldLanguage = registeredScriptEngineServices.putIfAbsent(service.getClass(),
if (oldLanguage != null) {
throw new IllegalArgumentException("script engine service [" + registration.getScriptEngineService() +
throw new IllegalArgumentException("script engine service [" + service.getClass() +
"] already registered for language [" + oldLanguage + "]");
String language = registration.getScriptEngineLanguage();
Class<? extends ScriptEngineService> scriptEngineServiceClazz =
registeredLanguages.putIfAbsent(language, registration.getScriptEngineService());
if (scriptEngineServiceClazz != null) {
String language = service.getType();
ScriptEngineService scriptEngineService =
registeredLanguages.putIfAbsent(language, service);
if (scriptEngineService != null) {
throw new IllegalArgumentException("scripting language [" + language + "] already registered for script engine service [" +
scriptEngineServiceClazz.getCanonicalName() + "]");
scriptEngineService.getClass().getCanonicalName() + "]");
inlineScriptEnableds.put(language, registration.getDefaultInlineScriptEnabled());
inlineScriptEnableds.put(language, service.isInlineScriptEnabled());
this.registeredScriptEngineServices = Collections.unmodifiableMap(registeredScriptEngineServices);
@ -68,52 +68,12 @@ public class ScriptEngineRegistry {
return registeredScriptEngineServices.get(scriptEngineService);
Map<String, Class<? extends ScriptEngineService>> getRegisteredLanguages() {
public Map<String, ScriptEngineService> getRegisteredLanguages() {
return registeredLanguages;
Map<String, Boolean> getDefaultInlineScriptEnableds() {
public Map<String, Boolean> getDefaultInlineScriptEnableds() {
return this.defaultInlineScriptEnableds;
public static class ScriptEngineRegistration {
private final Class<? extends ScriptEngineService> scriptEngineService;
private final String scriptEngineLanguage;
private final boolean defaultInlineScriptEnabled;
* Register a script engine service with the default of inline scripts disabled
public ScriptEngineRegistration(Class<? extends ScriptEngineService> scriptEngineService, String scriptEngineLanguage) {
this(scriptEngineService, scriptEngineLanguage, false);
* Register a script engine service with the given default mode for inline scripts
public ScriptEngineRegistration(Class<? extends ScriptEngineService> scriptEngineService, String scriptEngineLanguage,
boolean defaultInlineScriptEnabled) {
if (Strings.hasText(scriptEngineLanguage) == false) {
throw new IllegalArgumentException("languages for script engine service [" +
scriptEngineService.getCanonicalName() + "] should be a non-empty string");
this.scriptEngineService = scriptEngineService;
this.scriptEngineLanguage = scriptEngineLanguage;
this.defaultInlineScriptEnabled = defaultInlineScriptEnabled;
Class<? extends ScriptEngineService> getScriptEngineService() {
return scriptEngineService;
String getScriptEngineLanguage() {
return scriptEngineLanguage;
boolean getDefaultInlineScriptEnabled() {
return defaultInlineScriptEnabled;
@ -37,7 +37,7 @@ public interface ScriptEngineService extends Closeable {
* Compiles a script.
* @param scriptName name of the script. {@code null} if it is anonymous (inline).
* @param scriptName name of the script. {@code null} if it is anonymous (inline).
* For a file script, its the file name (with extension).
* For a stored script, its the identifier.
* @param scriptSource actual source of the script
@ -55,4 +55,11 @@ public interface ScriptEngineService extends Closeable {
* The passed script may be null if it has already been garbage collected.
* */
void scriptRemoved(@Nullable CompiledScript script);
* Returns <code>true</code> if this scripting engine can safely accept inline scripts by default. The default is <code>false</code>
default boolean isInlineScriptEnabled() {
return false;
@ -20,16 +20,18 @@
package org.elasticsearch.script;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ScriptPlugin;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
* An {@link org.elasticsearch.common.inject.Module} which manages {@link ScriptEngineService}s, as well
@ -37,73 +39,52 @@ import java.util.Objects;
public class ScriptModule extends AbstractModule {
private final List<ScriptEngineRegistry.ScriptEngineRegistration> scriptEngineRegistrations = new ArrayList<>();
protected final ScriptContextRegistry scriptContextRegistry;
protected final ScriptEngineRegistry scriptEngineRegistry;
protected final ScriptSettings scriptSettings;
scriptEngineRegistrations.add(new ScriptEngineRegistry.ScriptEngineRegistration(NativeScriptEngineService.class,
NativeScriptEngineService.NAME, true));
public ScriptModule(ScriptEngineService... services) {
this(Arrays.asList(services), Collections.emptyList());
private final Map<String, Class<? extends NativeScriptFactory>> scripts = new HashMap<>();
private final List<ScriptContext.Plugin> customScriptContexts = new ArrayList<>();
public void addScriptEngine(ScriptEngineRegistry.ScriptEngineRegistration scriptEngineRegistration) {
public void registerScript(String name, Class<? extends NativeScriptFactory> script) {
scripts.put(name, script);
* Registers a custom script context that can be used by plugins to categorize the different operations that they use scripts for.
* Fine-grained settings allow to enable/disable scripts per context.
public void registerScriptContext(ScriptContext.Plugin scriptContext) {
public ScriptModule(List<ScriptEngineService> scriptEngineServices,
List<ScriptContext.Plugin> customScriptContexts) {
this.scriptContextRegistry = new ScriptContextRegistry(customScriptContexts);
this.scriptEngineRegistry = new ScriptEngineRegistry(scriptEngineServices);
this.scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
* This method is called after all modules have been processed but before we actually validate all settings. This allows the
* script extensions to add all their settings.
public void prepareSettings(SettingsModule settingsModule) {
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customScriptContexts);
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(scriptEngineRegistrations);
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
public List<Setting<?>> getSettings() {
ArrayList<Setting<?>> settings = new ArrayList<>();
return settings;
protected void configure() {
MapBinder<String, NativeScriptFactory> scriptsBinder
= MapBinder.newMapBinder(binder(), String.class, NativeScriptFactory.class);
for (Map.Entry<String, Class<? extends NativeScriptFactory>> entry : scripts.entrySet()) {
Multibinder<ScriptEngineService> multibinder = Multibinder.newSetBinder(binder(), ScriptEngineService.class);
for (ScriptEngineRegistry.ScriptEngineRegistration scriptEngineRegistration : scriptEngineRegistrations) {
ScriptContextRegistry scriptContextRegistry = new ScriptContextRegistry(customScriptContexts);
ScriptEngineRegistry scriptEngineRegistry = new ScriptEngineRegistry(scriptEngineRegistrations);
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
public static ScriptModule create(Settings settings, List<ScriptPlugin> scriptPlugins) {
Map<String, NativeScriptFactory> factoryMap = scriptPlugins.stream().flatMap(x -> x.getNativeScripts().stream())
.collect(Collectors.toMap(NativeScriptFactory::getName, Function.identity()));
NativeScriptEngineService nativeScriptEngineService = new NativeScriptEngineService(settings, factoryMap);
List<ScriptEngineService> scriptEngineServices = scriptPlugins.stream().map(x -> x.getScriptEngineService(settings))
return new ScriptModule(scriptEngineServices, scriptPlugins.stream().map(x -> x.getCustomScriptContexts())
@ -66,6 +66,7 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
@ -91,7 +92,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
private final String defaultLang;
private final Set<ScriptEngineService> scriptEngines;
private final Collection<ScriptEngineService> scriptEngines;
private final Map<String, ScriptEngineService> scriptEnginesByLang;
private final Map<String, ScriptEngineService> scriptEnginesByExt;
@ -132,7 +133,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
public static final ParseField SCRIPT_INLINE = new ParseField("script");
public ScriptService(Settings settings, Environment env, Set<ScriptEngineService> scriptEngines,
public ScriptService(Settings settings, Environment env,
ResourceWatcherService resourceWatcherService, ScriptEngineRegistry scriptEngineRegistry,
ScriptContextRegistry scriptContextRegistry, ScriptSettings scriptSettings) throws IOException {
@ -145,7 +146,7 @@ public class ScriptService extends AbstractComponent implements Closeable {
"Dynamic scripts can be enabled for all languages and all operations by replacing `script.disable_dynamic: false` with `script.inline: true` and `script.stored: true` in elasticsearch.yml");
this.scriptEngines = scriptEngines;
this.scriptEngines = scriptEngineRegistry.getRegisteredLanguages().values();
this.scriptContextRegistry = scriptContextRegistry;
int cacheMaxSize = SCRIPT_CACHE_SIZE_SETTING.get(settings);
@ -166,16 +166,16 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
if (needsScores) {
if (docIt.docID() < doc) {
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
leafCollector.collect(doc, rebasedBucket);
@ -56,7 +56,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
private Terms.Order order = Terms.Order.compound(Terms.Order.count(false), Terms.Order.term(true));
private IncludeExclude includeExclude = null;
private String executionHint = null;
private SubAggCollectionMode collectMode = SubAggCollectionMode.DEPTH_FIRST;
private SubAggCollectionMode collectMode = null;
private TermsAggregator.BucketCountThresholds bucketCountThresholds = new TermsAggregator.BucketCountThresholds(
private boolean showTermDocCountError = false;
@ -71,7 +71,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
public TermsAggregationBuilder(StreamInput in) throws IOException {
super(in, StringTerms.TYPE, ValuesSourceType.ANY);
bucketCountThresholds = new BucketCountThresholds(in);
collectMode = SubAggCollectionMode.readFromStream(in);
collectMode = in.readOptionalWriteable(SubAggCollectionMode::readFromStream);
executionHint = in.readOptionalString();
includeExclude = in.readOptionalWriteable(IncludeExclude::new);
order = InternalOrder.Streams.readOrder(in);
@ -86,7 +86,7 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
protected void innerWriteTo(StreamOutput out) throws IOException {
InternalOrder.Streams.writeOrder(order, out);
@ -266,7 +266,9 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
order.toXContent(builder, params);
builder.field(SubAggCollectionMode.KEY.getPreferredName(), collectMode.parseField().getPreferredName());
if (collectMode != null) {
builder.field(SubAggCollectionMode.KEY.getPreferredName(), collectMode.parseField().getPreferredName());
if (includeExclude != null) {
includeExclude.toXContent(builder, params);
@ -150,14 +150,22 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
SubAggCollectionMode cm = collectMode;
if (cm == null) {
cm = SubAggCollectionMode.DEPTH_FIRST;
if (factories != AggregatorFactories.EMPTY) {
cm = subAggCollectionMode(bucketCountThresholds.getShardSize(), maxOrd);
DocValueFormat format = config.format();
if ((includeExclude != null) && (includeExclude.isRegexBased()) && format != DocValueFormat.RAW) {
throw new AggregationExecutionException("Aggregation [" + name + "] cannot support regular expression style include/exclude "
+ "settings as they can only be applied to string fields. Use an array of values for include/exclude clauses");
return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent,
collectMode, showTermDocCountError, pipelineAggregators, metaData);
return execution.create(name, factories, valuesSource, order, format, bucketCountThresholds, includeExclude, context, parent,
cm, showTermDocCountError, pipelineAggregators, metaData);
if ((includeExclude != null) && (includeExclude.isRegexBased())) {
@ -167,19 +175,27 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
if (valuesSource instanceof ValuesSource.Numeric) {
IncludeExclude.LongFilter longFilter = null;
SubAggCollectionMode cm = collectMode;
if (cm == null) {
if (factories != AggregatorFactories.EMPTY) {
cm = subAggCollectionMode(bucketCountThresholds.getShardSize(), -1);
} else {
cm = SubAggCollectionMode.DEPTH_FIRST;
if (((ValuesSource.Numeric) valuesSource).isFloatingPoint()) {
if (includeExclude != null) {
longFilter = includeExclude.convertToDoubleFilter();
return new DoubleTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order,
bucketCountThresholds, context, parent, collectMode, showTermDocCountError, longFilter,
bucketCountThresholds, context, parent, cm, showTermDocCountError, longFilter,
pipelineAggregators, metaData);
if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(config.format());
return new LongTermsAggregator(name, factories, (ValuesSource.Numeric) valuesSource, config.format(), order,
bucketCountThresholds, context, parent, collectMode, showTermDocCountError, longFilter, pipelineAggregators,
bucketCountThresholds, context, parent, cm, showTermDocCountError, longFilter, pipelineAggregators,
@ -187,6 +203,20 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
+ "]. It can only be applied to numeric or string fields.");
// return the SubAggCollectionMode that this aggregation should use based on the expected size
// and the cardinality of the field
static SubAggCollectionMode subAggCollectionMode(int expectedSize, long maxOrd) {
if (expectedSize == Integer.MAX_VALUE) {
// return all buckets
return SubAggCollectionMode.DEPTH_FIRST;
if (maxOrd == -1 || maxOrd > expectedSize) {
// use breadth_first if the cardinality is bigger than the expected size or unknown (-1)
return SubAggCollectionMode.BREADTH_FIRST;
return SubAggCollectionMode.DEPTH_FIRST;
public enum ExecutionMode {
MAP(new ParseField("map")) {
@ -160,16 +160,12 @@ public class FetchPhase implements SearchPhase {
hits[index] = searchHit;
hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
if (fetchSubPhase.hitExecutionNeeded(context)) {
fetchSubPhase.hitExecute(context, hitContext);
fetchSubPhase.hitExecute(context, hitContext);
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
if (fetchSubPhase.hitsExecutionNeeded(context)) {
fetchSubPhase.hitsExecute(context, hits);
fetchSubPhase.hitsExecute(context, hits);
context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
@ -36,7 +36,7 @@ import java.util.Map;
public interface FetchSubPhase {
public static class HitContext {
class HitContext {
private InternalSearchHit hit;
private IndexSearcher searcher;
private LeafReaderContext readerContext;
@ -87,16 +87,13 @@ public interface FetchSubPhase {
return Collections.emptyMap();
boolean hitExecutionNeeded(SearchContext context);
* Executes the hit level phase, with a reader and doc id (note, its a low level reader, and the matching doc).
void hitExecute(SearchContext context, HitContext hitContext);
default void hitExecute(SearchContext context, HitContext hitContext) {}
boolean hitsExecutionNeeded(SearchContext context);
void hitsExecute(SearchContext context, InternalSearchHit[] hits);
default void hitsExecute(SearchContext context, InternalSearchHit[] hits) {}
* This interface is in the fetch phase plugin mechanism.
@ -104,16 +101,16 @@ public interface FetchSubPhase {
* Fetch phases that use the plugin mechanism must provide a ContextFactory to the SearchContext that creates the fetch phase context and also associates them with a name.
* See {@link SearchContext#getFetchSubPhaseContext(FetchSubPhase.ContextFactory)}
public interface ContextFactory<SubPhaseContext extends FetchSubPhaseContext> {
interface ContextFactory<SubPhaseContext extends FetchSubPhaseContext> {
* The name of the context.
public String getName();
String getName();
* Creates a new instance of a FetchSubPhaseContext that holds all information a FetchSubPhase needs to execute on hits.
public SubPhaseContext newContextInstance();
SubPhaseContext newContextInstance();
@ -30,24 +30,13 @@ import java.io.IOException;
public class ExplainFetchSubPhase implements FetchSubPhase {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.explain();
public final class ExplainFetchSubPhase implements FetchSubPhase {
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.explain() == false) {
try {
final int topLevelDocId = hitContext.hit().docId();
Explanation explanation = context.searcher().explain(context.query(), topLevelDocId);
@ -18,13 +18,11 @@
package org.elasticsearch.search.fetch.fielddata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.search.internal.SearchContext;
@ -37,7 +35,7 @@ import java.util.HashMap;
* Specifying {@code "fielddata_fields": ["field1", "field2"]}
public class FieldDataFieldsFetchSubPhase implements FetchSubPhase {
public final class FieldDataFieldsFetchSubPhase implements FetchSubPhase {
public static final String[] NAMES = {"fielddata_fields", "fielddataFields"};
public static final ContextFactory<FieldDataFieldsContext> CONTEXT_FACTORY = new ContextFactory<FieldDataFieldsContext>() {
@ -53,29 +51,14 @@ public class FieldDataFieldsFetchSubPhase implements FetchSubPhase {
public FieldDataFieldsFetchSubPhase() {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.getFetchSubPhaseContext(CONTEXT_FACTORY).hitExecutionNeeded();
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.getFetchSubPhaseContext(CONTEXT_FACTORY).hitExecutionNeeded() == false) {
for (FieldDataFieldsContext.FieldDataField field : context.getFetchSubPhaseContext(CONTEXT_FACTORY).fields()) {
if (hitContext.hit().fieldsOrNull() == null) {
hitContext.hit().fields(new HashMap<String, SearchHitField>(2));
hitContext.hit().fields(new HashMap<>(2));
SearchHitField hitField = hitContext.hit().fields().get(field.name());
if (hitField == null) {
@ -23,7 +23,6 @@ import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.FetchSubPhase;
@ -32,13 +31,10 @@ import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class InnerHitsFetchSubPhase implements FetchSubPhase {
public final class InnerHitsFetchSubPhase implements FetchSubPhase {
private final FetchPhase fetchPhase;
@ -46,20 +42,11 @@ public class InnerHitsFetchSubPhase implements FetchSubPhase {
this.fetchPhase = fetchPhase;
public Map<String, ? extends SearchParseElement> parseElements() {
// SearchParse elements needed because everything is parsed by InnerHitBuilder and eventually put
// into the search context.
return Collections.emptyMap();
public boolean hitExecutionNeeded(SearchContext context) {
return context.innerHits() != null && context.innerHits().getInnerHits().size() > 0;
public void hitExecute(SearchContext context, HitContext hitContext) {
if ((context.innerHits() != null && context.innerHits().getInnerHits().size() > 0) == false) {
Map<String, InternalSearchHits> results = new HashMap<>();
for (Map.Entry<String, InnerHitsContext.BaseInnerHits> entry : context.innerHits().getInnerHits().entrySet()) {
InnerHitsContext.BaseInnerHits innerHits = entry.getValue();
@ -92,13 +79,4 @@ public class InnerHitsFetchSubPhase implements FetchSubPhase {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
@ -18,6 +18,7 @@
package org.elasticsearch.search.fetch.matchedqueries;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.search.Query;
@ -26,7 +27,6 @@ import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.SearchContext;
@ -39,22 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
public class MatchedQueriesFetchSubPhase implements FetchSubPhase {
public Map<String, ? extends SearchParseElement> parseElements() {
return emptyMap();
public boolean hitsExecutionNeeded(SearchContext context) {
return true; // we short-circuit in hitsExecute
public final class MatchedQueriesFetchSubPhase implements FetchSubPhase {
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
@ -82,12 +67,13 @@ public class MatchedQueriesFetchSubPhase implements FetchSubPhase {
int docBase = -1;
Weight weight = context.searcher().createNormalizedWeight(query, false);
Bits matchingDocs = null;
final IndexReader indexReader = context.searcher().getIndexReader();
for (int i = 0; i < hits.length; ++i) {
InternalSearchHit hit = hits[i];
int hitReaderIndex = ReaderUtil.subIndex(hit.docId(), context.searcher().getIndexReader().leaves());
int hitReaderIndex = ReaderUtil.subIndex(hit.docId(), indexReader.leaves());
if (readerIndex != hitReaderIndex) {
readerIndex = hitReaderIndex;
LeafReaderContext ctx = context.searcher().getIndexReader().leaves().get(readerIndex);
LeafReaderContext ctx = indexReader.leaves().get(readerIndex);
docBase = ctx.docBase;
// scorers can be costly to create, so reuse them across docs of the same segment
Scorer scorer = weight.scorer(ctx);
@ -99,7 +85,7 @@ public class MatchedQueriesFetchSubPhase implements FetchSubPhase {
for (int i = 0; i < hits.length; ++i) {
hits[i].matchedQueries(matchedQueries[i].toArray(new String[0]));
hits[i].matchedQueries(matchedQueries[i].toArray(new String[matchedQueries[i].size()]));
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
@ -107,14 +93,4 @@ public class MatchedQueriesFetchSubPhase implements FetchSubPhase {
public boolean hitExecutionNeeded(SearchContext context) {
return false;
public void hitExecute(SearchContext context, HitContext hitContext) {
// we do everything in hitsExecute
@ -25,10 +25,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.innerhits.InnerHitsContext;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.search.internal.SearchContext;
@ -37,17 +34,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ParentFieldSubFetchPhase implements FetchSubPhase {
public Map<String, ? extends SearchParseElement> parseElements() {
return Collections.emptyMap();
public boolean hitExecutionNeeded(SearchContext context) {
return true;
public final class ParentFieldSubFetchPhase implements FetchSubPhase {
public void hitExecute(SearchContext context, HitContext hitContext) {
@ -65,15 +52,6 @@ public class ParentFieldSubFetchPhase implements FetchSubPhase {
fields.put(ParentFieldMapper.NAME, new InternalSearchHitField(ParentFieldMapper.NAME, Collections.singletonList(parentId)));
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public static String getParentId(ParentFieldMapper fieldMapper, LeafReader reader, int docId) {
try {
SortedDocValues docValues = reader.getSortedDocValues(fieldMapper.name());
@ -21,7 +21,6 @@ package org.elasticsearch.search.fetch.script;
import org.elasticsearch.script.LeafSearchScript;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHitField;
import org.elasticsearch.search.internal.SearchContext;
@ -32,27 +31,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
public class ScriptFieldsFetchSubPhase implements FetchSubPhase {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.hasScriptFields();
public final class ScriptFieldsFetchSubPhase implements FetchSubPhase {
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.hasScriptFields() == false) {
for (ScriptFieldsContext.ScriptField scriptField : context.scriptFields().fields()) {
LeafSearchScript leafScript;
try {
@ -62,10 +47,9 @@ public class ScriptFieldsFetchSubPhase implements FetchSubPhase {
Object value;
final Object value;
try {
value = leafScript.run();
value = leafScript.unwrap(value);
value = leafScript.unwrap(leafScript.run());
} catch (RuntimeException e) {
if (scriptField.ignoreException()) {
@ -74,7 +58,7 @@ public class ScriptFieldsFetchSubPhase implements FetchSubPhase {
if (hitContext.hit().fieldsOrNull() == null) {
hitContext.hit().fields(new HashMap<String, SearchHitField>(2));
hitContext.hit().fields(new HashMap<>(2));
SearchHitField hitField = hitContext.hit().fields().get(scriptField.name());
@ -84,7 +68,7 @@ public class ScriptFieldsFetchSubPhase implements FetchSubPhase {
values = Collections.emptyList();
} else if (value instanceof Collection) {
// TODO: use diamond operator once JI-9019884 is fixed
values = new ArrayList<Object>((Collection<?>) value);
values = new ArrayList<>((Collection<?>) value);
} else {
values = Collections.singletonList(value);
@ -23,32 +23,18 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.lookup.SourceLookup;
import java.io.IOException;
public class FetchSourceSubPhase implements FetchSubPhase {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.sourceRequested();
public final class FetchSourceSubPhase implements FetchSubPhase {
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.sourceRequested() == false) {
FetchSourceContext fetchSourceContext = context.fetchSourceContext();
assert fetchSourceContext.fetchSource();
if (fetchSourceContext.includes().length == 0 && fetchSourceContext.excludes().length == 0) {
@ -25,36 +25,21 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
public class VersionFetchSubPhase implements FetchSubPhase {
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.version();
public final class VersionFetchSubPhase implements FetchSubPhase {
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.version() == false) {
// it might make sense to cache the TermDocs on a shared fetch context and just skip here)
// it is going to mean we work on the high level multi reader and not the lower level reader as is
// the case below...
long version;
final long version;
try {
BytesRef uid = Uid.createUidAsBytes(hitContext.hit().type(), hitContext.hit().id());
version = Versions.loadVersion(
@ -64,10 +49,6 @@ public class VersionFetchSubPhase implements FetchSubPhase {
} catch (IOException e) {
throw new ElasticsearchException("Could not query index for _version", e);
if (version < 0) {
version = -1;
hitContext.hit().version(version < 0 ? -1 : version);
@ -21,7 +21,6 @@ package org.elasticsearch.search.highlight;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DocumentMapper;
@ -31,9 +30,7 @@ import org.elasticsearch.index.mapper.core.StringFieldMapper;
import org.elasticsearch.index.mapper.core.TextFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.search.Highlighters;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.SearchContext;
import java.util.Arrays;
@ -43,45 +40,21 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HighlightPhase extends AbstractComponent implements FetchSubPhase {
private static final List<String> STANDARD_HIGHLIGHTERS_BY_PRECEDENCE = Arrays.asList("fvh", "postings", "plain");
private final Highlighters highlighters;
public HighlightPhase(Settings settings, Highlighters highlighters) {
this.highlighters = highlighters;
* highlighters do not have a parse element, they use
* {@link HighlightBuilder#fromXContent(org.elasticsearch.index.query.QueryParseContext)} for parsing instead.
public Map<String, ? extends SearchParseElement> parseElements() {
return Collections.emptyMap();
public boolean hitsExecutionNeeded(SearchContext context) {
return false;
public void hitsExecute(SearchContext context, InternalSearchHit[] hits) {
public boolean hitExecutionNeeded(SearchContext context) {
return context.highlight() != null;
public void hitExecute(SearchContext context, HitContext hitContext) {
if (context.highlight() == null) {
Map<String, HighlightField> highlightFields = new HashMap<>();
for (SearchContextHighlight.Field field : context.highlight().fields()) {
Collection<String> fieldNamesToHighlight;
@ -19,6 +19,8 @@
package org.elasticsearch.tasks;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
@ -28,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.transport.TransportRequest;
@ -43,10 +46,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
* Task Manager service for keeping track of currently running tasks on the nodes
public class TaskManager extends AbstractComponent implements ClusterStateListener {
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
private final ConcurrentMapLong<Task> tasks = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();
@ -341,6 +347,23 @@ public class TaskManager extends AbstractComponent implements ClusterStateListen
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
public void waitForTaskCompletion(Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (getTask(task.getId()) == null) {
try {
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
private static class CancellableTaskHolder {
private static final String TASK_FINISHED_MARKER = "task finished";
@ -51,7 +51,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
* @return the list of registered settings
abstract List<Setting<?>> getRegisteredSettings();
public abstract List<Setting<?>> getRegisteredSettings();
* Return an executor settings object from the node-level settings.
@ -86,7 +86,7 @@ public final class FixedExecutorBuilder extends ExecutorBuilder<FixedExecutorBui
List<Setting<?>> getRegisteredSettings() {
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(sizeSetting, queueSizeSetting);
@ -77,7 +77,7 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecuto
List<Setting<?>> getRegisteredSettings() {
public List<Setting<?>> getRegisteredSettings() {
return Arrays.asList(coreSetting, maxSetting, keepAliveSetting);
@ -1,44 +0,0 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.threadpool;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.SettingsModule;
public class ThreadPoolModule extends AbstractModule {
private final ThreadPool threadPool;
public ThreadPoolModule(final ThreadPool threadPool) {
this.threadPool = threadPool;
public void prepareSettings(SettingsModule settingsModule) {
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
protected void configure() {
@ -31,7 +31,7 @@ grant codeBase "${codebase.securesm-1.0.jar}" {
//// Very special jar permissions:
//// These are dangerous permissions that we don't want to grant to everything.
grant codeBase "${codebase.lucene-core-6.1.0-snapshot-3a57bea.jar}" {
grant codeBase "${codebase.lucene-core-6.1.0.jar}" {
// needed to allow MMapDirectory's "unmap hack" (die unmap hack, die)
// java 8 package
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
@ -42,7 +42,7 @@ grant codeBase "${codebase.lucene-core-6.1.0-snapshot-3a57bea.jar}" {
permission java.lang.RuntimePermission "accessDeclaredMembers";
grant codeBase "${codebase.lucene-misc-6.1.0-snapshot-3a57bea.jar}" {
grant codeBase "${codebase.lucene-misc-6.1.0.jar}" {
// needed to allow shard shrinking to use hard-links if possible via lucenes HardlinkCopyDirectoryWrapper
permission java.nio.file.LinkPermission "hard";
@ -33,7 +33,7 @@ grant codeBase "${codebase.securemock-1.2.jar}" {
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
grant codeBase "${codebase.lucene-test-framework-6.1.0-snapshot-3a57bea.jar}" {
grant codeBase "${codebase.lucene-test-framework-6.1.0.jar}" {
// needed by RamUsageTester
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
// needed for testing hardlinks in StoreRecoveryTests since we install MockFS
@ -60,6 +60,10 @@ public class RecordingTaskManagerListener implements MockTaskManagerListener {
public void waitForTaskCompletion(Task task) {
public synchronized List<Tuple<Boolean, TaskInfo>> getEvents() {
return Collections.unmodifiableList(new ArrayList<>(events));
@ -363,6 +363,10 @@ public class TasksIT extends ESIntegTestCase {
public void waitForTaskCompletion(Task task) {
indexFuture = client().prepareIndex("test", "test").setSource("test", "test").execute();
@ -380,7 +384,7 @@ public class TasksIT extends ESIntegTestCase {
assertEquals(task.getType(), fetchedWithGet.getType());
assertEquals(task.getAction(), fetchedWithGet.getAction());
assertEquals(task.getDescription(), fetchedWithGet.getDescription());
assertEquals(task.getStatus(), fetchedWithGet.getStatus());
// The status won't always be equal - it might change between the list and the get.
assertEquals(task.getStartTime(), fetchedWithGet.getStartTime());
assertThat(fetchedWithGet.getRunningTimeNanos(), greaterThanOrEqualTo(task.getRunningTimeNanos()));
assertEquals(task.isCancellable(), fetchedWithGet.isCancellable());
@ -467,8 +471,33 @@ public class TasksIT extends ESIntegTestCase {
try {
taskId = waitForTestTaskStartOnAllNodes();
// Spin up a request to wait for that task to finish
// Wait for the task to start
assertBusy(() -> client().admin().cluster().prepareGetTask(taskId).get());
// Register listeners so we can be sure the waiting started
CountDownLatch waitForWaitingToStart = new CountDownLatch(1);
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
((MockTaskManager) transportService.getTaskManager()).addListener(new MockTaskManagerListener() {
public void waitForTaskCompletion(Task task) {
public void onTaskRegistered(Task task) {
public void onTaskUnregistered(Task task) {
// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
// Wait for the wait to start
} finally {
// Unblock the request so the wait for completion request can finish
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user