OpenSearch/x-dev-tools/smoke_test_watcher.py

303 lines
11 KiB
Python

import datetime
import traceback
import json
import os
import shutil
import signal
import subprocess
import tempfile
import threading
import time
import xml.dom.minidom
from http.client import HTTPSConnection
from http.client import HTTPConnection
import sys
LOG = os.environ.get('ES_SMOKE_TEST_PLUGINS_LOG', '/tmp/elasticsearch_smoke_test_plugins.log')
print('Logging to %s' % LOG)
if os.path.exists(LOG):
raise RuntimeError('please remove old log %s first' % LOG)
try:
JAVA_HOME = os.environ['JAVA7_HOME']
except KeyError:
try:
JAVA_HOME = os.environ['JAVA_HOME']
except KeyError:
raise RuntimeError("""
Please set JAVA_HOME in the env before running release tool
On OSX use: export JAVA_HOME=`/usr/libexec/java_home -v '1.7*'`""")
JAVA_ENV = 'export JAVA_HOME="%s" PATH="%s/bin:$PATH" JAVACMD="%s/bin/java"' % (JAVA_HOME, JAVA_HOME, JAVA_HOME)
try:
# make sure mvn3 is used if mvn3 is available
# some systems use maven 2 as default
subprocess.check_output('mvn3 --version', shell=True, stderr=subprocess.STDOUT)
MVN = 'mvn3'
except subprocess.CalledProcessError:
MVN = 'mvn'
def log(msg):
f = open(LOG, mode='ab')
f.write(('\n' + msg).encode('utf-8'))
f.close()
def run(command, quiet=False):
log('%s: RUN: %s\n' % (datetime.datetime.now(), command))
if os.system('%s >> %s 2>&1' % (command, LOG)):
msg = ' FAILED: %s [see log %s]' % (command, LOG)
if not quiet:
print(msg)
raise RuntimeError(msg)
def readServerOutput(p, startupEvent, failureEvent):
try:
while True:
line = p.stdout.readline()
if len(line) == 0:
p.poll()
if not startupEvent.isSet():
failureEvent.set()
startupEvent.set()
print('ES: **process exit**\n')
break
line = line.decode('utf-8').rstrip()
if line.endswith('started') and not startupEvent.isSet():
startupEvent.set()
print('ES: %s' % line)
except:
print()
print('Exception reading Elasticsearch output:')
traceback.print_exc()
failureEvent.set()
startupEvent.set()
def send_request(conn, method, path, body=None):
conn.request(method, path, body)
res = conn.getresponse()
if 200 > res.status > 300:
raise RuntimeError('Expected HTTP status code between 200 and 299 but got %s' % res.status)
return json.loads(res.read().decode("utf-8"))
supported_es_version = ['2.0.0-SNAPSHOT']
conn = HTTPSConnection('oss.sonatype.org')
conn.request('GET', 'https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/elasticsearch-tar/2.0.0-beta1-SNAPSHOT/maven-metadata.xml')
res = conn.getresponse()
dom = xml.dom.minidom.parseString(res.read().decode("utf-8"))
snapshotVersion = dom.getElementsByTagName("value")[0].firstChild.nodeValue
snapshot_url = 'https://oss.sonatype.org/content/repositories/snapshots/org/elasticsearch/distribution/elasticsearch-tar/2.0.0-beta1-SNAPSHOT/elasticsearch-tar-%s.tar.gz' % snapshotVersion
conn.close()
print("Using the following url for the snapshot version: %s" % snapshot_url)
version_url_matrix = [
{'version': '2.0.0-beta1-SNAPSHOT', 'host' : 'oss.sonatype.org', 'url': snapshot_url}
]
test_watch = """
{
"trigger": {
"schedule": {
"interval": "1s"
}
},
"input": {
"http": {
"request": {
"host": "localhost",
"port": 9200,
"path": "/_cluster/health"
}
}
},
"condition": {
"compare": {
"ctx.payload.status": {
"eq": "yellow"
}
}
},
"actions": {
"log": {
"logging": {
"text": "executed at {{ctx.execution_time}}"
}
}
}
}
"""
watch_history_query = """
{
"query": {
"term": {
"watch_id": {
"value": "cluster_health_watch"
}
}
}
}
"""
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Specify the version of Watcher to be tested as an argument.')
print('For example: `python3 x-dev-tools/smoke_test_watcher.py 2.0.0-beta1-SNAPSHOT`')
exit()
expected_watcher_version = sys.argv[1]
print('Smoke testing Watcher version [%s] with the following es versions [%s]' % (expected_watcher_version, supported_es_version))
print('Building x-plugins (skipping tests)...')
#run('%s; %s clean package -DskipTests' % (JAVA_ENV, MVN))
license_artifact = None
for dirname, dirnames, filenames in os.walk('license/plugin/target/releases/'):
for f in filenames:
print(os.path.join(dirname, f))
if f.endswith('.zip'):
license_artifact = os.path.abspath(os.path.join(dirname, f))
break
if license_artifact is None:
raise RuntimeError('could not find license release under license/plugin/target/releases/')
watcher_artifact = None
for dirname, dirnames, filenames in os.walk('watcher/target/releases/'):
for f in filenames:
print(os.path.join(dirname, f))
if f.endswith('.zip'):
watcher_artifact = os.path.abspath(os.path.join(dirname, f))
break
if watcher_artifact is None:
raise RuntimeError('could not find Watcher release under watcher/target/releases/')
base_tmp_dir = tempfile.mkdtemp() + '/watcher_smoker/'
os.makedirs(base_tmp_dir)
try:
for version_url in version_url_matrix:
es_version = version_url['version']
print('Testing watcher [%s] with elasticsearch [%s]' % (expected_watcher_version, es_version))
version_tmp_dir = '%s%s/' % (base_tmp_dir, es_version)
os.makedirs(version_tmp_dir)
download = version_tmp_dir + 'elasticsearch-%s.tar.gz' % (es_version)
conn = HTTPSConnection(version_url['host'])
conn.request('GET', version_url['url'])
print('downloading %s to %s' % (version_url['url'], download))
resp = conn.getresponse()
data = resp.read()
with open(download, 'wb') as f:
f.write(data)
conn.close()
print('Downloading elasticsearch [%s]' % es_version)
run('tar -xzf %s -C %s' % (download, version_tmp_dir))
es_dir = version_tmp_dir + 'elasticsearch-%s/' % (es_version)
print('Installing License plugin...')
url = 'file://%s' % license_artifact
run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/license', url))
url = 'file://%s' % watcher_artifact
print('Installing Watcher plugin...')
run('%s; %s install %s --url %s' % (JAVA_ENV, es_dir + 'bin/plugin', 'elasticsearch/watcher', url))
p = None
try:
print('Starting Elasticsearch...')
env = os.environ.copy()
env['JAVA_HOME'] = JAVA_HOME
env['PATH'] = '%s/bin:%s' % (JAVA_HOME, env['PATH'])
env['JAVA_CMD'] = '%s/bin/java' % JAVA_HOME
startupEvent = threading.Event()
failureEvent = threading.Event()
p = subprocess.Popen(('%s/bin/elasticsearch' % es_dir,
'-Des.node.name=watcher_smoke_tester',
'-Des.cluster.name=watcher_smoke_tester'
'-Des.discovery.zen.ping.multicast.enabled=false',
'-Des.script.inline=on',
'-Des.script.indexed=on'),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env)
thread = threading.Thread(target=readServerOutput, args=(p, startupEvent, failureEvent))
thread.setDaemon(True)
thread.start()
startupEvent.wait(5000)
if failureEvent.isSet():
raise RuntimeError('ES failed to start')
conn = HTTPConnection('127.0.0.1', 9200, 20)
# TODO: fix version in watcher info api!
# print('> Checking watcher info api...')
# version = send_request(conn, 'GET', '/_watcher')['version']['name']
# if version != expected_watcher_version:
# raise RuntimeError('Expected Watcher version %s but got %s' % (expected_watcher_version, version))
# print('> Successful!')
print('> Checking watcher stats api...')
watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state']
# we're too fast, lets wait and retry:
if watcher_state == 'starting':
time.sleep(5)
watcher_state = send_request(conn, 'GET', '/_watcher/stats')['watcher_state']
if watcher_state != 'started':
raise RuntimeError('Expected watcher_state started but got %s' % watcher_state)
watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count']
if watch_count != 0:
raise RuntimeError('Expected watcher_count 0 but got %s' % watch_count)
print('> Successful!')
print('> Checking put watch api...')
res_body = send_request(conn, 'PUT', '/_watcher/watch/cluster_health_watch', test_watch)
if res_body['_version'] != 1:
raise RuntimeError('Expected watch _version 1 but got %s' % res_body['_version'])
if res_body['created'] != True:
raise RuntimeError('Expected watch created True but got %s' % res_body['created'])
watch_count = send_request(conn, 'GET', '/_watcher/stats')['watch_count']
if watch_count != 1:
raise RuntimeError('Expected watcher_count 1 but got %s' % watch_count)
print('> Successful!')
print('> Checking if watch actually fires...')
time.sleep(5)
send_request(conn, 'GET', '/.watch_history-*/_refresh')
hit_count = send_request(conn, 'GET', '/.watch_history-*/_search')['hits']['total']
if hit_count == 0:
raise RuntimeError('Expected hits.total 1 or higher but got %s' % hit_count)
print('> Added test watch triggered %s history records...' % hit_count)
print('> Successful!')
print('> Checking delete watch api...')
found = send_request(conn, 'DELETE', '/_watcher/watch/cluster_health_watch')['found']
if (found == False):
raise RuntimeError('Expected found to be True but got %s' % found)
print('> Successful!')
print('> Smoke tester ran succesful with elastic version [%s]!' % es_version)
conn.close()
finally:
if p is not None:
try:
os.kill(p.pid, signal.SIGKILL)
except ProcessLookupError:
pass
finally:
shutil.rmtree(base_tmp_dir)
print('> Smoke tester has successfully completed')
print('> The following es version were test [%s] with Watcher version [%s]' % (supported_es_version, expected_watcher_version))