import datetime import traceback import json import os import shutil import signal import subprocess import tempfile import threading import time from http.client import HTTPConnection import sys LOG = os.environ.get('ES_SMOKE_TEST_PLUGINS_LOG', '/tmp/elasticsearch_smoke_test_plugins.log') print('Logging to %s' % LOG) if os.path.exists(LOG): raise RuntimeError('please remove old log %s first' % LOG) try: JAVA_HOME = os.environ['JAVA7_HOME'] except KeyError: try: JAVA_HOME = os.environ['JAVA_HOME'] except KeyError: raise RuntimeError(""" Please set JAVA_HOME in the env before running release tool On OSX use: export JAVA_HOME=`/usr/libexec/java_home -v '1.7*'`""") JAVA_ENV = 'export JAVA_HOME="%s" PATH="%s/bin:$PATH" JAVACMD="%s/bin/java"' % (JAVA_HOME, JAVA_HOME, JAVA_HOME) try: # make sure mvn3 is used if mvn3 is available # some systems use maven 2 as default subprocess.check_output('mvn3 --version', shell=True, stderr=subprocess.STDOUT) MVN = 'mvn3' except subprocess.CalledProcessError: MVN = 'mvn' def log(msg): f = open(LOG, mode='ab') f.write(('\n' + msg).encode('utf-8')) f.close() def run(command, quiet=False): log('%s: RUN: %s\n' % (datetime.datetime.now(), command)) if os.system('%s >> %s 2>&1' % (command, LOG)): msg = ' FAILED: %s [see log %s]' % (command, LOG) if not quiet: print(msg) raise RuntimeError(msg) def readServerOutput(p, startupEvent, failureEvent): try: while True: line = p.stdout.readline() if len(line) == 0: p.poll() if not startupEvent.isSet(): failureEvent.set() startupEvent.set() print('ES: **process exit**\n') break line = line.decode('utf-8').rstrip() if line.endswith('started') and not startupEvent.isSet(): startupEvent.set() print('ES: %s' % line) except: print() print('Exception reading Elasticsearch output:') traceback.print_exc() failureEvent.set() startupEvent.set() def send_request(conn, method, path, body=None): conn.request(method, path, body) res = conn.getresponse() if 200 > res.status > 300: raise RuntimeError('Expected HTTP status code between 200 and 299 but got %s' % res.status) return json.loads(res.read().decode("utf-8")) supported_es_version = ['2.0.0-SNAPSHOT'] version_url_matrix = [ {'version': '2.0.0-beta1-SNAPSHOT', 'host' : 'oss.sonatype.org', 'url': 'https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/elasticsearch-tar/2.0.0-beta1-SNAPSHOT/elasticsearch-tar-2.0.0-beta1-20150731.034255-32.tar.gz'} ] test_watch = """ { "trigger": { "schedule": { "interval": "1s" } }, "input": { "http": { "request": { "host": "localhost", "port": 9200, "path": "/_cluster/health" } } }, "condition": { "compare": { "ctx.payload.status": { "eq": "yellow" } } }, "actions": { "log": { "logging": { "text": "executed at {{ctx.execution_time}}" } } } } """ watch_history_query = """ { "query": { "term": { "watch_id": { "value": "cluster_health_watch" } } } } """ if __name__ == '__main__': if len(sys.argv) != 2: print('Specify the version of Watcher to be tested as an argument.') print('For example: `python3 x-dev-tools/smoke_test_watcher.py 2.0.0-beta1-SNAPSHOT`') exit() expected_watcher_version = sys.argv[1] print('Smoke testing Watcher version [%s] with the following es versions [%s]' % (expected_watcher_version, supported_es_version)) print('Building x-plugins (skipping tests)...') #run('%s; %s clean package -DskipTests' % (JAVA_ENV, MVN)) license_artifact = None for dirname, dirnames, filenames in os.walk('license/plugin/target/releases/'): for f in filenames: print(os.path.join(dirname, f)) if f.endswith('.zip'): license_artifact = os.path.abspath(os.path.join(dirname, f)) break if license_artifact is None: raise RuntimeError('could not find license release under license/plugin/target/releases/') watcher_artifact = None for dirname, dirnames, filenames in os.walk('watcher/target/releases/'): for f in filenames: print(os.path.join(dirname, f)) if f.endswith('.zip'): watcher_artifact = os.path.abspath(os.path.join(dirname, f)) break if watcher_artifact is None: raise RuntimeError('could not find Watcher release under watcher/target/releases/') base_tmp_dir = tempfile.mkdtemp() + '/watcher_smoker/' os.makedirs(base_tmp_dir) try: for version_url in version_url_matrix: es_version = version_url['version'] print('Testing watcher [%s] with elasticsearch [%s]' % (expected_watcher_version, es_version)) version_tmp_dir = '%s%s/' % (base_tmp_dir, es_version) os.makedirs(version_tmp_dir) download = version_tmp_dir + 'elasticsearch-%s.tar.gz' % (es_version) # conn = HTTPConnection(version_url['host']) # conn.set_debuglevel() # conn.request('GET', version_url['url']) # print('downloading %s to %s' % (version_url['url'], download)) # resp = conn.getresponse() # data = resp.read() # with open(download, 'wb') as f: # f.write(data) # conn.close() print('Downloading elasticsearch [%s]' % es_version) run('curl %s -o %s' % (version_url['url'], download)) run('tar -xzf %s -C %s' % (download, version_tmp_dir)) es_dir = version_tmp_dir + 'elasticsearch-%s/' % (es_version) print('Installing License plugin...') url = 'file://%s' % license_artifact run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/license', url)) url = 'file://%s' % watcher_artifact print('Installing Watcher plugin...') run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/watcher', url)) p = None try: print('Starting Elasticsearch...') env = os.environ.copy() env['JAVA_HOME'] = JAVA_HOME env['PATH'] = '%s/bin:%s' % (JAVA_HOME, env['PATH']) env['JAVA_CMD'] = '%s/bin/java' % JAVA_HOME startupEvent = threading.Event() failureEvent = threading.Event() p = subprocess.Popen(('%s/bin/elasticsearch' % es_dir, '-Des.node.name=watcher_smoke_tester', '-Des.cluster.name=watcher_smoke_tester' '-Des.discovery.zen.ping.multicast.enabled=false', '-Des.script.inline=on', '-Des.script.indexed=on'), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) thread = threading.Thread(target=readServerOutput, args=(p, startupEvent, failureEvent)) thread.setDaemon(True) thread.start() startupEvent.wait(5000) if failureEvent.isSet(): raise RuntimeError('ES failed to start') conn = HTTPConnection('127.0.0.1', 9200, 20) # TODO: fix version in watcher info api! # print('> Checking watcher info api...') # version = send_request(conn, 'GET', '/_watcher')['version']['name'] # if version != expected_watcher_version: # raise RuntimeError('Expected Watcher version %s but got %s' % (expected_watcher_version, version)) # print('> Successful!') print('> Checking watcher stats api...') watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state'] # we're too fast, lets wait and retry: if watcher_state == 'starting': time.sleep(5) watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state'] if watcher_state != 'started': raise RuntimeError('Expected watcher_state started but got %s' % watcher_state) watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count'] if watch_count != 0: raise RuntimeError('Expected watcher_count 0 but got %s' % watch_count) print('> Successful!') print('> Checking put watch api...') res_body = send_request(conn, 'PUT', '/_watcher/watch/cluster_health_watch', test_watch) if res_body['_version'] != 1: raise RuntimeError('Expected watch _version 1 but got %s' % res_body['_version']) if res_body['created'] != True: raise RuntimeError('Expected watch created True but got %s' % res_body['created']) watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count'] if watch_count != 1: raise RuntimeError('Expected watcher_count 1 but got %s' % watch_count) print('> Successful!') print('> Checking if watch actually fires...') time.sleep(5) send_request(conn, 'GET', '/.watch_history-*/_refresh') hit_count = send_request(conn, 'GET', '/.watch_history-*/_search')['hits']['total'] if hit_count == 0: raise RuntimeError('Expected hits.total 1 or higher but got %s' % hit_count) print('> Added test watch triggered %s history records...' % hit_count) print('> Successful!') print('> Checking delete watch api...') found = send_request(conn, 'DELETE', '/_watcher/watch/cluster_health_watch')['found'] if (found == False): raise RuntimeError('Expected found to be True but got %s' % found) print('> Successful!') print('> Smoke tester ran succesful with elastic version [%s]!' % es_version) finally: if p is not None: try: os.kill(p.pid, signal.SIGKILL) except ProcessLookupError: pass finally: shutil.rmtree(base_tmp_dir) print('> Smoke tester has successfully completed') print('> The following es version were test [%s] with Watcher version [%s]' % (supported_es_version, expected_watcher_version))