diff --git a/x-dev-tools/smoke_test_watcher.py b/x-dev-tools/smoke_test_watcher.py new file mode 100644 index 00000000000..5c6aa1f20d7 --- /dev/null +++ b/x-dev-tools/smoke_test_watcher.py @@ -0,0 +1,293 @@ +import datetime +import traceback +import json +import os +import shutil +import signal +import subprocess +import tempfile +import threading +import time + +from http.client import HTTPConnection +import sys + + +LOG = os.environ.get('ES_SMOKE_TEST_PLUGINS_LOG', '/tmp/elasticsearch_smoke_test_plugins.log') + +print('Logging to %s' % LOG) + +if os.path.exists(LOG): + raise RuntimeError('please remove old log %s first' % LOG) + +try: + JAVA_HOME = os.environ['JAVA7_HOME'] +except KeyError: + try: + JAVA_HOME = os.environ['JAVA_HOME'] + except KeyError: + raise RuntimeError(""" + Please set JAVA_HOME in the env before running release tool + On OSX use: export JAVA_HOME=`/usr/libexec/java_home -v '1.7*'`""") + +JAVA_ENV = 'export JAVA_HOME="%s" PATH="%s/bin:$PATH" JAVACMD="%s/bin/java"' % (JAVA_HOME, JAVA_HOME, JAVA_HOME) + +try: + # make sure mvn3 is used if mvn3 is available + # some systems use maven 2 as default + subprocess.check_output('mvn3 --version', shell=True, stderr=subprocess.STDOUT) + MVN = 'mvn3' +except subprocess.CalledProcessError: + MVN = 'mvn' + + +def log(msg): + f = open(LOG, mode='ab') + f.write(('\n' + msg).encode('utf-8')) + f.close() + + +def run(command, quiet=False): + log('%s: RUN: %s\n' % (datetime.datetime.now(), command)) + if os.system('%s >> %s 2>&1' % (command, LOG)): + msg = ' FAILED: %s [see log %s]' % (command, LOG) + if not quiet: + print(msg) + raise RuntimeError(msg) + + +def readServerOutput(p, startupEvent, failureEvent): + try: + while True: + line = p.stdout.readline() + if len(line) == 0: + p.poll() + if not startupEvent.isSet(): + failureEvent.set() + startupEvent.set() + print('ES: **process exit**\n') + break + line = line.decode('utf-8').rstrip() + if line.endswith('started') and not startupEvent.isSet(): + startupEvent.set() + print('ES: %s' % line) + except: + print() + print('Exception reading Elasticsearch output:') + traceback.print_exc() + failureEvent.set() + startupEvent.set() + +def send_request(conn, method, path, body=None): + conn.request(method, path, body) + res = conn.getresponse() + if 200 > res.status > 300: + raise RuntimeError('Expected HTTP status code between 200 and 299 but got %s' % res.status) + return json.loads(res.read().decode("utf-8")) + +supported_es_version = ['2.0.0-SNAPSHOT'] + +version_url_matrix = [ + {'version': '2.0.0-beta1-SNAPSHOT', 'host' : 'oss.sonatype.org', 'url': 'https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/elasticsearch-tar/2.0.0-beta1-SNAPSHOT/elasticsearch-tar-2.0.0-beta1-20150731.034255-32.tar.gz'} +] + +test_watch = """ +{ + "trigger": { + "schedule": { + "interval": "1s" + } + }, + "input": { + "http": { + "request": { + "host": "localhost", + "port": 9200, + "path": "/_cluster/health" + } + } + }, + "condition": { + "compare": { + "ctx.payload.status": { + "eq": "yellow" + } + } + }, + "actions": { + "log": { + "logging": { + "text": "executed at {{ctx.execution_time}}" + } + } + } +} +""" + +watch_history_query = """ +{ + "query": { + "term": { + "watch_id": { + "value": "cluster_health_watch" + } + } + } +} +""" + +if __name__ == '__main__': + if len(sys.argv) != 2: + print('Specify the version of Watcher to be tested as an argument.') + print('For example: `python3 x-dev-tools/smoke_test_watcher.py 2.0.0-beta1-SNAPSHOT`') + exit() + + expected_watcher_version = sys.argv[1] + print('Smoke testing Watcher version [%s] with the following es versions [%s]' % (expected_watcher_version, supported_es_version)) + + print('Building x-plugins (skipping tests)...') + #run('%s; %s clean package -DskipTests' % (JAVA_ENV, MVN)) + + license_artifact = None + for dirname, dirnames, filenames in os.walk('license/plugin/target/releases/'): + for f in filenames: + print(os.path.join(dirname, f)) + if f.endswith('.zip'): + license_artifact = os.path.abspath(os.path.join(dirname, f)) + break + + if license_artifact is None: + raise RuntimeError('could not find license release under license/plugin/target/releases/') + + watcher_artifact = None + for dirname, dirnames, filenames in os.walk('watcher/target/releases/'): + for f in filenames: + print(os.path.join(dirname, f)) + if f.endswith('.zip'): + watcher_artifact = os.path.abspath(os.path.join(dirname, f)) + break + + if watcher_artifact is None: + raise RuntimeError('could not find Watcher release under watcher/target/releases/') + + base_tmp_dir = tempfile.mkdtemp() + '/watcher_smoker/' + os.makedirs(base_tmp_dir) + try: + for version_url in version_url_matrix: + es_version = version_url['version'] + print('Testing watcher [%s] with elasticsearch [%s]' % (expected_watcher_version, es_version)) + + version_tmp_dir = '%s%s/' % (base_tmp_dir, es_version) + os.makedirs(version_tmp_dir) + download = version_tmp_dir + 'elasticsearch-%s.tar.gz' % (es_version) + + # conn = HTTPConnection(version_url['host']) + # conn.set_debuglevel() + # conn.request('GET', version_url['url']) + # print('downloading %s to %s' % (version_url['url'], download)) + # resp = conn.getresponse() + # data = resp.read() + # with open(download, 'wb') as f: + # f.write(data) + # conn.close() + print('Downloading elasticsearch [%s]' % es_version) + run('curl %s -o %s' % (version_url['url'], download)) + run('tar -xzf %s -C %s' % (download, version_tmp_dir)) + + es_dir = version_tmp_dir + 'elasticsearch-%s/' % (es_version) + print('Installing License plugin...') + url = 'file://%s' % license_artifact + run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/license', url)) + + url = 'file://%s' % watcher_artifact + print('Installing Watcher plugin...') + run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/watcher', url)) + + p = None + try: + print('Starting Elasticsearch...') + + env = os.environ.copy() + env['JAVA_HOME'] = JAVA_HOME + env['PATH'] = '%s/bin:%s' % (JAVA_HOME, env['PATH']) + env['JAVA_CMD'] = '%s/bin/java' % JAVA_HOME + + startupEvent = threading.Event() + failureEvent = threading.Event() + p = subprocess.Popen(('%s/bin/elasticsearch' % es_dir, + '-Des.node.name=watcher_smoke_tester', + '-Des.cluster.name=watcher_smoke_tester' + '-Des.discovery.zen.ping.multicast.enabled=false', + '-Des.script.inline=on', + '-Des.script.indexed=on'), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=env) + thread = threading.Thread(target=readServerOutput, args=(p, startupEvent, failureEvent)) + thread.setDaemon(True) + thread.start() + + startupEvent.wait(5000) + if failureEvent.isSet(): + raise RuntimeError('ES failed to start') + + conn = HTTPConnection('127.0.0.1', 9200, 20) + # TODO: fix version in watcher info api! + # print('> Checking watcher info api...') + # version = send_request(conn, 'GET', '/_watcher')['version']['name'] + # if version != expected_watcher_version: + # raise RuntimeError('Expected Watcher version %s but got %s' % (expected_watcher_version, version)) + # print('> Successful!') + + print('> Checking watcher stats api...') + watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state'] + # we're too fast, lets wait and retry: + if watcher_state == 'starting': + time.sleep(5) + watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state'] + if watcher_state != 'started': + raise RuntimeError('Expected watcher_state started but got %s' % watcher_state) + + watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count'] + if watch_count != 0: + raise RuntimeError('Expected watcher_count 0 but got %s' % watch_count) + print('> Successful!') + + print('> Checking put watch api...') + res_body = send_request(conn, 'PUT', '/_watcher/watch/cluster_health_watch', test_watch) + if res_body['_version'] != 1: + raise RuntimeError('Expected watch _version 1 but got %s' % res_body['_version']) + if res_body['created'] != True: + raise RuntimeError('Expected watch created True but got %s' % res_body['created']) + + watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count'] + if watch_count != 1: + raise RuntimeError('Expected watcher_count 1 but got %s' % watch_count) + print('> Successful!') + + print('> Checking if watch actually fires...') + time.sleep(5) + send_request(conn, 'GET', '/.watch_history-*/_refresh') + hit_count = send_request(conn, 'GET', '/.watch_history-*/_search')['hits']['total'] + if hit_count == 0: + raise RuntimeError('Expected hits.total 1 or higher but got %s' % hit_count) + print('> Added test watch triggered %s history records...' % hit_count) + print('> Successful!') + + print('> Checking delete watch api...') + found = send_request(conn, 'DELETE', '/_watcher/watch/cluster_health_watch')['found'] + if (found == False): + raise RuntimeError('Expected found to be True but got %s' % found) + print('> Successful!') + print('> Smoke tester ran succesful with elastic version [%s]!' % es_version) + finally: + if p is not None: + try: + os.kill(p.pid, signal.SIGKILL) + except ProcessLookupError: + pass + finally: + shutil.rmtree(base_tmp_dir) + + print('> Smoke tester has successfully completed') + print('> The following es version were test [%s] with Watcher version [%s]' % (supported_es_version, expected_watcher_version)) \ No newline at end of file