2.x backwards compatibility support for watcher

Basic backwards compatibility support for watcher.

Closes elastic/elasticsearch#3230

Relates to elastic/elasticsearch#3231 - this actually should fix all the failures caused
by fractional time values but it does so by being able to parse them.
Being able to parse them is important for 2.x compatibility but 5.0
watches shouldn't produce fractional time values. This fixes the
particular way of making fractional time values mentioned in elastic/elasticsearch#3231
but I expect there are a half dozen more places to fix. The actual
watcher tests are fairly basic.

Original commit: elastic/x-pack-elasticsearch@328717455c
This commit is contained in:
Nik Everett 2016-08-31 10:19:07 -04:00
parent 119bb67967
commit 203faaf4f4
27 changed files with 645 additions and 220 deletions

View File

@ -16,7 +16,7 @@
# Creates indices with old versions of elasticsearch. These indices are used by x-pack plugins like security # Creates indices with old versions of elasticsearch. These indices are used by x-pack plugins like security
# to test if the import of metadata that is stored in elasticsearch indexes works correctly. # to test if the import of metadata that is stored in elasticsearch indexes works correctly.
# This tool will start a node on port 9200/9300. If a node is already running on that port then the script will fail. # This tool will start a node on port 9200/9300. If a node is already running on that port then the script will fail.
# Currently this script can only deal with versions >=2.3X and < 5.0. Needs more work for versions before or after. # Currently this script can only deal with versions >=2.0.0 and < 5.0. Needs more work for versions before or after.
# #
# Run from x-plugins root directory like so: # Run from x-plugins root directory like so:
# python3 ./elasticsearch/x-dev-tools/create_bwc_indexes.py 2.3.4 # python3 ./elasticsearch/x-dev-tools/create_bwc_indexes.py 2.3.4
@ -50,6 +50,7 @@ try:
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import ConnectionError
from elasticsearch.exceptions import TransportError from elasticsearch.exceptions import TransportError
from elasticsearch.exceptions import NotFoundError
from elasticsearch.client import IndicesClient from elasticsearch.client import IndicesClient
except ImportError as e: except ImportError as e:
print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`') print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`')
@ -80,7 +81,10 @@ def start_node(version, release_dir, data_dir):
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def install_plugin(version, release_dir, plugin_name): def install_plugin(version, release_dir, plugin_name):
run_plugin(version, release_dir, 'install', [plugin_name]) args = [plugin_name]
if parse_version(version) >= parse_version('2.2.0'):
args = [plugin_name, '--batch']
run_plugin(version, release_dir, 'install', args)
def remove_plugin(version, release_dir, plugin_name): def remove_plugin(version, release_dir, plugin_name):
run_plugin(version, release_dir, 'remove', [plugin_name]) run_plugin(version, release_dir, 'remove', [plugin_name])
@ -96,9 +100,8 @@ def create_client():
client = Elasticsearch([{'host': 'localhost', 'port': 9200, 'http_auth':'es_admin:0123456789'}]) client = Elasticsearch([{'host': 'localhost', 'port': 9200, 'http_auth':'es_admin:0123456789'}])
health = client.cluster.health(wait_for_nodes=1) health = client.cluster.health(wait_for_nodes=1)
return client return client
except Exception as e: except ConnectionError:
logging.info('got exception while waiting for cluster' + str(e)) logging.info('Not started yet...')
pass
time.sleep(1) time.sleep(1)
assert False, 'Timed out waiting for node for %s seconds' % timeout assert False, 'Timed out waiting for node for %s seconds' % timeout
@ -113,11 +116,17 @@ def generate_security_index(client, version):
"roles" : [ "bwc_test_role" ] "roles" : [ "bwc_test_role" ]
} }
response = requests.put('http://localhost:9200/_shield/user/bwc_test_user', auth=('es_admin', '0123456789'), data=json.dumps(body)) while True:
logging.info('put user reponse: ' + response.text) response = requests.put('http://localhost:9200/_shield/user/bwc_test_user', auth=('es_admin', '0123456789'), data=json.dumps(body))
if (response.status_code != 200) : logging.info('put user reponse: ' + response.text)
if response.status_code == 200:
break
else:
if 'service has not been started' in response.text:
continue
raise Exception('PUT http://localhost:9200/_shield/role/bwc_test_role did not succeed!') raise Exception('PUT http://localhost:9200/_shield/role/bwc_test_role did not succeed!')
# add a role # add a role
body = { body = {
"cluster": ["all"], "cluster": ["all"],
@ -154,6 +163,107 @@ def generate_security_index(client, version):
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.security') health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.security')
assert health['timed_out'] == False, 'cluster health timed out %s' % health assert health['timed_out'] == False, 'cluster health timed out %s' % health
# this adds a couple of watches and waits for the the watch_history to accumulate some results
def generate_watcher_index(client, version):
logging.info('Adding a watch')
body = {
"trigger" : {
"schedule": {
"interval": "1s"
}
},
"condition" : {
"always" : {}
},
"throttle_period": "1s",
"actions" : {
"index_payload" : {
"transform" : {
"search" : {
"request" : {
"body" : { "size": 1, "query" : { "match_all" : {} }}
}
}
},
"index" : {
"index" : "bwc_watch_index",
"doc_type" : "bwc_watch_type"
}
}
}
}
response = requests.put('http://localhost:9200/_watcher/watch/bwc_watch', auth=('es_admin', '0123456789'), data=json.dumps(body))
logging.info('PUT watch response: ' + response.text)
if (response.status_code != 201) :
raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_watch did not succeed!')
if parse_version(version) < parse_version('2.3.0'):
logging.info('Skipping watch with a funny read timeout because email attachement is not supported by this version')
else:
logging.info('Adding a watch with a funny read timeout')
body = {
"trigger" : {
"schedule": {
"interval": "100s"
}
},
"condition": {
"never": {}
},
"actions": {
"work": {
"email": {
"to": "email@domain.com",
"subject": "Test Kibana PDF report",
"attachments": {
"test_report.pdf": {
"http": {
"content_type": "application/pdf",
"request": {
"read_timeout": "100s",
"scheme": "https",
"host": "example.com",
"path":"{{ctx.metadata.report_url}}",
"port": 8443,
"auth": {
"basic": {
"username": "Aladdin",
"password": "open sesame"
}
}
}
}
}
}
}
}
}
}
response = requests.put('http://localhost:9200/_watcher/watch/bwc_funny_timeout', auth=('es_admin', '0123456789'), data=json.dumps(body))
logging.info('PUT watch response: ' + response.text)
if (response.status_code != 201) :
raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_funny_timeout did not succeed!')
# wait to accumulate some watches
logging.info('Waiting for watch results index to fill up...')
for attempt in range(1, 31):
try:
response = client.search(index="bwc_watch_index", body={"query": {"match_all": {}}})
logging.info('(' + str(attempt) + ') Got ' + str(response['hits']['total']) + ' hits and want 10...')
if response['hits']['total'] >= 10:
break
except NotFoundError:
logging.info('(' + str(attempt) + ') Not found, retrying')
time.sleep(1)
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.watches')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.watch_history*')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='bwc_watch_index')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
def compress_index(version, tmp_dir, output_dir): def compress_index(version, tmp_dir, output_dir):
compress(tmp_dir, output_dir, 'x-pack-%s.zip' % version, 'data') compress(tmp_dir, output_dir, 'x-pack-%s.zip' % version, 'data')
@ -232,50 +342,52 @@ def main():
logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('urllib3').setLevel(logging.WARN)
cfg = parse_config() cfg = parse_config()
for version in cfg.versions: for version in cfg.versions:
if parse_version(version) < parse_version('2.3.0'): logging.info('--> Creating x-pack index for %s' % version)
logging.info('version is ' + version + ' but shield supports native realm oly from 2.3.0 on. nothing to do.')
continue
else:
logging.info('--> Creating x-pack index for %s' % version)
# setup for starting nodes # setup for starting nodes
release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version) release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
if not os.path.exists(release_dir): if not os.path.exists(release_dir):
raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir)) raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir))
tmp_dir = tempfile.mkdtemp() tmp_dir = tempfile.mkdtemp()
data_dir = os.path.join(tmp_dir, 'data') data_dir = os.path.join(tmp_dir, 'data')
logging.info('Temp data dir: %s' % data_dir) logging.info('Temp data dir: %s' % data_dir)
node = None node = None
try: try:
# install plugins # install plugins
remove_plugin(version, release_dir, 'license') remove_plugin(version, release_dir, 'license')
remove_plugin(version, release_dir, 'shield') remove_plugin(version, release_dir, 'shield')
# remove the shield config too before fresh install remove_plugin(version, release_dir, 'watcher')
run('rm -rf %s' %(os.path.join(release_dir, 'config/shield'))) # remove the shield config too before fresh install
install_plugin(version, release_dir, 'license') run('rm -rf %s' %(os.path.join(release_dir, 'config/shield')))
install_plugin(version, release_dir, 'shield') install_plugin(version, release_dir, 'license')
# here we could also install watcher etc install_plugin(version, release_dir, 'shield')
install_plugin(version, release_dir, 'watcher')
# here we could also install watcher etc
# create admin # create admin
run('%s useradd es_admin -r admin -p 0123456789' %(os.path.join(release_dir, 'bin/shield/esusers'))) run('%s useradd es_admin -r admin -p 0123456789' %(os.path.join(release_dir, 'bin/shield/esusers')))
node = start_node(version, release_dir, data_dir) node = start_node(version, release_dir, data_dir)
# create a client that authenticates as es_admin # create a client that authenticates as es_admin
client = create_client() client = create_client()
if parse_version(version) < parse_version('2.3.0'):
logging.info('Version is ' + version + ' but shield supports native realm oly from 2.3.0 on. Nothing to do for Shield.')
else:
generate_security_index(client, version) generate_security_index(client, version)
# here we could also add watches, monitoring etc generate_watcher_index(client, version)
# here we could also add watches, monitoring etc
shutdown_node(node)
node = None
compress_index(version, tmp_dir, cfg.output_dir)
finally:
if node is not None:
# This only happens if we've hit an exception:
shutdown_node(node) shutdown_node(node)
node = None shutil.rmtree(tmp_dir)
compress_index(version, tmp_dir, cfg.output_dir)
finally:
if node is not None:
# This only happens if we've hit an exception:
shutdown_node(node)
shutil.rmtree(tmp_dir)
if __name__ == '__main__': if __name__ == '__main__':
try: try:

