Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
84c862b825
|
@ -307,6 +307,12 @@ class BuildPlugin implements Plugin<Project> {
|
|||
/** Adds repositores used by ES dependencies */
|
||||
static void configureRepositories(Project project) {
|
||||
RepositoryHandler repos = project.repositories
|
||||
if (System.getProperty("repos.mavenlocal") != null) {
|
||||
// with -Drepos.mavenlocal=true we can force checking the local .m2 repo which is
|
||||
// useful for development ie. bwc tests where we install stuff in the local repository
|
||||
// such that we don't have to pass hardcoded files to gradle
|
||||
repos.mavenLocal()
|
||||
}
|
||||
repos.mavenCentral()
|
||||
repos.maven {
|
||||
name 'sonatype-snapshots'
|
||||
|
|
|
@ -23,8 +23,6 @@ import org.gradle.api.Project
|
|||
import org.gradle.api.file.FileCollection
|
||||
import org.gradle.api.tasks.Input
|
||||
|
||||
import java.time.LocalDateTime
|
||||
|
||||
/** Configuration for an elasticsearch cluster, used for integration tests. */
|
||||
class ClusterConfiguration {
|
||||
|
||||
|
@ -34,6 +32,12 @@ class ClusterConfiguration {
|
|||
@Input
|
||||
int numNodes = 1
|
||||
|
||||
@Input
|
||||
int numBwcNodes = 0
|
||||
|
||||
@Input
|
||||
String bwcVersion = null
|
||||
|
||||
@Input
|
||||
int httpPort = 0
|
||||
|
||||
|
|
|
@ -53,11 +53,50 @@ class ClusterFormationTasks {
|
|||
// no need to add cluster formation tasks if the task won't run!
|
||||
return
|
||||
}
|
||||
configureDistributionDependency(project, config.distribution)
|
||||
List<Task> startTasks = []
|
||||
File sharedDir = new File(project.buildDir, "cluster/shared")
|
||||
// first we remove everything in the shared cluster directory to ensure there are no leftovers in repos or anything
|
||||
// in theory this should not be necessary but repositories are only deleted in the cluster-state and not on-disk
|
||||
// such that snapshots survive failures / test runs and there is no simple way today to fix that.
|
||||
Task cleanup = project.tasks.create(name: "${task.name}#prepareCluster.cleanShared", type: Delete, dependsOn: task.dependsOn.collect()) {
|
||||
delete sharedDir
|
||||
doLast {
|
||||
sharedDir.mkdirs()
|
||||
}
|
||||
}
|
||||
List<Task> startTasks = [cleanup]
|
||||
List<NodeInfo> nodes = []
|
||||
if (config.numNodes < config.numBwcNodes) {
|
||||
throw new GradleException("numNodes must be >= numBwcNodes [${config.numNodes} < ${config.numBwcNodes}]")
|
||||
}
|
||||
if (config.numBwcNodes > 0 && config.bwcVersion == null) {
|
||||
throw new GradleException("bwcVersion must not be null if numBwcNodes is > 0")
|
||||
}
|
||||
// this is our current version distribution configuration we use for all kinds of REST tests etc.
|
||||
project.configurations {
|
||||
elasticsearchDistro
|
||||
}
|
||||
configureDistributionDependency(project, config.distribution, project.configurations.elasticsearchDistro, VersionProperties.elasticsearch)
|
||||
if (config.bwcVersion != null && config.numBwcNodes > 0) {
|
||||
// if we have a cluster that has a BWC cluster we also need to configure a dependency on the BWC version
|
||||
// this version uses the same distribution etc. and only differs in the version we depend on.
|
||||
// from here on everything else works the same as if it's the current version, we fetch the BWC version
|
||||
// from mirrors using gradles built-in mechanism etc.
|
||||
project.configurations {
|
||||
elasticsearchBwcDistro
|
||||
}
|
||||
configureDistributionDependency(project, config.distribution, project.configurations.elasticsearchBwcDistro, config.bwcVersion)
|
||||
}
|
||||
|
||||
for (int i = 0; i < config.numNodes; ++i) {
|
||||
NodeInfo node = new NodeInfo(config, i, project, task)
|
||||
// we start N nodes and out of these N nodes there might be M bwc nodes.
|
||||
// for each of those nodes we might have a different configuratioon
|
||||
String elasticsearchVersion = VersionProperties.elasticsearch
|
||||
Configuration configuration = project.configurations.elasticsearchDistro
|
||||
if (i < config.numBwcNodes) {
|
||||
elasticsearchVersion = config.bwcVersion
|
||||
configuration = project.configurations.elasticsearchBwcDistro
|
||||
}
|
||||
NodeInfo node = new NodeInfo(config, i, project, task, elasticsearchVersion, sharedDir)
|
||||
if (i == 0) {
|
||||
if (config.seedNodePortsFile != null) {
|
||||
// we might allow this in the future to be set but for now we are the only authority to set this!
|
||||
|
@ -66,7 +105,7 @@ class ClusterFormationTasks {
|
|||
config.seedNodePortsFile = node.transportPortsFile;
|
||||
}
|
||||
nodes.add(node)
|
||||
startTasks.add(configureNode(project, task, node))
|
||||
startTasks.add(configureNode(project, task, cleanup, node, configuration))
|
||||
}
|
||||
|
||||
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
|
||||
|
@ -77,20 +116,14 @@ class ClusterFormationTasks {
|
|||
}
|
||||
|
||||
/** Adds a dependency on the given distribution */
|
||||
static void configureDistributionDependency(Project project, String distro) {
|
||||
String elasticsearchVersion = VersionProperties.elasticsearch
|
||||
static void configureDistributionDependency(Project project, String distro, Configuration configuration, String elasticsearchVersion) {
|
||||
String packaging = distro
|
||||
if (distro == 'tar') {
|
||||
packaging = 'tar.gz'
|
||||
} else if (distro == 'integ-test-zip') {
|
||||
packaging = 'zip'
|
||||
}
|
||||
project.configurations {
|
||||
elasticsearchDistro
|
||||
}
|
||||
project.dependencies {
|
||||
elasticsearchDistro "org.elasticsearch.distribution.${distro}:elasticsearch:${elasticsearchVersion}@${packaging}"
|
||||
}
|
||||
project.dependencies.add(configuration.name, "org.elasticsearch.distribution.${distro}:elasticsearch:${elasticsearchVersion}@${packaging}")
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -110,10 +143,10 @@ class ClusterFormationTasks {
|
|||
*
|
||||
* @return a task which starts the node.
|
||||
*/
|
||||
static Task configureNode(Project project, Task task, NodeInfo node) {
|
||||
static Task configureNode(Project project, Task task, Object dependsOn, NodeInfo node, Configuration configuration) {
|
||||
|
||||
// tasks are chained so their execution order is maintained
|
||||
Task setup = project.tasks.create(name: taskName(task, node, 'clean'), type: Delete, dependsOn: task.dependsOn.collect()) {
|
||||
Task setup = project.tasks.create(name: taskName(task, node, 'clean'), type: Delete, dependsOn: dependsOn) {
|
||||
delete node.homeDir
|
||||
delete node.cwd
|
||||
doLast {
|
||||
|
@ -122,7 +155,7 @@ class ClusterFormationTasks {
|
|||
}
|
||||
setup = configureCheckPreviousTask(taskName(task, node, 'checkPrevious'), project, setup, node)
|
||||
setup = configureStopTask(taskName(task, node, 'stopPrevious'), project, setup, node)
|
||||
setup = configureExtractTask(taskName(task, node, 'extract'), project, setup, node)
|
||||
setup = configureExtractTask(taskName(task, node, 'extract'), project, setup, node, configuration)
|
||||
setup = configureWriteConfigTask(taskName(task, node, 'configure'), project, setup, node)
|
||||
setup = configureExtraConfigFilesTask(taskName(task, node, 'extraConfig'), project, setup, node)
|
||||
setup = configureCopyPluginsTask(taskName(task, node, 'copyPlugins'), project, setup, node)
|
||||
|
@ -158,27 +191,28 @@ class ClusterFormationTasks {
|
|||
}
|
||||
|
||||
/** Adds a task to extract the elasticsearch distribution */
|
||||
static Task configureExtractTask(String name, Project project, Task setup, NodeInfo node) {
|
||||
List extractDependsOn = [project.configurations.elasticsearchDistro, setup]
|
||||
/* project.configurations.elasticsearchDistro.singleFile will be an
|
||||
external artifact if this is being run by a plugin not living in the
|
||||
elasticsearch source tree. If this is a plugin built in the
|
||||
elasticsearch source tree or this is a distro in the elasticsearch
|
||||
source tree then this should be the version of elasticsearch built
|
||||
by the source tree. If it isn't then Bad Things(TM) will happen. */
|
||||
static Task configureExtractTask(String name, Project project, Task setup, NodeInfo node, Configuration configuration) {
|
||||
List extractDependsOn = [configuration, setup]
|
||||
/* configuration.singleFile will be an external artifact if this is being run by a plugin not living in the
|
||||
elasticsearch source tree. If this is a plugin built in the elasticsearch source tree or this is a distro in
|
||||
the elasticsearch source tree then this should be the version of elasticsearch built by the source tree.
|
||||
If it isn't then Bad Things(TM) will happen. */
|
||||
Task extract
|
||||
|
||||
switch (node.config.distribution) {
|
||||
case 'integ-test-zip':
|
||||
case 'zip':
|
||||
extract = project.tasks.create(name: name, type: Copy, dependsOn: extractDependsOn) {
|
||||
from { project.zipTree(project.configurations.elasticsearchDistro.singleFile) }
|
||||
from {
|
||||
project.zipTree(configuration.singleFile)
|
||||
}
|
||||
into node.baseDir
|
||||
}
|
||||
break;
|
||||
case 'tar':
|
||||
extract = project.tasks.create(name: name, type: Copy, dependsOn: extractDependsOn) {
|
||||
from {
|
||||
project.tarTree(project.resources.gzip(project.configurations.elasticsearchDistro.singleFile))
|
||||
project.tarTree(project.resources.gzip(configuration.singleFile))
|
||||
}
|
||||
into node.baseDir
|
||||
}
|
||||
|
@ -187,7 +221,7 @@ class ClusterFormationTasks {
|
|||
File rpmDatabase = new File(node.baseDir, 'rpm-database')
|
||||
File rpmExtracted = new File(node.baseDir, 'rpm-extracted')
|
||||
/* Delay reading the location of the rpm file until task execution */
|
||||
Object rpm = "${ -> project.configurations.elasticsearchDistro.singleFile}"
|
||||
Object rpm = "${ -> configuration.singleFile}"
|
||||
extract = project.tasks.create(name: name, type: LoggedExec, dependsOn: extractDependsOn) {
|
||||
commandLine 'rpm', '--badreloc', '--nodeps', '--noscripts', '--notriggers',
|
||||
'--dbpath', rpmDatabase,
|
||||
|
@ -202,7 +236,7 @@ class ClusterFormationTasks {
|
|||
case 'deb':
|
||||
/* Delay reading the location of the deb file until task execution */
|
||||
File debExtracted = new File(node.baseDir, 'deb-extracted')
|
||||
Object deb = "${ -> project.configurations.elasticsearchDistro.singleFile}"
|
||||
Object deb = "${ -> configuration.singleFile}"
|
||||
extract = project.tasks.create(name: name, type: LoggedExec, dependsOn: extractDependsOn) {
|
||||
commandLine 'dpkg-deb', '-x', deb, debExtracted
|
||||
doFirst {
|
||||
|
@ -221,8 +255,8 @@ class ClusterFormationTasks {
|
|||
Map esConfig = [
|
||||
'cluster.name' : node.clusterName,
|
||||
'pidfile' : node.pidFile,
|
||||
'path.repo' : "${node.homeDir}/repo",
|
||||
'path.shared_data' : "${node.homeDir}/../",
|
||||
'path.repo' : "${node.sharedDir}/repo",
|
||||
'path.shared_data' : "${node.sharedDir}/",
|
||||
// Define a node attribute so we can test that it exists
|
||||
'node.testattr' : 'test',
|
||||
'repositories.url.allowed_urls': 'http://snapshot.test*'
|
||||
|
|
|
@ -40,6 +40,9 @@ class NodeInfo {
|
|||
/** root directory all node files and operations happen under */
|
||||
File baseDir
|
||||
|
||||
/** shared data directory all nodes share */
|
||||
File sharedDir
|
||||
|
||||
/** the pid file the node will use */
|
||||
File pidFile
|
||||
|
||||
|
@ -89,14 +92,15 @@ class NodeInfo {
|
|||
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
|
||||
|
||||
/** Creates a node to run as part of a cluster for the given task */
|
||||
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
|
||||
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task, String nodeVersion, File sharedDir) {
|
||||
this.config = config
|
||||
this.nodeNum = nodeNum
|
||||
this.sharedDir = sharedDir
|
||||
clusterName = "${task.path.replace(':', '_').substring(1)}"
|
||||
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
|
||||
pidFile = new File(baseDir, 'es.pid')
|
||||
homeDir = homeDir(baseDir, config.distribution)
|
||||
confDir = confDir(baseDir, config.distribution)
|
||||
homeDir = homeDir(baseDir, config.distribution, nodeVersion)
|
||||
confDir = confDir(baseDir, config.distribution, nodeVersion)
|
||||
configFile = new File(confDir, 'elasticsearch.yml')
|
||||
// even for rpm/deb, the logs are under home because we dont start with real services
|
||||
File logsDir = new File(homeDir, 'logs')
|
||||
|
@ -181,13 +185,13 @@ class NodeInfo {
|
|||
}
|
||||
|
||||
/** Returns the directory elasticsearch home is contained in for the given distribution */
|
||||
static File homeDir(File baseDir, String distro) {
|
||||
static File homeDir(File baseDir, String distro, String nodeVersion) {
|
||||
String path
|
||||
switch (distro) {
|
||||
case 'integ-test-zip':
|
||||
case 'zip':
|
||||
case 'tar':
|
||||
path = "elasticsearch-${VersionProperties.elasticsearch}"
|
||||
path = "elasticsearch-${nodeVersion}"
|
||||
break
|
||||
case 'rpm':
|
||||
case 'deb':
|
||||
|
@ -199,12 +203,12 @@ class NodeInfo {
|
|||
return new File(baseDir, path)
|
||||
}
|
||||
|
||||
static File confDir(File baseDir, String distro) {
|
||||
static File confDir(File baseDir, String distro, String nodeVersion) {
|
||||
switch (distro) {
|
||||
case 'integ-test-zip':
|
||||
case 'zip':
|
||||
case 'tar':
|
||||
return new File(homeDir(baseDir, distro), 'config')
|
||||
return new File(homeDir(baseDir, distro, nodeVersion), 'config')
|
||||
case 'rpm':
|
||||
case 'deb':
|
||||
return new File(baseDir, "${distro}-extracted/etc/elasticsearch")
|
||||
|
|
|
@ -231,8 +231,7 @@ public class NodeInfo extends BaseNodeResponse {
|
|||
plugins.readFrom(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
ingest = new IngestInfo();
|
||||
ingest.readFrom(in);
|
||||
ingest = new IngestInfo(in);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -235,7 +235,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
|
|||
breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
|
||||
scriptStats = in.readOptionalStreamable(ScriptStats::new);
|
||||
discoveryStats = in.readOptionalStreamable(() -> new DiscoveryStats(null));
|
||||
ingestStats = in.readOptionalWritable(IngestStats.PROTO::readFrom);
|
||||
ingestStats = in.readOptionalWritable(IngestStats::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -65,6 +65,7 @@ import java.util.HashMap;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.ElasticsearchException.readException;
|
||||
|
|
|
@ -31,12 +31,18 @@ import java.util.Map;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class IngestStats implements Writeable<IngestStats>, ToXContent {
|
||||
|
||||
public final static IngestStats PROTO = new IngestStats(null, null);
|
||||
|
||||
private final Stats totalStats;
|
||||
private final Map<String, Stats> statsPerPipeline;
|
||||
|
||||
public IngestStats(StreamInput in) throws IOException {
|
||||
this.totalStats = new Stats(in);
|
||||
int size = in.readVInt();
|
||||
this.statsPerPipeline = new HashMap<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
statsPerPipeline.put(in.readString(), new Stats(in));
|
||||
}
|
||||
}
|
||||
|
||||
public IngestStats(Stats totalStats, Map<String, Stats> statsPerPipeline) {
|
||||
this.totalStats = totalStats;
|
||||
this.statsPerPipeline = statsPerPipeline;
|
||||
|
@ -58,16 +64,7 @@ public class IngestStats implements Writeable<IngestStats>, ToXContent {
|
|||
|
||||
@Override
|
||||
public IngestStats readFrom(StreamInput in) throws IOException {
|
||||
Stats totalStats = Stats.PROTO.readFrom(in);
|
||||
totalStats.readFrom(in);
|
||||
int size = in.readVInt();
|
||||
Map<String, Stats> statsPerPipeline = new HashMap<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Stats stats = Stats.PROTO.readFrom(in);
|
||||
statsPerPipeline.put(in.readString(), stats);
|
||||
stats.readFrom(in);
|
||||
}
|
||||
return new IngestStats(totalStats, statsPerPipeline);
|
||||
return new IngestStats(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,13 +96,18 @@ public class IngestStats implements Writeable<IngestStats>, ToXContent {
|
|||
|
||||
public static class Stats implements Writeable<Stats>, ToXContent {
|
||||
|
||||
private final static Stats PROTO = new Stats(0, 0, 0, 0);
|
||||
|
||||
private final long ingestCount;
|
||||
private final long ingestTimeInMillis;
|
||||
private final long ingestCurrent;
|
||||
private final long ingestFailedCount;
|
||||
|
||||
public Stats(StreamInput in) throws IOException {
|
||||
ingestCount = in.readVLong();
|
||||
ingestTimeInMillis = in.readVLong();
|
||||
ingestCurrent = in.readVLong();
|
||||
ingestFailedCount = in.readVLong();
|
||||
}
|
||||
|
||||
public Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount) {
|
||||
this.ingestCount = ingestCount;
|
||||
this.ingestTimeInMillis = ingestTimeInMillis;
|
||||
|
@ -144,11 +146,7 @@ public class IngestStats implements Writeable<IngestStats>, ToXContent {
|
|||
|
||||
@Override
|
||||
public Stats readFrom(StreamInput in) throws IOException {
|
||||
long ingestCount = in.readVLong();
|
||||
long ingestTimeInMillis = in.readVLong();
|
||||
long ingestCurrent = in.readVLong();
|
||||
long ingestFailedCount = in.readVLong();
|
||||
return new Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
|
||||
return new Stats(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.ingest.core;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
|
@ -32,17 +33,22 @@ import java.util.LinkedHashSet;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class IngestInfo implements Streamable, ToXContent {
|
||||
public class IngestInfo implements Writeable<IngestInfo>, ToXContent {
|
||||
|
||||
private Set<ProcessorInfo> processors;
|
||||
private final Set<ProcessorInfo> processors;
|
||||
|
||||
public IngestInfo() {
|
||||
processors = Collections.emptySet();
|
||||
public IngestInfo(StreamInput in) throws IOException {
|
||||
this(Collections.emptyList());
|
||||
final int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
processors.add(new ProcessorInfo(in));
|
||||
}
|
||||
}
|
||||
|
||||
public IngestInfo(List<ProcessorInfo> processors) {
|
||||
this.processors = new LinkedHashSet<>(processors);
|
||||
this.processors = new TreeSet<>(processors); // we use a treeset here to have a test-able / predictable order
|
||||
}
|
||||
|
||||
public Iterable<ProcessorInfo> getProcessors() {
|
||||
|
@ -54,15 +60,8 @@ public class IngestInfo implements Streamable, ToXContent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
int size = in.readVInt();
|
||||
Set<ProcessorInfo> processors = new LinkedHashSet<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
ProcessorInfo info = new ProcessorInfo();
|
||||
info.readFrom(in);
|
||||
processors.add(info);
|
||||
}
|
||||
this.processors = processors;
|
||||
public IngestInfo readFrom(StreamInput in) throws IOException {
|
||||
return new IngestInfo(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -22,16 +22,18 @@ package org.elasticsearch.ingest.core;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class ProcessorInfo implements Streamable, ToXContent {
|
||||
public class ProcessorInfo implements Writeable<ProcessorInfo>, ToXContent, Comparable<ProcessorInfo> {
|
||||
|
||||
private String type;
|
||||
private final String type;
|
||||
|
||||
ProcessorInfo() {
|
||||
public ProcessorInfo(StreamInput input) throws IOException {
|
||||
type = input.readString();
|
||||
}
|
||||
|
||||
public ProcessorInfo(String type) {
|
||||
|
@ -46,8 +48,8 @@ public class ProcessorInfo implements Streamable, ToXContent {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
this.type = in.readString();
|
||||
public ProcessorInfo readFrom(StreamInput in) throws IOException {
|
||||
return new ProcessorInfo(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -78,4 +80,9 @@ public class ProcessorInfo implements Streamable, ToXContent {
|
|||
public int hashCode() {
|
||||
return type.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(ProcessorInfo o) {
|
||||
return type.compareTo(o.type);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
public class IngestStatsTests extends ESTestCase {
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
IngestStats.Stats total = new IngestStats.Stats(5, 10, 20, 30);
|
||||
IngestStats.Stats foo = new IngestStats.Stats(50, 100, 200, 300);
|
||||
IngestStats ingestStats = new IngestStats(total, Collections.singletonMap("foo", foo));
|
||||
IngestStats serialize = serialize(ingestStats);
|
||||
assertNotSame(serialize, ingestStats);
|
||||
assertNotSame(serialize.getTotalStats(), total);
|
||||
assertEquals(total.getIngestCount(), serialize.getTotalStats().getIngestCount());
|
||||
assertEquals(total.getIngestFailedCount(), serialize.getTotalStats().getIngestFailedCount());
|
||||
assertEquals(total.getIngestTimeInMillis(), serialize.getTotalStats().getIngestTimeInMillis());
|
||||
assertEquals(total.getIngestCurrent(), serialize.getTotalStats().getIngestCurrent());
|
||||
|
||||
assertEquals(ingestStats.getStatsPerPipeline().size(), 1);
|
||||
assertTrue(ingestStats.getStatsPerPipeline().containsKey("foo"));
|
||||
|
||||
Map<String, IngestStats.Stats> left = ingestStats.getStatsPerPipeline();
|
||||
Map<String, IngestStats.Stats> right = serialize.getStatsPerPipeline();
|
||||
|
||||
assertEquals(right.size(), 1);
|
||||
assertTrue(right.containsKey("foo"));
|
||||
assertEquals(left.size(), 1);
|
||||
assertTrue(left.containsKey("foo"));
|
||||
IngestStats.Stats leftStats = left.get("foo");
|
||||
IngestStats.Stats rightStats = right.get("foo");
|
||||
assertEquals(leftStats.getIngestCount(), rightStats.getIngestCount());
|
||||
assertEquals(leftStats.getIngestFailedCount(), rightStats.getIngestFailedCount());
|
||||
assertEquals(leftStats.getIngestTimeInMillis(), rightStats.getIngestTimeInMillis());
|
||||
assertEquals(leftStats.getIngestCurrent(), rightStats.getIngestCurrent());
|
||||
}
|
||||
|
||||
private <T> T serialize(Writeable<T> writeable) throws IOException {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
writeable.writeTo(out);
|
||||
StreamInput in = StreamInput.wrap(out.bytes());
|
||||
return writeable.readFrom(in);
|
||||
}
|
||||
}
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.transport.TransportInfo;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -137,7 +138,7 @@ public class NodeInfoStreamingTests extends ESTestCase {
|
|||
PluginsAndModules plugins = new PluginsAndModules();
|
||||
plugins.addModule(DummyPluginInfo.INSTANCE);
|
||||
plugins.addPlugin(DummyPluginInfo.INSTANCE);
|
||||
IngestInfo ingestInfo = new IngestInfo();
|
||||
IngestInfo ingestInfo = new IngestInfo(Collections.emptyList());
|
||||
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, serviceAttributes, settings, osInfo, process, jvm, threadPoolInfo, transport, htttpInfo, plugins, ingestInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,3 +9,18 @@
|
|||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.modules.0.name: ingest-grok }
|
||||
- match: { nodes.$master.ingest.processors.0.type: append }
|
||||
- match: { nodes.$master.ingest.processors.1.type: convert }
|
||||
- match: { nodes.$master.ingest.processors.2.type: date }
|
||||
- match: { nodes.$master.ingest.processors.3.type: fail }
|
||||
- match: { nodes.$master.ingest.processors.4.type: foreach }
|
||||
- match: { nodes.$master.ingest.processors.5.type: grok }
|
||||
- match: { nodes.$master.ingest.processors.6.type: gsub }
|
||||
- match: { nodes.$master.ingest.processors.7.type: join }
|
||||
- match: { nodes.$master.ingest.processors.8.type: lowercase }
|
||||
- match: { nodes.$master.ingest.processors.9.type: remove }
|
||||
- match: { nodes.$master.ingest.processors.10.type: rename }
|
||||
- match: { nodes.$master.ingest.processors.11.type: set }
|
||||
- match: { nodes.$master.ingest.processors.12.type: split }
|
||||
- match: { nodes.$master.ingest.processors.13.type: trim }
|
||||
- match: { nodes.$master.ingest.processors.14.type: uppercase }
|
||||
|
|
|
@ -8,4 +8,19 @@
|
|||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.plugins.0.name: ingest-attachment }
|
||||
- match: { nodes.$master.ingest.processors.11.type: attachment }
|
||||
- match: { nodes.$master.ingest.processors.0.type: append }
|
||||
- match: { nodes.$master.ingest.processors.1.type: attachment }
|
||||
- match: { nodes.$master.ingest.processors.2.type: convert }
|
||||
- match: { nodes.$master.ingest.processors.3.type: date }
|
||||
- match: { nodes.$master.ingest.processors.4.type: fail }
|
||||
- match: { nodes.$master.ingest.processors.5.type: foreach }
|
||||
- match: { nodes.$master.ingest.processors.6.type: gsub }
|
||||
- match: { nodes.$master.ingest.processors.7.type: join }
|
||||
- match: { nodes.$master.ingest.processors.8.type: lowercase }
|
||||
- match: { nodes.$master.ingest.processors.9.type: remove }
|
||||
- match: { nodes.$master.ingest.processors.10.type: rename }
|
||||
- match: { nodes.$master.ingest.processors.11.type: set }
|
||||
- match: { nodes.$master.ingest.processors.12.type: split }
|
||||
- match: { nodes.$master.ingest.processors.13.type: trim }
|
||||
- match: { nodes.$master.ingest.processors.14.type: uppercase }
|
||||
|
||||
|
|
|
@ -8,4 +8,18 @@
|
|||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.plugins.0.name: ingest-geoip }
|
||||
- match: { nodes.$master.ingest.processors.3.type: geoip }
|
||||
- match: { nodes.$master.ingest.processors.0.type: append }
|
||||
- match: { nodes.$master.ingest.processors.1.type: convert }
|
||||
- match: { nodes.$master.ingest.processors.2.type: date }
|
||||
- match: { nodes.$master.ingest.processors.3.type: fail }
|
||||
- match: { nodes.$master.ingest.processors.4.type: foreach }
|
||||
- match: { nodes.$master.ingest.processors.5.type: geoip }
|
||||
- match: { nodes.$master.ingest.processors.6.type: gsub }
|
||||
- match: { nodes.$master.ingest.processors.7.type: join }
|
||||
- match: { nodes.$master.ingest.processors.8.type: lowercase }
|
||||
- match: { nodes.$master.ingest.processors.9.type: remove }
|
||||
- match: { nodes.$master.ingest.processors.10.type: rename }
|
||||
- match: { nodes.$master.ingest.processors.11.type: set }
|
||||
- match: { nodes.$master.ingest.processors.12.type: split }
|
||||
- match: { nodes.$master.ingest.processors.13.type: trim }
|
||||
- match: { nodes.$master.ingest.processors.14.type: uppercase }
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
apply plugin: 'elasticsearch.rest-test'
|
||||
|
||||
/* This project runs the core REST tests against a 2 node cluster where one of the nodes has a different minor.
|
||||
* Since we don't have a version to test against we currently use the hardcoded snapshot for to bascially run
|
||||
* against ourself. To test that useing a different version works got into distribution/zip and execute:
|
||||
* gradle clean install -Dbuild.snapshot=false
|
||||
*
|
||||
* This installs the release-build into a local .m2 repository, then change this version here to:
|
||||
* bwcVersion = "5.0.0"
|
||||
*
|
||||
* now you can run the bwc tests with:
|
||||
* gradle check -Drepos.mavenlocal=true
|
||||
*
|
||||
* (-Drepos.mavenlocal=true will force gradle to look for the zip distribuiton in the local .m2 repository)
|
||||
*/
|
||||
integTest {
|
||||
includePackaged = true
|
||||
cluster {
|
||||
numNodes = 2
|
||||
numBwcNodes = 1
|
||||
bwcVersion = "5.0.0-SNAPSHOT" // this is the same as the current version until we released the first RC
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.smoketest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.RestTestCandidate;
|
||||
import org.elasticsearch.test.rest.parser.RestTestParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class MultiNodeBackwardsIT extends ESRestTestCase {
|
||||
|
||||
public MultiNodeBackwardsIT(RestTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
|
||||
return createParameters(0, 1);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,28 +1,3 @@
|
|||
---
|
||||
"Check availability of default processors":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
- set: {master_node: master}
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.ingest.processors.0.type: date }
|
||||
- match: { nodes.$master.ingest.processors.1.type: uppercase }
|
||||
- match: { nodes.$master.ingest.processors.2.type: set }
|
||||
- match: { nodes.$master.ingest.processors.3.type: lowercase }
|
||||
- match: { nodes.$master.ingest.processors.4.type: gsub }
|
||||
- match: { nodes.$master.ingest.processors.5.type: convert }
|
||||
- match: { nodes.$master.ingest.processors.6.type: remove }
|
||||
- match: { nodes.$master.ingest.processors.7.type: fail }
|
||||
- match: { nodes.$master.ingest.processors.8.type: foreach }
|
||||
- match: { nodes.$master.ingest.processors.9.type: split }
|
||||
- match: { nodes.$master.ingest.processors.10.type: trim }
|
||||
- match: { nodes.$master.ingest.processors.11.type: rename }
|
||||
- match: { nodes.$master.ingest.processors.12.type: join }
|
||||
- match: { nodes.$master.ingest.processors.13.type: append }
|
||||
|
||||
---
|
||||
"Test basic pipeline crud":
|
||||
- do:
|
||||
|
|
|
@ -76,11 +76,12 @@ setup:
|
|||
- do:
|
||||
nodes.stats:
|
||||
metric: [ ingest ]
|
||||
- gte: {nodes.$master.ingest.total.count: 1}
|
||||
#we can't assert anything here since we might have more than one node in the cluster
|
||||
- gte: {nodes.$master.ingest.total.count: 0}
|
||||
- gte: {nodes.$master.ingest.total.failed: 0}
|
||||
- gte: {nodes.$master.ingest.total.time_in_millis: 0}
|
||||
- match: {nodes.$master.ingest.total.current: 0}
|
||||
- gte: {nodes.$master.ingest.pipelines.pipeline1.count: 1}
|
||||
- gte: {nodes.$master.ingest.pipelines.pipeline1.count: 0}
|
||||
- match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0}
|
||||
- gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0}
|
||||
- match: {nodes.$master.ingest.pipelines.pipeline1.current: 0}
|
||||
|
@ -113,11 +114,12 @@ setup:
|
|||
- do:
|
||||
nodes.stats:
|
||||
metric: [ ingest ]
|
||||
- gte: {nodes.$master.ingest.total.count: 1}
|
||||
#we can't assert anything here since we might have more than one node in the cluster
|
||||
- gte: {nodes.$master.ingest.total.count: 0}
|
||||
- gte: {nodes.$master.ingest.total.failed: 0}
|
||||
- gte: {nodes.$master.ingest.total.time_in_millis: 0}
|
||||
- match: {nodes.$master.ingest.total.current: 0}
|
||||
- match: {nodes.$master.ingest.pipelines.pipeline2.count: 1}
|
||||
- gte: {nodes.$master.ingest.pipelines.pipeline2.count: 0}
|
||||
- match: {nodes.$master.ingest.pipelines.pipeline2.failed: 0}
|
||||
- gte: {nodes.$master.ingest.pipelines.pipeline2.time_in_millis: 0}
|
||||
- match: {nodes.$master.ingest.pipelines.pipeline2.current: 0}
|
||||
|
|
|
@ -39,6 +39,7 @@ List projects = [
|
|||
'plugins:repository-s3',
|
||||
'plugins:jvm-example',
|
||||
'plugins:store-smb',
|
||||
'qa:backwards-5.0',
|
||||
'qa:evil-tests',
|
||||
'qa:smoke-test-client',
|
||||
'qa:smoke-test-multinode',
|
||||
|
|
Loading…
Reference in New Issue