Merge branch 'master' into feature/rank-eval

Conflicts:
	core/src/main/java/org/elasticsearch/script/Script.java
This commit is contained in:
Christoph Büscher 2016-11-15 14:56:36 +01:00
commit d6738fa650
526 changed files with 7132 additions and 3610 deletions

1
.gitignore vendored
View File

@ -33,6 +33,7 @@ dependency-reduced-pom.xml
# testing stuff
**/.local*
.vagrant/
/logs/
# osx stuff
.DS_Store

View File

@ -123,7 +123,7 @@ There are many more options to perform search, after all, it's a search product
h3. Multi Tenant - Indices and Types
Maan, that twitter index might get big (in this case, index size == valuation). Let's see if we can structure our twitter system a bit differently in order to support such large amounts of data.
Man, that twitter index might get big (in this case, index size == valuation). Let's see if we can structure our twitter system a bit differently in order to support such large amounts of data.
Elasticsearch supports multiple indices, as well as multiple types per index. In the previous example we used an index called @twitter@, with two types, @user@ and @tweet@.

View File

@ -325,7 +325,7 @@ vagrant plugin install vagrant-cachier
. Validate your installed dependencies:
-------------------------------------
gradle :qa:vagrant:checkVagrantVersion
gradle :qa:vagrant:vagrantCheckVersion
-------------------------------------
. Download and smoke test the VMs with `gradle vagrantSmokeTest` or
@ -417,17 +417,26 @@ and in another window:
----------------------------------------------------
vagrant up centos-7 --provider virtualbox && vagrant ssh centos-7
cd $TESTROOT
sudo bats $BATS/*rpm*.bats
cd $BATS_ARCHIVES
sudo -E bats $BATS_TESTS/*rpm*.bats
----------------------------------------------------
If you wanted to retest all the release artifacts on a single VM you could:
-------------------------------------------------
gradle prepareTestRoot
gradle vagrantSetUp
vagrant up ubuntu-1404 --provider virtualbox && vagrant ssh ubuntu-1404
cd $TESTROOT
sudo bats $BATS/*.bats
cd $BATS_ARCHIVES
sudo -E bats $BATS_TESTS/*.bats
-------------------------------------------------
Note: Starting vagrant VM outside of the elasticsearch folder requires to
indicates the folder that contains the Vagrantfile using the VAGRANT_CWD
environment variable:
-------------------------------------------------
gradle vagrantSetUp
VAGRANT_CWD=/path/to/elasticsearch vagrant up centos-7 --provider virtualbox
-------------------------------------------------
== Coverage analysis

9
Vagrantfile vendored
View File

@ -77,6 +77,9 @@ Vagrant.configure(2) do |config|
# the elasticsearch project called vagrant....
config.vm.synced_folder ".", "/vagrant", disabled: true
config.vm.synced_folder ".", "/elasticsearch"
# Expose project directory
PROJECT_DIR = ENV['VAGRANT_PROJECT_DIR'] || Dir.pwd
config.vm.synced_folder PROJECT_DIR, "/project"
config.vm.provider "virtualbox" do |v|
# Give the boxes 3GB because Elasticsearch defaults to using 2GB
v.memory = 3072
@ -272,8 +275,10 @@ export ZIP=/elasticsearch/distribution/zip/build/distributions
export TAR=/elasticsearch/distribution/tar/build/distributions
export RPM=/elasticsearch/distribution/rpm/build/distributions
export DEB=/elasticsearch/distribution/deb/build/distributions
export TESTROOT=/elasticsearch/qa/vagrant/build/testroot
export BATS=/elasticsearch/qa/vagrant/src/test/resources/packaging/scripts
export BATS=/project/build/bats
export BATS_UTILS=/project/build/bats/utils
export BATS_TESTS=/project/build/bats/tests
export BATS_ARCHIVES=/project/build/bats/archives
VARS
SHELL
end

View File

@ -495,6 +495,8 @@ class BuildPlugin implements Plugin<Project> {
systemProperty 'tests.artifact', project.name
systemProperty 'tests.task', path
systemProperty 'tests.security.manager', 'true'
// Breaking change in JDK-9, revert to JDK-8 behavior for now, see https://github.com/elastic/elasticsearch/issues/21534
systemProperty 'jdk.io.permissionsUseCanonicalPath', 'true'
systemProperty 'jna.nosys', 'true'
// default test sysprop values
systemProperty 'tests.ifNoTests', 'fail'

View File

@ -143,6 +143,10 @@ public class ThirdPartyAuditTask extends AntTask {
if (m.matches()) {
missingClasses.add(m.group(1).replace('.', '/') + ".class");
}
// Reset the priority of the event to DEBUG, so it doesn't
// pollute the build output
event.setMessage(event.getMessage(), Project.MSG_DEBUG);
} else if (event.getPriority() == Project.MSG_ERR) {
Matcher m = VIOLATION_PATTERN.matcher(event.getMessage());
if (m.matches()) {

View File

@ -104,11 +104,13 @@ class ClusterConfiguration {
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.echo("==> [${new Date()}] checking health: http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow")
String waitUrl = "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow"
ant.echo(message: "==> [${new Date()}] checking health: ${waitUrl}",
level: 'info')
// checking here for wait_for_nodes to be >= the number of nodes because its possible
// this cluster is attempting to connect to nodes created by another task (same cluster name),
// so there will be more nodes in that case in the cluster state
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
ant.get(src: waitUrl,
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)
@ -121,7 +123,7 @@ class ClusterConfiguration {
Map<String, String> systemProperties = new HashMap<>()
Map<String, String> settings = new HashMap<>()
Map<String, Object> settings = new HashMap<>()
// map from destination path, to source file
Map<String, Object> extraConfigFiles = new HashMap<>()
@ -138,7 +140,7 @@ class ClusterConfiguration {
}
@Input
void setting(String name, String value) {
void setting(String name, Object value) {
settings.put(name, value)
}

View File

@ -18,14 +18,7 @@
*/
package org.elasticsearch.gradle.vagrant
import org.gradle.api.DefaultTask
import org.gradle.api.tasks.Input
import org.gradle.api.tasks.TaskAction
import org.gradle.logging.ProgressLoggerFactory
import org.gradle.process.internal.ExecAction
import org.gradle.process.internal.ExecActionFactory
import javax.inject.Inject
/**
* Runs bats over vagrant. Pretty much like running it using Exec but with a

View File

@ -34,11 +34,18 @@ public class VagrantCommandTask extends LoggedExec {
@Input
String boxName
@Input
Map<String, String> environmentVars
public VagrantCommandTask() {
executable = 'vagrant'
project.afterEvaluate {
// It'd be nice if --machine-readable were, well, nice
standardOutput = new TeeOutputStream(standardOutput, createLoggerOutputStream())
if (environmentVars != null) {
environment environmentVars
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gradle.vagrant
import org.gradle.api.tasks.Input
class VagrantPropertiesExtension {
@Input
List<String> boxes
@Input
Long testSeed
@Input
String formattedTestSeed
@Input
String upgradeFromVersion
@Input
List<String> upgradeFromVersions
@Input
String batsDir
@Input
Boolean inheritTests
@Input
Boolean inheritTestArchives
@Input
Boolean inheritTestUtils
VagrantPropertiesExtension(List<String> availableBoxes) {
this.boxes = availableBoxes
this.batsDir = 'src/test/resources/packaging'
}
void boxes(String... boxes) {
this.boxes = Arrays.asList(boxes)
}
void setBatsDir(String batsDir) {
this.batsDir = batsDir
}
void setInheritTests(Boolean inheritTests) {
this.inheritTests = inheritTests
}
void setInheritTestArchives(Boolean inheritTestArchives) {
this.inheritTestArchives = inheritTestArchives
}
void setInheritTestUtils(Boolean inheritTestUtils) {
this.inheritTestUtils = inheritTestUtils
}
}

View File

@ -0,0 +1,457 @@
package org.elasticsearch.gradle.vagrant
import org.elasticsearch.gradle.FileContentsTask
import org.gradle.BuildAdapter
import org.gradle.BuildResult
import org.gradle.api.*
import org.gradle.api.artifacts.dsl.RepositoryHandler
import org.gradle.api.internal.artifacts.dependencies.DefaultProjectDependency
import org.gradle.api.tasks.Copy
import org.gradle.api.tasks.Delete
import org.gradle.api.tasks.Exec
class VagrantTestPlugin implements Plugin<Project> {
/** All available boxes **/
static List<String> BOXES = [
'centos-6',
'centos-7',
'debian-8',
'fedora-24',
'oel-6',
'oel-7',
'opensuse-13',
'sles-12',
'ubuntu-1204',
'ubuntu-1404',
'ubuntu-1604'
]
/** Boxes used when sampling the tests **/
static List<String> SAMPLE = [
'centos-7',
'ubuntu-1404',
]
/** All onboarded archives by default, available for Bats tests even if not used **/
static List<String> DISTRIBUTION_ARCHIVES = ['tar', 'rpm', 'deb']
/** Packages onboarded for upgrade tests **/
static List<String> UPGRADE_FROM_ARCHIVES = ['rpm', 'deb']
private static final BATS = 'bats'
private static final String BATS_TEST_COMMAND ="cd \$BATS_ARCHIVES && sudo -E bats --tap \$BATS_TESTS/*.$BATS"
@Override
void apply(Project project) {
// Creates the Vagrant extension for the project
project.extensions.create('esvagrant', VagrantPropertiesExtension, listVagrantBoxes(project))
// Add required repositories for Bats tests
configureBatsRepositories(project)
// Creates custom configurations for Bats testing files (and associated scripts and archives)
createBatsConfiguration(project)
// Creates all the main Vagrant tasks
createVagrantTasks(project)
if (project.extensions.esvagrant.boxes == null || project.extensions.esvagrant.boxes.size() == 0) {
throw new InvalidUserDataException('Vagrant boxes cannot be null or empty for esvagrant')
}
for (String box : project.extensions.esvagrant.boxes) {
if (BOXES.contains(box) == false) {
throw new InvalidUserDataException("Vagrant box [${box}] not found, available virtual machines are ${BOXES}")
}
}
// Creates all tasks related to the Vagrant boxes
createVagrantBoxesTasks(project)
}
private List<String> listVagrantBoxes(Project project) {
String vagrantBoxes = project.getProperties().get('vagrant.boxes', 'sample')
if (vagrantBoxes == 'sample') {
return SAMPLE
} else if (vagrantBoxes == 'all') {
return BOXES
} else {
return vagrantBoxes.split(',')
}
}
private static Set<String> listVersions(Project project) {
Node xml
new URL('https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch/maven-metadata.xml').openStream().withStream { s ->
xml = new XmlParser().parse(s)
}
Set<String> versions = new TreeSet<>(xml.versioning.versions.version.collect { it.text() }.findAll { it ==~ /[5]\.\d\.\d/ })
if (versions.isEmpty() == false) {
return versions;
}
// If no version is found, we run the tests with the current version
return Collections.singleton(project.version);
}
private static File getVersionsFile(Project project) {
File versions = new File(project.projectDir, 'versions');
if (versions.exists() == false) {
// Use the elasticsearch's versions file from project :qa:vagrant
versions = project.project(":qa:vagrant").file('versions')
}
return versions
}
private static void configureBatsRepositories(Project project) {
RepositoryHandler repos = project.repositories
// Try maven central first, it'll have releases before 5.0.0
repos.mavenCentral()
/* Setup a repository that tries to download from
https://artifacts.elastic.co/downloads/elasticsearch/[module]-[revision].[ext]
which should work for 5.0.0+. This isn't a real ivy repository but gradle
is fine with that */
repos.ivy {
artifactPattern "https://artifacts.elastic.co/downloads/elasticsearch/[module]-[revision].[ext]"
}
}
private static void createBatsConfiguration(Project project) {
project.configurations.create(BATS)
Long seed
String formattedSeed = null
String[] upgradeFromVersions
String maybeTestsSeed = System.getProperty("tests.seed", null);
if (maybeTestsSeed != null) {
List<String> seeds = maybeTestsSeed.tokenize(':')
if (seeds.size() != 0) {
String masterSeed = seeds.get(0)
seed = new BigInteger(masterSeed, 16).longValue()
formattedSeed = maybeTestsSeed
}
}
if (formattedSeed == null) {
seed = new Random().nextLong()
formattedSeed = String.format("%016X", seed)
}
String maybeUpdradeFromVersions = System.getProperty("tests.packaging.upgrade.from.versions", null)
if (maybeUpdradeFromVersions != null) {
upgradeFromVersions = maybeUpdradeFromVersions.split(",")
} else {
upgradeFromVersions = getVersionsFile(project)
}
String upgradeFromVersion = upgradeFromVersions[new Random(seed).nextInt(upgradeFromVersions.length)]
DISTRIBUTION_ARCHIVES.each {
// Adds a dependency for the current version
project.dependencies.add(BATS, project.dependencies.project(path: ":distribution:${it}", configuration: 'archives'))
}
UPGRADE_FROM_ARCHIVES.each {
// The version of elasticsearch that we upgrade *from*
project.dependencies.add(BATS, "org.elasticsearch.distribution.${it}:elasticsearch:${upgradeFromVersion}@${it}")
}
project.extensions.esvagrant.testSeed = seed
project.extensions.esvagrant.formattedTestSeed = formattedSeed
project.extensions.esvagrant.upgradeFromVersion = upgradeFromVersion
project.extensions.esvagrant.upgradeFromVersions = upgradeFromVersions
}
private static void createCleanTask(Project project) {
project.tasks.create('clean', Delete.class) {
description 'Clean the project build directory'
group 'Build'
delete project.buildDir
}
}
private static void createStopTask(Project project) {
project.tasks.create('stop') {
description 'Stop any tasks from tests that still may be running'
group 'Verification'
}
}
private static void createSmokeTestTask(Project project) {
project.tasks.create('vagrantSmokeTest') {
description 'Smoke test the specified vagrant boxes'
group 'Verification'
}
}
private static void createPrepareVagrantTestEnvTask(Project project) {
File batsDir = new File("${project.buildDir}/${BATS}")
Task createBatsDirsTask = project.tasks.create('createBatsDirs')
createBatsDirsTask.outputs.dir batsDir
createBatsDirsTask.dependsOn project.tasks.vagrantVerifyVersions
createBatsDirsTask.doLast {
batsDir.mkdirs()
}
Copy copyBatsArchives = project.tasks.create('copyBatsArchives', Copy) {
dependsOn createBatsDirsTask
into "${batsDir}/archives"
from project.configurations[BATS]
}
Copy copyBatsTests = project.tasks.create('copyBatsTests', Copy) {
dependsOn createBatsDirsTask
into "${batsDir}/tests"
from {
"${project.extensions.esvagrant.batsDir}/tests"
}
}
Copy copyBatsUtils = project.tasks.create('copyBatsUtils', Copy) {
dependsOn createBatsDirsTask
into "${batsDir}/utils"
from {
"${project.extensions.esvagrant.batsDir}/utils"
}
}
// Now we iterate over dependencies of the bats configuration. When a project dependency is found,
// we bring back its own archives, test files or test utils.
project.afterEvaluate {
project.configurations.bats.dependencies.findAll {it.configuration == BATS }.each { d ->
if (d instanceof DefaultProjectDependency) {
DefaultProjectDependency externalBatsDependency = (DefaultProjectDependency) d
Project externalBatsProject = externalBatsDependency.dependencyProject
String externalBatsDir = externalBatsProject.extensions.esvagrant.batsDir
if (project.extensions.esvagrant.inheritTests) {
copyBatsTests.from(externalBatsProject.files("${externalBatsDir}/tests"))
}
if (project.extensions.esvagrant.inheritTestArchives) {
copyBatsArchives.from(externalBatsDependency.projectConfiguration.files)
}
if (project.extensions.esvagrant.inheritTestUtils) {
copyBatsUtils.from(externalBatsProject.files("${externalBatsDir}/utils"))
}
}
}
}
Task createVersionFile = project.tasks.create('createVersionFile', FileContentsTask) {
dependsOn createBatsDirsTask
file "${batsDir}/archives/version"
contents project.version
}
Task createUpgradeFromFile = project.tasks.create('createUpgradeFromFile', FileContentsTask) {
dependsOn createBatsDirsTask
file "${batsDir}/archives/upgrade_from_version"
contents project.extensions.esvagrant.upgradeFromVersion
}
Task vagrantSetUpTask = project.tasks.create('vagrantSetUp')
vagrantSetUpTask.dependsOn 'vagrantCheckVersion'
vagrantSetUpTask.dependsOn copyBatsTests, copyBatsUtils, copyBatsArchives, createVersionFile, createUpgradeFromFile
vagrantSetUpTask.doFirst {
project.gradle.addBuildListener new BuildAdapter() {
@Override
void buildFinished(BuildResult result) {
if (result.failure) {
println "Reproduce with: gradle packagingTest "
+"-Pvagrant.boxes=${project.extensions.esvagrant.boxes} "
+ "-Dtests.seed=${project.extensions.esvagrant.formattedSeed} "
+ "-Dtests.packaging.upgrade.from.versions=${project.extensions.esvagrant.upgradeFromVersions.join(",")}"
}
}
}
}
}
private static void createUpdateVersionsTask(Project project) {
project.tasks.create('vagrantUpdateVersions') {
description 'Update file containing options for the\n "starting" version in the "upgrade from" packaging tests.'
group 'Verification'
doLast {
File versions = getVersionsFile(project)
versions.text = listVersions(project).join('\n') + '\n'
}
}
}
private static void createVerifyVersionsTask(Project project) {
project.tasks.create('vagrantVerifyVersions') {
description 'Update file containing options for the\n "starting" version in the "upgrade from" packaging tests.'
group 'Verification'
doLast {
String maybeUpdateFromVersions = System.getProperty("tests.packaging.upgrade.from.versions", null)
if (maybeUpdateFromVersions == null) {
Set<String> versions = listVersions(project)
Set<String> actualVersions = new TreeSet<>(project.extensions.esvagrant.upgradeFromVersions)
if (!versions.equals(actualVersions)) {
throw new GradleException("out-of-date versions " + actualVersions +
", expected " + versions + "; run gradle vagrantUpdateVersions")
}
}
}
}
}
private static void createCheckVagrantVersionTask(Project project) {
project.tasks.create('vagrantCheckVersion', Exec) {
description 'Check the Vagrant version'
group 'Verification'
commandLine 'vagrant', '--version'
standardOutput = new ByteArrayOutputStream()
doLast {
String version = standardOutput.toString().trim()
if ((version ==~ /Vagrant 1\.(8\.[6-9]|9\.[0-9])+/) == false) {
throw new InvalidUserDataException("Illegal version of vagrant [${version}]. Need [Vagrant 1.8.6+]")
}
}
}
}
private static void createCheckVirtualBoxVersionTask(Project project) {
project.tasks.create('virtualboxCheckVersion', Exec) {
description 'Check the Virtualbox version'
group 'Verification'
commandLine 'vboxmanage', '--version'
standardOutput = new ByteArrayOutputStream()
doLast {
String version = standardOutput.toString().trim()
try {
String[] versions = version.split('\\.')
int major = Integer.parseInt(versions[0])
int minor = Integer.parseInt(versions[1])
if ((major < 5) || (major == 5 && minor < 1)) {
throw new InvalidUserDataException("Illegal version of virtualbox [${version}]. Need [5.1+]")
}
} catch (NumberFormatException | ArrayIndexOutOfBoundsException e) {
throw new InvalidUserDataException("Unable to parse version of virtualbox [${version}]. Required [5.1+]", e)
}
}
}
}
private static void createPackagingTestTask(Project project) {
project.tasks.create('packagingTest') {
group 'Verification'
description "Tests yum/apt packages using vagrant and bats.\n" +
" Specify the vagrant boxes to test using the gradle property 'vagrant.boxes'.\n" +
" 'sample' can be used to test a single yum and apt box. 'all' can be used to\n" +
" test all available boxes. The available boxes are: \n" +
" ${BOXES}"
dependsOn 'vagrantCheckVersion'
}
}
private static void createVagrantTasks(Project project) {
createCleanTask(project)
createStopTask(project)
createSmokeTestTask(project)
createUpdateVersionsTask(project)
createVerifyVersionsTask(project)
createCheckVagrantVersionTask(project)
createCheckVirtualBoxVersionTask(project)
createPrepareVagrantTestEnvTask(project)
createPackagingTestTask(project)
}
private static void createVagrantBoxesTasks(Project project) {
assert project.extensions.esvagrant.boxes != null
assert project.tasks.stop != null
Task stop = project.tasks.stop
assert project.tasks.vagrantSmokeTest != null
Task vagrantSmokeTest = project.tasks.vagrantSmokeTest
assert project.tasks.vagrantCheckVersion != null
Task vagrantCheckVersion = project.tasks.vagrantCheckVersion
assert project.tasks.virtualboxCheckVersion != null
Task virtualboxCheckVersion = project.tasks.virtualboxCheckVersion
assert project.tasks.vagrantSetUp != null
Task vagrantSetUp = project.tasks.vagrantSetUp
assert project.tasks.packagingTest != null
Task packagingTest = project.tasks.packagingTest
/*
* We always use the main project.rootDir as Vagrant's current working directory (VAGRANT_CWD)
* so that boxes are not duplicated for every Gradle project that use this VagrantTestPlugin.
*/
def vagrantEnvVars = [
'VAGRANT_CWD' : "${project.rootDir.absolutePath}",
'VAGRANT_VAGRANTFILE' : 'Vagrantfile',
'VAGRANT_PROJECT_DIR' : "${project.projectDir.absolutePath}"
]
// Each box gets it own set of tasks
for (String box : BOXES) {
String boxTask = box.capitalize().replace('-', '')
// always add a halt task for all boxes, so clean makes sure they are all shutdown
Task halt = project.tasks.create("vagrant${boxTask}#halt", VagrantCommandTask) {
boxName box
environmentVars vagrantEnvVars
args 'halt', box
}
stop.dependsOn(halt)
if (project.extensions.esvagrant.boxes.contains(box) == false) {
// we only need a halt task if this box was not specified
continue;
}
Task update = project.tasks.create("vagrant${boxTask}#update", VagrantCommandTask) {
boxName box
environmentVars vagrantEnvVars
args 'box', 'update', box
dependsOn vagrantCheckVersion, virtualboxCheckVersion, vagrantSetUp
}
Task up = project.tasks.create("vagrant${boxTask}#up", VagrantCommandTask) {
boxName box
environmentVars vagrantEnvVars
/* Its important that we try to reprovision the box even if it already
exists. That way updates to the vagrant configuration take automatically.
That isn't to say that the updates will always be compatible. Its ok to
just destroy the boxes if they get busted but that is a manual step
because its slow-ish. */
/* We lock the provider to virtualbox because the Vagrantfile specifies
lots of boxes that only work properly in virtualbox. Virtualbox is
vagrant's default but its possible to change that default and folks do.
But the boxes that we use are unlikely to work properly with other
virtualization providers. Thus the lock. */
args 'up', box, '--provision', '--provider', 'virtualbox'
/* It'd be possible to check if the box is already up here and output
SKIPPED but that would require running vagrant status which is slow! */
dependsOn update
}
Task smoke = project.tasks.create("vagrant${boxTask}#smoketest", Exec) {
environment vagrantEnvVars
dependsOn up
finalizedBy halt
commandLine 'vagrant', 'ssh', box, '--command',
"set -o pipefail && echo 'Hello from ${project.path}' | sed -ue 's/^/ ${box}: /'"
}
vagrantSmokeTest.dependsOn(smoke)
Task packaging = project.tasks.create("vagrant${boxTask}#packagingtest", BatsOverVagrantTask) {
boxName box
environmentVars vagrantEnvVars
dependsOn up
finalizedBy halt
command BATS_TEST_COMMAND
}
packagingTest.dependsOn(packaging)
}
}
}

