Ensure all static BWC indices have some translog entries (#20362)

Due to the way the nodes where shut down etc. we always flushed
away the translog. This means we never tested upgrades of transaction
logs from older version. This change regenerates all valid bwc indices
and repositories with transaction logs and adds correspondent changes
to the OldIndexBackwardsCompatibilityIT.java
This commit is contained in:
Simon Willnauer 2016-09-07 16:53:24 +02:00 committed by GitHub
parent 6a7309c09a
commit 8502d2761f
40 changed files with 50 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
@ -33,6 +34,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -47,6 +49,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.StringFieldMapperPositionIncrementGapTests;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
@ -249,15 +252,43 @@ public class OldIndexBackwardsCompatibilityIT extends ESIntegTestCase {
Version actualVersionCreated = Version.indexCreated(getIndexResponse.getSettings().get(indexName));
assertEquals(indexCreated, actualVersionCreated);
ensureYellow(indexName);
IndicesSegmentResponse segmentsResponse = client().admin().indices().prepareSegments(indexName).get();
IndexSegments segments = segmentsResponse.getIndices().get(indexName);
for (IndexShardSegments indexShardSegments : segments) {
for (ShardSegments shardSegments : indexShardSegments) {
for (Segment segment : shardSegments) {
assertEquals(indexCreated.luceneVersion, segment.version);
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName)
.setDetailed(true).setActiveOnly(false).get();
boolean foundTranslog = false;
for (List<RecoveryState> states : recoveryResponse.shardRecoveryStates().values()) {
for (RecoveryState state : states) {
if (state.getStage() == RecoveryState.Stage.DONE
&& state.getPrimary()
&& state.getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE) {
assertFalse("more than one primary recoverd?", foundTranslog);
assertNotEquals(0, state.getTranslog().recoveredOperations());
foundTranslog = true;
}
}
}
assertTrue("expected translog but nothing was recovered", foundTranslog);
IndicesSegmentResponse segmentsResponse = client().admin().indices().prepareSegments(indexName).get();
IndexSegments segments = segmentsResponse.getIndices().get(indexName);
int numCurrent = 0;
int numBWC = 0;
for (IndexShardSegments indexShardSegments : segments) {
for (ShardSegments shardSegments : indexShardSegments) {
for (Segment segment : shardSegments) {
if (indexCreated.luceneVersion.equals(segment.version)) {
numBWC++;
if (Version.CURRENT.luceneVersion.equals(segment.version)) {
numCurrent++;
}
} else if (Version.CURRENT.luceneVersion.equals(segment.version)) {
numCurrent++;
} else {
fail("unexpected version " + segment.version);
}
}
}
}
assertNotEquals("expected at least 1 current segment after translog recovery", 0, numCurrent);
assertNotEquals("expected at least 1 old segment", 0, numBWC);
SearchResponse test = client().prepareSearch(indexName).get();
assertThat(test.getHits().getTotalHits(), greaterThanOrEqualTo(1L));
}

View File

@ -72,6 +72,15 @@ def index_documents(es, index_name, type, num_docs):
logging.info('Flushing index')
es.indices.flush(index=index_name)
def reindex_docs(es, index_name, type, num_docs):
logging.info('Re-indexing %s docs' % num_docs)
# reindex some docs after the flush such that we have something in the translog
for id in range(0, num_docs):
es.index(index=index_name, doc_type=type, id=id, body={'string': str(random.randint(0, 100)),
'long_sort': random.randint(0, 100),
'double_sort' : float(random.randint(0, 100)),
'bool' : random.choice([True, False])})
def delete_by_query(es, version, index_name, doc_type):
logging.info('Deleting long_sort:[10..20] docs')
@ -329,6 +338,7 @@ def generate_index(client, version, index_name):
index_documents(client, index_name, 'doc', num_docs)
logging.info('Running basic asserts on the data added')
run_basic_asserts(client, index_name, 'doc', num_docs)
return num_docs
def snapshot_index(client, version, repo_dir):
persistent = {
@ -438,7 +448,7 @@ def create_bwc_index(cfg, version):
node = start_node(version, release_dir, data_dir, repo_dir, cfg.tcp_port, cfg.http_port)
client = create_client(cfg.http_port)
index_name = 'index-%s' % version.lower()
generate_index(client, version, index_name)
num_docs = generate_index(client, version, index_name)
if snapshot_supported:
snapshot_index(client, version, repo_dir)
@ -447,6 +457,7 @@ def create_bwc_index(cfg, version):
# will already have the deletions applied on upgrade.
if version.startswith('0.') or version.startswith('1.'):
delete_by_query(client, version, index_name, 'doc')
reindex_docs(client, index_name, 'doc', min(100, num_docs))
shutdown_node(node)
node = None
@ -464,7 +475,7 @@ def create_bwc_index(cfg, version):
def shutdown_node(node):
logging.info('Shutting down node with pid %d', node.pid)
node.terminate()
node.kill() # don't use terminate otherwise we flush the translog
node.wait()
def parse_version(version):