View File

@ -0,0 +1,159 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.VersionUtils;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static org.elasticsearch.test.OldIndexUtils.copyIndex;
import static org.elasticsearch.test.OldIndexUtils.loadDataFilesList;
/**
* Base class for tests against clusters coming from old versions of xpack and Elasticsearch.
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) // We'll start the nodes manually
public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase extends SecurityIntegTestCase {
protected List<String> dataFiles;
@Override
protected final boolean ignoreExternalCluster() {
return true;
}
@Before
public final void initIndexesList() throws Exception {
dataFiles = loadDataFilesList("x-pack", getBwcIndicesPath());
}
@Override
public Settings nodeSettings(int ord) {
// speed up recoveries
return Settings.builder()
.put(super.nodeSettings(ord))
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30)
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30)
.build();
}
@Override
protected int maxNumberOfNodes() {
try {
return SecurityIntegTestCase.defaultMaxNumberOfNodes() + loadDataFilesList("x-pack", getBwcIndicesPath()).size();
} catch (IOException e) {
throw new RuntimeException("couldn't enumerate bwc indices", e);
}
}
public void testAllVersionsTested() throws Exception {
SortedSet<String> expectedVersions = new TreeSet<>();
for (Version v : VersionUtils.allVersions()) {
if (false == shouldTestVersion(v)) continue;
if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself
if (v.isBeta() == true || v.isAlpha() == true || v.isRC() == true) continue; // don't check alphas etc
expectedVersions.add("x-pack-" + v.toString() + ".zip");
}
expectedVersions.removeAll(dataFiles);
if (expectedVersions.isEmpty() == false) {
StringBuilder msg = new StringBuilder("Old index tests are missing indexes:");
for (String expected : expectedVersions) {
msg.append("\n" + expected);
}
fail(msg.toString());
}
}
public void testOldIndexes() throws Exception {
Collections.shuffle(dataFiles, random());
for (String dataFile : dataFiles) {
Version version = Version.fromString(dataFile.replace("x-pack-", "").replace(".zip", ""));
if (false == shouldTestVersion(version)) continue;
setupCluster(dataFile);
ensureYellow();
long startTime = System.nanoTime();
try {
checkVersion(version);
} catch (Throwable t) {
throw new AssertionError("Failed while checking [" + version + "]", t);
}
logger.info("--> Done testing {}, took {} millis", version, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
}
/**
* Should we test this version at all? Called before loading the data directory. Return false to skip it entirely.
*/
protected boolean shouldTestVersion(Version version) {
return true;
}
/**
* Actually test this version.
*/
protected abstract void checkVersion(Version version) throws Exception;
private void setupCluster(String pathToZipFile) throws Exception {
// shutdown any nodes from previous zip files
while (internalCluster().size() > 0) {
internalCluster().stopRandomNode(s -> true);
}
// first create the data directory and unzip the data there
// we put the whole cluster state and indexes because if we only copy indexes and import them as dangling then
// the native realm services will start because there is no security index and nothing is recovering
// but we want them to not start!
Path dataPath = createTempDir();
Settings.Builder nodeSettings = Settings.builder()
.put("path.data", dataPath.toAbsolutePath());
// unzip data
Path backwardsIndex = getBwcIndicesPath().resolve(pathToZipFile);
// decompress the index
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
logger.info("unzipping {}", backwardsIndex.toString());
TestUtil.unzip(stream, dataPath);
// now we need to copy the whole thing so that it looks like an actual data path
try (Stream<Path> unzippedFiles = Files.list(dataPath.resolve("data"))) {
Path dataDir = unzippedFiles.findFirst().get();
// this is not actually an index but the copy does the job anyway
copyIndex(logger, dataDir.resolve("nodes"), "nodes", dataPath);
// remove the original unzipped directory
}
IOUtils.rm(dataPath.resolve("data"));
}
// check it is unique
assertTrue(Files.exists(dataPath));
Path[] list = FileSystemUtils.files(dataPath);
if (list.length != 1) {
throw new IllegalStateException("Backwards index must contain exactly one node");
}
// start the node
logger.info("--> Data path for importing node: {}", dataPath);
String importingNodeName = internalCluster().startNode(nodeSettings.build());
Path[] nodePaths = internalCluster().getInstance(NodeEnvironment.class, importingNodeName).nodeDataPaths();
assertEquals(1, nodePaths.length);
}
}

