allow reads of native users and roles when template version hasn't been updated

This change allows reads of our native users and roles when the template version has not been updated to
match the current version. This is useful for rolling upgrades where the nodes are also being actively
queried and/or indexed into. Without this, we can wreak havoc on a cluster by causing exceptions during
replication, which leads to shard failures. On nodes that match the version defined in the template,
write operations are allowed since we know that we are backwards compatible in terms of format but we
may have added new fields and shouldn't index them until the mappings and template have been updated.

As part of this, the rolling upgrade tests from core were used as the basis for a very basic set of tests
for doing a rolling upgrade with x-pack.

Closes elastic/elasticsearch#4126

Original commit: elastic/x-pack-elasticsearch@9be518ef00
This commit is contained in:
Jay Modi 2016-11-22 12:00:09 -05:00 committed by GitHub
parent a9f3619b5a
commit 4239ba5415
10 changed files with 360 additions and 33 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.xpack.template.TemplateUtils;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
/**
@ -48,6 +49,7 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
public static final String SECURITY_TEMPLATE_NAME = "security-index-template";
private static final String SECURITY_VERSION_STRING = "security-version";
static final String SECURITY_INDEX_TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}");
static final Version MIN_READ_VERSION = Version.V_5_0_0;
private final InternalClient client;
final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
@ -178,6 +180,10 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
}
static boolean securityIndexMappingUpToDate(ClusterState clusterState, Logger logger) {
return securityIndexMappingVersionMatches(clusterState, logger, Version.CURRENT::equals);
}
static boolean securityIndexMappingVersionMatches(ClusterState clusterState, Logger logger, Predicate<Version> predicate) {
IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(SECURITY_INDEX_NAME);
if (indexMetaData != null) {
for (Object object : indexMetaData.getMappings().values().toArray()) {
@ -186,7 +192,7 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
continue;
}
try {
if (containsCorrectVersion(mappingMetaData.sourceAsMap()) == false) {
if (containsCorrectVersion(mappingMetaData.sourceAsMap(), predicate) == false) {
return false;
}
} catch (IOException e) {
@ -202,6 +208,10 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
}
static boolean securityTemplateExistsAndIsUpToDate(ClusterState state, Logger logger) {
return securityTemplateExistsAndVersionMatches(state, logger, Version.CURRENT::equals);
}
static boolean securityTemplateExistsAndVersionMatches(ClusterState state, Logger logger, Predicate<Version> predicate) {
IndexTemplateMetaData templateMeta = state.metaData().templates().get(SECURITY_TEMPLATE_NAME);
if (templateMeta == null) {
return false;
@ -220,7 +230,7 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
// get the actual mapping entries
@SuppressWarnings("unchecked")
Map<String, Object> mappingMap = (Map<String, Object>) typeMappingMap.get(key);
if (containsCorrectVersion(mappingMap) == false) {
if (containsCorrectVersion(mappingMap, predicate) == false) {
return false;
}
} catch (IOException e) {
@ -231,22 +241,18 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
return true;
}
private static boolean containsCorrectVersion(Map<String, Object> typeMappingMap) {
private static boolean containsCorrectVersion(Map<String, Object> typeMappingMap, Predicate<Version> predicate) {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) typeMappingMap.get("_meta");
if (meta == null) {
// pre 5.0, cannot be up to date
return false;
}
if (Version.CURRENT.toString().equals(meta.get(SECURITY_VERSION_STRING)) == false) {
// wrong version
return false;
}
return true;
return predicate.test(Version.fromString((String) meta.get(SECURITY_VERSION_STRING)));
}
public static boolean securityIndexMappingAndTemplateUpToDate(ClusterState clusterState, Logger logger) {
if (SecurityTemplateService.securityTemplateExistsAndIsUpToDate(clusterState, logger) == false) {
if (securityTemplateExistsAndIsUpToDate(clusterState, logger) == false) {
logger.debug("security template [{}] does not exist or is not up to date, so service cannot start",
SecurityTemplateService.SECURITY_TEMPLATE_NAME);
return false;
@ -257,4 +263,17 @@ public class SecurityTemplateService extends AbstractComponent implements Cluste
}
return true;
}
public static boolean securityIndexMappingAndTemplateSufficientToRead(ClusterState clusterState, Logger logger) {
if (securityTemplateExistsAndVersionMatches(clusterState, logger, MIN_READ_VERSION::onOrBefore) == false) {
logger.debug("security template [{}] does not exist or is not up to date, so service cannot start",
SecurityTemplateService.SECURITY_TEMPLATE_NAME);
return false;
}
if (securityIndexMappingVersionMatches(clusterState, logger, MIN_READ_VERSION::onOrBefore) == false) {
logger.debug("mapping for security index not up to date, so service cannot start");
return false;
}
return true;
}
}

View File

@ -67,6 +67,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateSufficientToRead;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate;
/**
@ -96,6 +97,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
private final boolean isTribeNode;
private volatile boolean securityIndexExists = false;
private volatile boolean canWrite = false;
public NativeUsersStore(Settings settings, InternalClient client) {
super(settings);
@ -239,9 +241,16 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
public void changePassword(final ChangePasswordRequest request, final ActionListener<Void> listener) {
final String username = request.username();
assert SystemUser.NAME.equals(username) == false && XPackUser.NAME.equals(username) == false : username + "is internal!";
if (isTribeNode) {
if (state() != State.STARTED) {
listener.onFailure(new IllegalStateException("password cannot be changed as user service has not been started"));
return;
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("users may not be created or modified using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("password cannot be changed as user service cannot write until template and " +
"mappings are up to date"));
return;
}
final String docType;
@ -314,6 +323,10 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("users may not be created or modified using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("user cannot be created or changed as the user service cannot write until " +
"template and mappings are up to date"));
return;
}
try {
@ -403,6 +416,10 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("users may not be created or modified using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("enabled status cannot be changed as user service cannot write until template " +
"and mappings are up to date"));
return;
}
if (ReservedRealm.isReserved(username, settings)) {
@ -477,6 +494,10 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("users may not be deleted using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("user cannot be deleted as user service cannot write until template and " +
"mappings are up to date"));
return;
}
try {
@ -519,7 +540,12 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
return true;
}
if (securityIndexMappingAndTemplateUpToDate(clusterState, logger) == false) {
if (securityIndexMappingAndTemplateUpToDate(clusterState, logger)) {
canWrite = true;
} else if (securityIndexMappingAndTemplateSufficientToRead(clusterState, logger)) {
canWrite = false;
} else {
canWrite = false;
return false;
}
@ -721,16 +747,8 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
@Override
public void clusterChanged(ClusterChangedEvent event) {
final boolean exists = event.state().metaData().indices().get(SecurityTemplateService.SECURITY_INDEX_NAME) != null;
// make sure all the primaries are active
if (exists && event.state().routingTable().index(SecurityTemplateService.SECURITY_INDEX_NAME).allPrimaryShardsActive()) {
logger.debug("security index [{}] all primary shards started, so polling can start",
SecurityTemplateService.SECURITY_INDEX_NAME);
securityIndexExists = true;
} else {
// always set the value - it may have changed...
securityIndexExists = false;
}
securityIndexExists = event.state().metaData().indices().get(SecurityTemplateService.SECURITY_INDEX_NAME) != null;
canWrite = securityIndexMappingAndTemplateUpToDate(event.state(), logger);
}
public State state() {
@ -744,6 +762,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
throw new IllegalStateException("can only reset if stopped!!!");
}
this.securityIndexExists = false;
this.canWrite = false;
this.state.set(State.INITIALIZED);
}

View File

@ -70,6 +70,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateSufficientToRead;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingAndTemplateUpToDate;
/**
@ -120,6 +121,7 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
private final AtomicLong numInvalidation = new AtomicLong(0);
private volatile boolean securityIndexExists = false;
private volatile boolean canWrite = false;
public NativeRolesStore(Settings settings, InternalClient client) {
super(settings);
@ -148,7 +150,12 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
return true;
}
if (securityIndexMappingAndTemplateUpToDate(clusterState, logger) == false) {
if (securityIndexMappingAndTemplateUpToDate(clusterState, logger)) {
canWrite = true;
} else if (securityIndexMappingAndTemplateSufficientToRead(clusterState, logger)) {
canWrite = false;
} else {
canWrite = false;
return false;
}
@ -229,6 +236,10 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("roles may not be deleted using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("role cannot be deleted as service cannot write until template and " +
"mappings are up to date"));
return;
}
try {
@ -263,6 +274,10 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
} else if (isTribeNode) {
listener.onFailure(new UnsupportedOperationException("roles may not be created or modified using a tribe node"));
return;
} else if (canWrite == false) {
listener.onFailure(new IllegalStateException("role cannot be created or modified as service cannot write until template and " +
"mappings are up to date"));
return;
}
try {
@ -474,6 +489,7 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
}
invalidateAll();
this.securityIndexExists = false;
this.canWrite = false;
this.state.set(State.INITIALIZED);
}
@ -514,16 +530,8 @@ public class NativeRolesStore extends AbstractComponent implements ClusterStateL
// TODO abstract this code rather than duplicating...
@Override
public void clusterChanged(ClusterChangedEvent event) {
final boolean exists = event.state().metaData().indices().get(SecurityTemplateService.SECURITY_INDEX_NAME) != null;
// make sure all the primaries are active
if (exists && event.state().routingTable().index(SecurityTemplateService.SECURITY_INDEX_NAME).allPrimaryShardsActive()) {
logger.debug(
"security index [{}] all primary shards started, so polling can start", SecurityTemplateService.SECURITY_INDEX_NAME);
securityIndexExists = true;
} else {
// always set the value - it may have changed...
securityIndexExists = false;
}
securityIndexExists = event.state().metaData().indices().get(SecurityTemplateService.SECURITY_INDEX_NAME) != null;
canWrite = securityIndexMappingAndTemplateUpToDate(event.state(), logger);
}
public State state() {

View File

@ -43,6 +43,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.xpack.security.SecurityTemplateService.SECURITY_INDEX_NAME;
import static org.elasticsearch.xpack.security.SecurityTemplateService.SECURITY_INDEX_TEMPLATE_VERSION_PATTERN;
import static org.elasticsearch.xpack.security.SecurityTemplateService.SECURITY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityIndexMappingVersionMatches;
import static org.elasticsearch.xpack.security.SecurityTemplateService.securityTemplateExistsAndVersionMatches;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -109,6 +111,13 @@ public class SecurityTemplateServiceTests extends ESTestCase {
checkTemplateUpdateWorkCorrectly(clusterStateBuilder);
}
public void testIndexTemplateVersionMatching() throws Exception {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithTemplate(templateString);
assertTrue(securityTemplateExistsAndVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::before));
assertFalse(securityTemplateExistsAndVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::after));
}
private void checkTemplateUpdateWorkCorrectly(ClusterState.Builder clusterStateBuilder) throws IOException {
securityTemplateService.clusterChanged(new ClusterChangedEvent("test-event", clusterStateBuilder.build()
, EMPTY_CLUSTER_STATE));
@ -234,6 +243,13 @@ public class SecurityTemplateServiceTests extends ESTestCase {
assertThat(listeners.size(), equalTo(0));
}
public void testMappingVersionMatching() throws IOException {
String templateString = "/" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);
assertTrue(securityIndexMappingVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::before));
assertFalse(securityIndexMappingVersionMatches(clusterStateBuilder.build(), logger, Version.V_5_0_0::after));
}
public void testMissingVersionMappingIsIdentifiedAsNotUpToDate() throws IOException {
String templateString = "/missing-version-" + SECURITY_TEMPLATE_NAME + ".json";
ClusterState.Builder clusterStateBuilder = createClusterStateWithMapping(templateString);

View File

@ -33,7 +33,7 @@
"mappings" : {
"user" : {
"_meta": {
"security-version": "4.0.0-alpha5"
"security-version": "4.0.0"
},
"dynamic" : "strict",
"properties" : {

View File

@ -0,0 +1,112 @@
import org.elasticsearch.gradle.test.NodeInfo
import org.elasticsearch.gradle.test.RestIntegTestTask
import java.nio.charset.StandardCharsets
apply plugin: 'elasticsearch.standalone-test'
Closure waitWithAuth = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
// wait up to twenty seconds
final long stopTime = System.currentTimeMillis() + 20000L;
Exception lastException = null;
while (System.currentTimeMillis() < stopTime) {
lastException = null;
// we use custom wait logic here as the elastic user is not available immediately and ant.get will fail when a 401 is returned
HttpURLConnection httpURLConnection = null;
try {
// TODO this sucks having to hardcode number of nodes, but node.config.numNodes isn't necessarily accurate for rolling
httpURLConnection = (HttpURLConnection) new URL("http://${node.httpUri()}/_cluster/health?wait_for_nodes=2").openConnection();
httpURLConnection.setRequestProperty("Authorization", "Basic " +
Base64.getEncoder().encodeToString("elastic:changeme".getBytes(StandardCharsets.UTF_8)));
httpURLConnection.setRequestMethod("GET");
httpURLConnection.setConnectTimeout(1000);
httpURLConnection.setReadTimeout(30000); // read needs to wait for nodes!
httpURLConnection.connect();
if (httpURLConnection.getResponseCode() == 200) {
tmpFile.withWriter StandardCharsets.UTF_8.name(), {
it.write(httpURLConnection.getInputStream().getText(StandardCharsets.UTF_8.name()))
}
break;
}
} catch (Exception e) {
logger.debug("failed to call cluster health", e)
lastException = e
} finally {
if (httpURLConnection != null) {
httpURLConnection.disconnect();
}
}
// did not start, so wait a bit before trying again
Thread.sleep(500L);
}
if (tmpFile.exists() == false && lastException != null) {
logger.error("final attempt of calling cluster health failed", lastException)
}
return tmpFile.exists()
}
task oldClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
cluster {
plugin ':x-plugins:elasticsearch'
distribution = 'zip'
bwcVersion = '6.0.0-alpha1-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop
numBwcNodes = 1
numNodes = 2
clusterName = 'rolling-upgrade'
waitCondition = waitWithAuth
systemProperty 'es.logger.org.elasticsearch.xpack.security', 'TRACE'
}
systemProperty 'tests.rest.suite', 'old_cluster'
}
task mixedClusterTest(type: RestIntegTestTask) {
dependsOn(oldClusterTest, 'oldClusterTest#node1.stop')
cluster {
plugin ':x-plugins:elasticsearch'
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> oldClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[1].dataDir}"
waitCondition = waitWithAuth
}
systemProperty 'tests.rest.suite', 'mixed_cluster'
finalizedBy 'oldClusterTest#node0.stop'
}
task upgradedClusterTest(type: RestIntegTestTask) {
dependsOn(mixedClusterTest, 'oldClusterTest#node0.stop')
cluster {
plugin ':x-plugins:elasticsearch'
distribution = 'zip'
clusterName = 'rolling-upgrade'
unicastTransportUri = { seedNode, node, ant -> mixedClusterTest.nodes.get(0).transportUri() }
dataDir = "${-> oldClusterTest.nodes[0].dataDir}"
waitCondition = waitWithAuth
}
systemProperty 'tests.rest.suite', 'upgraded_cluster'
// only need to kill the mixed cluster tests node here because we explicitly told it to not stop nodes upon completion
finalizedBy 'mixedClusterTest#stop'
}
task integTest {
dependsOn = [upgradedClusterTest]
}
test.enabled = false // no unit tests for rolling upgrades, only the rest integration test
check.dependsOn(integTest)
repositories {
maven {
url "https://oss.sonatype.org/content/repositories/snapshots/"
}
maven {
url "https://artifacts.elastic.co/maven"
}
maven {
url "https://snapshots.elastic.co/maven"
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.upgrades;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.elasticsearch.test.rest.yaml.parser.ClientYamlTestParseException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) // to account for slow as hell VMs
public class UpgradeClusterClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
public UpgradeClusterClientYamlTestSuiteIT(ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, ClientYamlTestParseException {
return createParameters();
}
@Override
protected Settings restClientSettings() {
String token = "Basic " + Base64.getEncoder().encodeToString("elastic:changeme".getBytes(StandardCharsets.UTF_8));
return Settings.builder()
.put(ThreadContext.PREFIX + ".Authorization", token)
.build();
}
}

View File

@ -0,0 +1,37 @@
---
"Index data and search on the mixed cluster":
- do:
cluster.health:
wait_for_status: green
wait_for_nodes: 2
- do:
search:
index: test_index
- match: { hits.total: 5 } # no new indexed data, so expect the original 5 documents from the old cluster
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_mixed", "f2": 5}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_mixed", "f2": 6}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_mixed", "f2": 7}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_mixed", "f2": 8}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_mixed", "f2": 9}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 10 } # 5 docs from old cluster, 5 docs from mixed cluster

View File

@ -0,0 +1,34 @@
---
"Index data and search on the old cluster":
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_old", "f2": 0}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_old", "f2": 1}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_old", "f2": 2}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_old", "f2": 3}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_old", "f2": 4}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 5 }

View File

@ -0,0 +1,37 @@
---
"Index data and search on the upgraded cluster":
- do:
cluster.health:
wait_for_status: green
wait_for_nodes: 2
- do:
search:
index: test_index
- match: { hits.total: 10 } # no new indexed data, so expect the original 10 documents from the old and mixed clusters
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v1_upgraded", "f2": 10}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v2_upgraded", "f2": 11}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v3_upgraded", "f2": 12}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v4_upgraded", "f2": 13}'
- '{"index": {"_index": "test_index", "_type": "test_type"}}'
- '{"f1": "v5_upgraded", "f2": 14}'
- do:
indices.flush:
index: test_index
- do:
search:
index: test_index
- match: { hits.total: 15 } # 10 docs from previous clusters plus 5 new docs