diff --git a/dev-tools/create-bwc-index.py b/dev-tools/create-bwc-index.py index ae24fd3b4bc..6cdf9b36fa3 100644 --- a/dev-tools/create-bwc-index.py +++ b/dev-tools/create-bwc-index.py @@ -14,19 +14,18 @@ # either express or implied. See the License for the specific # language governing permissions and limitations under the License. -import random -import os -import tempfile -import shutil -import subprocess -import time import argparse +import glob import logging +import os +import random +import subprocess import sys -import re +import tempfile +import time -if sys.version_info[0] > 2: - print('%s must use python 2.x (for the ES python client)' % sys.argv[0]) +if sys.version_info[0] < 3: + print('%s must use python 3.x (for the ES python client)' % sys.argv[0]) from datetime import datetime try: @@ -34,7 +33,7 @@ try: from elasticsearch.exceptions import ConnectionError from elasticsearch.exceptions import TransportError except ImportError as e: - print('Can\'t import elasticsearch please install `sudo pip install elasticsearch`') + print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`') sys.exit(1) # sometimes returns True @@ -80,7 +79,7 @@ def delete_by_query(es, version, index_name, doc_type): {'gte': 10, 'lte': 20}}}} - if version.startswith('0.90.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'): + if version.startswith('0.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'): # TODO #10262: we can't write DBQ into the translog for these old versions until we fix this back-compat bug: # #4074: these versions don't expect to see the top-level 'query' to count/delete_by_query: @@ -157,9 +156,8 @@ def create_client(http_port, timeout=30): time.sleep(1) assert False, 'Timed out waiting for node for %s seconds' % timeout -def generate_index(client, version): - name = 'index-%s' % version.lower() - client.indices.delete(index=name, ignore=404) +def generate_index(client, version, index_name): + client.indices.delete(index=index_name, ignore=404) num_shards = random.randint(1, 10) num_replicas = random.randint(0, 1) logging.info('Create single shard test index') @@ -229,7 +227,7 @@ def generate_index(client, version): } } - client.indices.create(index=name, body={ + client.indices.create(index=index_name, body={ 'settings': { 'number_of_shards': 1, 'number_of_replicas': 0 @@ -244,19 +242,19 @@ def generate_index(client, version): # 1.1.0 is buggy and creates lots and lots of segments, so we create a # lighter index for it to keep bw tests reasonable # see https://github.com/elastic/elasticsearch/issues/5817 - num_docs = num_docs / 10 - index_documents(client, name, 'doc', num_docs) + num_docs = int(num_docs / 10) + index_documents(client, index_name, 'doc', num_docs) logging.info('Running basic asserts on the data added') - run_basic_asserts(client, name, 'doc', num_docs) + run_basic_asserts(client, index_name, 'doc', num_docs) -def snapshot_index(client, cfg): +def snapshot_index(client, cfg, version, repo_dir): # Add bogus persistent settings to make sure they can be restored client.cluster.put_settings(body={ 'persistent': { - 'cluster.routing.allocation.exclude.version_attr': cfg.version + 'cluster.routing.allocation.exclude.version_attr': version } }) - client.indices.put_template(name='template_' + cfg.version.lower(), order=0, body={ + client.indices.put_template(name='template_' + version.lower(), order=0, body={ "template": "te*", "settings": { "number_of_shards" : 1 @@ -270,7 +268,7 @@ def snapshot_index(client, cfg): "alias1": {}, "alias2": { "filter": { - "term": {"version" : cfg.version } + "term": {"version" : version } }, "routing": "kimchy" }, @@ -280,7 +278,7 @@ def snapshot_index(client, cfg): client.snapshot.create_repository(repository='test_repo', body={ 'type': 'fs', 'settings': { - 'location': cfg.repo_dir + 'location': repo_dir } }) client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True) @@ -306,8 +304,11 @@ def compress(tmp_dir, output_dir, zipfile, directory): def parse_config(): parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests') - parser.add_argument('version', metavar='X.Y.Z', - help='The elasticsearch version to build an index for') + required = parser.add_mutually_exclusive_group(required=True) + required.add_argument('versions', metavar='X.Y.Z', nargs='*', default=[], + help='The elasticsearch version to build an index for') + required.add_argument('--all', action='store_true', default=False, + help='Recreate all existing backwards compatibility indexes') parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR', help='The directory containing elasticsearch releases') parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat', @@ -318,48 +319,59 @@ def parse_config(): help='The port to use as the minimum port for HTTP communication') cfg = parser.parse_args() - cfg.release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % cfg.version) - if not os.path.exists(cfg.release_dir): - parser.error('ES version %s does not exist in %s' % (cfg.version, cfg.releases_dir)) - if not os.path.exists(cfg.output_dir): parser.error('Output directory does not exist: %s' % cfg.output_dir) - cfg.tmp_dir = tempfile.mkdtemp() - cfg.data_dir = os.path.join(cfg.tmp_dir, 'data') - cfg.repo_dir = os.path.join(cfg.tmp_dir, 'repo') - logging.info('Temp data dir: %s' % cfg.data_dir) - logging.info('Temp repo dir: %s' % cfg.repo_dir) - cfg.snapshot_supported = not (cfg.version.startswith('0.') or cfg.version == '1.0.0.Beta1') + if not cfg.versions: + # --all + for bwc_index in glob.glob(os.path.join(cfg.output_dir, 'index-*.zip')): + version = os.path.basename(bwc_index)[len('index-'):-len('.zip')] + cfg.versions.append(version) return cfg +def create_bwc_index(cfg, version): + logging.info('--> Creating bwc index for %s' % version) + release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version) + if not os.path.exists(release_dir): + parser.error('ES version %s does not exist in %s' % (version, cfg.releases_dir)) + snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1') + tmp_dir = tempfile.mkdtemp() + data_dir = os.path.join(tmp_dir, 'data') + repo_dir = os.path.join(tmp_dir, 'repo') + logging.info('Temp data dir: %s' % data_dir) + logging.info('Temp repo dir: %s' % repo_dir) + + try: + node = start_node(version, release_dir, data_dir, cfg.tcp_port, cfg.http_port) + client = create_client(cfg.http_port) + index_name = 'index-%s' % version.lower() + generate_index(client, version, index_name) + if snapshot_supported: + snapshot_index(client, cfg, version, repo_dir) + + # 10067: get a delete-by-query into the translog on upgrade. We must do + # this after the snapshot, because it calls flush. Otherwise the index + # will already have the deletions applied on upgrade. + delete_by_query(client, version, index_name, 'doc') + + finally: + if 'node' in vars(): + logging.info('Shutting down node with pid %d', node.pid) + node.terminate() + time.sleep(1) # some nodes take time to terminate + compress_index(version, tmp_dir, cfg.output_dir) + if snapshot_supported: + compress_repo(version, tmp_dir, cfg.output_dir) + def main(): logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %I:%M:%S %p') logging.getLogger('elasticsearch').setLevel(logging.ERROR) logging.getLogger('urllib3').setLevel(logging.WARN) cfg = parse_config() - try: - node = start_node(cfg.version, cfg.release_dir, cfg.data_dir, cfg.tcp_port, cfg.http_port) - client = create_client(cfg.http_port) - generate_index(client, cfg.version) - if cfg.snapshot_supported: - snapshot_index(client, cfg) - - # 10067: get a delete-by-query into the translog on upgrade. We must do - # this after the snapshot, because it calls flush. Otherwise the index - # will already have the deletions applied on upgrade. - delete_by_query(client, cfg.version, 'test', 'doc') - - finally: - if 'node' in vars(): - logging.info('Shutting down node with pid %d', node.pid) - node.terminate() - time.sleep(1) # some nodes take time to terminate - compress_index(cfg.version, cfg.tmp_dir, cfg.output_dir) - if cfg.snapshot_supported: - compress_repo(cfg.version, cfg.tmp_dir, cfg.output_dir) + for version in cfg.versions: + create_bwc_index(cfg, version) if __name__ == '__main__': try: