#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import argparse import base64 import json import re import sys import time import urllib.request, urllib.error, urllib.parse import urllib.parse # Read a file. Returns string. def read_task_file(args): with open(args.file, 'r') as f: contents = f.read() # We don't use the parsed data, but we want to throw early if it's invalid try: json.loads(contents) except Exception as e: sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(e))) sys.exit(1) return contents def add_basic_auth_header(args, req): if (args.user is not None): basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password)) req.add_header("Authorization", "Basic %s" % basic_auth_encoded) # Keep trying until timeout_at, maybe die then. Returns bytes. def post_task(args, task_json, timeout_at): try: url = args.url.rstrip("/") + "/druid/indexer/v1/task" req = urllib.request.Request(url, task_json, {'Content-Type' : 'application/json'}) add_basic_auth_header(args, req) timeleft = timeout_at - time.time() response_timeout = min(max(timeleft, 5), 10) response = urllib.request.urlopen(req, None, response_timeout) return response.read().rstrip() except urllib.error.URLError as e: if isinstance(e, urllib.error.HTTPError) and e.code >= 400 and e.code <= 500: # 4xx (problem with the request) or 500 (something wrong on the server) raise_friendly_error(e) elif time.time() >= timeout_at: # No futher retries raise_friendly_error(e) elif isinstance(e, urllib.error.HTTPError) and e.code in [301, 302, 303, 305, 307] and \ e.info().getheader("Location") is not None: # Set the new location in args.url so it can be used by await_task_completion and re-issue the request location = urllib.parse.urlparse(e.info().getheader("Location")) args.url = "{0}://{1}".format(location.scheme, location.netloc) sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url)) return post_task(args, task_json, timeout_at) else: # If at first you don't succeed, try, try again! sleep_time = 5 if not args.quiet: extra = '' if hasattr(e, 'read'): extra = e.read().rstrip() sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip()) sys.stderr.write("\n") time.sleep(sleep_time) return post_task(args, task_json, timeout_at) # Keep trying until timeout_at, maybe die then def await_task_completion(args, task_id, timeout_at): while True: url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id) req = urllib.request.Request(url) add_basic_auth_header(args, req) timeleft = timeout_at - time.time() response_timeout = min(max(timeleft, 5), 10) response = urllib.request.urlopen(req, None, response_timeout) response_obj = json.loads(response.read().decode('utf-8')) response_status_code = response_obj["status"]["statusCode"] if response_status_code in ['SUCCESS', 'FAILED']: return response_status_code else: if time.time() < timeout_at: if not args.quiet: sys.stderr.write("Task {0} still running...\n".format(task_id)) timeleft = timeout_at - time.time() time.sleep(min(5, timeleft)) else: raise Exception("Task {0} did not finish in time!".format(task_id)) def raise_friendly_error(e): if isinstance(e, urllib.error.HTTPError): text = e.read().strip() reresult = re.search(r'
(.*?)
', text, re.DOTALL) if reresult: text = reresult.group(1).strip() raise Exception("HTTP Error {0}: {1}, check overlord log for more details.\n{2}".format(e.code, e.reason, text)) raise e def await_load_completion(args, datasource, timeout_at): while True: url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus" req = urllib.request.Request(url) add_basic_auth_header(args, req) timeleft = timeout_at - time.time() response_timeout = min(max(timeleft, 5), 10) response = urllib.request.urlopen(req, None, response_timeout) response_obj = json.loads(response.read().decode('utf-8')) load_status = response_obj.get(datasource, 0.0) if load_status >= 100.0: sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource)) return else: if time.time() < timeout_at: if not args.quiet: sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status)) timeleft = timeout_at - time.time() time.sleep(min(5, timeleft)) else: raise Exception("{0} was not loaded in time!".format(datasource)) def main(): parser = argparse.ArgumentParser(description='Post Druid indexing tasks.') parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url') parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url') parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file') parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks') parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks') parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load') parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors') parser.add_argument('--user', type=str, default=None, help='Basic auth username') parser.add_argument('--password', type=str, default=None, help='Basic auth password') args = parser.parse_args() submit_timeout_at = time.time() + args.submit_timeout complete_timeout_at = time.time() + args.complete_timeout task_contents = read_task_file(args) task_json = json.loads(task_contents) if task_json['type'] == "compact": datasource = task_json['dataSource'] else: datasource = task_json["spec"]["dataSchema"]["dataSource"] sys.stderr.write("Beginning indexing data for {0}\n".format(datasource)) task_id = json.loads(post_task(args, task_contents.encode(), submit_timeout_at).decode('utf-8'))["task"] sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id)) sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id)) sys.stderr.write('\033[1m' + "Task status: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id)) task_status = await_task_completion(args, task_id, complete_timeout_at) sys.stderr.write("Task finished with status: {0}\n".format(task_status)) if task_status != 'SUCCESS': sys.exit(1) sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource)) load_timeout_at = time.time() + args.load_timeout await_load_completion(args, datasource, load_timeout_at) try: main() except KeyboardInterrupt: sys.exit(1)