View File

@ -5,19 +5,9 @@
*/ */
package org.elasticsearch; package org.elasticsearch;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.security.action.role.GetRolesResponse; import org.elasticsearch.xpack.security.action.role.GetRolesResponse;
import org.elasticsearch.xpack.security.action.role.PutRoleResponse; import org.elasticsearch.xpack.security.action.role.PutRoleResponse;
import org.elasticsearch.xpack.security.action.user.GetUsersResponse; import org.elasticsearch.xpack.security.action.user.GetUsersResponse;
@ -28,26 +18,13 @@ import org.elasticsearch.xpack.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.security.authz.store.NativeRolesStore; import org.elasticsearch.xpack.security.authz.store.NativeRolesStore;
import org.elasticsearch.xpack.security.client.SecurityClient; import org.elasticsearch.xpack.security.client.SecurityClient;
import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.User;
import org.junit.AfterClass;
import org.junit.Before;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.OldIndexUtils.copyIndex; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.OldIndexUtils.loadIndexesList;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordTokenTests.basicAuthHeaderValue; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordTokenTests.basicAuthHeaderValue;
import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
/** /**
* Backwards compatibility test that loads some data from a pre-5.0 cluster and attempts to do some basic security stuff with it. It * Backwards compatibility test that loads some data from a pre-5.0 cluster and attempts to do some basic security stuff with it. It
@ -73,126 +50,13 @@ import static org.hamcrest.Matchers.equalTo;
* <li>This document in {@code index3}: {@code {"title": "bwc_test_user should not see this index"}}</li> * <li>This document in {@code index3}: {@code {"title": "bwc_test_user should not see this index"}}</li>
* </ul> * </ul>
**/ **/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) // We'll start the nodes manually public class OldSecurityIndexBackwardsCompatibilityIT extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
public class OldSecurityIndexBackwardsCompatibilityIT extends SecurityIntegTestCase {
List<String> indexes;
static String importingNodeName;
static Path dataPath;
@Override @Override
protected boolean ignoreExternalCluster() { protected boolean shouldTestVersion(Version version) {
return true; return version.onOrAfter(Version.V_2_3_0); // native realm only supported from 2.3.0 on
} }
@Before protected void checkVersion(Version version) throws Exception {
public void initIndexesList() throws Exception {
indexes = loadIndexesList("x-pack", getBwcIndicesPath());
}
@AfterClass
public static void tearDownStatics() {
importingNodeName = null;
dataPath = null;
}
@Override
public Settings nodeSettings(int ord) {
Settings settings = super.nodeSettings(ord);
// speed up recoveries
return Settings.builder()
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 30)
.put(ThrottlingAllocationDecider
.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 30)
.put(settings).build();
}
@Override
protected int maxNumberOfNodes() {
try {
return SecurityIntegTestCase.defaultMaxNumberOfNodes() + loadIndexesList("x-pack", getBwcIndicesPath()).size();
} catch (IOException e) {
throw new RuntimeException("couldn't enumerate bwc indices", e);
}
}
void setupCluster(String pathToZipFile) throws Exception {
// shutdown any nodes from previous zip files
while (internalCluster().size() > 0) {
internalCluster().stopRandomNode(s -> true);
}
// first create the data directory and unzip the data there
// we put the whole cluster state and indexes because if we only copy indexes and import them as dangling then
// the native realm services will start because there is no security index and nothing is recovering
// but we want them to not start!
dataPath = createTempDir();
Settings.Builder nodeSettings = Settings.builder()
.put("path.data", dataPath.toAbsolutePath());
// unzip data
Path backwardsIndex = getBwcIndicesPath().resolve(pathToZipFile);
// decompress the index
try (InputStream stream = Files.newInputStream(backwardsIndex)) {
logger.info("unzipping {}", backwardsIndex.toString());
TestUtil.unzip(stream, dataPath);
// now we need to copy the whole thing so that it looks like an actual data path
try (Stream<Path> unzippedFiles = Files.list(dataPath.resolve("data"))) {
Path dataDir = unzippedFiles.findFirst().get();
// this is not actually an index but the copy does the job anyway
copyIndex(logger, dataDir.resolve("nodes"), "nodes", dataPath);
// remove the original unzipped directory
}
IOUtils.rm(dataPath.resolve("data"));
}
// check it is unique
assertTrue(Files.exists(dataPath));
Path[] list = FileSystemUtils.files(dataPath);
if (list.length != 1) {
throw new IllegalStateException("Backwards index must contain exactly one node");
}
// start the node
logger.info("--> Data path for importing node: {}", dataPath);
importingNodeName = internalCluster().startNode(nodeSettings.build());
Path[] nodePaths = internalCluster().getInstance(NodeEnvironment.class, importingNodeName).nodeDataPaths();
assertEquals(1, nodePaths.length);
}
public void testAllVersionsTested() throws Exception {
SortedSet<String> expectedVersions = new TreeSet<>();
for (Version v : VersionUtils.allVersions()) {
if (v.before(Version.V_2_3_0)) continue; // native realm only supported from 2.3.0 on
if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself
if (v.isBeta() == true || v.isAlpha() == true || v.isRC() == true) continue; // don't check alphas etc
expectedVersions.add("x-pack-" + v.toString() + ".zip");
}
for (String index : indexes) {
if (expectedVersions.remove(index) == false) {
logger.warn("Old indexes tests contain extra index: {}", index);
}
}
if (expectedVersions.isEmpty() == false) {
StringBuilder msg = new StringBuilder("Old index tests are missing indexes:");
for (String expected : expectedVersions) {
msg.append("\n" + expected);
}
fail(msg.toString());
}
}
public void testOldIndexes() throws Exception {
Collections.shuffle(indexes, random());
for (String index : indexes) {
setupCluster(index);
ensureYellow();
long startTime = System.nanoTime();
assertBasicSecurityWorks();
logger.info("--> Done testing {}, took {} millis", index, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
}
void assertBasicSecurityWorks() throws Exception {
// test that user and roles are there // test that user and roles are there
logger.info("Getting roles..."); logger.info("Getting roles...");
SecurityClient securityClient = new SecurityClient(client()); SecurityClient securityClient = new SecurityClient(client());
@ -225,7 +89,7 @@ public class OldSecurityIndexBackwardsCompatibilityIT extends SecurityIntegTestC
assertEquals("bwc_test_user", user.principal()); assertEquals("bwc_test_user", user.principal());
// check that documents are there // check that documents are there
assertThat(client().prepareSearch().get().getHits().getTotalHits(), equalTo(5L)); assertHitCount(client().prepareSearch("index1", "index2", "index3").get(), 5);
Client bwcTestUserClient = client().filterWithHeader( Client bwcTestUserClient = client().filterWithHeader(
singletonMap(UsernamePasswordToken.BASIC_AUTH_HEADER, basicAuthHeaderValue("bwc_test_user", "9876543210"))); singletonMap(UsernamePasswordToken.BASIC_AUTH_HEADER, basicAuthHeaderValue("bwc_test_user", "9876543210")));

View File

@ -10,7 +10,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -18,10 +17,10 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry; import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.watcher.support.WatcherUtils; import org.elasticsearch.xpack.watcher.support.WatcherUtils;
import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -159,10 +158,12 @@ public class HttpRequest implements ToXContent {
builder.field(Field.BODY.getPreferredName(), body); builder.field(Field.BODY.getPreferredName(), body);
} }
if (connectionTimeout != null) { if (connectionTimeout != null) {
builder.field(Field.CONNECTION_TIMEOUT.getPreferredName(), connectionTimeout); builder.timeValueField(HttpRequest.Field.CONNECTION_TIMEOUT.getPreferredName(),
HttpRequest.Field.CONNECTION_TIMEOUT_HUMAN.getPreferredName(), connectionTimeout);
} }
if (readTimeout != null) { if (readTimeout != null) {
builder.field(Field.READ_TIMEOUT.getPreferredName(), readTimeout); builder.timeValueField(HttpRequest.Field.READ_TIMEOUT.getPreferredName(),
HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout);
} }
if (proxy != null) { if (proxy != null) {
builder.field(Field.PROXY.getPreferredName(), proxy); builder.field(Field.PROXY.getPreferredName(), proxy);
@ -269,19 +270,26 @@ public class HttpRequest implements ToXContent {
} }
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.AUTH)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.AUTH)) {
builder.auth(httpAuthRegistry.parse(parser)); builder.auth(httpAuthRegistry.parse(parser));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.CONNECTION_TIMEOUT)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.CONNECTION_TIMEOUT)) {
builder.connectionTimeout(TimeValue.timeValueMillis(parser.longValue()));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.CONNECTION_TIMEOUT_HUMAN)) {
// Users and 2.x specify the timeout this way
try { try {
builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser, Field.CONNECTION_TIMEOUT.toString())); builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser,
HttpRequest.Field.CONNECTION_TIMEOUT.toString()));
} catch (ElasticsearchParseException pe) { } catch (ElasticsearchParseException pe) {
throw new ElasticsearchParseException("could not parse http request. invalid time value for [{}] field", pe, throw new ElasticsearchParseException("could not parse http request template. invalid time value for [{}] field",
currentFieldName); pe, currentFieldName);
} }
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.READ_TIMEOUT)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.READ_TIMEOUT)) {
builder.readTimeout(TimeValue.timeValueMillis(parser.longValue()));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.READ_TIMEOUT_HUMAN)) {
// Users and 2.x specify the timeout this way
try { try {
builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, Field.READ_TIMEOUT.toString())); builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, HttpRequest.Field.READ_TIMEOUT.toString()));
} catch (ElasticsearchParseException pe) { } catch (ElasticsearchParseException pe) {
throw new ElasticsearchParseException("could not parse http request. invalid time value for [{}] field", pe, throw new ElasticsearchParseException("could not parse http request template. invalid time value for [{}] field",
currentFieldName); pe, currentFieldName);
} }
} else if (token == XContentParser.Token.START_OBJECT) { } else if (token == XContentParser.Token.START_OBJECT) {
if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.HEADERS)) { if (ParseFieldMatcher.STRICT.match(currentFieldName, Field.HEADERS)) {
@ -482,8 +490,10 @@ public class HttpRequest implements ToXContent {
ParseField HEADERS = new ParseField("headers"); ParseField HEADERS = new ParseField("headers");
ParseField AUTH = new ParseField("auth"); ParseField AUTH = new ParseField("auth");
ParseField BODY = new ParseField("body"); ParseField BODY = new ParseField("body");
ParseField CONNECTION_TIMEOUT = new ParseField("connection_timeout"); ParseField CONNECTION_TIMEOUT = new ParseField("connection_timeout_in_millis");
ParseField READ_TIMEOUT = new ParseField("read_timeout"); ParseField CONNECTION_TIMEOUT_HUMAN = new ParseField("connection_timeout");
ParseField READ_TIMEOUT = new ParseField("read_timeout_millis");
ParseField READ_TIMEOUT_HUMAN = new ParseField("read_timeout");
ParseField PROXY = new ParseField("proxy"); ParseField PROXY = new ParseField("proxy");
ParseField URL = new ParseField("url"); ParseField URL = new ParseField("url");
} }

View File

@ -17,9 +17,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.xpack.common.http.auth.HttpAuth; import org.elasticsearch.xpack.common.http.auth.HttpAuth;
import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry; import org.elasticsearch.xpack.common.http.auth.HttpAuthRegistry;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.common.text.TextTemplateEngine; import org.elasticsearch.xpack.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils; import org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpHeaders;
import java.io.IOException; import java.io.IOException;
@ -32,8 +32,6 @@ import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
/**
*/
public class HttpRequestTemplate implements ToXContent { public class HttpRequestTemplate implements ToXContent {
private final Scheme scheme; private final Scheme scheme;
@ -193,10 +191,12 @@ public class HttpRequestTemplate implements ToXContent {
builder.field(HttpRequest.Field.BODY.getPreferredName(), body, params); builder.field(HttpRequest.Field.BODY.getPreferredName(), body, params);
} }
if (connectionTimeout != null) { if (connectionTimeout != null) {
builder.field(HttpRequest.Field.CONNECTION_TIMEOUT.getPreferredName(), connectionTimeout); builder.timeValueField(HttpRequest.Field.CONNECTION_TIMEOUT.getPreferredName(),
HttpRequest.Field.CONNECTION_TIMEOUT_HUMAN.getPreferredName(), connectionTimeout);
} }
if (readTimeout != null) { if (readTimeout != null) {
builder.field(HttpRequest.Field.READ_TIMEOUT.getPreferredName(), readTimeout); builder.timeValueField(HttpRequest.Field.READ_TIMEOUT.getPreferredName(),
HttpRequest.Field.READ_TIMEOUT_HUMAN.getPreferredName(), readTimeout);
} }
if (proxy != null) { if (proxy != null) {
proxy.toXContent(builder, params); proxy.toXContent(builder, params);
@ -242,6 +242,11 @@ public class HttpRequestTemplate implements ToXContent {
return result; return result;
} }
@Override
public String toString() {
return Strings.toString(this);
}
public static Builder builder(String host, int port) { public static Builder builder(String host, int port) {
return new Builder(host, port); return new Builder(host, port);
} }
@ -280,6 +285,9 @@ public class HttpRequestTemplate implements ToXContent {
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.URL)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.URL)) {
builder.fromUrl(parser.text()); builder.fromUrl(parser.text());
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.CONNECTION_TIMEOUT)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.CONNECTION_TIMEOUT)) {
builder.connectionTimeout(TimeValue.timeValueMillis(parser.longValue()));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.CONNECTION_TIMEOUT_HUMAN)) {
// Users and 2.x specify the timeout this way
try { try {
builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser, builder.connectionTimeout(WatcherDateTimeUtils.parseTimeValue(parser,
HttpRequest.Field.CONNECTION_TIMEOUT.toString())); HttpRequest.Field.CONNECTION_TIMEOUT.toString()));
@ -288,6 +296,9 @@ public class HttpRequestTemplate implements ToXContent {
pe, currentFieldName); pe, currentFieldName);
} }
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.READ_TIMEOUT)) { } else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.READ_TIMEOUT)) {
builder.readTimeout(TimeValue.timeValueMillis(parser.longValue()));
} else if (ParseFieldMatcher.STRICT.match(currentFieldName, HttpRequest.Field.READ_TIMEOUT_HUMAN)) {
// Users and 2.x specify the timeout this way
try { try {
builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, HttpRequest.Field.READ_TIMEOUT.toString())); builder.readTimeout(WatcherDateTimeUtils.parseTimeValue(parser, HttpRequest.Field.READ_TIMEOUT.toString()));
} catch (ElasticsearchParseException pe) { } catch (ElasticsearchParseException pe) {

View File

@ -31,9 +31,6 @@ import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
/**
*
*/
public class HttpRequestTemplateTests extends ESTestCase { public class HttpRequestTemplateTests extends ESTestCase {
public void testBodyWithXContent() throws Exception { public void testBodyWithXContent() throws Exception {
@ -122,11 +119,11 @@ public class HttpRequestTemplateTests extends ESTestCase {
if (randomBoolean()) { if (randomBoolean()) {
builder.putHeader("_key", TextTemplate.inline("_value")); builder.putHeader("_key", TextTemplate.inline("_value"));
} }
long connectionTimeout = randomBoolean() ? 0 : randomIntBetween(5, 10); long connectionTimeout = randomBoolean() ? 0 : randomIntBetween(5, 100000);
if (connectionTimeout > 0) { if (connectionTimeout > 0) {
builder.connectionTimeout(TimeValue.timeValueSeconds(connectionTimeout)); builder.connectionTimeout(TimeValue.timeValueSeconds(connectionTimeout));
} }
long readTimeout = randomBoolean() ? 0 : randomIntBetween(5, 10); long readTimeout = randomBoolean() ? 0 : randomIntBetween(5, 100000);
if (readTimeout > 0) { if (readTimeout > 0) {
builder.readTimeout(TimeValue.timeValueSeconds(readTimeout)); builder.readTimeout(TimeValue.timeValueSeconds(readTimeout));
} }
@ -146,7 +143,7 @@ public class HttpRequestTemplateTests extends ESTestCase {
xContentParser.nextToken(); xContentParser.nextToken();
HttpRequestTemplate parsed = parser.parse(xContentParser); HttpRequestTemplate parsed = parser.parse(xContentParser);
assertThat(parsed, equalTo(template)); assertEquals(template, parsed);
} }
public void testParsingFromUrl() throws Exception { public void testParsingFromUrl() throws Exception {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.watcher.support; package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.DateMathParser; import org.elasticsearch.common.joda.DateMathParser;
@ -19,7 +20,10 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/** /**
* *
*/ */
@ -140,7 +144,7 @@ public class WatcherDateTimeUtils {
} }
if (token == XContentParser.Token.VALUE_STRING) { if (token == XContentParser.Token.VALUE_STRING) {
try { try {
TimeValue value = TimeValue.parseTimeValue(parser.text(), null, settingName); TimeValue value = parseTimeValueSupportingFractional(parser.text(), settingName);
if (value.millis() < 0) { if (value.millis() < 0) {
throw new ElasticsearchParseException("could not parse time value [{}]. Time value cannot be negative.", parser.text()); throw new ElasticsearchParseException("could not parse time value [{}]. Time value cannot be negative.", parser.text());
} }
@ -154,6 +158,47 @@ public class WatcherDateTimeUtils {
"instead", token); "instead", token);
} }
/**
* Parse a {@link TimeValue} with support for fractional values.
*/
public static TimeValue parseTimeValueSupportingFractional(@Nullable String sValue, String settingName) {
// This code is lifted almost straight from 2.x's TimeValue.java
Objects.requireNonNull(settingName);
if (sValue == null) {
return null;
}
try {
long millis;
String lowerSValue = sValue.toLowerCase(Locale.ROOT).trim();
if (lowerSValue.endsWith("ms")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 2)));
} else if (lowerSValue.endsWith("s")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 1000);
} else if (lowerSValue.endsWith("m")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 60 * 1000);
} else if (lowerSValue.endsWith("h")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 60 * 60 * 1000);
} else if (lowerSValue.endsWith("d")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 24 * 60 * 60 * 1000);
} else if (lowerSValue.endsWith("w")) {
millis = (long) (Double.parseDouble(lowerSValue.substring(0, lowerSValue.length() - 1)) * 7 * 24 * 60 * 60 * 1000);
} else if (lowerSValue.equals("-1")) {
// Allow this special value to be unit-less:
millis = -1;
} else if (lowerSValue.equals("0")) {
// Allow this special value to be unit-less:
millis = 0;
} else {
throw new ElasticsearchParseException(
"Failed to parse setting [{}] with value [{}] as a time value: unit is missing or unrecognized",
settingName, sValue);
}
return new TimeValue(millis, TimeUnit.MILLISECONDS);
} catch (NumberFormatException e) {
throw new ElasticsearchParseException("Failed to parse [{}]", e, sValue);
}
}
private static class ClockNowCallable implements Callable<Long> { private static class ClockNowCallable implements Callable<Long> {
private final Clock clock; private final Clock clock;

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.support; package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
@ -180,10 +181,19 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
.build(); .build();
request.settings(updatedSettings); request.settings(updatedSettings);
} }
PutIndexTemplateResponse response = client.putTemplate(request); client.putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
if (response.isAcknowledged() == false) { @Override
logger.error("Error adding watcher template [{}], request was not acknowledged", config.getTemplateName()); public void onResponse(PutIndexTemplateResponse response) {
} if (response.isAcknowledged() == false) {
logger.error("Error adding watcher template [{}], request was not acknowledged", config.getTemplateName());
}
}
@Override
public void onFailure(Exception e) {
logger.error("Error adding watcher template [{}]", e, config.getTemplateName());
}
});
}); });
} }

