mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-30 03:48:54 +00:00
Tests: Add --all flag to create-bwc script to regenerate all indexes
It is currently a pain to regenerate all the bwc indexes when making a change to the generation script. This makes it a single command.
This commit is contained in:
parent
eb52febf0c
commit
e575c4ce53
@ -14,19 +14,18 @@
|
|||||||
# either express or implied. See the License for the specific
|
# either express or implied. See the License for the specific
|
||||||
# language governing permissions and limitations under the License.
|
# language governing permissions and limitations under the License.
|
||||||
|
|
||||||
import random
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import time
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import glob
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import re
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
if sys.version_info[0] > 2:
|
if sys.version_info[0] < 3:
|
||||||
print('%s must use python 2.x (for the ES python client)' % sys.argv[0])
|
print('%s must use python 3.x (for the ES python client)' % sys.argv[0])
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
try:
|
try:
|
||||||
@ -34,7 +33,7 @@ try:
|
|||||||
from elasticsearch.exceptions import ConnectionError
|
from elasticsearch.exceptions import ConnectionError
|
||||||
from elasticsearch.exceptions import TransportError
|
from elasticsearch.exceptions import TransportError
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
print('Can\'t import elasticsearch please install `sudo pip install elasticsearch`')
|
print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`')
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# sometimes returns True
|
# sometimes returns True
|
||||||
@ -80,7 +79,7 @@ def delete_by_query(es, version, index_name, doc_type):
|
|||||||
{'gte': 10,
|
{'gte': 10,
|
||||||
'lte': 20}}}}
|
'lte': 20}}}}
|
||||||
|
|
||||||
if version.startswith('0.90.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'):
|
if version.startswith('0.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'):
|
||||||
# TODO #10262: we can't write DBQ into the translog for these old versions until we fix this back-compat bug:
|
# TODO #10262: we can't write DBQ into the translog for these old versions until we fix this back-compat bug:
|
||||||
|
|
||||||
# #4074: these versions don't expect to see the top-level 'query' to count/delete_by_query:
|
# #4074: these versions don't expect to see the top-level 'query' to count/delete_by_query:
|
||||||
@ -157,9 +156,8 @@ def create_client(http_port, timeout=30):
|
|||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
assert False, 'Timed out waiting for node for %s seconds' % timeout
|
assert False, 'Timed out waiting for node for %s seconds' % timeout
|
||||||
|
|
||||||
def generate_index(client, version):
|
def generate_index(client, version, index_name):
|
||||||
name = 'index-%s' % version.lower()
|
client.indices.delete(index=index_name, ignore=404)
|
||||||
client.indices.delete(index=name, ignore=404)
|
|
||||||
num_shards = random.randint(1, 10)
|
num_shards = random.randint(1, 10)
|
||||||
num_replicas = random.randint(0, 1)
|
num_replicas = random.randint(0, 1)
|
||||||
logging.info('Create single shard test index')
|
logging.info('Create single shard test index')
|
||||||
@ -229,7 +227,7 @@ def generate_index(client, version):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
client.indices.create(index=name, body={
|
client.indices.create(index=index_name, body={
|
||||||
'settings': {
|
'settings': {
|
||||||
'number_of_shards': 1,
|
'number_of_shards': 1,
|
||||||
'number_of_replicas': 0
|
'number_of_replicas': 0
|
||||||
@ -244,19 +242,19 @@ def generate_index(client, version):
|
|||||||
# 1.1.0 is buggy and creates lots and lots of segments, so we create a
|
# 1.1.0 is buggy and creates lots and lots of segments, so we create a
|
||||||
# lighter index for it to keep bw tests reasonable
|
# lighter index for it to keep bw tests reasonable
|
||||||
# see https://github.com/elastic/elasticsearch/issues/5817
|
# see https://github.com/elastic/elasticsearch/issues/5817
|
||||||
num_docs = num_docs / 10
|
num_docs = int(num_docs / 10)
|
||||||
index_documents(client, name, 'doc', num_docs)
|
index_documents(client, index_name, 'doc', num_docs)
|
||||||
logging.info('Running basic asserts on the data added')
|
logging.info('Running basic asserts on the data added')
|
||||||
run_basic_asserts(client, name, 'doc', num_docs)
|
run_basic_asserts(client, index_name, 'doc', num_docs)
|
||||||
|
|
||||||
def snapshot_index(client, cfg):
|
def snapshot_index(client, cfg, version, repo_dir):
|
||||||
# Add bogus persistent settings to make sure they can be restored
|
# Add bogus persistent settings to make sure they can be restored
|
||||||
client.cluster.put_settings(body={
|
client.cluster.put_settings(body={
|
||||||
'persistent': {
|
'persistent': {
|
||||||
'cluster.routing.allocation.exclude.version_attr': cfg.version
|
'cluster.routing.allocation.exclude.version_attr': version
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
client.indices.put_template(name='template_' + cfg.version.lower(), order=0, body={
|
client.indices.put_template(name='template_' + version.lower(), order=0, body={
|
||||||
"template": "te*",
|
"template": "te*",
|
||||||
"settings": {
|
"settings": {
|
||||||
"number_of_shards" : 1
|
"number_of_shards" : 1
|
||||||
@ -270,7 +268,7 @@ def snapshot_index(client, cfg):
|
|||||||
"alias1": {},
|
"alias1": {},
|
||||||
"alias2": {
|
"alias2": {
|
||||||
"filter": {
|
"filter": {
|
||||||
"term": {"version" : cfg.version }
|
"term": {"version" : version }
|
||||||
},
|
},
|
||||||
"routing": "kimchy"
|
"routing": "kimchy"
|
||||||
},
|
},
|
||||||
@ -280,7 +278,7 @@ def snapshot_index(client, cfg):
|
|||||||
client.snapshot.create_repository(repository='test_repo', body={
|
client.snapshot.create_repository(repository='test_repo', body={
|
||||||
'type': 'fs',
|
'type': 'fs',
|
||||||
'settings': {
|
'settings': {
|
||||||
'location': cfg.repo_dir
|
'location': repo_dir
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
|
client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
|
||||||
@ -306,8 +304,11 @@ def compress(tmp_dir, output_dir, zipfile, directory):
|
|||||||
|
|
||||||
def parse_config():
|
def parse_config():
|
||||||
parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
|
parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
|
||||||
parser.add_argument('version', metavar='X.Y.Z',
|
required = parser.add_mutually_exclusive_group(required=True)
|
||||||
help='The elasticsearch version to build an index for')
|
required.add_argument('versions', metavar='X.Y.Z', nargs='*', default=[],
|
||||||
|
help='The elasticsearch version to build an index for')
|
||||||
|
required.add_argument('--all', action='store_true', default=False,
|
||||||
|
help='Recreate all existing backwards compatibility indexes')
|
||||||
parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR',
|
parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR',
|
||||||
help='The directory containing elasticsearch releases')
|
help='The directory containing elasticsearch releases')
|
||||||
parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat',
|
parser.add_argument('--output-dir', '-o', default='src/test/resources/org/elasticsearch/bwcompat',
|
||||||
@ -318,48 +319,59 @@ def parse_config():
|
|||||||
help='The port to use as the minimum port for HTTP communication')
|
help='The port to use as the minimum port for HTTP communication')
|
||||||
cfg = parser.parse_args()
|
cfg = parser.parse_args()
|
||||||
|
|
||||||
cfg.release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % cfg.version)
|
|
||||||
if not os.path.exists(cfg.release_dir):
|
|
||||||
parser.error('ES version %s does not exist in %s' % (cfg.version, cfg.releases_dir))
|
|
||||||
|
|
||||||
if not os.path.exists(cfg.output_dir):
|
if not os.path.exists(cfg.output_dir):
|
||||||
parser.error('Output directory does not exist: %s' % cfg.output_dir)
|
parser.error('Output directory does not exist: %s' % cfg.output_dir)
|
||||||
|
|
||||||
cfg.tmp_dir = tempfile.mkdtemp()
|
if not cfg.versions:
|
||||||
cfg.data_dir = os.path.join(cfg.tmp_dir, 'data')
|
# --all
|
||||||
cfg.repo_dir = os.path.join(cfg.tmp_dir, 'repo')
|
for bwc_index in glob.glob(os.path.join(cfg.output_dir, 'index-*.zip')):
|
||||||
logging.info('Temp data dir: %s' % cfg.data_dir)
|
version = os.path.basename(bwc_index)[len('index-'):-len('.zip')]
|
||||||
logging.info('Temp repo dir: %s' % cfg.repo_dir)
|
cfg.versions.append(version)
|
||||||
cfg.snapshot_supported = not (cfg.version.startswith('0.') or cfg.version == '1.0.0.Beta1')
|
|
||||||
|
|
||||||
return cfg
|
return cfg
|
||||||
|
|
||||||
|
def create_bwc_index(cfg, version):
|
||||||
|
logging.info('--> Creating bwc index for %s' % version)
|
||||||
|
release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
|
||||||
|
if not os.path.exists(release_dir):
|
||||||
|
parser.error('ES version %s does not exist in %s' % (version, cfg.releases_dir))
|
||||||
|
snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1')
|
||||||
|
tmp_dir = tempfile.mkdtemp()
|
||||||
|
data_dir = os.path.join(tmp_dir, 'data')
|
||||||
|
repo_dir = os.path.join(tmp_dir, 'repo')
|
||||||
|
logging.info('Temp data dir: %s' % data_dir)
|
||||||
|
logging.info('Temp repo dir: %s' % repo_dir)
|
||||||
|
|
||||||
|
try:
|
||||||
|
node = start_node(version, release_dir, data_dir, cfg.tcp_port, cfg.http_port)
|
||||||
|
client = create_client(cfg.http_port)
|
||||||
|
index_name = 'index-%s' % version.lower()
|
||||||
|
generate_index(client, version, index_name)
|
||||||
|
if snapshot_supported:
|
||||||
|
snapshot_index(client, cfg, version, repo_dir)
|
||||||
|
|
||||||
|
# 10067: get a delete-by-query into the translog on upgrade. We must do
|
||||||
|
# this after the snapshot, because it calls flush. Otherwise the index
|
||||||
|
# will already have the deletions applied on upgrade.
|
||||||
|
delete_by_query(client, version, index_name, 'doc')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if 'node' in vars():
|
||||||
|
logging.info('Shutting down node with pid %d', node.pid)
|
||||||
|
node.terminate()
|
||||||
|
time.sleep(1) # some nodes take time to terminate
|
||||||
|
compress_index(version, tmp_dir, cfg.output_dir)
|
||||||
|
if snapshot_supported:
|
||||||
|
compress_repo(version, tmp_dir, cfg.output_dir)
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
datefmt='%Y-%m-%d %I:%M:%S %p')
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
logging.getLogger('urllib3').setLevel(logging.WARN)
|
||||||
cfg = parse_config()
|
cfg = parse_config()
|
||||||
try:
|
for version in cfg.versions:
|
||||||
node = start_node(cfg.version, cfg.release_dir, cfg.data_dir, cfg.tcp_port, cfg.http_port)
|
create_bwc_index(cfg, version)
|
||||||
client = create_client(cfg.http_port)
|
|
||||||
generate_index(client, cfg.version)
|
|
||||||
if cfg.snapshot_supported:
|
|
||||||
snapshot_index(client, cfg)
|
|
||||||
|
|
||||||
# 10067: get a delete-by-query into the translog on upgrade. We must do
|
|
||||||
# this after the snapshot, because it calls flush. Otherwise the index
|
|
||||||
# will already have the deletions applied on upgrade.
|
|
||||||
delete_by_query(client, cfg.version, 'test', 'doc')
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if 'node' in vars():
|
|
||||||
logging.info('Shutting down node with pid %d', node.pid)
|
|
||||||
node.terminate()
|
|
||||||
time.sleep(1) # some nodes take time to terminate
|
|
||||||
compress_index(cfg.version, cfg.tmp_dir, cfg.output_dir)
|
|
||||||
if cfg.snapshot_supported:
|
|
||||||
compress_repo(cfg.version, cfg.tmp_dir, cfg.output_dir)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
try:
|
try:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user