View File

@ -0,0 +1 @@
implementation-class=org.elasticsearch.gradle.vagrant.VagrantTestPlugin

View File

@ -840,7 +840,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryStateTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryStatusTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]settings[/\\]UpdateNumberOfReplicasIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]settings[/\\]UpdateSettingsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]CloseIndexDisableCloseAllIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]OpenCloseIndexIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]RareClusterStateIT.java" checks="LineLength" />

View File

@ -1,11 +1,12 @@
elasticsearch = 6.0.0-alpha1
lucene = 6.3.0-snapshot-a66a445
lucene = 6.3.0
# optional dependencies
spatial4j = 0.6
jts = 1.13
jackson = 2.8.1
snakeyaml = 1.15
# When updating log4j, please update also docs/java-api/index.asciidoc
log4j = 2.7
slf4j = 1.6.2
jna = 4.2.2

View File

@ -35,7 +35,7 @@ import java.util.List;
public class NoopPlugin extends Plugin implements ActionPlugin {
@Override
public List<ActionHandler<? extends ActionRequest<?>, ? extends ActionResponse>> getActions() {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(NoopBulkAction.INSTANCE, TransportNoopBulkAction.class),
new ActionHandler<>(NoopSearchAction.INSTANCE, TransportNoopSearchAction.class)

View File

@ -38,25 +38,15 @@ import java.io.IOException;
/**
* Default implementation of {@link org.apache.http.nio.protocol.HttpAsyncResponseConsumer}. Buffers the whole
* response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response.
* Limits the size of responses that can be read to {@link #DEFAULT_BUFFER_LIMIT} by default, configurable value.
* Throws an exception in case the entity is longer than the configured buffer limit.
* Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer
* than the configured buffer limit.
*/
public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer<HttpResponse> {
//default buffer limit is 10MB
public static final int DEFAULT_BUFFER_LIMIT = 10 * 1024 * 1024;
private final int bufferLimitBytes;
private volatile HttpResponse response;
private volatile SimpleInputBuffer buf;
/**
* Creates a new instance of this consumer with a buffer limit of {@link #DEFAULT_BUFFER_LIMIT}
*/
public HeapBufferedAsyncResponseConsumer() {
this.bufferLimitBytes = DEFAULT_BUFFER_LIMIT;
}
/**
* Creates a new instance of this consumer with the provided buffer limit
*/

View File

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpResponse;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import static org.elasticsearch.client.HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT_BUFFER_LIMIT;
/**
* Factory used to create instances of {@link HttpAsyncResponseConsumer}. Each request retry needs its own instance of the
* consumer object. Users can implement this interface and pass their own instance to the specialized
* performRequest methods that accept an {@link HttpAsyncResponseConsumerFactory} instance as argument.
*/
interface HttpAsyncResponseConsumerFactory {
/**
* Creates the default type of {@link HttpAsyncResponseConsumer}, based on heap buffering with a buffer limit of 100MB.
*/
HttpAsyncResponseConsumerFactory DEFAULT = new HeapBufferedResponseConsumerFactory(DEFAULT_BUFFER_LIMIT);
/**
* Creates the {@link HttpAsyncResponseConsumer}, called once per request attempt.
*/
HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer();
/**
* Default factory used to create instances of {@link HttpAsyncResponseConsumer}.
* Creates one instance of {@link HeapBufferedAsyncResponseConsumer} for each request attempt, with a configurable
* buffer limit which defaults to 100MB.
*/
class HeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory {
//default buffer limit is 100MB
static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024;
private final int bufferLimit;
public HeapBufferedResponseConsumerFactory(int bufferLimitBytes) {
this.bufferLimit = bufferLimitBytes;
}
@Override
public HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer() {
return new HeapBufferedAsyncResponseConsumer(bufferLimit);
}
}
}

View File

@ -143,7 +143,7 @@ public class RestClient implements Closeable {
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/
public Response performRequest(String method, String endpoint, Header... headers) throws IOException {
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), (HttpEntity)null, headers);
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), null, headers);
}
/**
@ -165,9 +165,9 @@ public class RestClient implements Closeable {
/**
* Sends a request to the Elasticsearch cluster that the client points to and waits for the corresponding response
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body.
* to be returned. Shortcut to {@link #performRequest(String, String, Map, HttpEntity, HttpAsyncResponseConsumerFactory, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumerFactory} instance,
* {@link HttpAsyncResponseConsumerFactory} will be used to create the needed instances of {@link HttpAsyncResponseConsumer}.
*
* @param method the http method
* @param endpoint the path of the request (without host and port)
@ -181,8 +181,7 @@ public class RestClient implements Closeable {
*/
public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, Header... headers) throws IOException {
HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer();
return performRequest(method, endpoint, params, entity, responseConsumer, headers);
return performRequest(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, headers);
}
/**
@ -196,8 +195,9 @@ public class RestClient implements Closeable {
* @param endpoint the path of the request (without host and port)
* @param params the query_string parameters
* @param entity the body of the request, null if not applicable
* @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response
* body gets streamed from a non-blocking HTTP connection on the client side.
* @param httpAsyncResponseConsumerFactory the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the response body gets streamed from a non-blocking HTTP
* connection on the client side.
* @param headers the optional request headers
* @return the response returned by Elasticsearch
* @throws IOException in case of a problem or the connection was aborted
@ -205,10 +205,10 @@ public class RestClient implements Closeable {
* @throws ResponseException in case Elasticsearch responded with a status code that indicated an error
*/
public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
Header... headers) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsync(method, endpoint, params, entity, responseConsumer, listener, headers);
performRequestAsync(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, listener, headers);
return listener.get();
}
@ -245,9 +245,9 @@ public class RestClient implements Closeable {
/**
* Sends a request to the Elasticsearch cluster that the client points to. Doesn't wait for the response, instead
* the provided {@link ResponseListener} will be notified upon completion or failure.
* Shortcut to {@link #performRequestAsync(String, String, Map, HttpEntity, HttpAsyncResponseConsumer, ResponseListener, Header...)}
* which doesn't require specifying an {@link HttpAsyncResponseConsumer} instance, {@link HeapBufferedAsyncResponseConsumer}
* will be used to consume the response body.
* Shortcut to {@link #performRequestAsync(String, String, Map, HttpEntity, HttpAsyncResponseConsumerFactory, ResponseListener,
* Header...)} which doesn't require specifying an {@link HttpAsyncResponseConsumerFactory} instance,
* {@link HttpAsyncResponseConsumerFactory} will be used to create the needed instances of {@link HttpAsyncResponseConsumer}.
*
* @param method the http method
* @param endpoint the path of the request (without host and port)
@ -258,8 +258,7 @@ public class RestClient implements Closeable {
*/
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, ResponseListener responseListener, Header... headers) {
HttpAsyncResponseConsumer<HttpResponse> responseConsumer = new HeapBufferedAsyncResponseConsumer();
performRequestAsync(method, endpoint, params, entity, responseConsumer, responseListener, headers);
performRequestAsync(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, responseListener, headers);
}
/**
@ -274,29 +273,31 @@ public class RestClient implements Closeable {
* @param endpoint the path of the request (without host and port)
* @param params the query_string parameters
* @param entity the body of the request, null if not applicable
* @param responseConsumer the {@link HttpAsyncResponseConsumer} callback. Controls how the response
* body gets streamed from a non-blocking HTTP connection on the client side.
* @param httpAsyncResponseConsumerFactory the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the response body gets streamed from a non-blocking HTTP
* connection on the client side.
* @param responseListener the {@link ResponseListener} to notify when the request is completed or fails
* @param headers the optional request headers
*/
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
ResponseListener responseListener, Header... headers) {
URI uri = buildUri(pathPrefix, endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity);
setHeaders(request, headers);
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextHost().iterator(), request, responseConsumer, failureTrackingResponseListener);
performRequestAsync(startTime, nextHost().iterator(), request, httpAsyncResponseConsumerFactory, failureTrackingResponseListener);
}
private void performRequestAsync(final long startTime, final Iterator<HttpHost> hosts, final HttpRequestBase request,
final HttpAsyncResponseConsumer<HttpResponse> responseConsumer,
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
final FailureTrackingResponseListener listener) {
final HttpHost host = hosts.next();
//we stream the request body if the entity allows for it
HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(host, request);
client.execute(requestProducer, responseConsumer, new FutureCallback<HttpResponse>() {
HttpAsyncResponseConsumer<HttpResponse> asyncResponseConsumer = httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
client.execute(requestProducer, asyncResponseConsumer, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
@ -346,7 +347,7 @@ public class RestClient implements Closeable {
} else {
listener.trackFailure(exception);
request.reset();
performRequestAsync(startTime, hosts, request, responseConsumer, listener);
performRequestAsync(startTime, hosts, request, httpAsyncResponseConsumerFactory, listener);
}
} else {
listener.onDefinitiveFailure(exception);

View File

@ -32,7 +32,6 @@ import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.protocol.HttpContext;
import static org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.DEFAULT_BUFFER_LIMIT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@ -45,13 +44,14 @@ public class HeapBufferedAsyncResponseConsumerTests extends RestClientTestCase {
//maximum buffer that this test ends up allocating is 50MB
private static final int MAX_TEST_BUFFER_SIZE = 50 * 1024 * 1024;
private static final int TEST_BUFFER_LIMIT = 10 * 1024 * 1024;
public void testResponseProcessing() throws Exception {
ContentDecoder contentDecoder = mock(ContentDecoder.class);
IOControl ioControl = mock(IOControl.class);
HttpContext httpContext = mock(HttpContext.class);
HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer());
HeapBufferedAsyncResponseConsumer consumer = spy(new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT));
ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1);
StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK");
@ -74,8 +74,8 @@ public class HeapBufferedAsyncResponseConsumerTests extends RestClientTestCase {
}
public void testDefaultBufferLimit() throws Exception {
HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer();
bufferLimitTest(consumer, DEFAULT_BUFFER_LIMIT);
HeapBufferedAsyncResponseConsumer consumer = new HeapBufferedAsyncResponseConsumer(TEST_BUFFER_LIMIT);
bufferLimitTest(consumer, TEST_BUFFER_LIMIT);
}
public void testConfiguredBufferLimit() throws Exception {

View File

@ -0,0 +1,210 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpHost;
import org.codehaus.mojo.animal_sniffer.IgnoreJRERequirement;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Integration test to check interaction between {@link RestClient} and {@link org.apache.http.client.HttpClient}.
* Works against real http servers, multiple hosts. Also tests failover by randomly shutting down hosts.
*/
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
public class RestClientMultipleHostsIntegTests extends RestClientTestCase {
private static HttpServer[] httpServers;
private static RestClient restClient;
private static String pathPrefix;
@BeforeClass
public static void startHttpServer() throws Exception {
String pathPrefixWithoutLeadingSlash;
if (randomBoolean()) {
pathPrefixWithoutLeadingSlash = "testPathPrefix/" + randomAsciiOfLengthBetween(1, 5);
pathPrefix = "/" + pathPrefixWithoutLeadingSlash;
} else {
pathPrefix = pathPrefixWithoutLeadingSlash = "";
}
int numHttpServers = randomIntBetween(2, 4);
httpServers = new HttpServer[numHttpServers];
HttpHost[] httpHosts = new HttpHost[numHttpServers];
for (int i = 0; i < numHttpServers; i++) {
HttpServer httpServer = createHttpServer();
httpServers[i] = httpServer;
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClient = restClientBuilder.build();
}
private static HttpServer createHttpServer() throws Exception {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
//returns a different status code depending on the path
for (int statusCode : getAllStatusCodes()) {
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
return httpServer;
}
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
private static class ResponseHandler implements HttpHandler {
private final int statusCode;
ResponseHandler(int statusCode) {
this.statusCode = statusCode;
}
@Override
public void handle(HttpExchange httpExchange) throws IOException {
httpExchange.getRequestBody().close();
httpExchange.sendResponseHeaders(statusCode, -1);
httpExchange.close();
}
}
@AfterClass
public static void stopHttpServers() throws IOException {
restClient.close();
restClient = null;
for (HttpServer httpServer : httpServers) {
httpServer.stop(0);
}
httpServers = null;
}
@Before
public void stopRandomHost() {
//verify that shutting down some hosts doesn't matter as long as one working host is left behind
if (httpServers.length > 1 && randomBoolean()) {
List<HttpServer> updatedHttpServers = new ArrayList<>(httpServers.length - 1);
int nodeIndex = randomInt(httpServers.length - 1);
for (int i = 0; i < httpServers.length; i++) {
HttpServer httpServer = httpServers[i];
if (i == nodeIndex) {
httpServer.stop(0);
} else {
updatedHttpServers.add(httpServer);
}
}
httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]);
}
}
public void testSyncRequests() throws IOException {
int numRequests = randomIntBetween(5, 20);
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
Response response;
try {
response = restClient.performRequest(method, "/" + statusCode);
} catch(ResponseException responseException) {
response = responseException.getResponse();
}
assertEquals(method, response.getRequestLine().getMethod());
assertEquals(statusCode, response.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, response.getRequestLine().getUri());
}
}
public void testAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<TestResponse> responses = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
restClient.performRequestAsync(method, "/" + statusCode, new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(new TestResponse(method, statusCode, response));
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
responses.add(new TestResponse(method, statusCode, exception));
latch.countDown();
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(numRequests, responses.size());
for (TestResponse testResponse : responses) {
Response response = testResponse.getResponse();
assertEquals(testResponse.method, response.getRequestLine().getMethod());
assertEquals(testResponse.statusCode, response.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + testResponse.statusCode,
response.getRequestLine().getUri());
}
}
private static class TestResponse {
private final String method;
private final int statusCode;
private final Object response;
TestResponse(String method, int statusCode, Object response) {
this.method = method;
this.statusCode = statusCode;
this.response = response;
}
Response getResponse() {
if (response instanceof Response) {
return (Response) response;
}
if (response instanceof ResponseException) {
return ((ResponseException) response).getResponse();
}
throw new AssertionError("unexpected response " + response.getClass());
}
}
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.client;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
@ -45,19 +44,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods;
import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Integration test to check interaction between {@link RestClient} and {@link org.apache.http.client.HttpClient}.
@ -65,28 +58,42 @@ import static org.junit.Assert.fail;
*/
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@IgnoreJRERequirement
public class RestClientIntegTests extends RestClientTestCase {
public class RestClientSingleHostIntegTests extends RestClientTestCase {
private static HttpServer httpServer;
private static RestClient restClient;
private static String pathPrefix;
private static Header[] defaultHeaders;
@BeforeClass
public static void startHttpServer() throws Exception {
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
String pathPrefixWithoutLeadingSlash;
if (randomBoolean()) {
pathPrefixWithoutLeadingSlash = "testPathPrefix/" + randomAsciiOfLengthBetween(1, 5);
pathPrefix = "/" + pathPrefixWithoutLeadingSlash;
} else {
pathPrefix = pathPrefixWithoutLeadingSlash = "";
}
httpServer = createHttpServer();
int numHeaders = randomIntBetween(0, 5);
defaultHeaders = generateHeaders("Header-default", "Header-array", numHeaders);
RestClientBuilder restClientBuilder = RestClient.builder(
new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort())).setDefaultHeaders(defaultHeaders);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClient = restClientBuilder.build();
}
private static HttpServer createHttpServer() throws Exception {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
//returns a different status code depending on the path
for (int statusCode : getAllStatusCodes()) {
createStatusCodeContext(httpServer, statusCode);
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
int numHeaders = randomIntBetween(0, 5);
defaultHeaders = generateHeaders("Header-default", "Header-array", numHeaders);
restClient = RestClient.builder(new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setDefaultHeaders(defaultHeaders).build();
}
private static void createStatusCodeContext(HttpServer httpServer, final int statusCode) {
httpServer.createContext("/" + statusCode, new ResponseHandler(statusCode));
return httpServer;
}
//animal-sniffer doesn't like our usage of com.sun.net.httpserver.* classes
@ -157,7 +164,11 @@ public class RestClientIntegTests extends RestClientTestCase {
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertThat(esResponse.getStatusLine().getStatusCode(), equalTo(statusCode));
assertEquals(method, esResponse.getRequestLine().getMethod());
assertEquals(statusCode, esResponse.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, esResponse.getRequestLine().getUri());
for (final Header responseHeader : esResponse.getHeaders()) {
final String name = responseHeader.getName();
final String value = responseHeader.getValue();
@ -197,49 +208,6 @@ public class RestClientIntegTests extends RestClientTestCase {
bodyTest("GET");
}
/**
* Ensure that pathPrefix works as expected.
*/
public void testPathPrefix() throws IOException {
// guarantee no other test setup collides with this one and lets it sneak through
final String uniqueContextSuffix = "/testPathPrefix";
final String pathPrefix = "base/" + randomAsciiOfLengthBetween(1, 5) + "/";
final int statusCode = randomStatusCode(getRandom());
final HttpContext context =
httpServer.createContext("/" + pathPrefix + statusCode + uniqueContextSuffix, new ResponseHandler(statusCode));
try (final RestClient client =
RestClient.builder(new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()))
.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefix).build()) {
for (final String method : getHttpMethods()) {
Response esResponse;
try {
esResponse = client.performRequest(method, "/" + statusCode + uniqueContextSuffix);
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertThat(esResponse.getRequestLine().getUri(), equalTo("/" + pathPrefix + statusCode + uniqueContextSuffix));
assertThat(esResponse.getStatusLine().getStatusCode(), equalTo(statusCode));
}
} finally {
httpServer.removeContext(context);
}
}
public void testPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
private void bodyTest(String method) throws IOException {
String requestBody = "{ \"field\": \"value\" }";
StringEntity entity = new StringEntity(requestBody);
@ -250,60 +218,9 @@ public class RestClientIntegTests extends RestClientTestCase {
} catch(ResponseException e) {
esResponse = e.getResponse();
}
assertEquals(method, esResponse.getRequestLine().getMethod());
assertEquals(statusCode, esResponse.getStatusLine().getStatusCode());
assertEquals((pathPrefix.length() > 0 ? pathPrefix : "") + "/" + statusCode, esResponse.getRequestLine().getUri());
assertEquals(requestBody, EntityUtils.toString(esResponse.getEntity()));
}
public void testAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<TestResponse> responses = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
final int statusCode = randomStatusCode(getRandom());
restClient.performRequestAsync(method, "/" + statusCode, new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(new TestResponse(method, statusCode, response));
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
responses.add(new TestResponse(method, statusCode, exception));
latch.countDown();
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(numRequests, responses.size());
for (TestResponse response : responses) {
assertEquals(response.method, response.getResponse().getRequestLine().getMethod());
assertEquals(response.statusCode, response.getResponse().getStatusLine().getStatusCode());
}
}
private static class TestResponse {
private final String method;
private final int statusCode;
private final Object response;
TestResponse(String method, int statusCode, Object response) {
this.method = method;
this.statusCode = statusCode;
this.response = response;
}
Response getResponse() {
if (response instanceof Response) {
return (Response) response;
}
if (response instanceof ResponseException) {
return ((ResponseException) response).getResponse();
}
throw new AssertionError("unexpected response " + response.getClass());
}
}
}

View File

@ -139,6 +139,17 @@ public class RestClientSingleHostTests extends RestClientTestCase {
restClient = new RestClient(httpClient, 10000, defaultHeaders, new HttpHost[]{httpHost}, null, failureListener);
}
public void testNullPath() throws IOException {
for (String method : getHttpMethods()) {
try {
restClient.performRequest(method, null);
fail("path set to null should fail!");
} catch (NullPointerException e) {
assertEquals("path must not be null", e.getMessage());
}
}
}
/**
* Verifies the content of the {@link HttpRequest} that's internally created and passed through to the http client
*/

View File

@ -62,10 +62,7 @@ dependencies {
compile 'com.carrotsearch:hppc:0.7.1'
// time handling, remove with java 8 time
compile 'joda-time:joda-time:2.9.4'
// joda 2.0 moved to using volatile fields for datetime
// When updating to a new version, make sure to update our copy of BaseDateTime
compile 'org.joda:joda-convert:1.2'
compile 'joda-time:joda-time:2.9.5'
// json and yaml
compile "org.yaml:snakeyaml:${versions.snakeyaml}"

View File

@ -1 +0,0 @@
35ec554f0cd00c956cc69051514d9488b1374dec

View File

@ -1,202 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,5 +0,0 @@
=============================================================================
= NOTICE file corresponding to section 4d of the Apache License Version 2.0 =
=============================================================================
This product includes software developed by
Joda.org (http://www.joda.org/).

View File

@ -1 +0,0 @@
1c295b462f16702ebe720bbb08f62e1ba80da41b

View File

@ -0,0 +1 @@
5f01da7306363fad2028b916f3eab926262de928

View File

@ -1 +0,0 @@
61aacb657e44a9beabf95834e106bbb96373a703

View File

@ -0,0 +1 @@
494aed699af238c3872a6b65e17939e9cb7ddbe0

View File

@ -1 +0,0 @@
600de75a81e259cab0384e546d9a1d527ddba6d6

View File

@ -0,0 +1 @@
77dede7dff1b833ca2e92d8ab137edb209354d9b

View File

@ -1 +0,0 @@
188774468a56a8731ca639527d721060d26ffebd

View File

@ -0,0 +1 @@
d3c87ea89e2f83e401f9cc7f14e4c43945f7f1e1

View File

@ -1 +0,0 @@
5afd9271e3d8f645440f48ff2487545ae5573e7e

View File

@ -0,0 +1 @@
2c96d59e318ea66838aeb9c5cfb8b4d27b40953c

View File

@ -1 +0,0 @@
0f575175e26d4d3b1095f6300cbefbbb3ee994cd

View File

@ -0,0 +1 @@
4f154d8badfe47fe45503c18fb30f2177f758794

View File

@ -1 +0,0 @@
ee898c3d318681c9f29c56e6d9b52876be96d814

View File

@ -0,0 +1 @@
79b898117dcfde2981ec6806e420ff218842eca8

View File

@ -1 +0,0 @@
ea6defd322456711394b4dabcda70a217e3caacd

View File

@ -0,0 +1 @@
89edeb404e507d640cb13903acff6953199704a2

View File

@ -1 +0,0 @@
ea2de7f9753a8e19a1ec9f25a3ea65d7ce909a0e

View File

@ -0,0 +1 @@
02d0e1f5a9df15ac911ad495bad5ea253ab50a9f

View File

@ -1 +0,0 @@
0b15c6f29bfb9ec14a4615013a94bfa43a63793d

View File

@ -0,0 +1 @@
eb7938233c8103223069c7b5b5f785b4d20ddafa

View File

@ -1 +0,0 @@
d89d9fa1036c38144e0b8db079ae959353847c86

View File

@ -0,0 +1 @@
e979fb02155cbe81a8d335d6dc41d2ef06be68b6

View File

@ -1 +0,0 @@
c003c1ab0a19a02b30156ce13372cff1001d6a7d

View File

@ -0,0 +1 @@
257387c45c6fa2b77fd6931751f93fdcd798ced4

View File

@ -1 +0,0 @@
a3c570bf588d7c9ca43d074db9ce9c9b8408b930

View File

@ -0,0 +1 @@
3cf5fe5402b5e34b240b73501c9e97a82428259e

View File

@ -1 +0,0 @@
de54ca61f5892cf2c88ac083b3332a827beca7ff

View File

@ -0,0 +1 @@
1b77ef3740dc885c62d5966fbe9aea1199d344fb

View File

@ -1 +0,0 @@
cacdf81b324acd335be63798d5a3dd16e7dff9a3

View File

@ -0,0 +1 @@
aa94b4a8636b3633008640cc5155ad354aebcea5

View File

@ -1 +0,0 @@
a5cb3723bc8e0db185fc43e57b648145de27fde8

View File

@ -0,0 +1 @@
ed5d8ee5cd7edcad5d4ffca2b4540ccc844e9bb0

View File

@ -1,130 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.index;
import org.apache.lucene.util.StringHelper;
import java.io.IOException;
/**
* Forked utility methods from Lucene's PointValues until LUCENE-7257 is released.
*/
public class XPointValues {
/** Return the cumulated number of points across all leaves of the given
* {@link IndexReader}. Leaves that do not have points for the given field
* are ignored.
* @see PointValues#size(String) */
public static long size(IndexReader reader, String field) throws IOException {
long size = 0;
for (LeafReaderContext ctx : reader.leaves()) {
FieldInfo info = ctx.reader().getFieldInfos().fieldInfo(field);
if (info == null || info.getPointDimensionCount() == 0) {
continue;
}
PointValues values = ctx.reader().getPointValues();
size += values.size(field);
}
return size;
}
/** Return the cumulated number of docs that have points across all leaves
* of the given {@link IndexReader}. Leaves that do not have points for the
* given field are ignored.
* @see PointValues#getDocCount(String) */
public static int getDocCount(IndexReader reader, String field) throws IOException {
int count = 0;
for (LeafReaderContext ctx : reader.leaves()) {
FieldInfo info = ctx.reader().getFieldInfos().fieldInfo(field);
if (info == null || info.getPointDimensionCount() == 0) {
continue;
}
PointValues values = ctx.reader().getPointValues();
count += values.getDocCount(field);
}
return count;
}
/** Return the minimum packed values across all leaves of the given
* {@link IndexReader}. Leaves that do not have points for the given field
* are ignored.
* @see PointValues#getMinPackedValue(String) */
public static byte[] getMinPackedValue(IndexReader reader, String field) throws IOException {
byte[] minValue = null;
for (LeafReaderContext ctx : reader.leaves()) {
FieldInfo info = ctx.reader().getFieldInfos().fieldInfo(field);
if (info == null || info.getPointDimensionCount() == 0) {
continue;
}
PointValues values = ctx.reader().getPointValues();
byte[] leafMinValue = values.getMinPackedValue(field);
if (leafMinValue == null) {
continue;
}
if (minValue == null) {
minValue = leafMinValue.clone();
} else {
final int numDimensions = values.getNumDimensions(field);
final int numBytesPerDimension = values.getBytesPerDimension(field);
for (int i = 0; i < numDimensions; ++i) {
int offset = i * numBytesPerDimension;
if (StringHelper.compare(numBytesPerDimension, leafMinValue, offset, minValue, offset) < 0) {
System.arraycopy(leafMinValue, offset, minValue, offset, numBytesPerDimension);
}
}
}
}
return minValue;
}
/** Return the maximum packed values across all leaves of the given
* {@link IndexReader}. Leaves that do not have points for the given field
* are ignored.
* @see PointValues#getMaxPackedValue(String) */
public static byte[] getMaxPackedValue(IndexReader reader, String field) throws IOException {
byte[] maxValue = null;
for (LeafReaderContext ctx : reader.leaves()) {
FieldInfo info = ctx.reader().getFieldInfos().fieldInfo(field);
if (info == null || info.getPointDimensionCount() == 0) {
continue;
}
PointValues values = ctx.reader().getPointValues();
byte[] leafMaxValue = values.getMaxPackedValue(field);
if (leafMaxValue == null) {
continue;
}
if (maxValue == null) {
maxValue = leafMaxValue.clone();
} else {
final int numDimensions = values.getNumDimensions(field);
final int numBytesPerDimension = values.getBytesPerDimension(field);
for (int i = 0; i < numDimensions; ++i) {
int offset = i * numBytesPerDimension;
if (StringHelper.compare(numBytesPerDimension, leafMaxValue, offset, maxValue, offset) > 0) {
System.arraycopy(leafMaxValue, offset, maxValue, offset, numBytesPerDimension);
}
}
}
}
return maxValue;
}
/** Default constructor */
private XPointValues() {
}
}

View File

@ -523,16 +523,14 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.index.shard.IndexShardRelocatedException::new, 45),
NODE_SHOULD_NOT_CONNECT_EXCEPTION(org.elasticsearch.transport.NodeShouldNotConnectException.class,
org.elasticsearch.transport.NodeShouldNotConnectException::new, 46),
INDEX_TEMPLATE_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.indices.IndexTemplateAlreadyExistsException.class,
org.elasticsearch.indices.IndexTemplateAlreadyExistsException::new, 47),
// 47 used to be for IndexTemplateAlreadyExistsException which was deprecated in 5.1 removed in 6.0
TRANSLOG_CORRUPTED_EXCEPTION(org.elasticsearch.index.translog.TranslogCorruptedException.class,
org.elasticsearch.index.translog.TranslogCorruptedException::new, 48),
CLUSTER_BLOCK_EXCEPTION(org.elasticsearch.cluster.block.ClusterBlockException.class,
org.elasticsearch.cluster.block.ClusterBlockException::new, 49),
FETCH_PHASE_EXECUTION_EXCEPTION(org.elasticsearch.search.fetch.FetchPhaseExecutionException.class,
org.elasticsearch.search.fetch.FetchPhaseExecutionException::new, 50),
INDEX_SHARD_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.index.IndexShardAlreadyExistsException.class,
org.elasticsearch.index.IndexShardAlreadyExistsException::new, 51),
// 51 used to be for IndexShardAlreadyExistsException which was deprecated in 5.1 removed in 6.0
VERSION_CONFLICT_ENGINE_EXCEPTION(org.elasticsearch.index.engine.VersionConflictEngineException.class,
org.elasticsearch.index.engine.VersionConflictEngineException::new, 52),
ENGINE_EXCEPTION(org.elasticsearch.index.engine.EngineException.class, org.elasticsearch.index.engine.EngineException::new, 53),
@ -553,7 +551,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper::new, 62),
ALIAS_FILTER_PARSING_EXCEPTION(org.elasticsearch.indices.AliasFilterParsingException.class,
org.elasticsearch.indices.AliasFilterParsingException::new, 63),
// 64 was DeleteByQueryFailedEngineException, which was removed in 3.0
// 64 was DeleteByQueryFailedEngineException, which was removed in 5.0
GATEWAY_EXCEPTION(org.elasticsearch.gateway.GatewayException.class, org.elasticsearch.gateway.GatewayException::new, 65),
INDEX_SHARD_NOT_RECOVERING_EXCEPTION(org.elasticsearch.index.shard.IndexShardNotRecoveringException.class,
org.elasticsearch.index.shard.IndexShardNotRecoveringException::new, 66),

View File

@ -19,6 +19,7 @@
package org.elasticsearch;
import org.apache.lucene.util.MathUtil;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
@ -298,7 +299,27 @@ public class Version {
* is a beta or RC release then the version itself is returned.
*/
public Version minimumCompatibilityVersion() {
return Version.smallest(this, fromId(major * 1000000 + 99));
final int bwcMajor;
final int bwcMinor;
if (this.onOrAfter(Version.V_6_0_0_alpha1)) {
bwcMajor = major-1;
bwcMinor = 0; // TODO we have to move this to the latest released minor of the last major but for now we just keep
} else {
bwcMajor = major;
bwcMinor = 0;
}
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
}
/**
* Returns <code>true</code> iff both version are compatible. Otherwise <code>false</code>
*/
public boolean isCompatible(Version version) {
boolean compatible = onOrAfter(version.minimumCompatibilityVersion())
&& version.onOrAfter(minimumCompatibilityVersion());
assert compatible == false || Math.max(major, version.major) - Math.min(major, version.major) <= 1;
return compatible;
}
@SuppressForbidden(reason = "System.out.*")

View File

@ -356,7 +356,7 @@ public class ActionModule extends AbstractModule {
register(handler.getAction().name(), handler);
}
public <Request extends ActionRequest<Request>, Response extends ActionResponse> void register(
public <Request extends ActionRequest, Response extends ActionResponse> void register(
GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction,
Class<?>... supportTransportActions) {
register(new ActionHandler<>(action, transportAction, supportTransportActions));

View File

@ -25,7 +25,7 @@ import org.elasticsearch.transport.TransportRequest;
import java.io.IOException;
public abstract class ActionRequest<Request extends ActionRequest<Request>> extends TransportRequest {
public abstract class ActionRequest extends TransportRequest {
public ActionRequest() {
super();

View File

@ -34,6 +34,8 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.cluster.service.ClusterStateStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -140,7 +142,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
final ClusterState state = observer.observedState();
final ClusterServiceState observedState = observer.observedState();
final ClusterState state = observedState.getClusterState();
if (request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));
return;
@ -148,8 +151,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
final int concreteWaitFor = waitFor;
final ClusterStateObserver.ChangePredicate validationPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, newState, concreteWaitFor);
protected boolean validate(ClusterServiceState newState) {
return newState.getClusterStateStatus() == ClusterStateStatus.APPLIED && validateRequest(request, newState.getClusterState(), concreteWaitFor);
}
};
@ -171,7 +174,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
listener.onResponse(response);
}
};
if (state.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
if (observedState.getClusterStateStatus() == ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
stateListener.onNewClusterState(state);
} else {
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());

View File

@ -25,7 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
* Transport level private response for the transport handler registered under
* {@value org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction#NAME}
*/
public final class LivenessRequest extends ActionRequest<LivenessRequest> {
public final class LivenessRequest extends ActionRequest {
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
@ -46,6 +47,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -118,12 +120,44 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
Set<String> childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
if (childNodes != null) {
if (childNodes.isEmpty()) {
// The task has no child tasks, so we can return immediately
logger.trace("cancelling task {} with no children", cancellableTask.getId());
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
} else {
// The task has some child tasks, we need to wait for until ban is set on all nodes
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
setBanOnNodes(request.getReason(), cancellableTask, childNodes, banLock);
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
String nodeId = clusterService.localNode().getId();
AtomicInteger responses = new AtomicInteger(childNodes.size());
List<Exception> failures = new ArrayList<>();
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
processResponse();
}
@Override
public void onFailure(Exception e) {
synchronized (failures) {
failures.add(e);
}
processResponse();
}
private void processResponse() {
banLock.onBanSet();
if (responses.decrementAndGet() == 0) {
if (failures.isEmpty() == false) {
IllegalStateException exception = new IllegalStateException("failed to cancel children of the task [" +
cancellableTask.getId() + "]");
failures.forEach(exception::addSuppressed);
listener.onFailure(exception);
} else {
listener.onResponse(cancellableTask.taskInfo(nodeId, false));
}
}
}
});
}
} else {
logger.trace("task {} is already cancelled", cancellableTask.getId());
@ -136,10 +170,10 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
return true;
}
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, BanLock banLock) {
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, ActionListener<Void> listener) {
sendSetBanRequest(nodes,
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
banLock);
listener);
}
private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
@ -147,28 +181,29 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())));
}
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, BanLock banLock) {
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
ClusterState clusterState = clusterService.state();
for (String node : nodes) {
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
if (discoveryNode != null) {
// Check if node still in the cluster
logger.debug("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
request.ban);
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
banLock.onBanSet();
listener.onResponse(null);
}
@Override
public void handleException(TransportException exp) {
banLock.onBanSet();
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
listener.onFailure(exp);
}
});
} else {
banLock.onBanSet();
listener.onResponse(null);
logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster",
request.parentTaskId, node);
}