View File

@ -24,11 +24,10 @@ import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.common.init.proxy.ClientProxy; import org.elasticsearch.xpack.common.init.proxy.ClientProxy;
import org.elasticsearch.xpack.security.InternalClient;
/** /**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client * A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
@ -107,8 +106,8 @@ public class WatcherClientProxy extends ClientProxy {
return client.admin().indices().refresh(preProcess(request)).actionGet(defaultSearchTimeout); return client.admin().indices().refresh(preProcess(request)).actionGet(defaultSearchTimeout);
} }
public PutIndexTemplateResponse putTemplate(PutIndexTemplateRequest request) { public void putTemplate(PutIndexTemplateRequest request, ActionListener<PutIndexTemplateResponse> listener) {
preProcess(request); preProcess(request);
return client.admin().indices().putTemplate(request).actionGet(defaultIndexTimeout); client.admin().indices().putTemplate(request, listener);
} }
} }

View File

@ -0,0 +1,164 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher;
import org.elasticsearch.AbstractOldXPackIndicesBackwardsCompatibilityTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.actions.logging.LoggingAction;
import org.elasticsearch.xpack.watcher.client.WatchSourceBuilder;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.always.AlwaysCondition;
import org.elasticsearch.xpack.watcher.transport.actions.get.GetWatchResponse;
import org.elasticsearch.xpack.watcher.transport.actions.put.PutWatchResponse;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule;
import org.elasticsearch.xpack.watcher.trigger.schedule.IntervalSchedule.Interval;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
/**
* Tests for watcher indexes created before 5.0.
*/
@TestLogging("_root:INFO")
public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackIndicesBackwardsCompatibilityTestCase {
@Override
public Settings nodeSettings(int ord) {
return Settings.builder()
.put(super.nodeSettings(ord))
.put(XPackSettings.WATCHER_ENABLED.getKey(), true)
.build();
}
public void testAllVersionsTested() throws Exception {
SortedSet<String> expectedVersions = new TreeSet<>();
for (Version v : VersionUtils.allVersions()) {
if (v.before(Version.V_2_0_0)) continue; // unsupported indexes
if (v.equals(Version.CURRENT)) continue; // the current version is always compatible with itself
if (v.isBeta() == true || v.isAlpha() == true || v.isRC() == true) continue; // don't check alphas etc
expectedVersions.add("x-pack-" + v.toString() + ".zip");
}
for (String index : dataFiles) {
if (expectedVersions.remove(index) == false) {
logger.warn("Old indexes tests contain extra index: {}", index);
}
}
if (expectedVersions.isEmpty() == false) {
StringBuilder msg = new StringBuilder("Old index tests are missing indexes:");
for (String expected : expectedVersions) {
msg.append("\n" + expected);
}
fail(msg.toString());
}
}
@Override
public void testOldIndexes() throws Exception {
super.testOldIndexes();
// Wait for watcher to fully start before shutting down
assertBusy(() -> {
assertEquals(WatcherState.STARTED, internalCluster().getInstance(WatcherService.class).state());
});
// Shutdown watcher on the last node so that the test can shutdown cleanly
internalCluster().getInstance(WatcherLifeCycleService.class).stop();
}
@Override
protected void checkVersion(Version version) throws Exception {
// Wait for watcher to actually start....
assertBusy(() -> {
assertEquals(WatcherState.STARTED, internalCluster().getInstance(WatcherService.class).state());
});
assertWatchIndexContentsWork(version);
assertBasicWatchInteractions();
}
void assertWatchIndexContentsWork(Version version) throws Exception {
WatcherClient watcherClient = new WatcherClient(client());
// Fetch a basic watch
GetWatchResponse bwcWatch = watcherClient.prepareGetWatch("bwc_watch").get();
assertTrue(bwcWatch.isFound());
assertNotNull(bwcWatch.getSource());
Map<String, Object> source = bwcWatch.getSource().getAsMap();
Map<?, ?> actions = (Map<?, ?>) source.get("actions");
Map<?, ?> indexPayload = (Map<?, ?>) actions.get("index_payload");
Map<?, ?> index = (Map<?, ?>) indexPayload.get("index");
assertEquals("bwc_watch_index", index.get("index"));
assertEquals("bwc_watch_type", index.get("doc_type"));
if (version.onOrAfter(Version.V_2_3_0)) {
/* Fetch a watch with a funny timeout to verify loading fractional time values. This watch is only built in >= 2.3 because
* email attachments aren't supported before that. */
bwcWatch = watcherClient.prepareGetWatch("bwc_funny_timeout").get();
assertTrue(bwcWatch.isFound());
assertNotNull(bwcWatch.getSource());
source = bwcWatch.getSource().getAsMap();
actions = (Map<?, ?>) source.get("actions");
Map<?, ?> work = (Map<?, ?>) actions.get("work");
Map<?, ?> email = (Map<?, ?>) work.get("email");
Map<?, ?> attachments = (Map<?, ?>) email.get("attachments");
Map<?, ?> attachment = (Map<?, ?>) attachments.get("test_report.pdf");
Map<?, ?> http = (Map<?, ?>) attachment.get("http");
Map<?, ?> request = (Map<?, ?>) http.get("request");
assertEquals(96000, request.get("read_timeout_millis"));
assertEquals("https", request.get("scheme"));
assertEquals("example.com", request.get("host"));
assertEquals("{{ctx.metadata.report_url}}", request.get("path"));
assertEquals(8443, request.get("port"));
Map<?, ?> auth = (Map<?, ?>) request.get("auth");
Map<?, ?> basic = (Map<?, ?>) auth.get("basic");
assertThat(basic, hasEntry("username", "Aladdin"));
// password doesn't come back because it is hidden
assertThat(basic, not(hasKey("password")));
}
SearchResponse history = client().prepareSearch(".watch_history*").get();
assertThat(history.getHits().totalHits(), greaterThanOrEqualTo(10L));
}
void assertBasicWatchInteractions() throws Exception {
WatcherClient watcherClient = new WatcherClient(client());
PutWatchResponse put = watcherClient.preparePutWatch("new_watch").setSource(new WatchSourceBuilder()
.condition(AlwaysCondition.INSTANCE)
.trigger(ScheduleTrigger.builder(new IntervalSchedule(Interval.seconds(1))))
.addAction("awesome", LoggingAction.builder(new TextTemplate("test")))).get();
assertTrue(put.isCreated());
assertEquals(1, put.getVersion());
put = watcherClient.preparePutWatch("new_watch").setSource(new WatchSourceBuilder()
.condition(AlwaysCondition.INSTANCE)
.trigger(ScheduleTrigger.builder(new IntervalSchedule(Interval.seconds(1))))
.addAction("awesome", LoggingAction.builder(new TextTemplate("test")))).get();
assertFalse(put.isCreated());
assertEquals(2, put.getVersion());
GetWatchResponse get = watcherClient.prepareGetWatch(put.getId()).get();
assertTrue(get.isFound());
{
Map<?, ?> source = get.getSource().getAsMap();
Map<?, ?> actions = (Map<?, ?>) source.get("actions");
Map<?, ?> awesome = (Map<?, ?>) actions.get("awesome");
Map<?, ?> logging = (Map<?, ?>) awesome.get("logging");
assertEquals("info", logging.get("level"));
assertEquals("test", logging.get("text"));
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
@ -22,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.watcher.support.WatcherDateTimeUtils.parseTimeValueSupportingFractional;
import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.xContentParser; import static org.elasticsearch.xpack.watcher.test.WatcherTestUtils.xContentParser;
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
@ -122,4 +124,56 @@ public class WatcherDateTimeUtilsTests extends ESTestCase {
TimeValue parsed = WatcherDateTimeUtils.parseTimeValue(parser, "test"); TimeValue parsed = WatcherDateTimeUtils.parseTimeValue(parser, "test");
assertThat(parsed, nullValue()); assertThat(parsed, nullValue());
} }
public void testParseTimeValueWithFractional() {
assertEquals("This function exists so 5.x can be compatible with 2.x indices. It should be removed with 6.x", 5,
Version.CURRENT.major);
// This code is lifted strait from 2.x's TimeValueTests.java
assertEquals(new TimeValue(10, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("10 ms", "test"));
assertEquals(new TimeValue(10, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("10ms", "test"));
assertEquals(new TimeValue(10, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("10 MS", "test"));
assertEquals(new TimeValue(10, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("10MS", "test"));
assertEquals(new TimeValue(10, TimeUnit.SECONDS), parseTimeValueSupportingFractional("10 s", "test"));
assertEquals(new TimeValue(10, TimeUnit.SECONDS), parseTimeValueSupportingFractional("10s", "test"));
assertEquals(new TimeValue(10, TimeUnit.SECONDS), parseTimeValueSupportingFractional("10 S", "test"));
assertEquals(new TimeValue(10, TimeUnit.SECONDS), parseTimeValueSupportingFractional("10S", "test"));
assertEquals(new TimeValue(100, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("0.1s", "test"));
assertEquals(new TimeValue(10, TimeUnit.MINUTES), parseTimeValueSupportingFractional("10 m", "test"));
assertEquals(new TimeValue(10, TimeUnit.MINUTES), parseTimeValueSupportingFractional("10m", "test"));
assertEquals(new TimeValue(10, TimeUnit.MINUTES), parseTimeValueSupportingFractional("10 M", "test"));
assertEquals(new TimeValue(10, TimeUnit.MINUTES), parseTimeValueSupportingFractional("10M", "test"));
assertEquals(new TimeValue(10, TimeUnit.HOURS), parseTimeValueSupportingFractional("10 h", "test"));
assertEquals(new TimeValue(10, TimeUnit.HOURS), parseTimeValueSupportingFractional("10h", "test"));
assertEquals(new TimeValue(10, TimeUnit.HOURS), parseTimeValueSupportingFractional("10 H", "test"));
assertEquals(new TimeValue(10, TimeUnit.HOURS), parseTimeValueSupportingFractional("10H", "test"));
assertEquals(new TimeValue(10, TimeUnit.DAYS), parseTimeValueSupportingFractional("10 d", "test"));
assertEquals(new TimeValue(10, TimeUnit.DAYS), parseTimeValueSupportingFractional("10d", "test"));
assertEquals(new TimeValue(10, TimeUnit.DAYS), parseTimeValueSupportingFractional("10 D", "test"));
assertEquals(new TimeValue(10, TimeUnit.DAYS), parseTimeValueSupportingFractional("10D", "test"));
assertEquals(new TimeValue(70, TimeUnit.DAYS), parseTimeValueSupportingFractional("10 w", "test"));
assertEquals(new TimeValue(70, TimeUnit.DAYS), parseTimeValueSupportingFractional("10w", "test"));
assertEquals(new TimeValue(70, TimeUnit.DAYS), parseTimeValueSupportingFractional("10 W", "test"));
assertEquals(new TimeValue(70, TimeUnit.DAYS), parseTimeValueSupportingFractional("10W", "test"));
// Extra fractional tests just because that is the point
assertEquals(new TimeValue(100, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("0.1s", "test"));
assertEquals(new TimeValue(6, TimeUnit.SECONDS), parseTimeValueSupportingFractional("0.1m", "test"));
assertEquals(new TimeValue(6, TimeUnit.MINUTES), parseTimeValueSupportingFractional("0.1h", "test"));
assertEquals(new TimeValue(144, TimeUnit.MINUTES), parseTimeValueSupportingFractional("0.1d", "test"));
assertEquals(new TimeValue(1008, TimeUnit.MINUTES), parseTimeValueSupportingFractional("0.1w", "test"));
// And some crazy fractions just for fun
assertEquals(new TimeValue(1700, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("1.7s", "test"));
assertEquals(new TimeValue(162, TimeUnit.SECONDS), parseTimeValueSupportingFractional("2.7m", "test"));
assertEquals(new TimeValue(5988, TimeUnit.MINUTES), parseTimeValueSupportingFractional("99.8h", "test"));
assertEquals(new TimeValue(1057968, TimeUnit.SECONDS), parseTimeValueSupportingFractional("12.245d", "test"));
assertEquals(new TimeValue(7258204799L, TimeUnit.MILLISECONDS), parseTimeValueSupportingFractional("12.001w", "test"));
}
} }