[7.x] [ML][Transforms] fixing rolling upgrade continuous transform test (#45823) (#46347) (#46337)

* [ML][Transforms] fixing rolling upgrade continuous transform test (#45823)

* [ML][Transforms] fixing rolling upgrade continuous transform test

* adjusting wait assert logic

* adjusting wait conditions

* [ML][Transforms] allow executor to call start on started task (#46347)

* making sure we only upgrade from 7.4.0 in test
This commit is contained in:
Benjamin Trent 2019-09-05 10:31:11 -05:00 committed by GitHub
parent bac5cb1c7a
commit caf3e4d654
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 31 deletions

View File

@ -305,7 +305,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
buildTask.initializeIndexer(indexerBuilder);
// DataFrameTransformTask#start will fail if the task state is FAILED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener);
// Will continue to attempt to start the indexer, even if the state is STARTED
buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, false, listener);
}
private void setNumFailureRetries(int numFailureRetries) {

View File

@ -219,13 +219,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
));
}
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
// Here `failOnConflict` is usually true, except when the initial start is called when the task is assigned to the node
synchronized void start(Long startingCheckpoint, boolean force, boolean failOnConflict, ActionListener<Response> listener) {
logger.debug("[{}] start called with force [{}] and state [{}].", getTransformId(), force, getState());
if (taskState.get() == DataFrameTransformTaskState.FAILED && force == false) {
listener.onFailure(new ElasticsearchStatusException(
@ -249,7 +244,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return;
}
// If we are already in a `STARTED` state, we should not attempt to call `.start` on the indexer again.
if (taskState.get() == DataFrameTransformTaskState.STARTED) {
if (taskState.get() == DataFrameTransformTaskState.STARTED && failOnConflict) {
listener.onFailure(new ElasticsearchStatusException(
"Cannot start transform [{}] as it is already STARTED.",
RestStatus.CONFLICT,
@ -260,7 +255,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
final IndexerState newState = getIndexer().start();
if (Arrays.stream(RUNNING_STATES).noneMatch(newState::equals)) {
listener.onFailure(new ElasticsearchException("Cannot start task for data frame transform [{}], because state was [{}]",
transform.getId(), newState));
transform.getId(), newState));
return;
}
stateReason.set(null);
@ -298,10 +293,20 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
logger.error(new ParameterizedMessage("[{}] failed updating state to [{}].", getTransformId(), state), exc);
getIndexer().stop();
listener.onFailure(new ElasticsearchException("Error while updating state for data frame transform ["
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
+ transform.getId() + "] to [" + state.getIndexerState() + "].", exc));
}
));
}
/**
* Start the background indexer and set the task's state to started
* @param startingCheckpoint Set the current checkpoint to this value. If null the
* current checkpoint is not set
* @param force Whether to force start a failed task or not
* @param listener Started listener
*/
public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) {
start(startingCheckpoint, force, true, listener);
}
public synchronized void stop(boolean force) {
logger.debug("[{}] stop called with force [{}] and state [{}]", getTransformId(), force, getState());

View File

@ -7,10 +7,10 @@ package org.elasticsearch.upgrades;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.core.IndexerState;
import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats;
@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.xpack.test.rest.XPackRestTestConstants;
@ -37,7 +38,9 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -48,7 +51,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/43662")
public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
@ -79,12 +81,19 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
*/
public void testDataFramesRollingUpgrade() throws Exception {
assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0));
assumeTrue("Continuous data frames time sync not fixed until 7.4", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0));
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
adjustLoggingLevels.setJsonEntity(
"{\"transient\": {" +
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(adjustLoggingLevels);
Request waitForYellow = new Request("GET", "/_cluster/health");
waitForYellow.addParameter("wait_for_nodes", "3");
waitForYellow.addParameter("wait_for_status", "yellow");
switch (CLUSTER_TYPE) {
case OLD:
client().performRequest(waitForYellow);
createAndStartContinuousDataFrame();
break;
case MIXED:
@ -113,15 +122,15 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
private void createAndStartContinuousDataFrame() throws Exception {
createIndex(CONTINUOUS_DATA_FRAME_SOURCE);
long totalDocsWritten = 0;
long totalDocsWrittenSum = 0;
for (TimeValue bucket : BUCKETS) {
int docs = randomIntBetween(1, 25);
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, bucket, ENTITIES);
totalDocsWritten += docs * ENTITIES.size();
totalDocsWrittenSum += docs * ENTITIES.size();
}
long totalDocsWritten = totalDocsWrittenSum;
DataFrameTransformConfig config = DataFrameTransformConfig.builder()
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(30)))
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setPivotConfig(PivotConfig.builder()
.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("stars").field("stars")))
.setGroups(GroupConfig.builder().groupBy("user_id", TermsGroupSource.builder().setField("user_id").build()).build())
@ -129,19 +138,28 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
.setDest(DestConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_ID + "_idx").build())
.setSource(SourceConfig.builder().setIndex(CONTINUOUS_DATA_FRAME_SOURCE).build())
.setId(CONTINUOUS_DATA_FRAME_ID)
.setFrequency(TimeValue.timeValueSeconds(1))
.build();
putTransform(CONTINUOUS_DATA_FRAME_ID, config);
startTransform(CONTINUOUS_DATA_FRAME_ID);
waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, 0L);
DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertBusy(() -> {
DataFrameTransformStats stateAndStats = getTransformStats(CONTINUOUS_DATA_FRAME_ID);
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
// Even if we get back to started, we may periodically get set back to `indexing` when triggered.
// Though short lived due to no changes on the source indices, it could result in flaky test behavior
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
}, 120, TimeUnit.SECONDS);
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(), equalTo((long)ENTITIES.size()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(), equalTo(totalDocsWritten));
assertThat(stateAndStats.getState(), oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
// We want to make sure our latest state is written before we turn the node off, this makes the testing more reliable
awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, IndexerState.STARTED.value());
}
@SuppressWarnings("unchecked")
private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception {
// A continuous data frame should automatically become started when it gets assigned to a node
@ -161,9 +179,9 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
List<String> entities = new ArrayList<>(1);
entities.add("user_" + ENTITIES.size() + expectedLastCheckpoint);
int docs = 5;
// Index the data very recently in the past so that the transform sync delay can catch up to reading it in our spin
// wait later.
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(1), entities);
// Index the data
// The frequency and delay should see the data once its indexed
putData(CONTINUOUS_DATA_FRAME_SOURCE, docs, TimeValue.timeValueSeconds(0), entities);
waitUntilAfterCheckpoint(CONTINUOUS_DATA_FRAME_ID, expectedLastCheckpoint);
@ -176,10 +194,55 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
assertThat(stateAndStats.getState(),
oneOf(DataFrameTransformStats.State.STARTED, DataFrameTransformStats.State.INDEXING));
assertThat(stateAndStats.getIndexerStats().getOutputDocuments(),
greaterThan(previousStateAndStats.getIndexerStats().getOutputDocuments()));
assertThat(stateAndStats.getIndexerStats().getNumDocuments(),
greaterThanOrEqualTo(docs + previousStateAndStats.getIndexerStats().getNumDocuments()));
awaitWrittenIndexerState(CONTINUOUS_DATA_FRAME_ID, (responseBody) -> {
Map<String, Object> indexerStats = (Map<String,Object>)((List<?>)XContentMapValues.extractValue("hits.hits._source.stats",
responseBody))
.get(0);
assertThat((Integer)indexerStats.get("documents_indexed"),
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getOutputDocuments()).intValue()));
assertThat((Integer)indexerStats.get("documents_processed"),
greaterThan(Long.valueOf(previousStateAndStats.getIndexerStats().getNumDocuments()).intValue()));
});
}
private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAssertion) throws Exception {
Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search");
getStatsDocsRequest.setJsonEntity("{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
" \"filter\": \n" +
" {\"term\": {\n" +
" \"_id\": \"data_frame_transform_state_and_stats-" + id + "\"\n" +
" }}\n" +
" }\n" +
" },\n" +
" \"sort\": [\n" +
" {\n" +
" \"_index\": {\n" +
" \"order\": \"desc\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"size\": 1\n" +
"}");
assertBusy(() -> {
// Want to make sure we get the latest docs
client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh"));
Response response = client().performRequest(getStatsDocsRequest);
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = entityAsMap(response);
assertEquals(1, XContentMapValues.extractValue("hits.total.value", responseBody));
responseAssertion.accept(responseBody);
}, 60, TimeUnit.SECONDS);
}
private void awaitWrittenIndexerState(String id, String indexerState) throws Exception {
awaitWrittenIndexerState(id, (responseBody) -> {
String storedState = ((List<?>)XContentMapValues.extractValue("hits.hits._source.state.indexer_state", responseBody))
.get(0)
.toString();
assertThat(storedState, equalTo(indexerState));
});
}
private void putTransform(String id, DataFrameTransformConfig config) throws IOException {
@ -222,7 +285,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
}
private void waitUntilAfterCheckpoint(String id, long currentCheckpoint) throws Exception {
assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getNext().getCheckpoint(), greaterThan(currentCheckpoint)),
assertBusy(() -> assertThat(getTransformStats(id).getCheckpointingInfo().getLast().getCheckpoint(), greaterThan(currentCheckpoint)),
60, TimeUnit.SECONDS);
}
@ -249,7 +312,7 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase {
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", indexName);
req.setEntity(entity);
client().performRequest(req);
assertThat(client().performRequest(req).getStatusLine().getStatusCode(), equalTo(200));
}
}