@ -303,15 +303,16 @@ comma separated list of nodes to connect to (e.g. localhost:9300). A transport c
be created based on that and used for all the before|after test operations, and to extract
the http addresses of the nodes so that REST requests can be sent to them.
== Testing scripts
== Testing packaging
The simplest way to test scripts and the packaged distributions is to use
Vagrant. You can get started by following there five easy steps:
The packaging tests use Vagrant virtual machines to verify that installing
and running elasticsearch distributions works correctly on supported operating systems.
These tests should really only be run in vagrant vms because they're destructive.
. Install Virtual Box and Vagrant.
. (Optional) Install vagrant-cachier to squeeze a bit more performance out of
the process:
. (Optional) Install https://github.com/fgrehm/vagrant-cachier[vagrant-cachier] to squeeze
a bit more performance out of the process:
vagrant plugin install vagrant-cachier
@ -325,26 +326,39 @@ vagrant plugin install vagrant-cachier
. Download and smoke test the VMs with `./gradlew vagrantSmokeTest` or
`./gradlew -Pvagrant.boxes=all vagrantSmokeTest`. The first time you run this it will
download the base images and provision the boxes and immediately quit. If you
you this again it'll skip the download step.
download the base images and provision the boxes and immediately quit. Downloading all
the images may take a long time. After the images are already on your machine, they won't
be downloaded again unless they have been updated to a new version.
. Run the tests with `./gradlew packagingTest`. This will cause Gradle to build
the tar, zip, and deb packages and all the plugins. It will then run the tests
on ubuntu-1404 and centos-7. We chose those two distributions as the default
because they cover deb and rpm packaging and SyvVinit and systemd.
You can run on all the VMs by running `./gradlew -Pvagrant.boxes=all
packagingTest`. You can run a particular VM with a command like `./gradlew
-Pvagrant.boxes=oel-7 packagingTest`. See `./gradlew tasks` for a complete list
of available vagrant boxes for testing. It's important to know that if you
interrupt any of these Gradle commands then the boxes will remain running and
you'll have to terminate them with `./gradlew stop`.
You can choose which boxes to test by setting the `-Pvagrant.boxes` project property. All of
the valid options for this property are:
* `sample` - The default, only chooses ubuntu-1404 and centos-7
* List of box names, comma separated (e.g. `oel-7,fedora-26`) - Chooses exactly the boxes listed.
* `linux-all` - All linux boxes.
* `windows-all` - All Windows boxes. If there are any Windows boxes which do not
have images available when this value is provided, the build will fail.
* `all` - All boxes we test. If there are any boxes (e.g. Windows) which do not have images
available when this value is provided, the build will fail.
For a complete list of boxes on which tests can be run, run `./gradlew :qa:vagrant:listAllBoxes`.
For a list of boxes that have images available from your configuration, run
`./gradlew :qa:vagrant:listAvailableBoxes`
Note that if you interrupt gradle in the middle of running these tasks, any boxes started
will remain running and you'll have to stop them manually with `./gradlew stop` or
`vagrant halt`.
All the regular vagrant commands should just work so you can get a shell in a
VM running trusty by running
`vagrant up ubuntu-1404 --provider virtualbox && vagrant ssh ubuntu-1404`.
These are the linux flavors the Vagrantfile currently supports:
These are the linux flavors supported, all of which we provide images for
* ubuntu-1404 aka trusty
* ubuntu-1604 aka xenial
@ -364,9 +378,42 @@ quality boxes available in vagrant atlas:
* sles-11
We're missing the following because our tests are very linux/bash centric:
=== Testing packaging on Windows
* Windows Server 2012
The packaging tests also support Windows Server 2012R2 and Windows Server 2016.
Unfortunately we're not able to provide boxes for them in open source use
because of licensing issues. Any Virtualbox image that has WinRM and Powershell
enabled for remote users should work.
Testing on Windows requires the https://github.com/criteo/vagrant-winrm[vagrant-winrm] plugin.
vagrant plugin install vagrant-winrm
Specify the image IDs of the Windows boxes to gradle with the following project
properties. They can be set in `~/.gradle/gradle.properties` like
or passed on the command line like `-Pvagrant.windows-2012r2.id=my-image-id`
These properties are required for Windows support in all gradle tasks that
handle packaging tests. Either or both may be specified. Remember that to run tests
on these boxes, the project property `vagrant.boxes` still needs to be set to a
value that will include them.
If you're running vagrant commands outside of gradle, specify the Windows boxes
with the environment variables
=== Testing VMs are disposable
It's important to think of VMs like cattle. If they become lame you just shoot
them and let vagrant reprovision them. Say you've hosed your precise VM:
@ -399,54 +446,62 @@ vagrant destroy -f
`vagrant up` would normally start all the VMs but we've prevented that because
that'd consume a ton of ram.
== Testing scripts more directly
=== Iterating on packaging tests
In general its best to stick to testing in vagrant because the bats scripts are
destructive. When working with a single package it's generally faster to run its
tests in a tighter loop than Gradle provides. In one window:
Running the packaging tests through gradle can take a while because it will start
and stop the VM each time. You can iterate faster by keeping the VM up and running
the tests directly.
./gradlew :distribution:packages:rpm:assemble
The packaging tests use a random seed to determine which past version to use for
testing upgrades. To use a single past version fix the test seed when running
the commands below (see <<Seed and repetitions.>>)
and in another window:
First build the packaging tests and their dependencies
vagrant up centos-7 --provider virtualbox && vagrant ssh centos-7
./gradlew :qa:vagrant:setupPackagingTest
Then choose the VM you want to test on and bring it up. For example, to bring
up Debian 9 use the gradle command below. Bringing the box up with vagrant directly
may not mount the packaging test project in the right place. Once the VM is up, ssh
into it
./gradlew :qa:vagrant:vagrantDebian9#up
vagrant ssh debian-9
Now inside the VM, to run the https://github.com/sstephenson/bats[bats] packaging tests
sudo -E bats $BATS_TESTS/*rpm*.bats
If you wanted to retest all the release artifacts on a single VM you could:
# runs all bats tests
sudo bats $BATS_TESTS/*.bats
./gradlew setupPackagingTest
cd qa/vagrant; vagrant up ubuntu-1404 --provider virtualbox && vagrant ssh ubuntu-1404
sudo -E bats $BATS_TESTS/*.bats
# you can also pass specific test files
sudo bats $BATS_TESTS/20_tar_package.bats $BATS_TESTS/25_tar_plugins.bats
You can also use Gradle to prepare the test environment and then starts a single VM:
To run the Java packaging tests, again inside the VM
./gradlew vagrantFedora27#up
bash $PACKAGING_TESTS/run-tests.sh
Or any of vagrantCentos6#up, vagrantCentos7#up, vagrantDebian8#up,
vagrantDebian9#up, vagrantFedora26#up, vagrantFedora27#up, vagrantOel6#up, vagrantOel7#up,
vagrantOpensuse42#up,vagrantSles12#up, vagrantUbuntu1404#up, vagrantUbuntu1604#up.
or on Windows
Once up, you can then connect to the VM using SSH from the elasticsearch directory:
powershell -File $Env:PACKAGING_TESTS/run-tests.ps1
vagrant ssh fedora-27
When you've made changes you want to test, keep the VM up and reload the tests and
distributions inside by running (on the host)
Or from another directory:
VAGRANT_CWD=/path/to/elasticsearch vagrant ssh fedora-27
./gradlew :qa:vagrant:clean :qa:vagrant:setupPackagingTest
Note: Starting vagrant VM outside of the elasticsearch folder requires to
indicates the folder that contains the Vagrantfile using the VAGRANT_CWD
@ -121,6 +121,26 @@ Vagrant.configure(2) do |config|
sles_common config, box
windows_2012r2_box = ENV['VAGRANT_WINDOWS_2012R2_BOX']
if windows_2012r2_box && windows_2012r2_box.empty? == false
'windows-2012r2'.tap do |box|
config.vm.define box, define_opts do |config|
config.vm.box = windows_2012r2_box
windows_common config, box
windows_2016_box = ENV['VAGRANT_WINDOWS_2016_BOX']
if windows_2016_box && windows_2016_box.empty? == false
'windows-2016'.tap do |box|
config.vm.define box, define_opts do |config|
config.vm.box = windows_2016_box
windows_common config, box
def deb_common(config, name, extra: '')
@ -353,3 +373,22 @@ SUDOERS_VARS
chmod 0440 /etc/sudoers.d/elasticsearch_vars
def windows_common(config, name)
config.vm.provision 'markerfile', type: 'shell', inline: <<-SHELL
$ErrorActionPreference = "Stop"
New-Item C:/is_vagrant_vm -ItemType file -Force | Out-Null
config.vm.provision 'set prompt', type: 'shell', inline: <<-SHELL
$ErrorActionPreference = "Stop"
$ps_prompt = 'function Prompt { "#{name}:$($ExecutionContext.SessionState.Path.CurrentLocation)>" }'
$ps_prompt | Out-File $PsHome/Microsoft.PowerShell_profile.ps1
config.vm.provision 'set env variables', type: 'shell', inline: <<-SHELL
$ErrorActionPreference = "Stop"
[Environment]::SetEnvironmentVariable("PACKAGING_ARCHIVES", "C:/project/build/packaging/archives", "Machine")
[Environment]::SetEnvironmentVariable("PACKAGING_TESTS", "C:/project/build/packaging/tests", "Machine")
@ -13,10 +13,12 @@ import org.gradle.api.tasks.Delete
import org.gradle.api.tasks.Exec
import org.gradle.api.tasks.TaskState
import static java.util.Collections.unmodifiableList
class VagrantTestPlugin implements Plugin<Project> {
/** All available boxes **/
static List<String> BOXES = [
/** All Linux boxes that we test. These are all always supplied **/
static final List<String> LINUX_BOXES = unmodifiableList([
@ -29,26 +31,35 @@ class VagrantTestPlugin implements Plugin<Project> {
/** All Windows boxes that we test, which may or may not be supplied **/
static final List<String> WINDOWS_BOXES = unmodifiableList([
/** All boxes that we test, some of which may not be supplied **/
static final List<String> ALL_BOXES = unmodifiableList(LINUX_BOXES + WINDOWS_BOXES)
/** Boxes used when sampling the tests **/
static List<String> SAMPLE = [
static final List<String> SAMPLE = unmodifiableList([
/** All distributions to bring into test VM, whether or not they are used **/
static List<String> DISTRIBUTIONS = [
static final List<String> DISTRIBUTIONS = unmodifiableList([
/** Packages onboarded for upgrade tests **/
static List<String> UPGRADE_FROM_ARCHIVES = ['rpm', 'deb']
static final List<String> UPGRADE_FROM_ARCHIVES = unmodifiableList(['rpm', 'deb'])
private static final PACKAGING_CONFIGURATION = 'packaging'
private static final PACKAGING_TEST_CONFIGURATION = 'packagingTest'
@ -56,11 +67,19 @@ class VagrantTestPlugin implements Plugin<Project> {
private static final String BATS_TEST_COMMAND ="cd \$PACKAGING_ARCHIVES && sudo bats --tap \$BATS_TESTS/*.$BATS"
private static final String PLATFORM_TEST_COMMAND ="rm -rf ~/elasticsearch && rsync -r /elasticsearch/ ~/elasticsearch && cd ~/elasticsearch && ./gradlew test integTest"
/** Boxes that have been supplied and are available for testing **/
List<String> availableBoxes = []
/** extra env vars to pass to vagrant for box configuration **/
Map<String, String> vagrantBoxEnvVars = [:]
void apply(Project project) {
// Creates the Vagrant extension for the project
project.extensions.create('esvagrant', VagrantPropertiesExtension, listVagrantBoxes(project))
project.extensions.create('esvagrant', VagrantPropertiesExtension, listSelectedBoxes(project))
// Add required repositories for packaging tests
@ -73,12 +92,17 @@ class VagrantTestPlugin implements Plugin<Project> {
if (project.extensions.esvagrant.boxes == null || project.extensions.esvagrant.boxes.size() == 0) {
throw new InvalidUserDataException('Vagrant boxes cannot be null or empty for esvagrant')
throw new InvalidUserDataException('Must specify at least one vagrant box')
for (String box : project.extensions.esvagrant.boxes) {
if (BOXES.contains(box) == false) {
throw new InvalidUserDataException("Vagrant box [${box}] not found, available virtual machines are ${BOXES}")
if (ALL_BOXES.contains(box) == false) {
throw new InvalidUserDataException("Vagrant box [${box}] is unknown to this plugin. Valid boxes are ${ALL_BOXES}")
if (availableBoxes.contains(box) == false) {
throw new InvalidUserDataException("Vagrant box [${box}] is not available because an image is not supplied for it. " +
"Available boxes with supplied images are ${availableBoxes}")
@ -86,14 +110,45 @@ class VagrantTestPlugin implements Plugin<Project> {
private List<String> listVagrantBoxes(Project project) {
* Enumerate all the boxes that we know about and could possibly choose to test
private void collectAvailableBoxes(Project project) {
// these images are hardcoded in the Vagrantfile and are always available
// these images need to be provided at runtime
String windows_2012r2_box = project.getProperties().get('vagrant.windows-2012r2.id')
if (windows_2012r2_box != null && windows_2012r2_box.isEmpty() == false) {
vagrantBoxEnvVars['VAGRANT_WINDOWS_2012R2_BOX'] = windows_2012r2_box
String windows_2016_box = project.getProperties().get('vagrant.windows-2016.id')
if (windows_2016_box != null && windows_2016_box.isEmpty() == false) {
vagrantBoxEnvVars['VAGRANT_WINDOWS_2016_BOX'] = windows_2016_box
* Enumerate all the boxes that we have chosen to test
private static List<String> listSelectedBoxes(Project project) {
String vagrantBoxes = project.getProperties().get('vagrant.boxes', 'sample')
if (vagrantBoxes == 'sample') {
return SAMPLE
} else if (vagrantBoxes == 'all') {
return BOXES
} else {
return vagrantBoxes.split(',')
switch (vagrantBoxes) {
case 'sample':
return SAMPLE
case 'linux-all':
case 'windows-all':
case 'all':
return ALL_BOXES
case '':
return []
return vagrantBoxes.split(',')
@ -184,11 +239,19 @@ class VagrantTestPlugin implements Plugin<Project> {
from project.configurations[PACKAGING_TEST_CONFIGURATION]
Task createTestRunnerScript = project.tasks.create('createTestRunnerScript', FileContentsTask) {
Task createLinuxRunnerScript = project.tasks.create('createLinuxRunnerScript', FileContentsTask) {
dependsOn copyPackagingTests
file "${testsDir}/run-tests.sh"
contents "java -cp \"\$PACKAGING_TESTS/*\" org.junit.runner.JUnitCore ${-> project.extensions.esvagrant.testClass}"
Task createWindowsRunnerScript = project.tasks.create('createWindowsRunnerScript', FileContentsTask) {
dependsOn copyPackagingTests
file "${testsDir}/run-tests.ps1"
contents """\
java -cp "\$Env:PACKAGING_TESTS/*" org.junit.runner.JUnitCore ${-> project.extensions.esvagrant.testClass}
Task createVersionFile = project.tasks.create('createVersionFile', FileContentsTask) {
dependsOn copyPackagingArchives
@ -249,20 +312,24 @@ class VagrantTestPlugin implements Plugin<Project> {
Task vagrantSetUpTask = project.tasks.create('setupPackagingTest')
vagrantSetUpTask.dependsOn 'vagrantCheckVersion'
vagrantSetUpTask.dependsOn copyPackagingArchives, copyPackagingTests, createTestRunnerScript
vagrantSetUpTask.dependsOn createVersionFile, createUpgradeFromFile, createUpgradeIsOssFile
vagrantSetUpTask.dependsOn copyBatsTests, copyBatsUtils
private static void createPackagingTestTask(Project project) {
project.tasks.create('packagingTest') {
group 'Verification'
description "Tests yum/apt packages using vagrant and bats.\n" +
" Specify the vagrant boxes to test using the gradle property 'vagrant.boxes'.\n" +
" 'sample' can be used to test a single yum and apt box. 'all' can be used to\n" +
" test all available boxes. The available boxes are: \n" +
" ${BOXES}"
description "Tests distribution installation on different platforms using vagrant. See TESTING.asciidoc for details."
dependsOn 'vagrantCheckVersion'
@ -270,24 +337,49 @@ class VagrantTestPlugin implements Plugin<Project> {
private static void createPlatformTestTask(Project project) {
project.tasks.create('platformTest') {
group 'Verification'
description "Test unit and integ tests on different platforms using vagrant.\n" +
" Specify the vagrant boxes to test using the gradle property 'vagrant.boxes'.\n" +
" 'all' can be used to test all available boxes. The available boxes are: \n" +
" ${BOXES}"
description "Test unit and integ tests on different platforms using vagrant. See TESTING.asciidoc for details. This test " +
"is unmaintained."
dependsOn 'vagrantCheckVersion'
private static void createVagrantTasks(Project project) {
private void createBoxListTasks(Project project) {
project.tasks.create('listAllBoxes') {
group 'Verification'
description 'List all vagrant boxes which can be tested by this plugin'
doLast {
println("All vagrant boxes supported by ${project.path}")
for (String box : ALL_BOXES) {
dependsOn 'vagrantCheckVersion'
project.tasks.create('listAvailableBoxes') {
group 'Verification'
description 'List all vagrant boxes which are available for testing'
doLast {
println("All vagrant boxes available to ${project.path}")
for (String box : availableBoxes) {
dependsOn 'vagrantCheckVersion'
private void createVagrantTasks(Project project) {
private static void createVagrantBoxesTasks(Project project) {
private void createVagrantBoxesTasks(Project project) {
assert project.extensions.esvagrant.boxes != null
assert project.tasks.stop != null
@ -320,9 +412,10 @@ class VagrantTestPlugin implements Plugin<Project> {
'VAGRANT_VAGRANTFILE' : 'Vagrantfile',
'VAGRANT_PROJECT_DIR' : "${project.projectDir.absolutePath}"
// Each box gets it own set of tasks
for (String box : BOXES) {
for (String box : availableBoxes) {
String boxTask = box.capitalize().replace('-', '')
// always add a halt task for all boxes, so clean makes sure they are all shutdown
@ -363,6 +456,7 @@ class VagrantTestPlugin implements Plugin<Project> {
final Task destroy = project.tasks.create("vagrant${boxTask}#destroy", LoggedExec) {
commandLine "bash", "-c", "vagrant status ${box} | grep -q \"${box}\\s\\+not created\" || vagrant destroy ${box} --force"
workingDir project.rootProject.rootDir
environment vagrantEnvVars
destroy.onlyIf { vagrantDestroy }
@ -386,37 +480,42 @@ class VagrantTestPlugin implements Plugin<Project> {
environment vagrantEnvVars
dependsOn up
finalizedBy halt
commandLine 'vagrant', 'ssh', box, '--command',
"set -o pipefail && echo 'Hello from ${project.path}' | sed -ue 's/^/ ${box}: /'"
Task batsPackagingTest = project.tasks.create("vagrant${boxTask}#batsPackagingTest", BatsOverVagrantTask) {
boxName box
environmentVars vagrantEnvVars
dependsOn up, setupPackagingTest
finalizedBy halt
if (LINUX_BOXES.contains(box)) {
smoke.commandLine = ['vagrant', 'ssh', box, '--command',
"set -o pipefail && echo 'Hello from ${project.path}' | sed -ue 's/^/ ${box}: /'"]
} else {
smoke.commandLine = ['vagrant', 'winrm', box, '--command',
"Write-Host ' ${box}: Hello from ${project.path}'"]
TaskExecutionAdapter batsPackagingReproListener = createReproListener(project, batsPackagingTest.path)
batsPackagingTest.doFirst {
batsPackagingTest.doLast {
if (project.extensions.esvagrant.boxes.contains(box)) {
if (LINUX_BOXES.contains(box)) {
Task batsPackagingTest = project.tasks.create("vagrant${boxTask}#batsPackagingTest", BatsOverVagrantTask) {
boxName box
environmentVars vagrantEnvVars
dependsOn up, setupPackagingTest
finalizedBy halt
TaskExecutionAdapter batsPackagingReproListener = createReproListener(project, batsPackagingTest.path)
batsPackagingTest.doFirst {
batsPackagingTest.doLast {
if (project.extensions.esvagrant.boxes.contains(box)) {
Task javaPackagingTest = project.tasks.create("vagrant${boxTask}#javaPackagingTest", VagrantCommandTask) {
command 'ssh'
boxName box
environmentVars vagrantEnvVars
dependsOn up, setupPackagingTest
finalizedBy halt
args '--command', "bash \"\$PACKAGING_TESTS/run-tests.sh\""
// todo remove this onlyIf after all packaging tests are consolidated
@ -424,6 +523,14 @@ class VagrantTestPlugin implements Plugin<Project> {
project.extensions.esvagrant.testClass != null
if (LINUX_BOXES.contains(box)) {
javaPackagingTest.command = 'ssh'
javaPackagingTest.args = ['--command', 'bash "$PACKAGING_TESTS/run-tests.sh"']
} else {
javaPackagingTest.command = 'winrm'
javaPackagingTest.args = ['--command', 'powershell -File "$Env:PACKAGING_TESTS/run-tests.ps1"']
TaskExecutionAdapter javaPackagingReproListener = createReproListener(project, javaPackagingTest.path)
javaPackagingTest.doFirst {
@ -435,23 +542,29 @@ class VagrantTestPlugin implements Plugin<Project> {
Task platform = project.tasks.create("vagrant${boxTask}#platformTest", VagrantCommandTask) {
command 'ssh'
boxName box
environmentVars vagrantEnvVars
dependsOn up
finalizedBy halt
args '--command', PLATFORM_TEST_COMMAND + " -Dtests.seed=${-> project.testSeed}"
TaskExecutionAdapter platformReproListener = createReproListener(project, platform.path)
platform.doFirst {
platform.doLast {
if (project.extensions.esvagrant.boxes.contains(box)) {
* This test is unmaintained and was created to run on Linux. We won't allow it to run on Windows
* until it's been brought back into maintenance
if (LINUX_BOXES.contains(box)) {
Task platform = project.tasks.create("vagrant${boxTask}#platformTest", VagrantCommandTask) {
command 'ssh'
boxName box
environmentVars vagrantEnvVars
dependsOn up
finalizedBy halt
args '--command', PLATFORM_TEST_COMMAND + " -Dtests.seed=${-> project.testSeed}"
TaskExecutionAdapter platformReproListener = createReproListener(project, platform.path)
platform.doFirst {
platform.doLast {
if (project.extensions.esvagrant.boxes.contains(box)) {
@ -0,0 +1,59 @@
package org.elasticsearch.client;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScheme;
import org.apache.http.impl.client.TargetAuthenticationStrategy;
import org.apache.http.protocol.HttpContext;
* An {@link org.apache.http.client.AuthenticationStrategy} implementation that does <em>not</em> perform
* any special handling if authentication fails.
* The default handler in Apache HTTP client mimics standard browser behaviour of clearing authentication
* credentials if it receives a 401 response from the server. While this can be useful for browser, it is
* rarely the desired behaviour with the Elasticsearch REST API.
* If the code using the REST client has configured credentials for the REST API, then we can and should
* assume that this is intentional, and those credentials represent the best possible authentication
* mechanism to the Elasticsearch node.
* If we receive a 401 status, a probably cause is that the authentication mechanism in place was unable
* to perform the requisite password checks (the node has not yet recovered its state, or an external
* authentication provider was unavailable).
* If this occurs, then the desired behaviour is for the Rest client to retry with the same credentials
* (rather than trying with no credentials, or expecting the calling code to provide alternate credentials).
final class PersistentCredentialsAuthenticationStrategy extends TargetAuthenticationStrategy {
private final Log logger = LogFactory.getLog(PersistentCredentialsAuthenticationStrategy.class);
public void authFailed(HttpHost host, AuthScheme authScheme, HttpContext context) {
if (logger.isDebugEnabled()) {
logger.debug("Authentication to " + host + " failed (scheme: " + authScheme.getSchemeName()
+ "). Preserving credentials for next request");
// Do nothing.
// The superclass implementation of method will clear the credentials from the cache, but we don't
@ -204,7 +204,8 @@ public final class RestClientBuilder {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
//default settings for connection pooling may be too constraining
.setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
if (httpClientConfigCallback != null) {
httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
@ -31,14 +31,14 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.TargetAuthenticationStrategy;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.io.InputStreamReader;
@ -147,6 +147,8 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
if (usePreemptiveAuth == false) {
// disable preemptive auth by ignoring any authcache
// don't use the "persistent credentials strategy"
httpClientBuilder.setTargetAuthenticationStrategy(new TargetAuthenticationStrategy());
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
@ -193,7 +195,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
assertTrue("timeout waiting for requests to be sent", latch.await(10, TimeUnit.SECONDS));
if (exceptions.isEmpty() == false) {
AssertionError error = new AssertionError("expected no failures but got some. see suppressed for first 10 of ["
+ exceptions.size() + "] failures");
+ exceptions.size() + "] failures");
for (Exception exception : exceptions.subList(0, Math.min(10, exceptions.size()))) {
@ -217,7 +219,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
Response esResponse;
try {
esResponse = restClient.performRequest(method, "/" + statusCode, Collections.<String, String>emptyMap(), requestHeaders);
} catch(ResponseException e) {
} catch (ResponseException e) {
esResponse = e.getResponse();
@ -291,8 +293,8 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
* Verify that credentials are sent on the first request with preemptive auth enabled (default when provided with credentials).
public void testPreemptiveAuthEnabled() throws IOException {
final String[] methods = { "POST", "PUT", "GET", "DELETE" };
public void testPreemptiveAuthEnabled() throws IOException {
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
try (RestClient restClient = createRestClient(true, true)) {
for (final String method : methods) {
@ -306,8 +308,8 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
* Verify that credentials are <em>not</em> sent on the first request with preemptive auth disabled.
public void testPreemptiveAuthDisabled() throws IOException {
final String[] methods = { "POST", "PUT", "GET", "DELETE" };
public void testPreemptiveAuthDisabled() throws IOException {
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
try (RestClient restClient = createRestClient(true, false)) {
for (final String method : methods) {
@ -318,12 +320,31 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
* Verify that credentials continue to be sent even if a 401 (Unauthorized) response is received
public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException {
final String[] methods = {"POST", "PUT", "GET", "DELETE"};
try (RestClient restClient = createRestClient(true, true)) {
for (final String method : methods) {
Header realmHeader = new BasicHeader("WWW-Authenticate", "Basic realm=\"test\"");
final Response response401 = bodyTest(restClient, method, 401, new Header[]{realmHeader});
assertThat(response401.getHeader("Authorization"), startsWith("Basic"));
final Response response200 = bodyTest(restClient, method, 200, new Header[0]);
assertThat(response200.getHeader("Authorization"), startsWith("Basic"));
public void testUrlWithoutLeadingSlash() throws Exception {
if (pathPrefix.length() == 0) {
try {
restClient.performRequest("GET", "200");
fail("request should have failed");
} catch(ResponseException e) {
} catch (ResponseException e) {
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
} else {
@ -335,8 +356,8 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
//pathPrefix is not required to start with '/', will be added automatically
try (RestClient restClient = RestClient.builder(
new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setPathPrefix(pathPrefix.substring(1)).build()) {
new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setPathPrefix(pathPrefix.substring(1)).build()) {
Response response = restClient.performRequest("GET", "200");
//a trailing slash gets automatically added if a pathPrefix is configured
assertEquals(200, response.getStatusLine().getStatusCode());
@ -350,10 +371,15 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase {
private Response bodyTest(final RestClient restClient, final String method) throws IOException {
String requestBody = "{ \"field\": \"value\" }";
int statusCode = randomStatusCode(getRandom());
return bodyTest(restClient, method, statusCode, new Header[0]);
private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws IOException {
String requestBody = "{ \"field\": \"value\" }";
Request request = new Request(method, "/" + statusCode);
Response esResponse;
try {
esResponse = restClient.performRequest(request);
@ -9,7 +9,7 @@ When using the `has_parent` query it is important to use the `PreBuiltTransportC
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(InetAddresses.forString(""), 9300)));
client.addTransportAddress(new TransportAddress(new InetSocketAddress(InetAddresses.forString(""), 9300)));
Otherwise the parent-join module doesn't get loaded and the `has_parent` query can't be used from the transport client.
@ -9,7 +9,7 @@ See:
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
TransportClient client = new PreBuiltTransportClient(settings);
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(InetAddresses.forString(""), 9300)));
client.addTransportAddress(new TransportAddress(new InetSocketAddress(InetAddresses.forString(""), 9300)));
Before the `percolate` query can be used an `percolator` mapping should be added and
@ -124,7 +124,7 @@ The `other_bucket` parameter can be set to add a bucket to the response which wi
not match any of the given filters. The value of this parameter can be as follows:
`false`:: Does not compute the `other` bucket
`true`:: Returns the `other` bucket bucket either in a bucket (named `_other_` by default) if named filters are being used,
`true`:: Returns the `other` bucket either in a bucket (named `_other_` by default) if named filters are being used,
or as the last bucket if anonymous filters are being used
The `other_bucket_key` parameter can be used to set the key for the `other` bucket to a value other than the default `_other_`. Setting
@ -28,8 +28,8 @@ The top_hits aggregation returns regular search hits, because of this many per h
==== Example
In the following example we group the questions by tag and per tag we show the last active question. For each question
only the title field is being included in the source.
In the following example we group the sales by type and per type we show the last sale.
For each sale only the date and price fields are being included in the source.
@ -237,9 +237,14 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin {
filters.add(PreConfiguredTokenFilter.singleton("dutch_stem", false, input -> new SnowballFilter(input, new DutchStemmer())));
filters.add(PreConfiguredTokenFilter.singleton("edge_ngram", false, input ->
new EdgeNGramTokenFilter(input, EdgeNGramTokenFilter.DEFAULT_MIN_GRAM_SIZE, EdgeNGramTokenFilter.DEFAULT_MAX_GRAM_SIZE)));
// TODO deprecate edgeNGram
filters.add(PreConfiguredTokenFilter.singleton("edgeNGram", false, input ->
new EdgeNGramTokenFilter(input, EdgeNGramTokenFilter.DEFAULT_MIN_GRAM_SIZE, EdgeNGramTokenFilter.DEFAULT_MAX_GRAM_SIZE)));
filters.add(PreConfiguredTokenFilter.singletonWithVersion("edgeNGram", false, (reader, version) -> {
if (version.onOrAfter(org.elasticsearch.Version.V_6_4_0)) {
"The [edgeNGram] token filter name is deprecated and will be removed in a future version. "
+ "Please change the filter name to [edge_ngram] instead.");
return new EdgeNGramTokenFilter(reader, EdgeNGramTokenFilter.DEFAULT_MIN_GRAM_SIZE, EdgeNGramTokenFilter.DEFAULT_MAX_GRAM_SIZE);
filters.add(PreConfiguredTokenFilter.singleton("elision", true,
input -> new ElisionFilter(input, FrenchAnalyzer.DEFAULT_ARTICLES)));
filters.add(PreConfiguredTokenFilter.singleton("french_stem", false, input -> new SnowballFilter(input, new FrenchStemmer())));
@ -256,8 +261,14 @@ public class CommonAnalysisPlugin extends Plugin implements AnalysisPlugin {
filters.add(PreConfiguredTokenFilter.singleton("ngram", false, NGramTokenFilter::new));
// TODO deprecate nGram
filters.add(PreConfiguredTokenFilter.singleton("nGram", false, NGramTokenFilter::new));
filters.add(PreConfiguredTokenFilter.singletonWithVersion("nGram", false, (reader, version) -> {
if (version.onOrAfter(org.elasticsearch.Version.V_6_4_0)) {
"The [nGram] token filter name is deprecated and will be removed in a future version. "
+ "Please change the filter name to [ngram] instead.");
return new NGramTokenFilter(reader);
filters.add(PreConfiguredTokenFilter.singleton("persian_normalization", true, PersianNormalizationFilter::new));
filters.add(PreConfiguredTokenFilter.singleton("porter_stem", false, PorterStemFilter::new));
filters.add(PreConfiguredTokenFilter.singleton("reverse", false, ReverseStringFilter::new));
@ -0,0 +1,119 @@
package org.elasticsearch.analysis.common;
import org.apache.lucene.analysis.MockTokenizer;
import org.apache.lucene.analysis.Tokenizer;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.TokenFilterFactory;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.io.StringReader;
import java.util.Map;
public class CommonAnalysisPluginTests extends ESTestCase {
* Check that the deprecated name "nGram" issues a deprecation warning for indices created since 6.3.0
public void testNGramDeprecationWarning() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, Version.CURRENT))
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings);
try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) {
Map<String, TokenFilterFactory> tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter;
TokenFilterFactory tokenFilterFactory = tokenFilters.get("nGram");
Tokenizer tokenizer = new MockTokenizer();
tokenizer.setReader(new StringReader("foo bar"));
"The [nGram] token filter name is deprecated and will be removed in a future version. "
+ "Please change the filter name to [ngram] instead.");
* Check that the deprecated name "nGram" does NOT issues a deprecation warning for indices created before 6.4.0
public void testNGramNoDeprecationWarningPre6_4() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
VersionUtils.randomVersionBetween(random(), Version.V_5_0_0, Version.V_6_3_0))
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings);
try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) {
Map<String, TokenFilterFactory> tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter;
TokenFilterFactory tokenFilterFactory = tokenFilters.get("nGram");
Tokenizer tokenizer = new MockTokenizer();
tokenizer.setReader(new StringReader("foo bar"));
* Check that the deprecated name "edgeNGram" issues a deprecation warning for indices created since 6.3.0
public void testEdgeNGramDeprecationWarning() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, Version.CURRENT))
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings);
try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) {
Map<String, TokenFilterFactory> tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter;
TokenFilterFactory tokenFilterFactory = tokenFilters.get("edgeNGram");
Tokenizer tokenizer = new MockTokenizer();
tokenizer.setReader(new StringReader("foo bar"));
"The [edgeNGram] token filter name is deprecated and will be removed in a future version. "
+ "Please change the filter name to [edge_ngram] instead.");
* Check that the deprecated name "edgeNGram" does NOT issues a deprecation warning for indices created before 6.4.0
public void testEdgeNGramNoDeprecationWarningPre6_4() throws IOException {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
VersionUtils.randomVersionBetween(random(), Version.V_5_0_0, Version.V_6_3_0))
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings);
try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) {
Map<String, TokenFilterFactory> tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter;
TokenFilterFactory tokenFilterFactory = tokenFilters.get("edgeNGram");
Tokenizer tokenizer = new MockTokenizer();
tokenizer.setReader(new StringReader("foo bar"));
@ -32,15 +32,11 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTokenStreamTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static com.carrotsearch.randomizedtesting.RandomizedTest.scaledRandomIntBetween;
import static org.hamcrest.Matchers.instanceOf;
@ -129,7 +125,7 @@ public class NGramTokenizerFactoryTests extends ESTokenStreamTestCase {
for (int i = 0; i < iters; i++) {
final Index index = new Index("test", "_na_");
final String name = "ngr";
Version v = randomVersion(random());
Version v = VersionUtils.randomVersion(random());
Builder builder = newAnalysisSettingsBuilder().put("min_gram", 2).put("max_gram", 3);
boolean reverse = random().nextBoolean();
if (reverse) {
@ -150,7 +146,6 @@ public class NGramTokenizerFactoryTests extends ESTokenStreamTestCase {
* test that throws an error when trying to get a NGramTokenizer where difference between max_gram and min_gram
* is greater than the allowed value of max_ngram_diff
@ -175,16 +170,4 @@ public class NGramTokenizerFactoryTests extends ESTokenStreamTestCase {
+ IndexSettings.MAX_NGRAM_DIFF_SETTING.getKey() + "] index level setting.",
private Version randomVersion(Random random) throws IllegalArgumentException, IllegalAccessException {
Field[] declaredFields = Version.class.getFields();
List<Field> versionFields = new ArrayList<>();
for (Field field : declaredFields) {
if ((field.getModifiers() & Modifier.STATIC) != 0 && field.getName().startsWith("V_") && field.getType() == Version.class) {
return (Version) versionFields.get(random.nextInt(versionFields.size())).get(Version.class);
@ -66,9 +66,6 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
if (blobExists(blobName)) {
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
@ -28,6 +28,7 @@ import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
@ -47,12 +48,15 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore {
// The recommended maximum size of a blob that should be uploaded in a single
@ -204,24 +208,32 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
* @param inputStream the stream containing the blob data
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(() -> storage.writer(blobInfo));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
public boolean isOpen() {
return writeChannel.isOpen();
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
public boolean isOpen() {
return writeChannel.isOpen();
public void close() throws IOException {
public void close() throws IOException {
@SuppressForbidden(reason = "Channel is based of a socket not a file")
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
@SuppressForbidden(reason = "Channel is based of a socket not a file")
public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
throw se;
@ -238,7 +250,17 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
SocketAccess.doPrivilegedVoidIOException(() -> storage.create(blobInfo, baos.toByteArray()));
() -> {
try {
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
throw se;
@ -295,8 +317,8 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
* Moves a blob within the same bucket
* @param sourceBlob name of the blob to move
* @param targetBlob new name of the blob in the same bucket
* @param sourceBlobName name of the blob to move
* @param targetBlobName new name of the blob in the same bucket
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
@ -56,6 +56,7 @@ import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
@ -113,7 +114,14 @@ class MockStorage implements Storage {
if (bucketName.equals(blobInfo.getBucket()) == false) {
throw new StorageException(404, "Bucket not found");
blobs.put(blobInfo.getName(), content);
if (Stream.of(options).anyMatch(option -> option.equals(BlobTargetOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), content);
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
} else {
blobs.put(blobInfo.getName(), content);
return get(BlobId.of(blobInfo.getBucket(), blobInfo.getName()));
@ -243,9 +251,16 @@ class MockStorage implements Storage {
public void close() throws IOException {
public void close() {
blobs.put(blobInfo.getName(), output.toByteArray());
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
if (existingBytes != null) {
throw new StorageException(412, "Blob already exists");
} else {
blobs.put(blobInfo.getName(), output.toByteArray());
@ -163,12 +163,24 @@ setup:
version: " - 6.3.99"
reason: "cluster state including cluster_uuid at the top level is new in v6.4.0 and higher"
# Get the current cluster_uuid
- do:
cluster.state: {}
- set: { metadata.cluster_uuid : cluster_uuid }
- do:
metric: [ master_node, version, metadata ]
metric: [ master_node, version ]
- is_true: cluster_uuid
- match: { cluster_uuid: $cluster_uuid }
- is_true: master_node
- is_true: version
- is_true: state_uuid
- is_true: metadata
- do:
metric: [ routing_table ]
index: testidx
- match: { cluster_uuid: $cluster_uuid }
- is_true: routing_table
@ -0,0 +1,39 @@
"Score should match explanation in rescore":
- skip:
version: " - 6.99.99"
reason: Explanation for rescoring was corrected after these versions
- do:
refresh: true
- '{"index": {"_index": "test_index", "_type": "_doc", "_id": "1"}}'
- '{"f1": "1"}'
- '{"index": {"_index": "test_index", "_type": "_doc", "_id": "2"}}'
- '{"f1": "2"}'
- '{"index": {"_index": "test_index", "_type": "_doc", "_id": "3"}}'
- '{"f1": "3"}'
- do:
index: test_index
explain: true
match_all: {}
window_size: 2
match_all: {}
query_weight: 5
rescore_query_weight: 10
- match: { hits.hits.0._score: 15 }
- match: { hits.hits.0._explanation.value: 15 }
- match: { hits.hits.1._score: 15 }
- match: { hits.hits.1._explanation.value: 15 }
- match: { hits.hits.2._score: 5 }
- match: { hits.hits.2._explanation.value: 5 }
@ -98,14 +98,11 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
if (request.blocks()) {
if (request.metaData()) {
MetaData.Builder mdBuilder;
if (request.indices().length == 0) {
mdBuilder = MetaData.builder(currentState.metaData());
} else {
mdBuilder = MetaData.builder();
MetaData.Builder mdBuilder = MetaData.builder();
if (request.metaData()) {
if (request.indices().length > 0) {
String[] indices = indexNameExpressionResolver.concreteIndexNames(currentState, request);
for (String filteredIndex : indices) {
@ -114,17 +111,19 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
mdBuilder.put(indexMetaData, false);
} else {
mdBuilder = MetaData.builder(currentState.metaData());
// Filter our metadata that shouldn't be returned by API
for(ObjectObjectCursor<String, Custom> custom : currentState.metaData().customs()) {
for(ObjectObjectCursor<String, Custom> custom : currentState.metaData().customs()) {
if(!custom.value.context().contains(MetaData.XContentContext.API)) {
if (request.customs()) {
for (ObjectObjectCursor<String, ClusterState.Custom> custom : currentState.customs()) {
if (custom.value.isPrivate() == false) {
@ -41,6 +41,15 @@ public final class PreConfiguredTokenFilter extends PreConfiguredAnalysisCompone
(tokenStream, version) -> create.apply(tokenStream));
* Create a pre-configured token filter that may not vary at all.
public static PreConfiguredTokenFilter singletonWithVersion(String name, boolean useFilterForMultitermQueries,
BiFunction<TokenStream, Version, TokenStream> create) {
return new PreConfiguredTokenFilter(name, useFilterForMultitermQueries, CachingStrategy.ONE,
(tokenStream, version) -> create.apply(tokenStream, version));
* Create a pre-configured token filter that may vary based on the Lucene version.
@ -1147,6 +1147,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
final Collection<String> fileNames;
try {
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadata = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
@ -1242,9 +1243,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* Snapshot individual file
* <p>
* This is asynchronous method. Upon completion of the operation latch is getting counted down and any failures are
* added to the {@code failures} list
* @param fileInfo file to be snapshotted
@ -30,6 +30,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Set;
import java.util.Collections;
import static java.util.stream.Collectors.toSet;
public final class QueryRescorer implements Rescorer {
@ -61,6 +63,11 @@ public final class QueryRescorer implements Rescorer {
// First take top slice of incoming docs, to be rescored:
TopDocs topNFirstPass = topN(topDocs, rescoreContext.getWindowSize());
// Save doc IDs for which rescoring was applied to be used in score explanation
Set<Integer> topNDocIDs = Collections.unmodifiableSet(
Arrays.stream(topNFirstPass.scoreDocs).map(scoreDoc -> scoreDoc.doc).collect(toSet()));
// Rescore them:
TopDocs rescored = rescorer.rescore(searcher, topNFirstPass, rescoreContext.getWindowSize());
@ -71,16 +78,12 @@ public final class QueryRescorer implements Rescorer {
public Explanation explain(int topLevelDocId, IndexSearcher searcher, RescoreContext rescoreContext,
Explanation sourceExplanation) throws IOException {
QueryRescoreContext rescore = (QueryRescoreContext) rescoreContext;
if (sourceExplanation == null) {
// this should not happen but just in case
return Explanation.noMatch("nothing matched");
// TODO: this isn't right? I.e., we are incorrectly pretending all first pass hits were rescored? If the requested docID was
// beyond the top rescoreContext.window() in the first pass hits, we don't rescore it now?
Explanation rescoreExplain = searcher.explain(rescore.query(), topLevelDocId);
QueryRescoreContext rescore = (QueryRescoreContext) rescoreContext;
float primaryWeight = rescore.queryWeight();
Explanation prim;
if (sourceExplanation.isMatch()) {
prim = Explanation.match(
@ -89,23 +92,24 @@ public final class QueryRescorer implements Rescorer {
} else {
prim = Explanation.noMatch("First pass did not match", sourceExplanation);
// NOTE: we don't use Lucene's Rescorer.explain because we want to insert our own description with which ScoreMode was used. Maybe
// we should add QueryRescorer.explainCombine to Lucene?
if (rescoreExplain != null && rescoreExplain.isMatch()) {
float secondaryWeight = rescore.rescoreQueryWeight();
Explanation sec = Explanation.match(
if (rescoreContext.isRescored(topLevelDocId)){
Explanation rescoreExplain = searcher.explain(rescore.query(), topLevelDocId);
// NOTE: we don't use Lucene's Rescorer.explain because we want to insert our own description with which ScoreMode was used.
// Maybe we should add QueryRescorer.explainCombine to Lucene?
if (rescoreExplain != null && rescoreExplain.isMatch()) {
float secondaryWeight = rescore.rescoreQueryWeight();
Explanation sec = Explanation.match(
rescoreExplain.getValue() * secondaryWeight,
"product of:",
rescoreExplain, Explanation.match(secondaryWeight, "secondaryWeight"));
QueryRescoreMode scoreMode = rescore.scoreMode();
return Explanation.match(
QueryRescoreMode scoreMode = rescore.scoreMode();
return Explanation.match(
scoreMode.combine(prim.getValue(), sec.getValue()),
scoreMode + " of:",
prim, sec);
} else {
return prim;
return prim;
private static final Comparator<ScoreDoc> SCORE_DOC_COMPARATOR = new Comparator<ScoreDoc>() {
@ -19,6 +19,8 @@
package org.elasticsearch.search.rescore;
import java.util.Set;
* Context available to the rescore while it is running. Rescore
* implementations should extend this with any additional resources that
@ -27,6 +29,7 @@ package org.elasticsearch.search.rescore;
public class RescoreContext {
private final int windowSize;
private final Rescorer rescorer;
private Set<Integer> resroredDocs; //doc Ids for which rescoring was applied
* Build the context.
@ -50,4 +53,12 @@ public class RescoreContext {
public int getWindowSize() {
return windowSize;
public void setRescoredDocs(Set<Integer> docIds) {
resroredDocs = docIds;
public boolean isRescored(int docId) {
return resroredDocs.contains(docId);
@ -470,7 +470,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
* TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several
* parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard.
public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
@ -494,6 +494,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
assertHitCount(countResponse, numDocs);
ShardRouting shardRouting = corruptRandomPrimaryFile(false);
logger.info("--> shard {} has a corrupted file", shardRouting);
// we don't corrupt segments.gen since S/R doesn't snapshot this file
// the other problem here why we can't corrupt segments.X files is that the snapshot flushes again before
// it snapshots and that will write a new segments.X+1 file
@ -504,9 +505,12 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test").get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.PARTIAL));
logger.info("failed during snapshot -- maybe SI file got corrupted");
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
final SnapshotState snapshotState = createSnapshotResponse.getSnapshotInfo().state();
logger.info("--> snapshot terminated with state " + snapshotState);
final List<Path> files = listShardFiles(shardRouting);
Path corruptedFile = null;
for (Path file : files) {
@ -515,6 +519,11 @@ public class CorruptedFileIT extends ESIntegTestCase {
if (snapshotState != SnapshotState.PARTIAL) {
logger.info("--> listing shard files for investigation");
files.forEach(f -> logger.info("path: {}", f.toAbsolutePath()));
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.PARTIAL));
assertThat(corruptedFile, notNullValue());
@ -95,7 +95,7 @@ TransportClient client = new PreBuiltXPackTransportClient(Settings.builder()
.put("cluster.name", "myClusterName")
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
XPackClient xpackClient = new XPackClient(client);
WatcherClient watcherClient = xpackClient.watcher();
@ -37,6 +37,16 @@ public class CategorizerState {
return jobId + "#";
* Given the id of a categorizer state document it extracts the job id
* @param docId the categorizer state document id
* @return the job id or {@code null} if the id is not valid
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE);
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
private CategorizerState() {
@ -29,6 +29,16 @@ public class ModelState {
return jobId + "-" + snapshotId + "#" + docNum;
* Given the id of a state document it extracts the job id
* @param docId the state document id
* @return the job id or {@code null} if the id is not valid
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE + "_");
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
private ModelState() {
@ -60,6 +60,16 @@ public class Quantiles implements ToXContentObject, Writeable {
return jobId + "-" + TYPE;
* Given the id of a quantiles document it extracts the job id
* @param docId the quantiles document id
* @return the job id or {@code null} if the id is not valid
public static final String extractJobId(String docId) {
int suffixIndex = docId.lastIndexOf("_" + TYPE);
return suffixIndex <= 0 ? null : docId.substring(0, suffixIndex);
private final String jobId;
private final Date timestamp;
private final String quantileState;
@ -12,8 +12,9 @@ public final class WatcherIndexTemplateRegistryField {
// version 3: include watch status in history
// version 6: upgrade to ES 6, removal of _status field
// version 7: add full exception stack traces for better debugging
// version 8: fix slack attachment property not to be dynamic, causing field type issues
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final String INDEX_TEMPLATE_VERSION = "7";
public static final String INDEX_TEMPLATE_VERSION = "8";
public static final String HISTORY_TEMPLATE_NAME = ".watch-history-" + INDEX_TEMPLATE_VERSION;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
public static final String WATCHES_TEMPLATE_NAME = ".watches";
@ -507,6 +507,13 @@
"properties" : {
"color" : {
"type" : "keyword"
"fields" : {
"properties" : {
"value" : {
"type" : "text"
@ -0,0 +1,29 @@
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class CategorizerStateTests extends ESTestCase {
public void testExtractJobId_GivenValidDocId() {
assertThat(CategorizerState.extractJobId("foo_categorizer_state#1"), equalTo("foo"));
assertThat(CategorizerState.extractJobId("bar_categorizer_state#2"), equalTo("bar"));
assertThat(CategorizerState.extractJobId("foo_bar_categorizer_state#3"), equalTo("foo_bar"));
assertThat(CategorizerState.extractJobId("_categorizer_state_categorizer_state#3"), equalTo("_categorizer_state"));
public void testExtractJobId_GivenInvalidDocId() {
assertThat(CategorizerState.extractJobId(""), is(nullValue()));
assertThat(CategorizerState.extractJobId("foo"), is(nullValue()));
assertThat(CategorizerState.extractJobId("_categorizer_state"), is(nullValue()));
assertThat(CategorizerState.extractJobId("foo_model_state_3141341341"), is(nullValue()));
@ -0,0 +1,31 @@
package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class ModelStateTests extends ESTestCase {
public void testExtractJobId_GivenValidDocId() {
assertThat(ModelState.extractJobId("foo_model_state_3151373783#1"), equalTo("foo"));
assertThat(ModelState.extractJobId("bar_model_state_451515#3"), equalTo("bar"));
assertThat(ModelState.extractJobId("foo_bar_model_state_blah_blah"), equalTo("foo_bar"));
assertThat(ModelState.extractJobId("_model_state_model_state_11111"), equalTo("_model_state"));
public void testExtractJobId_GivenInvalidDocId() {
assertThat(ModelState.extractJobId(""), is(nullValue()));
assertThat(ModelState.extractJobId("foo"), is(nullValue()));
assertThat(ModelState.extractJobId("_model_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("_state_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("_model_state_3141341341"), is(nullValue()));
assertThat(ModelState.extractJobId("foo_quantiles"), is(nullValue()));
@ -15,9 +15,26 @@ import java.io.IOException;
import java.util.Date;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
public class QuantilesTests extends AbstractSerializingTestCase<Quantiles> {
public void testExtractJobId_GivenValidDocId() {
assertThat(Quantiles.extractJobId("foo_quantiles"), equalTo("foo"));
assertThat(Quantiles.extractJobId("bar_quantiles"), equalTo("bar"));
assertThat(Quantiles.extractJobId("foo_bar_quantiles"), equalTo("foo_bar"));
assertThat(Quantiles.extractJobId("_quantiles_quantiles"), equalTo("_quantiles"));
public void testExtractJobId_GivenInvalidDocId() {
assertThat(Quantiles.extractJobId(""), is(nullValue()));
assertThat(Quantiles.extractJobId("foo"), is(nullValue()));
assertThat(Quantiles.extractJobId("_quantiles"), is(nullValue()));
assertThat(Quantiles.extractJobId("foo_model_state_3141341341"), is(nullValue()));
public void testEquals_GivenSameObject() {
Quantiles quantiles = new Quantiles("foo", new Date(0L), "foo");
@ -22,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.retention.ExpiredForecastsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredModelSnapshotsRemover;
import org.elasticsearch.xpack.ml.job.retention.ExpiredResultsRemover;
import org.elasticsearch.xpack.ml.job.retention.MlDataRemover;
import org.elasticsearch.xpack.ml.job.retention.UnusedStateRemover;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.VolatileCursorIterator;
@ -56,7 +57,8 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
List<MlDataRemover> dataRemovers = Arrays.asList(
new ExpiredResultsRemover(client, clusterService, auditor),
new ExpiredForecastsRemover(client),
new ExpiredModelSnapshotsRemover(client, clusterService)
new ExpiredModelSnapshotsRemover(client, clusterService),
new UnusedStateRemover(client, clusterService)
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener);
@ -97,6 +97,7 @@ public abstract class BatchedDocumentsIterator<T> {
searchRequest.source(new SearchSourceBuilder()
SearchResponse searchResponse = client.search(searchRequest).actionGet();
@ -123,6 +124,14 @@ public abstract class BatchedDocumentsIterator<T> {
return results;
* Should fetch source? Defaults to {@code true}
* @return whether the source should be fetched
protected boolean shouldFetchSource() {
return true;
* Get the query to use for the search
* @return the search query
@ -0,0 +1,36 @@
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
* Iterates through the state doc ids
public class BatchedStateDocIdsIterator extends BatchedDocumentsIterator<String> {
public BatchedStateDocIdsIterator(Client client, String index) {
super(client, index);
protected boolean shouldFetchSource() {
return false;
protected QueryBuilder getQuery() {
return QueryBuilders.matchAllQuery();
protected String map(SearchHit hit) {
return hit.getId();
@ -0,0 +1,134 @@
package org.elasticsearch.xpack.ml.job.retention;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.persistence.BatchedStateDocIdsIterator;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
* If for any reason a job is deleted by some of its state documents
* are left behind, this class deletes any unused documents stored
* in the .ml-state index.
public class UnusedStateRemover implements MlDataRemover {
private static final Logger LOGGER = Loggers.getLogger(UnusedStateRemover.class);
private final Client client;
private final ClusterService clusterService;
public UnusedStateRemover(Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
public void remove(ActionListener<Boolean> listener) {
try {
BulkRequestBuilder deleteUnusedStateRequestBuilder = findUnusedStateDocs();
if (deleteUnusedStateRequestBuilder.numberOfActions() > 0) {
executeDeleteUnusedStateDocs(deleteUnusedStateRequestBuilder, listener);
} else {
} catch (Exception e) {
private BulkRequestBuilder findUnusedStateDocs() {
Set<String> jobIds = getJobIds();
BulkRequestBuilder deleteUnusedStateRequestBuilder = client.prepareBulk();
BatchedStateDocIdsIterator stateDocIdsIterator = new BatchedStateDocIdsIterator(client, AnomalyDetectorsIndex.jobStateIndexName());
while (stateDocIdsIterator.hasNext()) {
Deque<String> stateDocIds = stateDocIdsIterator.next();
for (String stateDocId : stateDocIds) {
String jobId = JobIdExtractor.extractJobId(stateDocId);
if (jobId == null) {
// not a managed state document id
if (jobIds.contains(jobId) == false) {
deleteUnusedStateRequestBuilder.add(new DeleteRequest(
AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, stateDocId));
return deleteUnusedStateRequestBuilder;
private Set<String> getJobIds() {
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = clusterState.getMetaData().custom(MLMetadataField.TYPE);
if (mlMetadata != null) {
return mlMetadata.getJobs().keySet();
return Collections.emptySet();
private void executeDeleteUnusedStateDocs(BulkRequestBuilder deleteUnusedStateRequestBuilder, ActionListener<Boolean> listener) {
LOGGER.info("Found [{}] unused state documents; attempting to delete",
deleteUnusedStateRequestBuilder.execute(new ActionListener<BulkResponse>() {
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures()) {
LOGGER.error("Some unused state documents could not be deleted due to failures: {}",
} else {
LOGGER.info("Successfully deleted all unused state documents");
public void onFailure(Exception e) {
LOGGER.error("Error deleting unused model state documents: ", e);
private static class JobIdExtractor {
private static List<Function<String, String>> extractors = Arrays.asList(
ModelState::extractJobId, Quantiles::extractJobId, CategorizerState::extractJobId);
private static String extractJobId(String docId) {
String jobId;
for (Function<String, String> extractor : extractors) {
jobId = extractor.apply(docId);
if (jobId != null) {
return jobId;
return null;
@ -450,7 +450,7 @@ public class TokenServiceTests extends ESTestCase {
final TimeValue defaultExpiration = TokenService.TOKEN_EXPIRATION.get(Settings.EMPTY);
final int fastForwardAmount = randomIntBetween(1, Math.toIntExact(defaultExpiration.getSeconds()));
final int fastForwardAmount = randomIntBetween(1, Math.toIntExact(defaultExpiration.getSeconds()) - 5);
try (ThreadContext.StoredContext ignore = requestContext.newStoredContext(true)) {
// move the clock forward but don't go to expiry
@ -134,7 +134,7 @@ public class AutodetectMemoryLimitIT extends MlNativeAutodetectIntegTestCase {
assertThat(modelSizeStats.getModelBytes(), lessThan(36000000L));
assertThat(modelSizeStats.getModelBytes(), greaterThan(30000000L));
assertThat(modelSizeStats.getTotalByFieldCount(), lessThan(1900L));
assertThat(modelSizeStats.getTotalByFieldCount(), greaterThan(1600L));
assertThat(modelSizeStats.getTotalByFieldCount(), greaterThan(1500L));
assertThat(modelSizeStats.getMemoryStatus(), equalTo(ModelSizeStats.MemoryStatus.HARD_LIMIT));
@ -8,12 +8,15 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
@ -21,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
@ -31,13 +35,16 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
@ -78,11 +85,16 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
public void tearDownData() throws Exception {
public void tearDownData() {
public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
// Tests that nothing goes wrong when there's nothing to delete
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
public void testDeleteExpiredData() throws Exception {
@ -166,6 +178,18 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount()));
// Index some unused state documents (more than 10K to test scrolling works)
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
assertThat(bulkRequestBuilder.get().status(), equalTo(RestStatus.OK));
// Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
// We need to refresh to ensure the deletion is visible
@ -216,6 +240,16 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L));
// Verify .ml-state doesn't contain unused state documents
SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexName())
assertThat(stateDocsResponse.getHits().getTotalHits(), lessThan(10000L));
for (SearchHit hit : stateDocsResponse.getHits().getHits()) {
assertThat(hit.getId().startsWith("non_existing_job"), is(false));
private static Job.Builder newJobBuilder(String id) {
@ -71,6 +71,6 @@ public class IndexAuditUpgradeIT extends AbstractUpgradeTestCase {
List<Map<String, Object>> buckets = (List<Map<String, Object>>) nodesAgg.get("buckets");
assertEquals(numBuckets, buckets.size());
assertEquals("Found node buckets " + buckets, numBuckets, buckets.size());
@ -16,7 +16,13 @@
"input": {
"simple": {
"foo" : "something from input"
"foo" : "something from input",
"hits" : {
"hits" : [
{ "_source" : { "name" : "first", "value" : "2018-04-26T11:45:12.518Z" } },
{ "_source" : { "name" : "second", "value" : "anything" } }
"actions": {
@ -49,7 +55,20 @@
"dynamic_attachments" : {
"list_path" : "ctx.payload.hits.hits",
"attachment_template" : {
"title": "Title",
"fields" : [
"title" : "Field title {{_source.name}}",
"value" : "{{_source.value}}",
"short" : true
Reference in New Issue
Block a user