Fix BWC index generation and tests for 5.0.0

Fixes the create_bwc_indexes script to build the bwc indices for
either 5.0.0 or 2.x.y.

Closes elastic/elasticsearch#3908

Original commit: elastic/x-pack-elasticsearch@f857647bb3
This commit is contained in:
Nik Everett 2016-11-01 20:03:19 -04:00
parent 04969bd0cd
commit e63580459c
6 changed files with 113 additions and 68 deletions

View File

@ -68,14 +68,18 @@ def start_node(version, release_dir, data_dir):
logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, DEFAULT_TRANSPORT_TCP_PORT logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, DEFAULT_TRANSPORT_TCP_PORT
, DEFAULT_HTTP_TCP_PORT, data_dir)) , DEFAULT_HTTP_TCP_PORT, data_dir))
cluster_name = 'bwc_index_' + version cluster_name = 'bwc_index_' + version
if parse_version(version) < parse_version("5.0.0-alpha1"):
prefix = '-Des.'
else:
prefix = '-E'
cmd = [ cmd = [
os.path.join(release_dir, 'bin/elasticsearch'), os.path.join(release_dir, 'bin/elasticsearch'),
'-Des.path.data=%s' %(data_dir), '%spath.data=%s' % (prefix, data_dir),
'-Des.path.logs=logs', '%spath.logs=logs' % prefix,
'-Des.cluster.name=%s' %(cluster_name), '%scluster.name=%s' % (prefix, cluster_name),
'-Des.network.host=localhost', '%snetwork.host=localhost' % prefix,
'-Des.transport.tcp.port=%s' %(DEFAULT_TRANSPORT_TCP_PORT), # not sure if we need to customize ports '%stransport.tcp.port=%s' % (prefix, DEFAULT_TRANSPORT_TCP_PORT), # not sure if we need to customize ports
'-Des.http.port=%s' %(DEFAULT_HTTP_TCP_PORT) '%shttp.port=%s' % (prefix, DEFAULT_HTTP_TCP_PORT)
] ]
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@ -87,10 +91,17 @@ def install_plugin(version, release_dir, plugin_name):
run_plugin(version, release_dir, 'install', args) run_plugin(version, release_dir, 'install', args)
def remove_plugin(version, release_dir, plugin_name): def remove_plugin(version, release_dir, plugin_name):
run_plugin(version, release_dir, 'remove', [plugin_name]) # 5.0 doesn't like trying to remove a plugin that isn't installed so we
# shouldn't try.
if os.path.exists(os.path.join(release_dir, 'plugins', plugin_name)):
run_plugin(version, release_dir, 'remove', [plugin_name])
def run_plugin(version, release_dir, plugin_cmd, args): def run_plugin(version, release_dir, plugin_cmd, args):
cmd = [os.path.join(release_dir, 'bin/plugin'), plugin_cmd] + args if parse_version(version) < parse_version('5.0.0'):
script = 'bin/plugin'
else:
script = 'bin/elasticsearch-plugin'
cmd = [os.path.join(release_dir, script), plugin_cmd] + args
subprocess.check_call(cmd) subprocess.check_call(cmd)
def create_client(): def create_client():
@ -105,6 +116,18 @@ def create_client():
time.sleep(1) time.sleep(1)
assert False, 'Timed out waiting for node for %s seconds' % timeout assert False, 'Timed out waiting for node for %s seconds' % timeout
def wait_for_yellow(version, client, index):
logging.info('Waiting for %s to be yellow' % index)
# The health call below uses `params` because it the 5.x client doesn't
# support wait_for_relocating_shards and the 2.x client doesn't support
# wait_for_relocating_shards and we'd like to use the same client for both
# versions.
if parse_version(version) < parse_version('5.0.0'):
health = client.cluster.health(wait_for_status='yellow', index=index, params={'wait_for_relocating_shards':0})
else:
health = client.cluster.health(wait_for_status='yellow', index=index, params={'wait_for_no_relocating_shards':'true'})
assert health['timed_out'] == False, 'cluster health timed out %s' % health
# this adds a user bwc_test_role/9876543210, a role bwc_test_role and some documents the user has or has not access to # this adds a user bwc_test_role/9876543210, a role bwc_test_role and some documents the user has or has not access to
def generate_security_index(client, version): def generate_security_index(client, version):
@ -134,12 +157,15 @@ def generate_security_index(client, version):
{ {
"names": [ "index1", "index2" ], "names": [ "index1", "index2" ],
"privileges": ["all"], "privileges": ["all"],
"fields": [ "title", "body" ],
"query": "{\"match\": {\"title\": \"foo\"}}" "query": "{\"match\": {\"title\": \"foo\"}}"
} }
], ],
"run_as": [ "other_user" ] "run_as": [ "other_user" ]
} }
if parse_version(version) < parse_version('5.0.0'):
body['indices'][0]['fields'] = [ "title", "body" ]
else:
body['indices'][0]['field_security'] = { "grant": [ "title", "body" ] }
# order of params in put role request is important, see https://github.com/elastic/x-plugins/issues/2606 # order of params in put role request is important, see https://github.com/elastic/x-plugins/issues/2606
response = requests.put('http://localhost:9200/_shield/role/bwc_test_role', auth=('es_admin', '0123456789') response = requests.put('http://localhost:9200/_shield/role/bwc_test_role', auth=('es_admin', '0123456789')
, data=json.dumps(body, sort_keys=True)) , data=json.dumps(body, sort_keys=True))
@ -159,9 +185,7 @@ def generate_security_index(client, version):
client.index(index="index3", doc_type="doc", body={"title": "bwc_test_user should not see this index"}) client.index(index="index3", doc_type="doc", body={"title": "bwc_test_user should not see this index"})
logging.info('Waiting for yellow') wait_for_yellow(version, client, '.security')
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.security')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
# this adds a couple of watches and waits for the the watch_history to accumulate some results # this adds a couple of watches and waits for the the watch_history to accumulate some results
def generate_watcher_index(client, version): def generate_watcher_index(client, version):
@ -294,30 +318,36 @@ def generate_watcher_index(client, version):
# wait to accumulate some watches # wait to accumulate some watches
logging.info('Waiting for watch results index to fill up...') logging.info('Waiting for watch results index to fill up...')
wait_for_search(10, lambda: client.search(index="bwc_watch_index", body={"query": {"match_all": {}}})) wait_for_search(10, lambda: client.search(index="bwc_watch_index", body={"query": {"match_all": {}}}))
if parse_version(version) < parse_version('5.0.0'):
watcher_history_name = ".watch_history*"
else:
watcher_history_name = ".watcher-history*"
wait_for_search(10, lambda: client.search(index=watcher_history_name, body={"query": {"match_all": {}}}))
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.watches') wait_for_yellow(version, client, '.watches')
assert health['timed_out'] == False, 'cluster health timed out %s' % health wait_for_yellow(version, client, watcher_history_name)
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='.watch_history*') wait_for_yellow(version, client, 'bwc_watch_index')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
health = client.cluster.health(wait_for_status='yellow', wait_for_relocating_shards=0, index='bwc_watch_index')
assert health['timed_out'] == False, 'cluster health timed out %s' % health
def wait_for_monitoring_index_to_fill(client, version): def wait_for_monitoring_index_to_fill(client, version):
logging.info('Waiting for marvel to index the cluster_info...') if parse_version(version) < parse_version('5.0.0'):
wait_for_search(1, lambda: client.search(index=".marvel-es-*", doc_type="cluster_info", body={"query": {"match_all": {}}})) monitoring_name = '.marvel-*'
else:
monitoring_name = '.monitoring-*'
def wait_for_monitoring_to_index(doc_type, count):
logging.info('Waiting for %s to have cout(%s) = %s...' % (monitoring_name, doc_type, count))
wait_for_search(count, lambda:
client.search(index=monitoring_name, doc_type=doc_type, body={"query": {"match_all": {}}}))
wait_for_monitoring_to_index('cluster_info', 1)
if parse_version(version) >= parse_version('2.1.0'): if parse_version(version) >= parse_version('2.1.0'):
logging.info('Waiting for marvel to index the node information...') wait_for_monitoring_to_index('node', 1)
wait_for_search(1, lambda: client.search(index=".marvel-es-*", doc_type="node", body={"query": {"match_all": {}}})) wait_for_monitoring_to_index('index_stats', 10)
logging.info('Waiting for marvel index to get enough index_stats...') wait_for_monitoring_to_index('shards', 10)
wait_for_search(10, lambda: client.search(index=".marvel-es-*", doc_type="index_stats", body={"query": {"match_all": {}}})) wait_for_monitoring_to_index('indices_stats', 3)
logging.info('Waiting for marvel index to get enough shards...') wait_for_monitoring_to_index('node_stats', 3)
wait_for_search(10, lambda: client.search(index=".marvel-es-*", doc_type="shards", body={"query": {"match_all": {}}})) wait_for_monitoring_to_index('cluster_state', 3)
logging.info('Waiting for marvel index to get enough indices_stats...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="indices_stats", body={"query": {"match_all": {}}})) wait_for_yellow(version, client, monitoring_name)
logging.info('Waiting for marvel index to get enough node_stats...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="node_stats", body={"query": {"match_all": {}}}))
logging.info('Waiting for marvel index to get enough cluster_state...')
wait_for_search(3, lambda: client.search(index=".marvel-es-*", doc_type="cluster_state", body={"query": {"match_all": {}}}))
def wait_for_search(required_count, searcher): def wait_for_search(required_count, searcher):
for attempt in range(1, 31): for attempt in range(1, 31):
@ -422,28 +452,42 @@ def main():
node = None node = None
try: try:
if parse_version(version) < parse_version('5.0.0'):
# Remove old plugins just in case any are around # Remove old plugins just in case any are around
remove_plugin(version, release_dir, 'marvel-agent') remove_plugin(version, release_dir, 'marvel-agent')
remove_plugin(version, release_dir, 'watcher') remove_plugin(version, release_dir, 'watcher')
remove_plugin(version, release_dir, 'shield') remove_plugin(version, release_dir, 'shield')
remove_plugin(version, release_dir, 'license') remove_plugin(version, release_dir, 'license')
# Remove the shield config too before fresh install # Remove the shield config too before fresh install
run('rm -rf %s' %(os.path.join(release_dir, 'config/shield'))) run('rm -rf %s' %(os.path.join(release_dir, 'config/shield')))
# Install the plugins we'll need # Install plugins we'll need
install_plugin(version, release_dir, 'license') install_plugin(version, release_dir, 'license')
install_plugin(version, release_dir, 'shield') install_plugin(version, release_dir, 'shield')
install_plugin(version, release_dir, 'watcher') install_plugin(version, release_dir, 'watcher')
install_plugin(version, release_dir, 'marvel-agent') install_plugin(version, release_dir, 'marvel-agent')
# define the stuff we need to make the esadmin user
users_script = os.path.join(release_dir, 'bin/shield/esusers')
esadmin_role = 'admin'
else:
# Remove old plugins just in case any are around
remove_plugin(version, release_dir, 'x-pack')
# Remove the x-pack config too before fresh install
run('rm -rf %s' %(os.path.join(release_dir, 'config/x-pack')))
# Install plugins we'll need
install_plugin(version, release_dir, 'x-pack')
# define the stuff we need to make the esadmin user
users_script = os.path.join(release_dir, 'bin/x-pack/users')
esadmin_role = 'superuser'
# create admin # create admin
run('%s useradd es_admin -r admin -p 0123456789' %(os.path.join(release_dir, 'bin/shield/esusers'))) run('%s useradd es_admin -r %s -p 0123456789' %
(users_script, esadmin_role))
node = start_node(version, release_dir, data_dir) node = start_node(version, release_dir, data_dir)
# create a client that authenticates as es_admin # create a client that authenticates as es_admin
client = create_client() client = create_client()
if parse_version(version) < parse_version('2.3.0'): if parse_version(version) < parse_version('2.3.0'):
logging.info('Version is ' + version + ' but shield supports native realm oly from 2.3.0 on. Nothing to do for Shield.') logging.info('Version is ' + version + ' but shield supports native realm only from 2.3.0 on. Nothing to do for Shield.')
else: else:
generate_security_index(client, version) generate_security_index(client, version)
generate_watcher_index(client, version) generate_watcher_index(client, version)

View File

@ -96,7 +96,6 @@ public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase exte
} }
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/3908")
public void testAllVersionsTested() throws Exception { public void testAllVersionsTested() throws Exception {
SortedSet<String> expectedVersions = new TreeSet<>(); SortedSet<String> expectedVersions = new TreeSet<>();
for (Version v : VersionUtils.allVersions()) { for (Version v : VersionUtils.allVersions()) {
@ -115,7 +114,6 @@ public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase exte
} }
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/3908")
public void testOldIndexes() throws Exception { public void testOldIndexes() throws Exception {
Collections.shuffle(dataFiles, random()); Collections.shuffle(dataFiles, random());
for (String dataFile : dataFiles) { for (String dataFile : dataFiles) {
@ -136,10 +134,6 @@ public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase exte
} }
} }
public void testEmpty() {
// empty test so test suite does not fail for no tests
}
/** /**
* Should we test this version at all? Called before loading the data directory. Return false to skip it entirely. * Should we test this version at all? Called before loading the data directory. Return false to skip it entirely.
*/ */
@ -174,7 +168,13 @@ public abstract class AbstractOldXPackIndicesBackwardsCompatibilityTestCase exte
try (Stream<Path> unzippedFiles = Files.list(dataPath.resolve("data"))) { try (Stream<Path> unzippedFiles = Files.list(dataPath.resolve("data"))) {
Path dataDir = unzippedFiles.findFirst().get(); Path dataDir = unzippedFiles.findFirst().get();
// this is not actually an index but the copy does the job anyway // this is not actually an index but the copy does the job anyway
copyIndex(logger, dataDir.resolve("nodes"), "nodes", dataPath); int zipIndex = pathToZipFile.indexOf(".zip");
Version version = Version.fromString(pathToZipFile.substring("x-pack-".length(), zipIndex));
if (version.before(Version.V_5_0_0_alpha1)) {
// the bwc scripts packs the indices under this path before 5.0
dataDir = dataDir.resolve("nodes");
}
copyIndex(logger, dataDir, "nodes", dataPath);
// remove the original unzipped directory // remove the original unzipped directory
} }
IOUtils.rm(dataPath.resolve("data")); IOUtils.rm(dataPath.resolve("data"));

View File

@ -41,8 +41,8 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
/** /**
* Backwards compatibility test that loads some data from a pre-5.0 cluster and attempts to do some basic security stuff with it. It * Backwards compatibility test that loads some data from a pre-Version.CURRENT cluster and attempts to do some basic security stuff with
* contains: * it. It contains:
* <ul> * <ul>
* <li>This user: {@code {"username": "bwc_test_user", "roles" : [ "bwc_test_role" ], "password" : "9876543210"}}</li> * <li>This user: {@code {"username": "bwc_test_user", "roles" : [ "bwc_test_role" ], "password" : "9876543210"}}</li>
* <li>This role: {@code {"name": "bwc_test_role", "cluster": ["all"]}, "run_as": [ "other_user" ], "indices": [{ * <li>This role: {@code {"name": "bwc_test_role", "cluster": ["all"]}, "run_as": [ "other_user" ], "indices": [{

View File

@ -167,14 +167,13 @@ public class OldMonitoringIndicesBackwardsCompatibilityIT extends AbstractOldXPa
assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0)); assertThat((Integer) docs.get("count"), greaterThanOrEqualTo(0));
} }
@SuppressWarnings("unchecked")
private void checkNodeStats(final Version version, final String masterNodeId, Map<String, Object> nodeStats) { private void checkNodeStats(final Version version, final String masterNodeId, Map<String, Object> nodeStats) {
checkMonitoringElement(nodeStats); checkMonitoringElement(nodeStats);
checkSourceNode(version, nodeStats); checkSourceNode(version, nodeStats);
Map<?, ?> stats = (Map<?, ?>) nodeStats.get("node_stats"); Map<?, ?> stats = (Map<?, ?>) nodeStats.get("node_stats");
// Those fields are expected in every node stats documents // Those fields are expected in every node stats documents
Set<String> mandatoryKeys = new HashSet(); Set<String> mandatoryKeys = new HashSet<>();
mandatoryKeys.add("node_id"); mandatoryKeys.add("node_id");
mandatoryKeys.add("node_master"); mandatoryKeys.add("node_master");
mandatoryKeys.add("mlockall"); mandatoryKeys.add("mlockall");
@ -200,7 +199,7 @@ public class OldMonitoringIndicesBackwardsCompatibilityIT extends AbstractOldXPa
assertThat("Expecting [" + key + "] to be present for bwc index in version [" + version + "]", stats, hasKey(key)); assertThat("Expecting [" + key + "] to be present for bwc index in version [" + version + "]", stats, hasKey(key));
} }
Set<String> keys = new HashSet(stats.keySet()); Set<?> keys = new HashSet<>(stats.keySet());
keys.removeAll(mandatoryKeys); keys.removeAll(mandatoryKeys);
assertTrue("Found unexpected fields [" + Strings.collectionToCommaDelimitedString(keys) + "] " + assertTrue("Found unexpected fields [" + Strings.collectionToCommaDelimitedString(keys) + "] " +
"for bwc index in version [" + version + "]", keys.isEmpty()); "for bwc index in version [" + version + "]", keys.isEmpty());

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTrigger;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasKey;
@ -70,28 +71,28 @@ public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackI
assertEquals(1000, source.get("throttle_period_in_millis")); assertEquals(1000, source.get("throttle_period_in_millis"));
Map<?, ?> input = (Map<?, ?>) source.get("input"); Map<?, ?> input = (Map<?, ?>) source.get("input");
Map<?, ?> search = (Map<?, ?>) input.get("search"); Map<?, ?> search = (Map<?, ?>) input.get("search");
assertEquals(96000, search.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s... // We asked for 100s but 2.x converted that to 1.6m which is actually 96s...
int timeout = (int) (version.onOrAfter(Version.V_5_0_0_alpha1) ? timeValueSeconds(100).millis() : timeValueSeconds(96).millis());
assertEquals(timeout, search.get("timeout_in_millis"));
Map<?, ?> actions = (Map<?, ?>) source.get("actions"); Map<?, ?> actions = (Map<?, ?>) source.get("actions");
Map<?, ?> indexPayload = (Map<?, ?>) actions.get("index_payload"); Map<?, ?> indexPayload = (Map<?, ?>) actions.get("index_payload");
Map<?, ?> transform = (Map<?, ?>) indexPayload.get("transform"); Map<?, ?> transform = (Map<?, ?>) indexPayload.get("transform");
search = (Map<?, ?>) transform.get("search"); search = (Map<?, ?>) transform.get("search");
assertEquals(96000, search.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s... assertEquals(timeout, search.get("timeout_in_millis"));
Map<?, ?> index = (Map<?, ?>) indexPayload.get("index"); Map<?, ?> index = (Map<?, ?>) indexPayload.get("index");
assertEquals("bwc_watch_index", index.get("index")); assertEquals("bwc_watch_index", index.get("index"));
assertEquals("bwc_watch_type", index.get("doc_type")); assertEquals("bwc_watch_type", index.get("doc_type"));
assertEquals(96000, index.get("timeout_in_millis")); // We asked for 100s but 2.x converted that to 1.6m which is actually 96s... assertEquals(timeout, index.get("timeout_in_millis"));
// Fetch a watch with "fun" throttle periods // Fetch a watch with "fun" throttle periods
bwcWatch = watcherClient.prepareGetWatch("bwc_throttle_period").get(); bwcWatch = watcherClient.prepareGetWatch("bwc_throttle_period").get();
assertTrue(bwcWatch.isFound()); assertTrue(bwcWatch.isFound());
assertNotNull(bwcWatch.getSource()); assertNotNull(bwcWatch.getSource());
source = bwcWatch.getSource().getAsMap(); source = bwcWatch.getSource().getAsMap();
// We asked for 100s but 2.x converted that to 1.6m which is actually 96s... assertEquals(timeout, source.get("throttle_period_in_millis"));
assertEquals(96000, source.get("throttle_period_in_millis"));
actions = (Map<?, ?>) source.get("actions"); actions = (Map<?, ?>) source.get("actions");
indexPayload = (Map<?, ?>) actions.get("index_payload"); indexPayload = (Map<?, ?>) actions.get("index_payload");
// We asked for 100s but 2.x converted that to 1.6m which is actually 96s... assertEquals(timeout, indexPayload.get("throttle_period_in_millis"));
assertEquals(96000, indexPayload.get("throttle_period_in_millis"));
if (version.onOrAfter(Version.V_2_3_0)) { if (version.onOrAfter(Version.V_2_3_0)) {
/* Fetch a watch with a funny timeout to verify loading fractional time values. This watch is only built in >= 2.3 because /* Fetch a watch with a funny timeout to verify loading fractional time values. This watch is only built in >= 2.3 because
@ -107,7 +108,7 @@ public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackI
Map<?, ?> attachment = (Map<?, ?>) attachments.get("test_report.pdf"); Map<?, ?> attachment = (Map<?, ?>) attachments.get("test_report.pdf");
Map<?, ?> http = (Map<?, ?>) attachment.get("http"); Map<?, ?> http = (Map<?, ?>) attachment.get("http");
Map<?, ?> request = (Map<?, ?>) http.get("request"); Map<?, ?> request = (Map<?, ?>) http.get("request");
assertEquals(96000, request.get("read_timeout_millis")); assertEquals(timeout, request.get("read_timeout_millis"));
assertEquals("https", request.get("scheme")); assertEquals("https", request.get("scheme"));
assertEquals("example.com", request.get("host")); assertEquals("example.com", request.get("host"));
assertEquals("{{ctx.metadata.report_url}}", request.get("path")); assertEquals("{{ctx.metadata.report_url}}", request.get("path"));
@ -119,7 +120,8 @@ public class OldWatcherIndicesBackwardsCompatibilityIT extends AbstractOldXPackI
assertThat(basic, not(hasKey("password"))); assertThat(basic, not(hasKey("password")));
} }
SearchResponse history = client().prepareSearch(".watch_history*").get(); String watchHistoryPattern = version.onOrAfter(Version.V_5_0_0_alpha1) ? ".watcher-history*" : ".watch_history*";
SearchResponse history = client().prepareSearch(watchHistoryPattern).get();
assertThat(history.getHits().totalHits(), greaterThanOrEqualTo(10L)); assertThat(history.getHits().totalHits(), greaterThanOrEqualTo(10L));
} }