View File

@ -33,7 +33,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A request to get node tasks
*/
public class GetTaskRequest extends ActionRequest<GetTaskRequest> {
public class GetTaskRequest extends ActionRequest {
private TaskId taskId = TaskId.EMPTY_TASK_ID;
private boolean waitForCompletion = false;
private TimeValue timeout = null;

View File

@ -30,7 +30,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/** Request the mappings of specific fields */
public class GetFieldMappingsRequest extends ActionRequest<GetFieldMappingsRequest> implements IndicesRequest.Replaceable {
public class GetFieldMappingsRequest extends ActionRequest implements IndicesRequest.Replaceable {
protected boolean local = false;

View File

@ -32,7 +32,7 @@ public class PutMappingClusterStateUpdateRequest extends IndicesClusterStateUpda
private boolean updateAllTypes = false;
PutMappingClusterStateUpdateRequest() {
public PutMappingClusterStateUpdateRequest() {
}

View File

@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.template.put;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
@ -32,6 +33,8 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -41,10 +44,13 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
@ -56,11 +62,13 @@ import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
*/
public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateRequest> implements IndicesRequest {
private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(PutIndexTemplateRequest.class));
private String name;
private String cause = "";
private String template;
private List<String> indexPatterns;
private int order;
@ -92,8 +100,8 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
if (name == null) {
validationException = addValidationError("name is missing", validationException);
}
if (template == null) {
validationException = addValidationError("template is missing", validationException);
if (indexPatterns == null || indexPatterns.size() == 0) {
validationException = addValidationError("pattern is missing", validationException);
}
return validationException;
}
@ -113,13 +121,13 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
return this.name;
}
public PutIndexTemplateRequest template(String template) {
this.template = template;
public PutIndexTemplateRequest patterns(List<String> indexPatterns) {
this.indexPatterns = indexPatterns;
return this;
}
public String template() {
return this.template;
public List<String> patterns() {
return this.indexPatterns;
}
public PutIndexTemplateRequest order(int order) {
@ -142,7 +150,7 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
/**
* Set to <tt>true</tt> to force only creation, not an update of an index template. If it already
* exists, it will fail with an {@link org.elasticsearch.indices.IndexTemplateAlreadyExistsException}.
* exists, it will fail with an {@link IllegalArgumentException}.
*/
public PutIndexTemplateRequest create(boolean create) {
this.create = create;
@ -286,7 +294,20 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
for (Map.Entry<String, Object> entry : source.entrySet()) {
String name = entry.getKey();
if (name.equals("template")) {
template(entry.getValue().toString());
// This is needed to allow for bwc (beats, logstash) with pre-5.0 templates (#21009)
if(entry.getValue() instanceof String) {
DEPRECATION_LOGGER.deprecated("Deprecated field [template] used, replaced by [index_patterns]");
patterns(Collections.singletonList((String) entry.getValue()));
}
} else if (name.equals("index_patterns")) {
if(entry.getValue() instanceof String) {
patterns(Collections.singletonList((String) entry.getValue()));
} else if (entry.getValue() instanceof List) {
List<String> elements = ((List<?>) entry.getValue()).stream().map(Object::toString).collect(Collectors.toList());
patterns(elements);
} else {
throw new IllegalArgumentException("Malformed [template] value, should be a string or a list of strings");
}
} else if (name.equals("order")) {
order(XContentMapValues.nodeIntegerValue(entry.getValue(), order()));
} else if ("version".equals(name)) {
@ -295,7 +316,7 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
}
version((Integer)entry.getValue());
} else if (name.equals("settings")) {
if (!(entry.getValue() instanceof Map)) {
if ((entry.getValue() instanceof Map) == false) {
throw new IllegalArgumentException("Malformed [settings] section, should include an inner object");
}
settings((Map<String, Object>) entry.getValue());
@ -436,7 +457,7 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
@Override
public String[] indices() {
return new String[]{template};
return indexPatterns.toArray(new String[indexPatterns.size()]);
}
@Override
@ -449,7 +470,12 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
super.readFrom(in);
cause = in.readString();
name = in.readString();
template = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
indexPatterns = in.readList(StreamInput::readString);
} else {
indexPatterns = Collections.singletonList(in.readString());
}
order = in.readInt();
create = in.readBoolean();
settings = readSettingsFromStream(in);
@ -475,7 +501,11 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
super.writeTo(out);
out.writeString(cause);
out.writeString(name);
out.writeString(template);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeStringList(indexPatterns);
} else {
out.writeString(indexPatterns.size() > 0 ? indexPatterns.get(0) : "");
}
out.writeInt(order);
out.writeBoolean(create);
writeSettingsToStream(settings, out);

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class PutIndexTemplateRequestBuilder
@ -39,10 +41,20 @@ public class PutIndexTemplateRequestBuilder
}
/**
* Sets the template match expression that will be used to match on indices created.
* Sets the match expression that will be used to match on indices created.
*
* @deprecated Replaced by {@link #setPatterns(List)}
*/
public PutIndexTemplateRequestBuilder setTemplate(String template) {
request.template(template);
@Deprecated
public PutIndexTemplateRequestBuilder setTemplate(String indexPattern) {
return setPatterns(Collections.singletonList(indexPattern));
}
/**
* Sets the match expression that will be used to match on indices created.
*/
public PutIndexTemplateRequestBuilder setPatterns(List<String> indexPatterns) {
request.patterns(indexPatterns);
return this;
}
@ -64,7 +76,7 @@ public class PutIndexTemplateRequestBuilder
/**
* Set to <tt>true</tt> to force only creation, not an update of an index template. If it already
* exists, it will fail with an {@link org.elasticsearch.indices.IndexTemplateAlreadyExistsException}.
* exists, it will fail with an {@link IllegalArgumentException}.
*/
public PutIndexTemplateRequestBuilder setCreate(boolean create) {
request.create(create);

View File

@ -79,7 +79,7 @@ public class TransportPutIndexTemplateAction extends TransportMasterNodeAction<P
templateSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
indexScopedSettings.validate(templateSettingsBuilder);
indexTemplateService.putTemplate(new MetaDataIndexTemplateService.PutRequest(cause, request.name())
.template(request.template())
.patterns(request.patterns())
.order(request.order())
.settings(templateSettingsBuilder.build())
.mappings(request.mappings())

View File

@ -61,7 +61,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* Note that we only support refresh on the bulk request not per item.
* @see org.elasticsearch.client.Client#bulk(BulkRequest)
*/
public class BulkRequest extends ActionRequest<BulkRequest> implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
public class BulkRequest extends ActionRequest implements CompositeIndicesRequest, WriteRequest<BulkRequest> {
private static final DeprecationLogger DEPRECATION_LOGGER =
new DeprecationLogger(Loggers.getLogger(BulkRequest.class));
@ -170,7 +170,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
sizeInBytes += request.upsertRequest().source().length();
}
if (request.script() != null) {
sizeInBytes += request.script().getScript().length() * 2;
sizeInBytes += request.script().getIdOrCode().length() * 2;
}
return this;
}

View File

@ -142,10 +142,18 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case INDEX:
final IndexRequest indexRequest = (IndexRequest) itemRequest;
Engine.IndexResult indexResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
operationResult = indexResult;
response = indexResult.hasFailure() ? null
: new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
if (indexResult.hasFailure()) {
response = null;
} else {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getVersion(), indexResult.isCreated());
}
operationResult = indexResult;
replicaRequest = request.items()[requestIndex];
break;
case UPDATE:
@ -158,10 +166,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
case DELETE:
final DeleteRequest deleteRequest = (DeleteRequest) itemRequest;
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
operationResult = deleteResult;
response = deleteResult.hasFailure() ? null :
new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
if (deleteResult.hasFailure()) {
response = null;
} else {
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getVersion(), deleteResult.isFound());
}
operationResult = deleteResult;
replicaRequest = request.items()[requestIndex];
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
@ -261,9 +276,23 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdatedAction);
if (updateOperationResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
case DELETED:
updateOperationResult = executeDeleteRequestOnPrimary(translate.action(), primary);
DeleteRequest deleteRequest = translate.action();
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
if (updateOperationResult.hasFailure() == false) {
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
case NOOP:
primary.noopUpdate(updateRequest.type());

View File

@ -124,8 +124,16 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
@Override
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response = result.hasFailure() ? null :
new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
final DeleteResponse response;
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(primary.shardId(), request.type(), request.id(), result.getVersion(), result.isFound());
} else {
response = null;
}
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
}
@ -136,19 +144,12 @@ public class TransportDeleteAction extends TransportWriteAction<DeleteRequest, D
}
public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) {
Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
Engine.DeleteResult result = primary.delete(delete);
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
return primary.delete(delete);
}
public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
return replica.delete(delete);
}
}

View File

@ -48,7 +48,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements Iterable<MultiGetRequest.Item>, CompositeIndicesRequest, RealtimeRequest {
public class MultiGetRequest extends ActionRequest implements Iterable<MultiGetRequest.Item>, CompositeIndicesRequest, RealtimeRequest {
/**
* A single get item.

View File

@ -142,9 +142,18 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
@Override
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response = indexResult.hasFailure() ? null :
new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
final IndexResponse response;
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(),
indexResult.isCreated());
} else {
response = null;
}
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}
@ -213,15 +222,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
"Dynamic mappings are not available on the node that holds the primary yet");
}
}
Engine.IndexResult result = primary.index(operation);
if (result.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = result.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersionForWrites(request.version());
}
return result;
return primary.index(operation);
}
}

View File

@ -59,7 +59,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
@Override
public <Request extends ActionRequest<Request>, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
switch (action) {
case IndexAction.NAME:
IndexRequest indexRequest = (IndexRequest) request;

View File

@ -54,7 +54,7 @@ public final class IngestProxyActionFilter implements ActionFilter {
}
@Override
public <Request extends ActionRequest<Request>, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
public <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request, ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
Action ingestAction;
switch (action) {
case IndexAction.NAME:

View File

@ -37,7 +37,7 @@ import java.util.Map;
import static org.elasticsearch.ingest.IngestDocument.MetaData;
public class SimulatePipelineRequest extends ActionRequest<SimulatePipelineRequest> {
public class SimulatePipelineRequest extends ActionRequest {
private String id;
private boolean verbose;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.main;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
public class MainRequest extends ActionRequest<MainRequest> {
public class MainRequest extends ActionRequest {
@Override
public ActionRequestValidationException validate() {

View File

@ -31,7 +31,7 @@ import java.util.List;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class ClearScrollRequest extends ActionRequest<ClearScrollRequest> {
public class ClearScrollRequest extends ActionRequest {
private List<String> scrollIds;

View File

@ -36,7 +36,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* A multi search API request.
*/
public class MultiSearchRequest extends ActionRequest<MultiSearchRequest> implements CompositeIndicesRequest {
public class MultiSearchRequest extends ActionRequest implements CompositeIndicesRequest {
private int maxConcurrentSearchRequests = 0;
private List<SearchRequest> requests = new ArrayList<>();

View File

@ -30,10 +30,8 @@ import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermStatistics;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.HppcMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
@ -72,7 +70,7 @@ import java.util.stream.StreamSupport;
public class SearchPhaseController extends AbstractComponent {
public static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
private static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
int i = o1.value.shardTarget().index().compareTo(o2.value.shardTarget().index());
if (i == 0) {
i = o1.value.shardTarget().shardId().id() - o2.value.shardTarget().shardId().id();
@ -80,17 +78,15 @@ public class SearchPhaseController extends AbstractComponent {
return i;
};
public static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final ClusterService clusterService;
SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService, ClusterService clusterService) {
SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
super(settings);
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.clusterService = clusterService;
}
public AggregatedDfs aggregateDfs(AtomicArray<DfsSearchResult> results) {
@ -486,7 +482,7 @@ public class SearchPhaseController extends AbstractComponent {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
}
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state());
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService);
aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
List<SiblingPipelineAggregator> pipelineAggregators = firstResult.pipelineAggregators();
if (pipelineAggregators != null) {

View File

@ -49,7 +49,7 @@ import java.util.Objects;
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public final class SearchRequest extends ActionRequest<SearchRequest> implements IndicesRequest.Replaceable {
public final class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
private SearchType searchType = SearchType.DEFAULT;

View File

@ -33,7 +33,7 @@ import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
public class SearchScrollRequest extends ActionRequest {
private String scrollId;
private Scroll scroll;

View File

@ -69,7 +69,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService);;
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService);
this.searchTransportService = new SearchTransportService(settings, transportService);
SearchTransportService.registerRequestHandler(transportService, searchService);
this.clusterService = clusterService;

View File

@ -50,10 +50,9 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
SearchScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = new SearchTransportService(settings, transportService);
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService);
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService);
}
@Override
protected final void doExecute(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");

View File

@ -40,7 +40,7 @@ public interface ActionFilter {
* Enables filtering the execution of an action on the request side, either by sending a response through the
* {@link ActionListener} or by continuing the execution through the given {@link ActionFilterChain chain}
*/
<Request extends ActionRequest<Request>, Response extends ActionResponse> void apply(Task task, String action, Request request,
<Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain);
/**
@ -62,7 +62,7 @@ public interface ActionFilter {
}
@Override
public final <Request extends ActionRequest<Request>, Response extends ActionResponse> void apply(Task task, String action, Request request,
public final <Request extends ActionRequest, Response extends ActionResponse> void apply(Task task, String action, Request request,
ActionListener<Response> listener, ActionFilterChain<Request, Response> chain) {
if (apply(action, request, listener)) {
chain.proceed(task, action, request, listener);
@ -73,7 +73,7 @@ public interface ActionFilter {
* Applies this filter and returns {@code true} if the execution chain should proceed, or {@code false}
* if it should be aborted since the filter already handled the request and called the given listener.
*/
protected abstract boolean apply(String action, ActionRequest<?> request, ActionListener<?> listener);
protected abstract boolean apply(String action, ActionRequest request, ActionListener<?> listener);
@Override
public final <Response extends ActionResponse> void apply(String action, Response response, ActionListener<Response> listener,

View File

@ -27,7 +27,7 @@ import org.elasticsearch.tasks.Task;
/**
* A filter chain allowing to continue and process the transport action request
*/
public interface ActionFilterChain<Request extends ActionRequest<Request>, Response extends ActionResponse> {
public interface ActionFilterChain<Request extends ActionRequest, Response extends ActionResponse> {
/**
* Continue processing the request. Should only be called if a response has not been sent through

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -69,14 +70,14 @@ public class ActiveShardsObserver extends AbstractComponent {
}
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
if (activeShardCount.enoughShardsActive(observer.observedState(), indexName)) {
if (activeShardCount.enoughShardsActive(observer.observedState().getClusterState(), indexName)) {
onResult.accept(true);
} else {
final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate =
new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(final ClusterState newState) {
return activeShardCount.enoughShardsActive(newState, indexName);
protected boolean validate(final ClusterServiceState newState) {
return activeShardCount.enoughShardsActive(newState.getClusterState(), indexName);
}
};

View File

@ -35,7 +35,7 @@ import java.util.function.Supplier;
/**
* A TransportAction that self registers a handler into the transport service
*/
public abstract class HandledTransportAction<Request extends ActionRequest<Request>, Response extends ActionResponse>
public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse>
extends TransportAction<Request, Response> {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,

View File

@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
public abstract class TransportAction<Request extends ActionRequest<Request>, Response extends ActionResponse> extends AbstractComponent {
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
protected final ThreadPool threadPool;
protected final String actionName;
@ -148,7 +148,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
protected abstract void doExecute(Request request, ActionListener<Response> listener);
private static class RequestFilterChain<Request extends ActionRequest<Request>, Response extends ActionResponse>
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse>
implements ActionFilterChain<Request, Response> {
private final TransportAction<Request, Response> action;
@ -184,7 +184,7 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
}
}
private static class ResponseFilterChain<Request extends ActionRequest<Request>, Response extends ActionResponse>
private static class ResponseFilterChain<Request extends ActionRequest, Response extends ActionResponse>
implements ActionFilterChain<Request, Response> {
private final ActionFilter[] filters;

View File

@ -28,7 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends ActionRequest<Request> implements IndicesRequest.Replaceable {
public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends ActionRequest implements IndicesRequest.Replaceable {
protected String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();

View File

@ -29,7 +29,7 @@ import java.io.IOException;
/**
* A based request for master based operation.
*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest<Request> {
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {
public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterServiceState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
@ -112,8 +113,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
private final ClusterStateObserver.ChangePredicate retryableOrNoBlockPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
protected boolean validate(ClusterServiceState newState) {
ClusterBlockException blockException = checkBlock(request, newState.getClusterState());
return (blockException == null || !blockException.retryable());
}
};
@ -133,7 +134,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
protected void doStart() {
final ClusterState clusterState = observer.observedState();
final ClusterState clusterState = observer.observedState().getClusterState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>> extends ActionRequest<Request> {
public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>> extends ActionRequest {
/**
* the list of nodesIds that will be used to resolve this request and {@link #concreteNodes}

View File

@ -106,17 +106,18 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
final List<NodeResponse> responses = new ArrayList<>();
final List<FailedNodeException> failures = new ArrayList<>();
final boolean accumulateExceptions = accumulateExceptions();
for (int i = 0; i < nodesResponses.length(); ++i) {
Object response = nodesResponses.get(i);
if (nodeResponseClass.isInstance(response)) {
responses.add(nodeResponseClass.cast(response));
} else if (response instanceof FailedNodeException) {
failures.add((FailedNodeException)response);
if (response instanceof FailedNodeException) {
if (accumulateExceptions) {
failures.add((FailedNodeException)response);
} else {
logger.warn("not accumulating exceptions, excluding exception from response", (FailedNodeException)response);
}
} else {
logger.warn("ignoring unexpected response [{}] of type [{}], expected [{}] or [{}]",
response, response != null ? response.getClass().getName() : null,
nodeResponseClass.getSimpleName(), FailedNodeException.class.getSimpleName());
responses.add(nodeResponseClass.cast(response));
}
}
@ -243,9 +244,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
(org.apache.logging.log4j.util.Supplier<?>)
() -> new ParameterizedMessage("failed to execute on node [{}]", nodeId), t);
}
if (accumulateExceptions()) {
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
}
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
if (counter.incrementAndGet() == responses.length()) {
finishHim();
}

View File

@ -43,7 +43,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
* Requests that are run on a particular replica, first on the primary and then on the replicas like {@link IndexRequest} or
* {@link TransportShardRefreshAction}.
*/
public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ActionRequest<Request>
public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ActionRequest
implements IndicesRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);

View File

@ -622,7 +622,7 @@ public abstract class TransportReplicationAction<
@Override
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.observedState();
final ClusterState state = observer.observedState().getClusterState();
if (handleBlockExceptions(state)) {
return;
}

Some files were not shown because too many files have changed in this diff Show More