Removed static indices and repos and the scripts that create them.
Two tests were still using the static indices: * IndexFolderUpgraderTests#testUpgradeRealIndex() * InternalEngineTests#testUpgradeOldIndex() I removed these tests too, because these tests functionally overlap with the full-cluster-restart qa tests. Relates to #24939
This commit is contained in:
parent
20b7258d41
commit
0e5460324c
|
@ -19,9 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.common.util;
|
package org.elasticsearch.common.util;
|
||||||
|
|
||||||
import org.apache.lucene.util.CollectionUtil;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase;
|
import org.apache.lucene.util.LuceneTestCase;
|
||||||
import org.apache.lucene.util.TestUtil;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.AllocationId;
|
import org.elasticsearch.cluster.routing.AllocationId;
|
||||||
|
@ -32,32 +30,23 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.gateway.MetaDataStateFormat;
|
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.index.shard.ShardPath;
|
import org.elasticsearch.index.shard.ShardPath;
|
||||||
import org.elasticsearch.index.shard.ShardStateMetaData;
|
import org.elasticsearch.index.shard.ShardStateMetaData;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.OldIndexUtils;
|
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.DirectoryStream;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
|
||||||
public class IndexFolderUpgraderTests extends ESTestCase {
|
public class IndexFolderUpgraderTests extends ESTestCase {
|
||||||
|
@ -181,68 +170,6 @@ public class IndexFolderUpgraderTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Run upgrade on a real bwc index
|
|
||||||
*/
|
|
||||||
public void testUpgradeRealIndex() throws IOException, URISyntaxException {
|
|
||||||
List<Path> indexes = new ArrayList<>();
|
|
||||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(getBwcIndicesPath(), "index-*.zip")) {
|
|
||||||
for (Path path : stream) {
|
|
||||||
indexes.add(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CollectionUtil.introSort(indexes, (o1, o2) -> o1.getFileName().compareTo(o2.getFileName()));
|
|
||||||
final Path path = randomFrom(indexes);
|
|
||||||
final String indexName = path.getFileName().toString().replace(".zip", "").toLowerCase(Locale.ROOT);
|
|
||||||
try (NodeEnvironment nodeEnvironment = newNodeEnvironment()) {
|
|
||||||
Path unzipDir = createTempDir();
|
|
||||||
Path unzipDataDir = unzipDir.resolve("data");
|
|
||||||
// decompress the index
|
|
||||||
try (InputStream stream = Files.newInputStream(path)) {
|
|
||||||
TestUtil.unzip(stream, unzipDir);
|
|
||||||
}
|
|
||||||
// check it is unique
|
|
||||||
assertTrue(Files.exists(unzipDataDir));
|
|
||||||
Path[] list = FileSystemUtils.files(unzipDataDir);
|
|
||||||
if (list.length != 1) {
|
|
||||||
throw new IllegalStateException("Backwards index must contain exactly one cluster but was " + list.length);
|
|
||||||
}
|
|
||||||
// the bwc scripts packs the indices under this path
|
|
||||||
Path src = OldIndexUtils.getIndexDir(logger, indexName, path.getFileName().toString(), list[0]);
|
|
||||||
assertTrue("[" + path + "] missing index dir: " + src.toString(), Files.exists(src));
|
|
||||||
final Path indicesPath = randomFrom(nodeEnvironment.nodePaths()).indicesPath;
|
|
||||||
logger.info("--> injecting index [{}] into [{}]", indexName, indicesPath);
|
|
||||||
OldIndexUtils.copyIndex(logger, src, src.getFileName().toString(), indicesPath);
|
|
||||||
IndexFolderUpgrader.upgradeIndicesIfNeeded(Settings.EMPTY, nodeEnvironment);
|
|
||||||
|
|
||||||
// ensure old index folder is deleted
|
|
||||||
Set<String> indexFolders = nodeEnvironment.availableIndexFolders();
|
|
||||||
assertEquals(indexFolders.size(), 1);
|
|
||||||
|
|
||||||
// ensure index metadata is moved
|
|
||||||
IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
|
|
||||||
nodeEnvironment.resolveIndexFolder(indexFolders.iterator().next()));
|
|
||||||
assertNotNull(indexMetaData);
|
|
||||||
Index index = indexMetaData.getIndex();
|
|
||||||
assertEquals(index.getName(), indexName);
|
|
||||||
|
|
||||||
Set<ShardId> shardIds = nodeEnvironment.findAllShardIds(index);
|
|
||||||
// ensure all shards are moved
|
|
||||||
assertEquals(shardIds.size(), indexMetaData.getNumberOfShards());
|
|
||||||
for (ShardId shardId : shardIds) {
|
|
||||||
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnvironment, shardId,
|
|
||||||
new IndexSettings(indexMetaData, Settings.EMPTY));
|
|
||||||
final Path translog = shardPath.resolveTranslog();
|
|
||||||
final Path idx = shardPath.resolveIndex();
|
|
||||||
final Path state = shardPath.getShardStatePath().resolve(MetaDataStateFormat.STATE_DIR_NAME);
|
|
||||||
assertTrue(shardPath.exists());
|
|
||||||
assertTrue(Files.exists(translog));
|
|
||||||
assertTrue(Files.exists(idx));
|
|
||||||
assertTrue(Files.exists(state));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testNeedsUpgrade() throws IOException {
|
public void testNeedsUpgrade() throws IOException {
|
||||||
final Index index = new Index("foo", UUIDs.randomBase64UUID());
|
final Index index = new Index("foo", UUIDs.randomBase64UUID());
|
||||||
IndexMetaData indexState = IndexMetaData.builder(index.getName())
|
IndexMetaData indexState = IndexMetaData.builder(index.getName())
|
||||||
|
|
|
@ -67,7 +67,6 @@ import org.apache.lucene.util.Bits;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.FixedBitSet;
|
import org.apache.lucene.util.FixedBitSet;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.apache.lucene.util.TestUtil;
|
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
@ -83,7 +82,6 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.uid.Versions;
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
|
@ -120,7 +118,6 @@ import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||||
import org.elasticsearch.index.mapper.Uid;
|
import org.elasticsearch.index.mapper.Uid;
|
||||||
import org.elasticsearch.index.mapper.UidFieldMapper;
|
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||||
|
@ -138,7 +135,6 @@ import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||||
import org.elasticsearch.test.DummyShardLock;
|
import org.elasticsearch.test.DummyShardLock;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.IndexSettingsModule;
|
import org.elasticsearch.test.IndexSettingsModule;
|
||||||
import org.elasticsearch.test.OldIndexUtils;
|
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.hamcrest.MatcherAssert;
|
import org.hamcrest.MatcherAssert;
|
||||||
|
@ -146,10 +142,8 @@ import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.UncheckedIOException;
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.nio.file.DirectoryStream;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -161,7 +155,6 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -2592,93 +2585,6 @@ public class InternalEngineTests extends ESTestCase {
|
||||||
return new Mapping(Version.CURRENT, root, new MetadataFieldMapper[0], emptyMap());
|
return new Mapping(Version.CURRENT, root, new MetadataFieldMapper[0], emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUpgradeOldIndex() throws IOException {
|
|
||||||
List<Path> indexes = new ArrayList<>();
|
|
||||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(getBwcIndicesPath(), "index-*.zip")) {
|
|
||||||
for (Path path : stream) {
|
|
||||||
indexes.add(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Collections.shuffle(indexes, random());
|
|
||||||
for (Path indexFile : indexes.subList(0, scaledRandomIntBetween(1, indexes.size() / 2))) {
|
|
||||||
final String indexName = indexFile.getFileName().toString().replace(".zip", "").toLowerCase(Locale.ROOT);
|
|
||||||
Path unzipDir = createTempDir();
|
|
||||||
Path unzipDataDir = unzipDir.resolve("data");
|
|
||||||
// decompress the index
|
|
||||||
try (InputStream stream = Files.newInputStream(indexFile)) {
|
|
||||||
TestUtil.unzip(stream, unzipDir);
|
|
||||||
}
|
|
||||||
// check it is unique
|
|
||||||
assertTrue(Files.exists(unzipDataDir));
|
|
||||||
Path[] list = filterExtraFSFiles(FileSystemUtils.files(unzipDataDir));
|
|
||||||
|
|
||||||
if (list.length != 1) {
|
|
||||||
throw new IllegalStateException("Backwards index must contain exactly one cluster but was " + list.length
|
|
||||||
+ " " + Arrays.toString(list));
|
|
||||||
}
|
|
||||||
|
|
||||||
// the bwc scripts packs the indices under this path
|
|
||||||
Path src = OldIndexUtils.getIndexDir(logger, indexName, indexFile.toString(), list[0]);
|
|
||||||
Path translog = src.resolve("0").resolve("translog");
|
|
||||||
assertTrue("[" + indexFile + "] missing translog dir: " + translog.toString(), Files.exists(translog));
|
|
||||||
Path[] tlogFiles = filterExtraFSFiles(FileSystemUtils.files(translog));
|
|
||||||
assertEquals(Arrays.toString(tlogFiles), tlogFiles.length, 2); // ckp & tlog
|
|
||||||
Path tlogFile = tlogFiles[0].getFileName().toString().endsWith("tlog") ? tlogFiles[0] : tlogFiles[1];
|
|
||||||
final long size = Files.size(tlogFile);
|
|
||||||
logger.debug("upgrading index {} file: {} size: {}", indexName, tlogFiles[0].getFileName(), size);
|
|
||||||
Directory directory = newFSDirectory(src.resolve("0").resolve("index"));
|
|
||||||
final IndexMetaData indexMetaData = IndexMetaData.FORMAT.loadLatestState(logger, xContentRegistry(), src);
|
|
||||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
|
|
||||||
final Store store = createStore(indexSettings, directory);
|
|
||||||
final int iters = randomIntBetween(0, 2);
|
|
||||||
int numDocs = -1;
|
|
||||||
for (int i = 0; i < iters; i++) { // make sure we can restart on an upgraded index
|
|
||||||
try (InternalEngine engine = createEngine(indexSettings, store, translog, newMergePolicy())) {
|
|
||||||
try (Searcher searcher = engine.acquireSearcher("test")) {
|
|
||||||
if (i > 0) {
|
|
||||||
assertEquals(numDocs, searcher.reader().numDocs());
|
|
||||||
}
|
|
||||||
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 1);
|
|
||||||
numDocs = searcher.reader().numDocs();
|
|
||||||
assertTrue(search.totalHits > 1);
|
|
||||||
}
|
|
||||||
CommitStats commitStats = engine.commitStats();
|
|
||||||
Map<String, String> userData = commitStats.getUserData();
|
|
||||||
assertTrue("user data doesn't contain uuid", userData.containsKey(Translog.TRANSLOG_UUID_KEY));
|
|
||||||
assertTrue("user data doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY));
|
|
||||||
assertFalse("user data contains legacy marker", userData.containsKey("translog_id"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try (InternalEngine engine = createEngine(indexSettings, store, translog, newMergePolicy())) {
|
|
||||||
if (numDocs == -1) {
|
|
||||||
try (Searcher searcher = engine.acquireSearcher("test")) {
|
|
||||||
numDocs = searcher.reader().numDocs();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
final int numExtraDocs = randomIntBetween(1, 10);
|
|
||||||
for (int i = 0; i < numExtraDocs; i++) {
|
|
||||||
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
|
||||||
Term uid;
|
|
||||||
if (indexMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
|
||||||
uid = new Term(IdFieldMapper.NAME, doc.id());
|
|
||||||
} else {
|
|
||||||
uid = new Term(UidFieldMapper.NAME, Uid.createUid(doc.type(), doc.id()));
|
|
||||||
}
|
|
||||||
Engine.Index firstIndexRequest = new Engine.Index(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
|
|
||||||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
|
||||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
|
||||||
}
|
|
||||||
engine.refresh("test");
|
|
||||||
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
|
||||||
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + numExtraDocs));
|
|
||||||
assertThat(topDocs.totalHits, equalTo((long) numDocs + numExtraDocs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IOUtils.close(store, directory);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path[] filterExtraFSFiles(Path[] files) {
|
private Path[] filterExtraFSFiles(Path[] files) {
|
||||||
List<Path> paths = new ArrayList<>();
|
List<Path> paths = new ArrayList<>();
|
||||||
for (Path p : files) {
|
for (Path p : files) {
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,585 +0,0 @@
|
||||||
# Licensed to Elasticsearch under one or more contributor
|
|
||||||
# license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright
|
|
||||||
# ownership. Elasticsearch licenses this file to you under
|
|
||||||
# the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing,
|
|
||||||
# software distributed under the License is distributed on
|
|
||||||
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
|
|
||||||
# either express or implied. See the License for the specific
|
|
||||||
# language governing permissions and limitations under the License.
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import base64
|
|
||||||
import glob
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
|
|
||||||
DEFAULT_TRANSPORT_TCP_PORT = 9300
|
|
||||||
DEFAULT_HTTP_TCP_PORT = 9200
|
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
print('%s must use python 3.x (for the ES python client)' % sys.argv[0])
|
|
||||||
|
|
||||||
try:
|
|
||||||
from elasticsearch import Elasticsearch
|
|
||||||
from elasticsearch.exceptions import ConnectionError
|
|
||||||
from elasticsearch.exceptions import TransportError
|
|
||||||
except ImportError as e:
|
|
||||||
print('Can\'t import elasticsearch please install `sudo pip3 install elasticsearch`')
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# sometimes returns True
|
|
||||||
def rarely():
|
|
||||||
return random.randint(0, 10) == 0
|
|
||||||
|
|
||||||
# usually returns True
|
|
||||||
def frequently():
|
|
||||||
return not rarely()
|
|
||||||
|
|
||||||
def capabilities_of(version):
|
|
||||||
current_version = parse_version(version)
|
|
||||||
|
|
||||||
return {
|
|
||||||
'warmers': current_version < parse_version('2.0.0-alpha1'),
|
|
||||||
'dots_in_field_names': current_version >= parse_version('2.4.0'),
|
|
||||||
'lenient_booleans': current_version < parse_version('6.0.0-alpha1')
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def falsy(lenient):
|
|
||||||
return random.choice(['off', 'no', '0', 0, 'false', False]) if lenient else False
|
|
||||||
|
|
||||||
|
|
||||||
def truthy(lenient):
|
|
||||||
return random.choice(['on', 'yes', '1', 1, 'true', True]) if lenient else True
|
|
||||||
|
|
||||||
|
|
||||||
def random_bool(lenient):
|
|
||||||
return random.choice([falsy, truthy])(lenient)
|
|
||||||
|
|
||||||
|
|
||||||
# asserts the correctness of the given hits given they are sorted asc
|
|
||||||
def assert_sort(hits):
|
|
||||||
values = [hit['sort'] for hit in hits['hits']['hits']]
|
|
||||||
assert len(values) > 0, 'expected non emtpy result'
|
|
||||||
val = min(values)
|
|
||||||
for x in values:
|
|
||||||
assert x >= val, '%s >= %s' % (x, val)
|
|
||||||
val = x
|
|
||||||
|
|
||||||
# Indexes the given number of document into the given index
|
|
||||||
# and randomly runs refresh, optimize and flush commands
|
|
||||||
def index_documents(es, index_name, type, num_docs, capabilities):
|
|
||||||
logging.info('Indexing %s docs' % num_docs)
|
|
||||||
index(es, index_name, type, num_docs, capabilities, flush=True)
|
|
||||||
logging.info('Flushing index')
|
|
||||||
es.indices.flush(index=index_name)
|
|
||||||
|
|
||||||
def index(es, index_name, type, num_docs, capabilities, flush=False):
|
|
||||||
for id in range(0, num_docs):
|
|
||||||
lenient_bool = capabilities['lenient_booleans']
|
|
||||||
body = {
|
|
||||||
'string': str(random.randint(0, 100)),
|
|
||||||
'long_sort': random.randint(0, 100),
|
|
||||||
'double_sort': float(random.randint(0, 100)),
|
|
||||||
# be sure to create a "proper" boolean (True, False) for the first document so that automapping is correct
|
|
||||||
'bool': random_bool(lenient_bool) if id > 0 else random.choice([True, False])
|
|
||||||
}
|
|
||||||
if capabilities['dots_in_field_names']:
|
|
||||||
body['field.with.dots'] = str(random.randint(0, 100))
|
|
||||||
|
|
||||||
body['binary'] = base64.b64encode(bytearray(random.getrandbits(8) for _ in range(16))).decode('ascii')
|
|
||||||
|
|
||||||
es.index(index=index_name, doc_type=type, id=id, body=body)
|
|
||||||
|
|
||||||
if rarely():
|
|
||||||
es.indices.refresh(index=index_name)
|
|
||||||
if rarely() and flush:
|
|
||||||
es.indices.flush(index=index_name, force=frequently())
|
|
||||||
|
|
||||||
def reindex_docs(es, index_name, type, num_docs, capabilities):
|
|
||||||
logging.info('Re-indexing %s docs' % num_docs)
|
|
||||||
# TODO: Translog recovery fails on mixed representation of booleans as strings / booleans (e.g. "true", true)
|
|
||||||
# (see gradle :core:test -Dtests.seed=AF7BB7B3FA387AAE -Dtests.class=org.elasticsearch.index.engine.InternalEngineTests
|
|
||||||
# -Dtests.method="testUpgradeOldIndex")
|
|
||||||
capabilities['lenient_booleans'] = False
|
|
||||||
# reindex some docs after the flush such that we have something in the translog
|
|
||||||
index(es, index_name, type, num_docs, capabilities)
|
|
||||||
|
|
||||||
def delete_by_query(es, version, index_name, doc_type):
|
|
||||||
|
|
||||||
logging.info('Deleting long_sort:[10..20] docs')
|
|
||||||
|
|
||||||
query = {'query':
|
|
||||||
{'range':
|
|
||||||
{'long_sort':
|
|
||||||
{'gte': 10,
|
|
||||||
'lte': 20}}}}
|
|
||||||
|
|
||||||
if version.startswith('0.') or version in ('1.0.0.Beta1', '1.0.0.Beta2'):
|
|
||||||
# TODO #10262: we can't write DBQ into the translog for these old versions until we fix this back-compat bug:
|
|
||||||
|
|
||||||
# #4074: these versions don't expect to see the top-level 'query' to count/delete_by_query:
|
|
||||||
query = query['query']
|
|
||||||
return
|
|
||||||
|
|
||||||
deleted_count = es.count(index=index_name, doc_type=doc_type, body=query)['count']
|
|
||||||
|
|
||||||
result = es.delete_by_query(index=index_name,
|
|
||||||
doc_type=doc_type,
|
|
||||||
body=query)
|
|
||||||
|
|
||||||
# make sure no shards failed:
|
|
||||||
assert result['_indices'][index_name]['_shards']['failed'] == 0, 'delete by query failed: %s' % result
|
|
||||||
|
|
||||||
logging.info('Deleted %d docs' % deleted_count)
|
|
||||||
|
|
||||||
def run_basic_asserts(es, version, index_name, type, num_docs):
|
|
||||||
count = es.count(index=index_name)['count']
|
|
||||||
assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
|
|
||||||
if parse_version(version) < parse_version('5.1.0'):
|
|
||||||
# This alias isn't allowed to be created after 5.1 so we can verify that we can still use it
|
|
||||||
count = es.count(index='#' + index_name)['count']
|
|
||||||
assert count == num_docs, 'Expected %r but got %r documents' % (num_docs, count)
|
|
||||||
for _ in range(0, num_docs):
|
|
||||||
random_doc_id = random.randint(0, num_docs-1)
|
|
||||||
doc = es.get(index=index_name, doc_type=type, id=random_doc_id)
|
|
||||||
assert doc, 'Expected document for id %s but got %s' % (random_doc_id, doc)
|
|
||||||
|
|
||||||
assert_sort(es.search(index=index_name,
|
|
||||||
body={
|
|
||||||
'sort': [
|
|
||||||
{'double_sort': {'order': 'asc'}}
|
|
||||||
]
|
|
||||||
}))
|
|
||||||
|
|
||||||
assert_sort(es.search(index=index_name,
|
|
||||||
body={
|
|
||||||
'sort': [
|
|
||||||
{'long_sort': {'order': 'asc'}}
|
|
||||||
]
|
|
||||||
}))
|
|
||||||
|
|
||||||
|
|
||||||
def build_version(version_tuple):
|
|
||||||
return '.'.join([str(x) for x in version_tuple])
|
|
||||||
|
|
||||||
def build_tuple(version_string):
|
|
||||||
return [int(x) for x in version_string.split('.')]
|
|
||||||
|
|
||||||
def start_node(version, release_dir, data_dir, repo_dir, tcp_port=DEFAULT_TRANSPORT_TCP_PORT, http_port=DEFAULT_HTTP_TCP_PORT, cluster_name=None):
|
|
||||||
logging.info('Starting node from %s on port %s/%s, data_dir %s' % (release_dir, tcp_port, http_port, data_dir))
|
|
||||||
if cluster_name is None:
|
|
||||||
cluster_name = 'bwc_index_' + version
|
|
||||||
if parse_version(version) < parse_version("5.0.0-alpha1"):
|
|
||||||
prefix = '-Des.'
|
|
||||||
else:
|
|
||||||
prefix = '-E'
|
|
||||||
cmd = [
|
|
||||||
os.path.join(release_dir, 'bin/elasticsearch'),
|
|
||||||
'%spath.data=%s' % (prefix, data_dir),
|
|
||||||
'%spath.logs=logs' % prefix,
|
|
||||||
'%scluster.name=%s' % (prefix, cluster_name),
|
|
||||||
'%snetwork.host=localhost' % prefix,
|
|
||||||
'%stransport.tcp.port=%s' % (prefix, tcp_port),
|
|
||||||
'%shttp.port=%s' % (prefix, http_port),
|
|
||||||
'%spath.repo=%s' % (prefix, repo_dir)
|
|
||||||
]
|
|
||||||
if version.startswith('0.') or version.startswith('1.0.0.Beta') :
|
|
||||||
cmd.append('-f') # version before 1.0 start in background automatically
|
|
||||||
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
||||||
env=dict(os.environ, ES_JAVA_OPTS='-Dmapper.allow_dots_in_name=true'))
|
|
||||||
|
|
||||||
def install_plugin(version, release_dir, plugin_name):
|
|
||||||
run_plugin(version, release_dir, 'install', [plugin_name])
|
|
||||||
|
|
||||||
def remove_plugin(version, release_dir, plugin_name):
|
|
||||||
run_plugin(version, release_dir, 'remove', [plugin_name])
|
|
||||||
|
|
||||||
def run_plugin(version, release_dir, plugin_cmd, args):
|
|
||||||
cmd = [os.path.join(release_dir, 'bin/elasticsearch-plugin'), plugin_cmd] + args
|
|
||||||
subprocess.check_call(cmd)
|
|
||||||
|
|
||||||
def create_client(http_port=DEFAULT_HTTP_TCP_PORT, timeout=30):
|
|
||||||
logging.info('Waiting for node to startup')
|
|
||||||
for _ in range(0, timeout):
|
|
||||||
# TODO: ask Honza if there is a better way to do this?
|
|
||||||
try:
|
|
||||||
client = Elasticsearch([{'host': 'localhost', 'port': http_port}])
|
|
||||||
client.cluster.health(wait_for_nodes=1)
|
|
||||||
client.count() # can we actually search or do we get a 503? -- anyway retry
|
|
||||||
return client
|
|
||||||
except (ConnectionError, TransportError):
|
|
||||||
pass
|
|
||||||
time.sleep(1)
|
|
||||||
assert False, 'Timed out waiting for node for %s seconds' % timeout
|
|
||||||
|
|
||||||
def generate_index(client, version, index_name):
|
|
||||||
client.indices.delete(index=index_name, ignore=404)
|
|
||||||
logging.info('Create single shard test index')
|
|
||||||
|
|
||||||
capabilities = capabilities_of(version)
|
|
||||||
lenient_booleans = capabilities['lenient_booleans']
|
|
||||||
|
|
||||||
mappings = {}
|
|
||||||
warmers = {}
|
|
||||||
if capabilities['warmers']:
|
|
||||||
warmers['warmer1'] = {
|
|
||||||
'source': {
|
|
||||||
'query': {
|
|
||||||
'match_all': {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
# backcompat test for legacy type level analyzer settings, see #8874
|
|
||||||
mappings['analyzer_type1'] = {
|
|
||||||
'analyzer': 'standard',
|
|
||||||
'properties': {
|
|
||||||
'string_with_index_analyzer': {
|
|
||||||
'type': 'string',
|
|
||||||
'index_analyzer': 'standard'
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
# completion type was added in 0.90.3
|
|
||||||
if not version.startswith('0.20') and version not in ['0.90.0.Beta1', '0.90.0.RC1', '0.90.0.RC2', '0.90.0', '0.90.1', '0.90.2']:
|
|
||||||
mappings['analyzer_type1']['properties']['completion_with_index_analyzer'] = {
|
|
||||||
'type': 'completion',
|
|
||||||
'index_analyzer': 'standard'
|
|
||||||
}
|
|
||||||
|
|
||||||
mappings['analyzer_type2'] = {
|
|
||||||
'index_analyzer': 'standard',
|
|
||||||
'search_analyzer': 'keyword',
|
|
||||||
'search_quote_analyzer': 'english',
|
|
||||||
}
|
|
||||||
mappings['index_name_and_path'] = {
|
|
||||||
'properties': {
|
|
||||||
'parent_multi_field': {
|
|
||||||
'type': 'string',
|
|
||||||
'path': 'just_name',
|
|
||||||
'fields': {
|
|
||||||
'raw': {'type': 'string', 'index': 'not_analyzed', 'index_name': 'raw_multi_field'}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
'field_with_index_name': {
|
|
||||||
'type': 'string',
|
|
||||||
'index_name': 'custom_index_name_for_field'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mappings['meta_fields'] = {
|
|
||||||
'_routing': {
|
|
||||||
'required': falsy(lenient_booleans)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
mappings['custom_formats'] = {
|
|
||||||
'properties': {
|
|
||||||
'string_with_custom_postings': {
|
|
||||||
'type': 'string',
|
|
||||||
'postings_format': 'Lucene41'
|
|
||||||
},
|
|
||||||
'long_with_custom_doc_values': {
|
|
||||||
'type': 'long',
|
|
||||||
'doc_values_format': 'Lucene42'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mappings['auto_boost'] = {
|
|
||||||
'_all': {
|
|
||||||
'auto_boost': truthy(lenient_booleans)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mappings['doc'] = {'properties' : {}}
|
|
||||||
|
|
||||||
if capabilities['dots_in_field_names']:
|
|
||||||
if parse_version(version) < parse_version("5.0.0-alpha1"):
|
|
||||||
mappings["doc"]['properties'].update({
|
|
||||||
'field.with.dots': {
|
|
||||||
'type': 'string',
|
|
||||||
'boost': 4
|
|
||||||
}
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
mappings["doc"]['properties'].update({
|
|
||||||
'field.with.dots': {
|
|
||||||
'type': 'text'
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
if parse_version(version) < parse_version("5.0.0-alpha1"):
|
|
||||||
mappings['norms'] = {
|
|
||||||
'properties': {
|
|
||||||
'string_with_norms_disabled': {
|
|
||||||
'type': 'string',
|
|
||||||
'norms' : {
|
|
||||||
'enabled' : False
|
|
||||||
}
|
|
||||||
},
|
|
||||||
'string_with_norms_enabled': {
|
|
||||||
'type': 'string',
|
|
||||||
'index': 'not_analyzed',
|
|
||||||
'norms': {
|
|
||||||
'enabled' : True,
|
|
||||||
'loading': 'eager'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mappings['doc'] = {
|
|
||||||
'properties': {
|
|
||||||
'string': {
|
|
||||||
'type': 'string',
|
|
||||||
'boost': 4
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else: # current version of the norms mapping
|
|
||||||
mappings['norms'] = {
|
|
||||||
'properties': {
|
|
||||||
'string_with_norms_disabled': {
|
|
||||||
'type': 'text',
|
|
||||||
'norms': False
|
|
||||||
},
|
|
||||||
'string_with_norms_enabled': {
|
|
||||||
'type': 'keyword',
|
|
||||||
'index': 'not_analyzed',
|
|
||||||
'norms': True,
|
|
||||||
'eager_global_ordinals' : True
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mappings['doc']['properties'].update({
|
|
||||||
'string': {
|
|
||||||
'type': 'text',
|
|
||||||
'boost': 4
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
# test back-compat of stored binary fields
|
|
||||||
mappings['doc']['properties']['binary'] = {
|
|
||||||
'type': 'binary',
|
|
||||||
'store': truthy(lenient_booleans),
|
|
||||||
}
|
|
||||||
|
|
||||||
settings = {
|
|
||||||
'number_of_shards': 1,
|
|
||||||
'number_of_replicas': 0,
|
|
||||||
}
|
|
||||||
if version.startswith('0.') or version.startswith('1.'):
|
|
||||||
# Same as ES default (60 seconds), but missing the units to make sure they are inserted on upgrade:
|
|
||||||
settings['gc_deletes'] = '60000',
|
|
||||||
# Same as ES default (5 GB), but missing the units to make sure they are inserted on upgrade:
|
|
||||||
settings['merge.policy.max_merged_segment'] = '5368709120'
|
|
||||||
body = {
|
|
||||||
'settings': settings,
|
|
||||||
'mappings': mappings,
|
|
||||||
}
|
|
||||||
|
|
||||||
if warmers:
|
|
||||||
body['warmers'] = warmers
|
|
||||||
client.indices.create(index=index_name, body=body)
|
|
||||||
if parse_version(version) < parse_version("5.0.0-alpha1"):
|
|
||||||
health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
|
|
||||||
else:
|
|
||||||
health = client.cluster.health(wait_for_status='green', wait_for_no_relocating_shards=True)
|
|
||||||
assert health['timed_out'] == False, 'cluster health timed out %s' % health
|
|
||||||
|
|
||||||
num_docs = random.randint(2000, 3000)
|
|
||||||
if version == "1.1.0":
|
|
||||||
# 1.1.0 is buggy and creates lots and lots of segments, so we create a
|
|
||||||
# lighter index for it to keep bw tests reasonable
|
|
||||||
# see https://github.com/elastic/elasticsearch/issues/5817
|
|
||||||
num_docs = int(num_docs / 10)
|
|
||||||
index_documents(client, index_name, 'doc', num_docs, capabilities)
|
|
||||||
if parse_version(version) < parse_version('5.1.0'):
|
|
||||||
logging.info("Adding a alias that can't be created in 5.1+ so we can assert that we can still use it")
|
|
||||||
client.indices.put_alias(index=index_name, name='#' + index_name)
|
|
||||||
logging.info('Running basic asserts on the data added')
|
|
||||||
run_basic_asserts(client, version, index_name, 'doc', num_docs)
|
|
||||||
return num_docs, capabilities
|
|
||||||
|
|
||||||
def snapshot_index(client, version, repo_dir):
|
|
||||||
persistent = {
|
|
||||||
'cluster.routing.allocation.exclude.version_attr': version
|
|
||||||
}
|
|
||||||
if parse_version(version) < parse_version('5.0.0-alpha1'):
|
|
||||||
# Same as ES default (30 seconds), but missing the units to make sure they are inserted on upgrade:
|
|
||||||
persistent['discovery.zen.publish_timeout'] = '30000'
|
|
||||||
# Same as ES default (512 KB), but missing the units to make sure they are inserted on upgrade:
|
|
||||||
persistent['indices.recovery.file_chunk_size'] = '524288'
|
|
||||||
# Add bogus persistent settings to make sure they can be restored
|
|
||||||
client.cluster.put_settings(body={
|
|
||||||
'persistent': persistent
|
|
||||||
})
|
|
||||||
client.indices.put_template(name='template_' + version.lower(), order=0, body={
|
|
||||||
"template": "te*",
|
|
||||||
"settings": {
|
|
||||||
"number_of_shards" : 1
|
|
||||||
},
|
|
||||||
"mappings": {
|
|
||||||
"type1": {
|
|
||||||
"_source": {
|
|
||||||
"enabled": falsy(capabilities_of(version)['lenient_booleans'])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"aliases": {
|
|
||||||
"alias1": {},
|
|
||||||
"alias2": {
|
|
||||||
"filter": {
|
|
||||||
"term": {"version" : version }
|
|
||||||
},
|
|
||||||
"routing": "kimchy"
|
|
||||||
},
|
|
||||||
"{index}-alias": {}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
client.snapshot.create_repository(repository='test_repo', body={
|
|
||||||
'type': 'fs',
|
|
||||||
'settings': {
|
|
||||||
'location': repo_dir
|
|
||||||
}
|
|
||||||
})
|
|
||||||
client.snapshot.create(repository='test_repo', snapshot='test_1', wait_for_completion=True)
|
|
||||||
client.snapshot.delete_repository(repository='test_repo')
|
|
||||||
|
|
||||||
def compress_index(version, tmp_dir, output_dir):
|
|
||||||
compress(tmp_dir, output_dir, 'index-%s.zip' % version, 'data')
|
|
||||||
|
|
||||||
def compress_repo(version, tmp_dir, output_dir):
|
|
||||||
compress(tmp_dir, output_dir, 'repo-%s.zip' % version, 'repo')
|
|
||||||
|
|
||||||
def compress(tmp_dir, output_dir, zipfile, directory):
|
|
||||||
abs_output_dir = os.path.abspath(output_dir)
|
|
||||||
zipfile = os.path.join(abs_output_dir, zipfile)
|
|
||||||
if os.path.exists(zipfile):
|
|
||||||
os.remove(zipfile)
|
|
||||||
logging.info('Compressing index into %s, tmpDir %s', zipfile, tmp_dir)
|
|
||||||
olddir = os.getcwd()
|
|
||||||
os.chdir(tmp_dir)
|
|
||||||
subprocess.check_call('zip -r %s %s' % (zipfile, directory), shell=True)
|
|
||||||
os.chdir(olddir)
|
|
||||||
|
|
||||||
|
|
||||||
def parse_config():
|
|
||||||
parser = argparse.ArgumentParser(description='Builds an elasticsearch index for backwards compatibility tests')
|
|
||||||
required = parser.add_mutually_exclusive_group(required=True)
|
|
||||||
required.add_argument('versions', metavar='X.Y.Z', nargs='*', default=[],
|
|
||||||
help='The elasticsearch version to build an index for')
|
|
||||||
required.add_argument('--all', action='store_true', default=False,
|
|
||||||
help='Recreate all existing backwards compatibility indexes')
|
|
||||||
parser.add_argument('--releases-dir', '-d', default='backwards', metavar='DIR',
|
|
||||||
help='The directory containing elasticsearch releases')
|
|
||||||
parser.add_argument('--output-dir', '-o', default='core/src/test/resources/indices/bwc',
|
|
||||||
help='The directory to write the zipped index into')
|
|
||||||
parser.add_argument('--tcp-port', default=DEFAULT_TRANSPORT_TCP_PORT, type=int,
|
|
||||||
help='The port to use as the minimum port for TCP communication')
|
|
||||||
parser.add_argument('--http-port', default=DEFAULT_HTTP_TCP_PORT, type=int,
|
|
||||||
help='The port to use as the minimum port for HTTP communication')
|
|
||||||
cfg = parser.parse_args()
|
|
||||||
|
|
||||||
if not os.path.exists(cfg.output_dir):
|
|
||||||
parser.error('Output directory does not exist: %s' % cfg.output_dir)
|
|
||||||
|
|
||||||
if not cfg.versions:
|
|
||||||
# --all
|
|
||||||
for bwc_index in glob.glob(os.path.join(cfg.output_dir, 'index-*.zip')):
|
|
||||||
version = os.path.basename(bwc_index)[len('index-'):-len('.zip')]
|
|
||||||
cfg.versions.append(version)
|
|
||||||
|
|
||||||
return cfg
|
|
||||||
|
|
||||||
def create_bwc_index(cfg, version):
|
|
||||||
logging.info('--> Creating bwc index for %s' % version)
|
|
||||||
release_dir = os.path.join(cfg.releases_dir, 'elasticsearch-%s' % version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
raise RuntimeError('ES version %s does not exist in %s' % (version, cfg.releases_dir))
|
|
||||||
snapshot_supported = not (version.startswith('0.') or version == '1.0.0.Beta1')
|
|
||||||
tmp_dir = tempfile.mkdtemp()
|
|
||||||
|
|
||||||
data_dir = os.path.join(tmp_dir, 'data')
|
|
||||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
|
||||||
logging.info('Temp data dir: %s' % data_dir)
|
|
||||||
logging.info('Temp repo dir: %s' % repo_dir)
|
|
||||||
|
|
||||||
node = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
node = start_node(version, release_dir, data_dir, repo_dir, cfg.tcp_port, cfg.http_port)
|
|
||||||
client = create_client(cfg.http_port)
|
|
||||||
index_name = 'index-%s' % version.lower()
|
|
||||||
num_docs, capabilities = generate_index(client, version, index_name)
|
|
||||||
if snapshot_supported:
|
|
||||||
snapshot_index(client, version, repo_dir)
|
|
||||||
|
|
||||||
# 10067: get a delete-by-query into the translog on upgrade. We must do
|
|
||||||
# this after the snapshot, because it calls flush. Otherwise the index
|
|
||||||
# will already have the deletions applied on upgrade.
|
|
||||||
if version.startswith('0.') or version.startswith('1.'):
|
|
||||||
delete_by_query(client, version, index_name, 'doc')
|
|
||||||
reindex_docs(client, index_name, 'doc', min(100, num_docs), capabilities)
|
|
||||||
|
|
||||||
shutdown_node(node)
|
|
||||||
node = None
|
|
||||||
|
|
||||||
compress_index(version, tmp_dir, cfg.output_dir)
|
|
||||||
if snapshot_supported:
|
|
||||||
compress_repo(version, tmp_dir, cfg.output_dir)
|
|
||||||
finally:
|
|
||||||
|
|
||||||
if node is not None:
|
|
||||||
# This only happens if we've hit an exception:
|
|
||||||
shutdown_node(node)
|
|
||||||
|
|
||||||
shutil.rmtree(tmp_dir)
|
|
||||||
|
|
||||||
def shutdown_node(node):
|
|
||||||
logging.info('Shutting down node with pid %d', node.pid)
|
|
||||||
node.kill() # don't use terminate otherwise we flush the translog
|
|
||||||
node.wait()
|
|
||||||
|
|
||||||
def parse_version(version):
|
|
||||||
import re
|
|
||||||
splitted = re.split('[.-]', version)
|
|
||||||
if len(splitted) == 3:
|
|
||||||
splitted = splitted + ['GA']
|
|
||||||
splitted = [s.lower() for s in splitted]
|
|
||||||
assert len(splitted) == 4;
|
|
||||||
return splitted
|
|
||||||
|
|
||||||
assert parse_version('5.0.0-alpha1') == parse_version('5.0.0-alpha1')
|
|
||||||
assert parse_version('5.0.0-alpha1') < parse_version('5.0.0-alpha2')
|
|
||||||
assert parse_version('5.0.0-alpha1') < parse_version('5.0.0-beta1')
|
|
||||||
assert parse_version('5.0.0-beta1') < parse_version('5.0.0')
|
|
||||||
assert parse_version('1.2.3') < parse_version('2.1.0')
|
|
||||||
assert parse_version('1.2.3') < parse_version('1.2.4')
|
|
||||||
assert parse_version('1.1.0') < parse_version('1.2.0')
|
|
||||||
|
|
||||||
def main():
|
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
|
||||||
cfg = parse_config()
|
|
||||||
for version in cfg.versions:
|
|
||||||
create_bwc_index(cfg, version)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
main()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('Caught keyboard interrupt, exiting...')
|
|
|
@ -1,93 +0,0 @@
|
||||||
import create_bwc_index
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
def fetch_version(version):
|
|
||||||
logging.info('fetching ES version %s' % version)
|
|
||||||
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
|
|
||||||
raise RuntimeError('failed to download ES version %s' % version)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
'''
|
|
||||||
Creates a static back compat index (.zip) with conflicting mappings.
|
|
||||||
'''
|
|
||||||
|
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
|
||||||
|
|
||||||
tmp_dir = tempfile.mkdtemp()
|
|
||||||
try:
|
|
||||||
data_dir = os.path.join(tmp_dir, 'data')
|
|
||||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
|
||||||
logging.info('Temp data dir: %s' % data_dir)
|
|
||||||
logging.info('Temp repo dir: %s' % repo_dir)
|
|
||||||
|
|
||||||
version = '1.7.0'
|
|
||||||
classifier = 'conflicting-mappings-%s' % version
|
|
||||||
index_name = 'index-%s' % classifier
|
|
||||||
|
|
||||||
# Download old ES releases if necessary:
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(version)
|
|
||||||
|
|
||||||
node = create_bwc_index.start_node(version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
|
|
||||||
put_conflicting_mappings(client, index_name)
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
print('%s server output:\n%s' % (version, node.stdout.read().decode('utf-8')))
|
|
||||||
node = None
|
|
||||||
create_bwc_index.compress_index(classifier, tmp_dir, 'core/src/test/resources/org/elasticsearch/action/admin/indices/upgrade')
|
|
||||||
finally:
|
|
||||||
if node is not None:
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
shutil.rmtree(tmp_dir)
|
|
||||||
|
|
||||||
def put_conflicting_mappings(client, index_name):
|
|
||||||
client.indices.delete(index=index_name, ignore=404)
|
|
||||||
logging.info('Create single shard test index')
|
|
||||||
|
|
||||||
mappings = {}
|
|
||||||
# backwardcompat test for conflicting mappings, see #11857
|
|
||||||
mappings['x'] = {
|
|
||||||
'analyzer': 'standard',
|
|
||||||
"properties": {
|
|
||||||
"foo": {
|
|
||||||
"type": "string"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
mappings['y'] = {
|
|
||||||
'analyzer': 'standard',
|
|
||||||
"properties": {
|
|
||||||
"foo": {
|
|
||||||
"type": "date"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
client.indices.create(index=index_name, body={
|
|
||||||
'settings': {
|
|
||||||
'number_of_shards': 1,
|
|
||||||
'number_of_replicas': 0
|
|
||||||
},
|
|
||||||
'mappings': mappings
|
|
||||||
})
|
|
||||||
health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
|
|
||||||
assert health['timed_out'] == False, 'cluster health timed out %s' % health
|
|
||||||
num_docs = random.randint(2000, 3000)
|
|
||||||
create_bwc_index.index_documents(client, index_name, 'doc', num_docs)
|
|
||||||
logging.info('Running basic asserts on the data added')
|
|
||||||
create_bwc_index.run_basic_asserts(client, index_name, 'doc', num_docs)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
|
|
|
@ -1,124 +0,0 @@
|
||||||
import create_bwc_index
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
def fetch_version(version):
|
|
||||||
logging.info('fetching ES version %s' % version)
|
|
||||||
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
|
|
||||||
raise RuntimeError('failed to download ES version %s' % version)
|
|
||||||
|
|
||||||
def create_index(plugin, mapping, docs):
|
|
||||||
'''
|
|
||||||
Creates a static back compat index (.zip) with mappings using fields defined in plugins.
|
|
||||||
'''
|
|
||||||
|
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
|
||||||
|
|
||||||
tmp_dir = tempfile.mkdtemp()
|
|
||||||
plugin_installed = False
|
|
||||||
node = None
|
|
||||||
try:
|
|
||||||
data_dir = os.path.join(tmp_dir, 'data')
|
|
||||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
|
||||||
logging.info('Temp data dir: %s' % data_dir)
|
|
||||||
logging.info('Temp repo dir: %s' % repo_dir)
|
|
||||||
|
|
||||||
version = '2.0.0'
|
|
||||||
classifier = '%s-%s' %(plugin, version)
|
|
||||||
index_name = 'index-%s' % classifier
|
|
||||||
|
|
||||||
# Download old ES releases if necessary:
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(version)
|
|
||||||
|
|
||||||
create_bwc_index.install_plugin(version, release_dir, plugin)
|
|
||||||
plugin_installed = True
|
|
||||||
node = create_bwc_index.start_node(version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
put_plugin_mappings(client, index_name, mapping, docs)
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
|
|
||||||
print('%s server output:\n%s' % (version, node.stdout.read().decode('utf-8')))
|
|
||||||
node = None
|
|
||||||
create_bwc_index.compress_index(classifier, tmp_dir, 'plugins/%s/src/test/resources/indices/bwc' %plugin)
|
|
||||||
finally:
|
|
||||||
if node is not None:
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
if plugin_installed:
|
|
||||||
create_bwc_index.remove_plugin(version, release_dir, plugin)
|
|
||||||
shutil.rmtree(tmp_dir)
|
|
||||||
|
|
||||||
def put_plugin_mappings(client, index_name, mapping, docs):
|
|
||||||
client.indices.delete(index=index_name, ignore=404)
|
|
||||||
logging.info('Create single shard test index')
|
|
||||||
|
|
||||||
client.indices.create(index=index_name, body={
|
|
||||||
'settings': {
|
|
||||||
'number_of_shards': 1,
|
|
||||||
'number_of_replicas': 0
|
|
||||||
},
|
|
||||||
'mappings': {
|
|
||||||
'type': mapping
|
|
||||||
}
|
|
||||||
})
|
|
||||||
health = client.cluster.health(wait_for_status='green', wait_for_relocating_shards=0)
|
|
||||||
assert health['timed_out'] == False, 'cluster health timed out %s' % health
|
|
||||||
|
|
||||||
logging.info('Indexing documents')
|
|
||||||
for i in range(len(docs)):
|
|
||||||
client.index(index=index_name, doc_type="type", id=str(i), body=docs[i])
|
|
||||||
logging.info('Flushing index')
|
|
||||||
client.indices.flush(index=index_name)
|
|
||||||
|
|
||||||
logging.info('Running basic checks')
|
|
||||||
count = client.count(index=index_name)['count']
|
|
||||||
assert count == len(docs), "expected %d docs, got %d" %(len(docs), count)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
docs = [
|
|
||||||
{
|
|
||||||
"foo": "abc"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"foo": "abcdef"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"foo": "a"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
murmur3_mapping = {
|
|
||||||
'properties': {
|
|
||||||
'foo': {
|
|
||||||
'type': 'string',
|
|
||||||
'fields': {
|
|
||||||
'hash': {
|
|
||||||
'type': 'murmur3'
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
create_index("mapper-murmur3", murmur3_mapping, docs)
|
|
||||||
|
|
||||||
size_mapping = {
|
|
||||||
'_size': {
|
|
||||||
'enabled': True
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
create_index("mapper-size", size_mapping, docs)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
import create_bwc_index
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
def fetch_version(version):
|
|
||||||
logging.info('fetching ES version %s' % version)
|
|
||||||
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
|
|
||||||
raise RuntimeError('failed to download ES version %s' % version)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
'''
|
|
||||||
Creates a static back compat index (.zip) with mixed 0.20 (Lucene 3.x) and 0.90 (Lucene 4.x) segments.
|
|
||||||
'''
|
|
||||||
|
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
|
||||||
|
|
||||||
tmp_dir = tempfile.mkdtemp()
|
|
||||||
try:
|
|
||||||
data_dir = os.path.join(tmp_dir, 'data')
|
|
||||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
|
||||||
logging.info('Temp data dir: %s' % data_dir)
|
|
||||||
logging.info('Temp repo dir: %s' % repo_dir)
|
|
||||||
|
|
||||||
first_version = '0.20.6'
|
|
||||||
second_version = '0.90.6'
|
|
||||||
index_name = 'index-%s-and-%s' % (first_version, second_version)
|
|
||||||
|
|
||||||
# Download old ES releases if necessary:
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(first_version)
|
|
||||||
|
|
||||||
node = create_bwc_index.start_node(first_version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
|
|
||||||
# Creates the index & indexes docs w/ first_version:
|
|
||||||
create_bwc_index.generate_index(client, first_version, index_name)
|
|
||||||
|
|
||||||
# Make sure we write segments:
|
|
||||||
flush_result = client.indices.flush(index=index_name)
|
|
||||||
if not flush_result['ok']:
|
|
||||||
raise RuntimeError('flush failed: %s' % str(flush_result))
|
|
||||||
|
|
||||||
segs = client.indices.segments(index=index_name)
|
|
||||||
shards = segs['indices'][index_name]['shards']
|
|
||||||
if len(shards) != 1:
|
|
||||||
raise RuntimeError('index should have 1 shard but got %s' % len(shards))
|
|
||||||
|
|
||||||
first_version_segs = shards['0'][0]['segments'].keys()
|
|
||||||
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
|
|
||||||
node = None
|
|
||||||
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(second_version)
|
|
||||||
|
|
||||||
# Now also index docs with second_version:
|
|
||||||
node = create_bwc_index.start_node(second_version, release_dir, data_dir, repo_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
|
|
||||||
# If we index too many docs, the random refresh/flush causes the ancient segments to be merged away:
|
|
||||||
num_docs = 10
|
|
||||||
create_bwc_index.index_documents(client, index_name, 'doc', num_docs)
|
|
||||||
|
|
||||||
# Make sure we get a segment:
|
|
||||||
flush_result = client.indices.flush(index=index_name)
|
|
||||||
if not flush_result['ok']:
|
|
||||||
raise RuntimeError('flush failed: %s' % str(flush_result))
|
|
||||||
|
|
||||||
# Make sure we see mixed segments (it's possible Lucene could have "accidentally" merged away the first_version segments):
|
|
||||||
segs = client.indices.segments(index=index_name)
|
|
||||||
shards = segs['indices'][index_name]['shards']
|
|
||||||
if len(shards) != 1:
|
|
||||||
raise RuntimeError('index should have 1 shard but got %s' % len(shards))
|
|
||||||
|
|
||||||
second_version_segs = shards['0'][0]['segments'].keys()
|
|
||||||
#print("first: %s" % first_version_segs)
|
|
||||||
#print("second: %s" % second_version_segs)
|
|
||||||
|
|
||||||
for segment_name in first_version_segs:
|
|
||||||
if segment_name in second_version_segs:
|
|
||||||
# Good: an ancient version seg "survived":
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
raise RuntimeError('index has no first_version segs left')
|
|
||||||
|
|
||||||
for segment_name in second_version_segs:
|
|
||||||
if segment_name not in first_version_segs:
|
|
||||||
# Good: a second_version segment was written
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
raise RuntimeError('index has no second_version segs left')
|
|
||||||
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
|
|
||||||
node = None
|
|
||||||
create_bwc_index.compress_index('%s-and-%s' % (first_version, second_version), tmp_dir, 'core/src/test/resources/org/elasticsearch/action/admin/indices/upgrade')
|
|
||||||
finally:
|
|
||||||
if node is not None:
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
shutil.rmtree(tmp_dir)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
|
|
|
@ -1,76 +0,0 @@
|
||||||
import create_bwc_index
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
|
|
||||||
def fetch_version(version):
|
|
||||||
logging.info('fetching ES version %s' % version)
|
|
||||||
if subprocess.call([sys.executable, os.path.join(os.path.split(sys.argv[0])[0], 'get-bwc-version.py'), version]) != 0:
|
|
||||||
raise RuntimeError('failed to download ES version %s' % version)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
'''
|
|
||||||
Creates a back compat index (.zip) using v0.20 and then creates a snapshot of it using v1.1
|
|
||||||
'''
|
|
||||||
|
|
||||||
logging.basicConfig(format='[%(levelname)s] [%(asctime)s] %(message)s', level=logging.INFO,
|
|
||||||
datefmt='%Y-%m-%d %I:%M:%S %p')
|
|
||||||
logging.getLogger('elasticsearch').setLevel(logging.ERROR)
|
|
||||||
logging.getLogger('urllib3').setLevel(logging.WARN)
|
|
||||||
|
|
||||||
tmp_dir = tempfile.mkdtemp()
|
|
||||||
try:
|
|
||||||
data_dir = os.path.join(tmp_dir, 'data')
|
|
||||||
logging.info('Temp data dir: %s' % data_dir)
|
|
||||||
|
|
||||||
first_version = '0.20.6'
|
|
||||||
second_version = '1.1.2'
|
|
||||||
index_name = 'index-%s-and-%s' % (first_version, second_version)
|
|
||||||
|
|
||||||
# Download old ES releases if necessary:
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % first_version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(first_version)
|
|
||||||
|
|
||||||
node = create_bwc_index.start_node(first_version, release_dir, data_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
|
|
||||||
# Creates the index & indexes docs w/ first_version:
|
|
||||||
create_bwc_index.generate_index(client, first_version, index_name)
|
|
||||||
|
|
||||||
# Make sure we write segments:
|
|
||||||
flush_result = client.indices.flush(index=index_name)
|
|
||||||
if not flush_result['ok']:
|
|
||||||
raise RuntimeError('flush failed: %s' % str(flush_result))
|
|
||||||
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
print('%s server output:\n%s' % (first_version, node.stdout.read().decode('utf-8')))
|
|
||||||
node = None
|
|
||||||
|
|
||||||
release_dir = os.path.join('backwards', 'elasticsearch-%s' % second_version)
|
|
||||||
if not os.path.exists(release_dir):
|
|
||||||
fetch_version(second_version)
|
|
||||||
|
|
||||||
# Now use second_version to snapshot the index:
|
|
||||||
node = create_bwc_index.start_node(second_version, release_dir, data_dir, cluster_name=index_name)
|
|
||||||
client = create_bwc_index.create_client()
|
|
||||||
|
|
||||||
repo_dir = os.path.join(tmp_dir, 'repo')
|
|
||||||
create_bwc_index.snapshot_index(client, second_version, repo_dir)
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
print('%s server output:\n%s' % (second_version, node.stdout.read().decode('utf-8')))
|
|
||||||
|
|
||||||
create_bwc_index.compress(tmp_dir, "src/test/resources/indices/bwc", 'unsupportedrepo-%s.zip' % first_version, 'repo')
|
|
||||||
|
|
||||||
node = None
|
|
||||||
finally:
|
|
||||||
if node is not None:
|
|
||||||
create_bwc_index.shutdown_node(node)
|
|
||||||
shutil.rmtree(tmp_dir)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
|
|
Loading…
Reference in New Issue