# ELASTICSEARCH CONFIDENTIAL # __________________ # # [2014] Elasticsearch Incorporated. All Rights Reserved. # # NOTICE: All information contained herein is, and remains # the property of Elasticsearch Incorporated and its suppliers, # if any. The intellectual and technical concepts contained # herein are proprietary to Elasticsearch Incorporated # and its suppliers and may be covered by U.S. and Foreign Patents, # patents in process, and are protected by trade secret or copyright law. # Dissemination of this information or reproduction of this material # is strictly forbidden unless prior written permission is obtained # from Elasticsearch Incorporated. # Creates indices with old versions of elasticsearch. These indices are used by x-pack plugins like security # to test if the import of metadata that is stored in elasticsearch indexes works correctly. # This tool will start a node on port 9200/9300. If a node is already running on that port then the script will fail. # Currently this script can only deal with versions >=2.0.0 and < 5.0. Needs more work for versions before or after. # # Run from x-plugins root directory like so: # python3 ./dev-tools/create_bwc_indexes.py 2.3.4 # You can get any particular version of elasticsearch with: # python3 ../elasticsearch/dev-tools/get-bwc-version.py 2.3.4 # # import argparse import glob import json import logging import os import random import shutil import subprocess import sys import tempfile import time import requests import socket import signal from cmd import Cmd DEFAULT_TRANSPORT_TCP_PORT = 9300 DEFAULT_HTTP_TCP_PORT = 9200 if sys.version_info[0] < 3: print('%s must use python 3.x (for the ES python client)' % sys.argv[0]) try: from elasticsearch import Elasticsearch from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import TransportError from elasticsearch.exceptions import NotFoundError from elasticsearch.client import IndicesClient except ImportError as e: print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`') sys.exit(1) def start_node(version, release_dir, data_dir): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('localhost',DEFAULT_HTTP_TCP_PORT)) if result == 0: raise Exception('Elasticsearch instance already running on port ' + str(DEFAULT_HTTP_TCP_PORT)) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('localhost',DEFAULT_TRANSPORT_TCP_PORT)) if result == 0: raise Exception('Elasticsearch instance already running on port ' + str(DEFAULT_TRANSPORT_TCP_PORT)) logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, DEFAULT_TRANSPORT_TCP_PORT , DEFAULT_HTTP_TCP_PORT, data_dir)) cluster_name = 'bwc_index_' + version if parse_version(version) < parse_version("5.0.0-alpha1"): prefix = '-Des.' else: prefix = '-E' cmd = [ os.path.join(release_dir, 'bin/elasticsearch'), '%spath.data=%s' % (prefix, data_dir), '%spath.logs=logs' % prefix, '%scluster.name=%s' % (prefix, cluster_name), '%snetwork.host=localhost' % prefix, '%stransport.tcp.port=%s' % (prefix, DEFAULT_TRANSPORT_TCP_PORT), # not sure if we need to customize ports '%shttp.port=%s' % (prefix, DEFAULT_HTTP_TCP_PORT) ] return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) def install_plugin(version, release_dir, plugin_name): args = [plugin_name] if parse_version(version) >= parse_version('2.2.0'): args = [plugin_name, '--batch'] run_plugin(version, release_dir, 'install', args) def remove_plugin(version, release_dir, plugin_name): # 5.0 doesn't like trying to remove a plugin that isn't installed so we # shouldn't try. if os.path.exists(os.path.join(release_dir, 'plugins', plugin_name)): run_plugin(version, release_dir, 'remove', [plugin_name]) def run_plugin(version, release_dir, plugin_cmd, args): if parse_version(version) < parse_version('5.0.0'): script = 'bin/plugin' else: script = 'bin/elasticsearch-plugin' cmd = [os.path.join(release_dir, script), plugin_cmd] + args subprocess.check_call(cmd) def create_client(): logging.info('Waiting for node to startup') for _ in range(0, 30): try: client = Elasticsearch([{'host': 'localhost', 'port': 9200, 'http_auth':'es_admin:0123456789'}]) health = client.cluster.health(wait_for_nodes=1) return client except ConnectionError: logging.info('Not started yet...') time.sleep(1) assert False, 'Timed out waiting for node for %s seconds' % timeout def wait_for_yellow(version, client, index): logging.info('Waiting for %s to be yellow' % index) # The health call below uses `params` because it the 5.x client doesn't # support wait_for_relocating_shards and the 2.x client doesn't support # wait_for_relocating_shards and we'd like to use the same client for both # versions. if parse_version(version) < parse_version('5.0.0'): health = client.cluster.health(wait_for_status='yellow', index=index, params={'wait_for_relocating_shards':0}) else: health = client.cluster.health(wait_for_status='yellow', index=index, params={'wait_for_no_relocating_shards':'true'}) assert health['timed_out'] == False, 'cluster health timed out %s' % health # this adds a user bwc_test_role/9876543210, a role bwc_test_role and some documents the user has or has not access to def generate_security_index(client, version): logging.info('Add a group') # don't know how to use python client with shield so use curl instead # add a user body = { "password" : "9876543210", "roles" : [ "bwc_test_role" ] } while True: response = requests.put('http://localhost:9200/_shield/user/bwc_test_user', auth=('es_admin', '0123456789'), data=json.dumps(body)) logging.info('put user reponse: ' + response.text) if response.status_code == 200: break else: if 'service has not been started' in response.text: continue raise Exception('PUT http://localhost:9200/_shield/role/bwc_test_role did not succeed!') # add a role body = { "cluster": ["all"], "indices": [ { "names": [ "index1", "index2" ], "privileges": ["all"], "query": "{\"match\": {\"title\": \"foo\"}}" } ], "run_as": [ "other_user" ] } if parse_version(version) < parse_version('5.0.0'): body['indices'][0]['fields'] = [ "title", "body" ] else: body['indices'][0]['field_security'] = { "grant": [ "title", "body" ] } # order of params in put role request is important, see https://github.com/elastic/x-plugins/issues/2606 response = requests.put('http://localhost:9200/_shield/role/bwc_test_role', auth=('es_admin', '0123456789') , data=json.dumps(body, sort_keys=True)) logging.info('put user reponse: ' + response.text) if (response.status_code != 200) : raise Exception('PUT http://localhost:9200/_shield/role/bwc_test_role did not succeed!') client.index(index="index1", doc_type="doc", body={"title": "foo", "body": "bwc_test_user should be able to see this field", "secured_body": "bwc_test_user should not be able to see this field"}) client.index(index="index1", doc_type="doc", body={"title": "bwc_test_user should not be able to see this document"}) client.index(index="index2", doc_type="doc", body={"title": "foo", "body": "bwc_test_user should be able to see this field", "secured_body": "bwc_test_user should not be able to see this field"}) client.index(index="index2", doc_type="doc", body={"title": "bwc_test_user should not be able to see this document"}) client.index(index="index3", doc_type="doc", body={"title": "bwc_test_user should not see this index"}) if parse_version(version) < parse_version('5.1.0'): logging.info("Adding a alias that starts with - so we can test against it") client.indices.put_alias(index='index3', name='-index3') wait_for_yellow(version, client, '.security') # this adds a couple of watches and waits for the the watch_history to accumulate some results def generate_watcher_index(client, version): logging.info('Adding a watch') body = { "trigger" : { "schedule": { "interval": "1s" } }, "input" : { "search" : { "timeout": "100s", "request" : { "indices" : [ ".watches" ], "body" : { "query" : { "match_all" : {}}, "size": 1 }, } } }, "condition" : { "always" : {} }, "throttle_period": "1s", "actions" : { "index_payload" : { "transform" : { "search" : { "request" : { "body" : { "size": 1, "query" : { "match_all" : {} }} }, "timeout": "100s" } }, "index" : { "index" : "bwc_watch_index", "doc_type" : "bwc_watch_type", "timeout": "100s" } } } } response = requests.put('http://localhost:9200/_watcher/watch/bwc_watch', auth=('es_admin', '0123456789'), data=json.dumps(body)) logging.info('PUT watch response: ' + response.text) if (response.status_code != 201) : raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_watch did not succeed!') logging.info('Adding a watch with "fun" throttle periods') body = { "trigger" : { "schedule": { "interval": "1s" } }, "condition" : { "never" : {} }, "throttle_period": "100s", "actions" : { "index_payload" : { "throttle_period": "100s", "transform" : { "search" : { "request" : { "body" : { "size": 1, "query" : { "match_all" : {} }} } } }, "index" : { "index" : "bwc_watch_index", "doc_type" : "bwc_watch_type" } } } } response = requests.put('http://localhost:9200/_watcher/watch/bwc_throttle_period', auth=('es_admin', '0123456789'), data=json.dumps(body)) logging.info('PUT watch response: ' + response.text) if (response.status_code != 201) : raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_throttle_period did not succeed!') if parse_version(version) < parse_version('2.3.0'): logging.info('Skipping watch with a funny read timeout because email attachement is not supported by this version') else: logging.info('Adding a watch with a funny read timeout') body = { "trigger" : { "schedule": { "interval": "100s" } }, "condition": { "never": {} }, "actions": { "work": { "email": { "to": "email@domain.com", "subject": "Test Kibana PDF report", "attachments": { "test_report.pdf": { "http": { "content_type": "application/pdf", "request": { "read_timeout": "100s", "scheme": "https", "host": "example.com", "path":"{{ctx.metadata.report_url}}", "port": 8443, "auth": { "basic": { "username": "Aladdin", "password": "open sesame" } } } } } } } } } } response = requests.put('http://localhost:9200/_watcher/watch/bwc_funny_timeout', auth=('es_admin', '0123456789'), data=json.dumps(body)) logging.info('PUT watch response: ' + response.text) if (response.status_code != 201) : raise Exception('PUT http://localhost:9200/_watcher/watch/bwc_funny_timeout did not succeed!') # wait to accumulate some watches logging.info('Waiting for watch results index to fill up...') wait_for_search(10, lambda: client.search(index="bwc_watch_index", body={"query": {"match_all": {}}})) if parse_version(version) < parse_version('5.0.0'): watcher_history_name = ".watch_history*" else: watcher_history_name = ".watcher-history*" wait_for_search(10, lambda: client.search(index=watcher_history_name, body={"query": {"match_all": {}}})) wait_for_yellow(version, client, '.watches') wait_for_yellow(version, client, watcher_history_name) wait_for_yellow(version, client, 'bwc_watch_index') def wait_for_monitoring_index_to_fill(client, version): if parse_version(version) < parse_version('5.0.0'): monitoring_name = '.marvel-*' else: monitoring_name = '.monitoring-*' def wait_for_monitoring_to_index(doc_type, count): logging.info('Waiting for %s to have cout(%s) = %s...' % (monitoring_name, doc_type, count)) wait_for_search(count, lambda: client.search(index=monitoring_name, doc_type=doc_type, body={"query": {"match_all": {}}})) wait_for_monitoring_to_index('cluster_info', 1) if parse_version(version) >= parse_version('2.1.0'): wait_for_monitoring_to_index('node', 1) wait_for_monitoring_to_index('index_stats', 10) wait_for_monitoring_to_index('shards', 10) wait_for_monitoring_to_index('indices_stats', 3) wait_for_monitoring_to_index('node_stats', 3) wait_for_monitoring_to_index('cluster_state', 3) wait_for_yellow(version, client, monitoring_name) def wait_for_search(required_count, searcher): for attempt in range(1, 31): try: response = searcher() logging.info('(' + str(attempt) + ') Got ' + str(response['hits']['total']) + ' hits and want ' + str(required_count) + '...') if response['hits']['total'] >= required_count: return except NotFoundError: logging.info('(' + str(attempt) + ') Not found, retrying') time.sleep(1) logger.error("Ran out of retries") raise "Ran out of retries" def compress_index(version, tmp_dir, output_dir): compress(tmp_dir, output_dir, 'x-pack-%s.zip' % version, 'data') def compress(tmp_dir, output_dir, zipfile, directory): abs_output_dir = os.path.abspath(output_dir) zipfile = os.path.join(abs_output_dir, zipfile) if os.path.exists(zipfile): os.remove(zipfile) logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir) olddir = os.getcwd() os.chdir(tmp_dir) subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True) os.chdir(olddir) def parse_config(): parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests') required = parser.add_mutually_exclusive_group(required=True) required.add_argument('versions', metavar='X.Y.Z', nargs='*', default=[], help='The elasticsearch version to build an index for') required.add_argument('--all', action='store_true', default=False, help='Recreate all existing backwards compatibility indexes') parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR', help='The directory containing elasticsearch releases') parser.add_argument('--output-dir', '-o', default='elasticsearch/src/test/resources/indices/bwc/', help='The directory to write the zipped index into') cfg = parser.parse_args() if not os.path.exists(cfg.output_dir): parser.error('Output directory does not exist: %s' % cfg.output_dir) if not cfg.versions: # --all for bwc_index in glob.glob(os.path.join(cfg.output_dir, 'x-pack-*.zip')): version = os.path.basename(bwc_index)[len('x-pack-'):-len('.zip')] cfg.versions.append(version) return cfg def shutdown_node(node): logging.info('Shutting down node with pid %d', node.pid) node.terminate() node.wait() def parse_version(version): import re splitted = re.split('[.-]', version) if len(splitted) == 3: splitted = splitted + ['GA'] splitted = [s.lower() for s in splitted] assert len(splitted) == 4; return splitted def run(command, env_vars=None): if env_vars: for key, value in env_vars.items(): os.putenv(key, value) logging.info('*** Running: %s%s%s' % (COLOR_OK, command, COLOR_END)) if os.system(command): raise RuntimeError(' FAILED: %s' % (command)) assert parse_version('1.2.3') < parse_version('2.1.0') assert parse_version('1.2.3') < parse_version('1.2.4') assert parse_version('1.1.0') < parse_version('1.2.0') # console colors COLOR_OK = '\033[92m' COLOR_END = '\033[0m' COLOR_FAIL = '\033[91m' def main(): logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %I:%M:%S %p') logging.getLogger('elasticsearch').setLevel(logging.ERROR) logging.getLogger('urllib3').setLevel(logging.WARN) cfg = parse_config() for version in cfg.versions: logging.info('--> Creating x-pack index for %s' % version) # setup for starting nodes release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version) if not os.path.exists(release_dir): raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir)) tmp_dir = tempfile.mkdtemp() data_dir = os.path.join(tmp_dir, 'data') logging.info('Temp data dir: %s' % data_dir) node = None try: if parse_version(version) < parse_version('5.0.0'): # Remove old plugins just in case any are around remove_plugin(version, release_dir, 'marvel-agent') remove_plugin(version, release_dir, 'watcher') remove_plugin(version, release_dir, 'shield') remove_plugin(version, release_dir, 'license') # Remove the shield config too before fresh install run('rm -rf %s' %(os.path.join(release_dir, 'config/shield'))) # Install plugins we'll need install_plugin(version, release_dir, 'license') install_plugin(version, release_dir, 'shield') install_plugin(version, release_dir, 'watcher') install_plugin(version, release_dir, 'marvel-agent') # define the stuff we need to make the esadmin user users_script = os.path.join(release_dir, 'bin/shield/esusers') esadmin_role = 'admin' else: # Remove old plugins just in case any are around remove_plugin(version, release_dir, 'x-pack') # Remove the x-pack config too before fresh install run('rm -rf %s' %(os.path.join(release_dir, 'config/x-pack'))) # Install plugins we'll need install_plugin(version, release_dir, 'x-pack') # define the stuff we need to make the esadmin user users_script = os.path.join(release_dir, 'bin/x-pack/users') esadmin_role = 'superuser' # create admin run('%s useradd es_admin -r %s -p 0123456789' % (users_script, esadmin_role)) node = start_node(version, release_dir, data_dir) # create a client that authenticates as es_admin client = create_client() if parse_version(version) < parse_version('2.3.0'): logging.info('Version is ' + version + ' but shield supports native realm only from 2.3.0 on. Nothing to do for Shield.') else: generate_security_index(client, version) generate_watcher_index(client, version) wait_for_monitoring_index_to_fill(client, version) shutdown_node(node) node = None compress_index(version, tmp_dir, cfg.output_dir) finally: if node is not None: # This only happens if we've hit an exception: shutdown_node(node) shutil.rmtree(tmp_dir) if __name__ == '__main__': try: main() except KeyboardInterrupt: logging.info('Caught keyboard interrupt, exiting...') sys.exit(signal.SIGTERM) # exit code