Added watcher smoke tester

Original commit: elastic/x-pack-elasticsearch@1fc0ba79fb
This commit is contained in:
Martijn van Groningen 2015-07-31 13:35:16 +02:00
parent 63a04169a2
commit b1f7724790
1 changed files with 293 additions and 0 deletions

View File

@ -0,0 +1,293 @@
import datetime
import traceback
import json
import os
import shutil
import signal
import subprocess
import tempfile
import threading
import time
from http.client import HTTPConnection
import sys
LOG = os.environ.get('ES_SMOKE_TEST_PLUGINS_LOG', '/tmp/elasticsearch_smoke_test_plugins.log')
print('Logging to %s' % LOG)
if os.path.exists(LOG):
raise RuntimeError('please remove old log %s first' % LOG)
try:
JAVA_HOME = os.environ['JAVA7_HOME']
except KeyError:
try:
JAVA_HOME = os.environ['JAVA_HOME']
except KeyError:
raise RuntimeError("""
Please set JAVA_HOME in the env before running release tool
On OSX use: export JAVA_HOME=`/usr/libexec/java_home -v '1.7*'`""")
JAVA_ENV = 'export JAVA_HOME="%s" PATH="%s/bin:$PATH" JAVACMD="%s/bin/java"' % (JAVA_HOME, JAVA_HOME, JAVA_HOME)
try:
# make sure mvn3 is used if mvn3 is available
# some systems use maven 2 as default
subprocess.check_output('mvn3 --version', shell=True, stderr=subprocess.STDOUT)
MVN = 'mvn3'
except subprocess.CalledProcessError:
MVN = 'mvn'
def log(msg):
f = open(LOG, mode='ab')
f.write(('\n' + msg).encode('utf-8'))
f.close()
def run(command, quiet=False):
log('%s: RUN: %s\n' % (datetime.datetime.now(), command))
if os.system('%s >> %s 2>&1' % (command, LOG)):
msg = ' FAILED: %s [see log %s]' % (command, LOG)
if not quiet:
print(msg)
raise RuntimeError(msg)
def readServerOutput(p, startupEvent, failureEvent):
try:
while True:
line = p.stdout.readline()
if len(line) == 0:
p.poll()
if not startupEvent.isSet():
failureEvent.set()
startupEvent.set()
print('ES: **process exit**\n')
break
line = line.decode('utf-8').rstrip()
if line.endswith('started') and not startupEvent.isSet():
startupEvent.set()
print('ES: %s' % line)
except:
print()
print('Exception reading Elasticsearch output:')
traceback.print_exc()
failureEvent.set()
startupEvent.set()
def send_request(conn, method, path, body=None):
conn.request(method, path, body)
res = conn.getresponse()
if 200 > res.status > 300:
raise RuntimeError('Expected HTTP status code between 200 and 299 but got %s' % res.status)
return json.loads(res.read().decode("utf-8"))
supported_es_version = ['2.0.0-SNAPSHOT']
version_url_matrix = [
{'version': '2.0.0-beta1-SNAPSHOT', 'host' : 'oss.sonatype.org', 'url': 'https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/elasticsearch-tar/2.0.0-beta1-SNAPSHOT/elasticsearch-tar-2.0.0-beta1-20150731.034255-32.tar.gz'}
]
test_watch = """
{
"trigger": {
"schedule": {
"interval": "1s"
}
},
"input": {
"http": {
"request": {
"host": "localhost",
"port": 9200,
"path": "/_cluster/health"
}
}
},
"condition": {
"compare": {
"ctx.payload.status": {
"eq": "yellow"
}
}
},
"actions": {
"log": {
"logging": {
"text": "executed at {{ctx.execution_time}}"
}
}
}
}
"""
watch_history_query = """
{
"query": {
"term": {
"watch_id": {
"value": "cluster_health_watch"
}
}
}
}
"""
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Specify the version of Watcher to be tested as an argument.')
print('For example: `python3 x-dev-tools/smoke_test_watcher.py 2.0.0-beta1-SNAPSHOT`')
exit()
expected_watcher_version = sys.argv[1]
print('Smoke testing Watcher version [%s] with the following es versions [%s]' % (expected_watcher_version, supported_es_version))
print('Building x-plugins (skipping tests)...')
#run('%s; %s clean package -DskipTests' % (JAVA_ENV, MVN))
license_artifact = None
for dirname, dirnames, filenames in os.walk('license/plugin/target/releases/'):
for f in filenames:
print(os.path.join(dirname, f))
if f.endswith('.zip'):
license_artifact = os.path.abspath(os.path.join(dirname, f))
break
if license_artifact is None:
raise RuntimeError('could not find license release under license/plugin/target/releases/')
watcher_artifact = None
for dirname, dirnames, filenames in os.walk('watcher/target/releases/'):
for f in filenames:
print(os.path.join(dirname, f))
if f.endswith('.zip'):
watcher_artifact = os.path.abspath(os.path.join(dirname, f))
break
if watcher_artifact is None:
raise RuntimeError('could not find Watcher release under watcher/target/releases/')
base_tmp_dir = tempfile.mkdtemp() + '/watcher_smoker/'
os.makedirs(base_tmp_dir)
try:
for version_url in version_url_matrix:
es_version = version_url['version']
print('Testing watcher [%s] with elasticsearch [%s]' % (expected_watcher_version, es_version))
version_tmp_dir = '%s%s/' % (base_tmp_dir, es_version)
os.makedirs(version_tmp_dir)
download = version_tmp_dir + 'elasticsearch-%s.tar.gz' % (es_version)
# conn = HTTPConnection(version_url['host'])
# conn.set_debuglevel()
# conn.request('GET', version_url['url'])
# print('downloading %s to %s' % (version_url['url'], download))
# resp = conn.getresponse()
# data = resp.read()
# with open(download, 'wb') as f:
# f.write(data)
# conn.close()
print('Downloading elasticsearch [%s]' % es_version)
run('curl %s -o %s' % (version_url['url'], download))
run('tar -xzf %s -C %s' % (download, version_tmp_dir))
es_dir = version_tmp_dir + 'elasticsearch-%s/' % (es_version)
print('Installing License plugin...')
url = 'file://%s' % license_artifact
run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/license', url))
url = 'file://%s' % watcher_artifact
print('Installing Watcher plugin...')
run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/watcher', url))
p = None
try:
print('Starting Elasticsearch...')
env = os.environ.copy()
env['JAVA_HOME'] = JAVA_HOME
env['PATH'] = '%s/bin:%s' % (JAVA_HOME, env['PATH'])
env['JAVA_CMD'] = '%s/bin/java' % JAVA_HOME
startupEvent = threading.Event()
failureEvent = threading.Event()
p = subprocess.Popen(('%s/bin/elasticsearch' % es_dir,
'-Des.node.name=watcher_smoke_tester',
'-Des.cluster.name=watcher_smoke_tester'
'-Des.discovery.zen.ping.multicast.enabled=false',
'-Des.script.inline=on',
'-Des.script.indexed=on'),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env)
thread = threading.Thread(target=readServerOutput, args=(p, startupEvent, failureEvent))
thread.setDaemon(True)
thread.start()
startupEvent.wait(5000)
if failureEvent.isSet():
raise RuntimeError('ES failed to start')
conn = HTTPConnection('127.0.0.1', 9200, 20)
# TODO: fix version in watcher info api!
# print('> Checking watcher info api...')
# version = send_request(conn, 'GET', '/_watcher')['version']['name']
# if version != expected_watcher_version:
# raise RuntimeError('Expected Watcher version %s but got %s' % (expected_watcher_version, version))
# print('> Successful!')
print('> Checking watcher stats api...')
watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state']
# we're too fast, lets wait and retry:
if watcher_state == 'starting':
time.sleep(5)
watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state']
if watcher_state != 'started':
raise RuntimeError('Expected watcher_state started but got %s' % watcher_state)
watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count']
if watch_count != 0:
raise RuntimeError('Expected watcher_count 0 but got %s' % watch_count)
print('> Successful!')
print('> Checking put watch api...')
res_body = send_request(conn, 'PUT', '/_watcher/watch/cluster_health_watch', test_watch)
if res_body['_version'] != 1:
raise RuntimeError('Expected watch _version 1 but got %s' % res_body['_version'])
if res_body['created'] != True:
raise RuntimeError('Expected watch created True but got %s' % res_body['created'])
watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count']
if watch_count != 1:
raise RuntimeError('Expected watcher_count 1 but got %s' % watch_count)
print('> Successful!')
print('> Checking if watch actually fires...')
time.sleep(5)
send_request(conn, 'GET', '/.watch_history-*/_refresh')
hit_count = send_request(conn, 'GET', '/.watch_history-*/_search')['hits']['total']
if hit_count == 0:
raise RuntimeError('Expected hits.total 1 or higher but got %s' % hit_count)
print('> Added test watch triggered %s history records...' % hit_count)
print('> Successful!')
print('> Checking delete watch api...')
found = send_request(conn, 'DELETE', '/_watcher/watch/cluster_health_watch')['found']
if (found == False):
raise RuntimeError('Expected found to be True but got %s' % found)
print('> Successful!')
print('> Smoke tester ran succesful with elastic version [%s]!' % es_version)
finally:
if p is not None:
try:
os.kill(p.pid, signal.SIGKILL)
except ProcessLookupError:
pass
finally:
shutil.rmtree(base_tmp_dir)
print('> Smoke tester has successfully completed')
print('> The following es version were test [%s] with Watcher version [%s]' % (supported_es_version, expected_watcher_version))