mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
rename internal indexes of transform plugin - rename audit index and create an alias for accessing it, BWC: add an alias for old indexes to keep them working, kibana UI will switch to use the read alias - rename config index and provide BWC to read from old and new ones
This commit is contained in:
parent
5dd6bd6f49
commit
3da91d5f7a
@ -22,16 +22,20 @@ public final class TransformInternalIndexConstants {
|
||||
*/
|
||||
|
||||
// internal index
|
||||
public static final String INDEX_VERSION = "2";
|
||||
public static final String INDEX_PATTERN = ".data-frame-internal-";
|
||||
|
||||
// version is not a rollover pattern, however padded because sort is string based
|
||||
public static final String INDEX_VERSION = "003";
|
||||
public static final String INDEX_PATTERN = ".transform-internal-";
|
||||
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
|
||||
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
|
||||
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
|
||||
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";
|
||||
|
||||
// audit index
|
||||
public static final String AUDIT_TEMPLATE_VERSION = "1";
|
||||
public static final String AUDIT_TEMPLATE_VERSION = "000001";
|
||||
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
|
||||
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
|
||||
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
|
||||
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*";
|
||||
|
||||
public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";
|
||||
|
@ -7,6 +7,8 @@
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -48,7 +50,7 @@ public class TransformAuditorIT extends TransformRestTestCase {
|
||||
createReviewsIndex();
|
||||
indicesCreated = true;
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
|
||||
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
|
||||
setupUser(TEST_USER_NAME, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -67,6 +69,7 @@ public class TransformAuditorIT extends TransformRestTestCase {
|
||||
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simple_pivot_for_audit\"}}}");
|
||||
assertBusy(() -> {
|
||||
assertTrue(indexExists(TransformInternalIndexConstants.AUDIT_INDEX));
|
||||
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
|
||||
});
|
||||
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
|
||||
// finished the job (as this is a very short DF job), all without the audit being fully written.
|
||||
@ -85,4 +88,16 @@ public class TransformAuditorIT extends TransformRestTestCase {
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
public void testAliasCreatedforBWCIndexes() throws Exception {
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
|
||||
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
|
||||
|
||||
createIndex(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, settings.build());
|
||||
assertBusy(() -> {
|
||||
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED,
|
||||
TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ public class TransformInternalIndexIT extends ESRestTestCase {
|
||||
|
||||
|
||||
private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME;
|
||||
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "1";
|
||||
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";
|
||||
|
||||
|
||||
public void testUpdateDeletesOldTransformConfig() throws Exception {
|
||||
|
@ -232,7 +232,8 @@ public class Transform extends Plugin implements ActionPlugin, PersistentTaskPlu
|
||||
transformConfigManager.get(),
|
||||
transformAuditor.get()));
|
||||
|
||||
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get());
|
||||
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(),
|
||||
new TransformClusterStateListener(clusterService, client));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
||||
class TransformClusterStateListener implements ClusterStateListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);
|
||||
|
||||
private final Client client;
|
||||
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
|
||||
|
||||
TransformClusterStateListener(ClusterService clusterService, Client client) {
|
||||
this.client = client;
|
||||
clusterService.addListener(this);
|
||||
logger.debug("Created TransformClusterStateListener");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// Wait until the gateway has recovered from disk.
|
||||
return;
|
||||
}
|
||||
|
||||
// The atomic flag prevents multiple simultaneous attempts to run alias creation
|
||||
// if there is a flurry of cluster state updates in quick succession
|
||||
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
|
||||
createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap(
|
||||
r -> {
|
||||
isIndexCreationInProgress.set(false);
|
||||
if (r) {
|
||||
logger.info("Created alias for deprecated data frame notifications index");
|
||||
} else {
|
||||
logger.debug("Skipped creating alias for deprecated data frame notifications index");
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
isIndexCreationInProgress.set(false);
|
||||
logger.error("Error creating alias for deprecated data frame notifications index", e);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener<Boolean> finalListener) {
|
||||
|
||||
// check if old audit index exists, no need to create the alias if it does not
|
||||
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED) == false) {
|
||||
finalListener.onResponse(false);
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.getMetaData().getAliasAndIndexLookup().get(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED).getIndices().stream()
|
||||
.anyMatch(metaData -> metaData.getAliases().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))) {
|
||||
finalListener.onResponse(false);
|
||||
return;
|
||||
}
|
||||
|
||||
final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
|
||||
.addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)
|
||||
.request();
|
||||
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
|
||||
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
|
||||
client.admin().indices()::aliases);
|
||||
}
|
||||
|
||||
}
|
@ -196,7 +196,10 @@ public class TransformFeatureSet implements XPackFeatureSet {
|
||||
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(),
|
||||
TransformStoredDoc.NAME)));
|
||||
|
||||
SearchRequestBuilder requestBuilder = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequestBuilder requestBuilder = client
|
||||
.prepareSearch(
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setSize(0)
|
||||
.setQuery(queryBuilder);
|
||||
|
||||
|
@ -63,7 +63,8 @@ public class TransportGetTransformAction extends AbstractTransportGetResourcesAc
|
||||
|
||||
@Override
|
||||
protected String[] getIndices() {
|
||||
return new String[]{TransformInternalIndexConstants.INDEX_NAME_PATTERN};
|
||||
return new String[] { TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED };
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -73,8 +73,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
*
|
||||
* Versioned Index:
|
||||
*
|
||||
* We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while
|
||||
* n is the _current_ version of the index.
|
||||
* We wrap several indexes under 1 pattern: ".transform-internal-001", ".transform-internal-002", ".transform-internal-n" while
|
||||
* n is the _current_ version of the index. For BWC we also search in ".data-frame-internal-1", ".data-frame-internal-2"
|
||||
*
|
||||
* - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses
|
||||
* - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind
|
||||
@ -168,7 +168,9 @@ public class TransformConfigManager {
|
||||
* @param listener listener to alert on completion
|
||||
*/
|
||||
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
|
||||
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
|
||||
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))))
|
||||
@ -195,7 +197,8 @@ public class TransformConfigManager {
|
||||
* @param listener listener to alert on completion
|
||||
*/
|
||||
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
|
||||
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
|
||||
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))))
|
||||
@ -261,7 +264,9 @@ public class TransformConfigManager {
|
||||
*/
|
||||
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener) {
|
||||
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint));
|
||||
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest searchRequest = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(queryBuilder)
|
||||
// use sort to get the last
|
||||
.addSort("_index", SortOrder.DESC)
|
||||
@ -283,14 +288,16 @@ public class TransformConfigManager {
|
||||
|
||||
/**
|
||||
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
|
||||
* data_frame/transforms, see the @link{TransportGetTransformAction}
|
||||
* _transform, see the @link{TransportGetTransformAction}
|
||||
*
|
||||
* @param transformId the transform id
|
||||
* @param resultListener listener to call after inner request has returned
|
||||
*/
|
||||
public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
|
||||
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
|
||||
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest searchRequest = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(queryBuilder)
|
||||
// use sort to get the last
|
||||
.addSort("_index", SortOrder.DESC)
|
||||
@ -312,7 +319,7 @@ public class TransformConfigManager {
|
||||
|
||||
/**
|
||||
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
|
||||
* data_frame/transforms, see the @link{TransportGetTransformAction}
|
||||
* _transform, see the @link{TransportGetTransformAction}
|
||||
*
|
||||
* @param transformId the transform id
|
||||
* @param configAndVersionListener listener to call after inner request has returned
|
||||
@ -321,7 +328,9 @@ public class TransformConfigManager {
|
||||
ActionListener<Tuple<TransformConfig,
|
||||
SeqNoPrimaryTermAndIndex>> configAndVersionListener) {
|
||||
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
|
||||
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest searchRequest = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(queryBuilder)
|
||||
// use sort to get the last
|
||||
.addSort("_index", SortOrder.DESC)
|
||||
@ -362,7 +371,9 @@ public class TransformConfigManager {
|
||||
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
|
||||
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, TransformConfig.NAME);
|
||||
|
||||
SearchRequest request = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest request = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
|
||||
.setFrom(pageParams.getFrom())
|
||||
.setTrackTotalHits(true)
|
||||
@ -413,7 +424,7 @@ public class TransformConfigManager {
|
||||
DeleteByQueryRequest request = new DeleteByQueryRequest()
|
||||
.setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously
|
||||
|
||||
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN);
|
||||
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
|
||||
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
|
||||
request.setQuery(query);
|
||||
request.setRefresh(true);
|
||||
@ -472,7 +483,9 @@ public class TransformConfigManager {
|
||||
public void getTransformStoredDoc(String transformId,
|
||||
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener) {
|
||||
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId));
|
||||
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest searchRequest = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.setQuery(queryBuilder)
|
||||
// use sort to get the last
|
||||
.addSort("_index", SortOrder.DESC)
|
||||
@ -508,7 +521,9 @@ public class TransformConfigManager {
|
||||
.filter(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), transformIds))
|
||||
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformStoredDoc.NAME)));
|
||||
|
||||
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
|
||||
SearchRequest searchRequest = client
|
||||
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
|
||||
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
|
||||
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
|
||||
.addSort("_index", SortOrder.DESC)
|
||||
.setQuery(builder)
|
||||
|
@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateReque
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
@ -94,6 +95,7 @@ public final class TransformInternalIndex {
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
|
||||
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
|
||||
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
|
||||
.build();
|
||||
return transformTemplate;
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM
|
||||
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED;
|
||||
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX;
|
||||
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED;
|
||||
import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_TASK_NAME;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
@ -64,8 +65,8 @@ import static org.hamcrest.Matchers.oneOf;
|
||||
public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
|
||||
private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));
|
||||
private static final String DATAFRAME_ENDPOINT = "/_transform/";
|
||||
private static final String DATAFRAME_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
|
||||
private static final String TRANSFORM_ENDPOINT = "/_transform/";
|
||||
private static final String TRANSFORM_ENDPOINT_DEPRECATED = "/_data_frame/transforms/";
|
||||
private static final String CONTINUOUS_TRANSFORM_ID = "continuous-transform-upgrade-job";
|
||||
private static final String CONTINUOUS_TRANSFORM_SOURCE = "transform-upgrade-continuous-source";
|
||||
private static final List<String> ENTITIES = Stream.iterate(1, n -> n + 1)
|
||||
@ -111,8 +112,8 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
});
|
||||
}
|
||||
|
||||
protected static void waitForPendingDataFrameTasks() throws Exception {
|
||||
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false);
|
||||
protected static void waitForPendingTransformTasks() throws Exception {
|
||||
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(TRANSFORM_TASK_NAME) == false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -127,8 +128,8 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
* The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results
|
||||
* index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade
|
||||
*/
|
||||
public void testDataFramesRollingUpgrade() throws Exception {
|
||||
assumeTrue("Continuous data frames time sync not fixed until 7.4", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0));
|
||||
public void testTransformRollingUpgrade() throws Exception {
|
||||
assumeTrue("Continuous transform time sync not fixed until 7.4", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0));
|
||||
Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings");
|
||||
adjustLoggingLevels.setJsonEntity(
|
||||
"{\"transient\": {" +
|
||||
@ -143,7 +144,7 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
switch (CLUSTER_TYPE) {
|
||||
case OLD:
|
||||
client().performRequest(waitForYellow);
|
||||
createAndStartContinuousDataFrame();
|
||||
createAndStartContinuousTransform();
|
||||
break;
|
||||
case MIXED:
|
||||
client().performRequest(waitForYellow);
|
||||
@ -151,11 +152,11 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
if (Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) {
|
||||
lastCheckpoint = 2;
|
||||
}
|
||||
verifyContinuousDataFrameHandlesData(lastCheckpoint);
|
||||
verifyContinuousTransformHandlesData(lastCheckpoint);
|
||||
break;
|
||||
case UPGRADED:
|
||||
client().performRequest(waitForYellow);
|
||||
verifyContinuousDataFrameHandlesData(3);
|
||||
verifyContinuousTransformHandlesData(3);
|
||||
cleanUpTransforms();
|
||||
break;
|
||||
default:
|
||||
@ -166,10 +167,10 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
private void cleanUpTransforms() throws Exception {
|
||||
stopTransform(CONTINUOUS_TRANSFORM_ID);
|
||||
deleteTransform(CONTINUOUS_TRANSFORM_ID);
|
||||
waitForPendingDataFrameTasks();
|
||||
waitForPendingTransformTasks();
|
||||
}
|
||||
|
||||
private void createAndStartContinuousDataFrame() throws Exception {
|
||||
private void createAndStartContinuousTransform() throws Exception {
|
||||
createIndex(CONTINUOUS_TRANSFORM_SOURCE);
|
||||
long totalDocsWrittenSum = 0;
|
||||
for (TimeValue bucket : BUCKETS) {
|
||||
@ -209,9 +210,9 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception {
|
||||
private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) throws Exception {
|
||||
|
||||
// A continuous data frame should automatically become started when it gets assigned to a node
|
||||
// A continuous transform should automatically become started when it gets assigned to a node
|
||||
// if it was assigned to the node that was removed from the cluster
|
||||
assertBusy(() -> {
|
||||
TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID);
|
||||
@ -255,7 +256,11 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
}
|
||||
|
||||
private void awaitWrittenIndexerState(String id, Consumer<Map<?, ?>> responseAssertion) throws Exception {
|
||||
Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search");
|
||||
Request getStatsDocsRequest = new Request("GET",
|
||||
TRANSFORM_INTERNAL_INDEX_PREFIX + "*," +
|
||||
TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*" +
|
||||
"/_search");
|
||||
|
||||
getStatsDocsRequest.setJsonEntity("{\n" +
|
||||
" \"query\": {\n" +
|
||||
" \"bool\": {\n" +
|
||||
@ -276,7 +281,8 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
"}");
|
||||
assertBusy(() -> {
|
||||
// Want to make sure we get the latest docs
|
||||
client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh"));
|
||||
client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX + "*/_refresh"));
|
||||
client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*/_refresh"));
|
||||
Response response = client().performRequest(getStatsDocsRequest);
|
||||
assertEquals(200, response.getStatusLine().getStatusCode());
|
||||
Map<String, Object> responseBody = entityAsMap(response);
|
||||
@ -295,7 +301,7 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase {
|
||||
}
|
||||
|
||||
private String getTransformEndpoint() {
|
||||
return CLUSTER_TYPE == ClusterType.UPGRADED ? DATAFRAME_ENDPOINT : DATAFRAME_ENDPOINT_DEPRECATED;
|
||||
return CLUSTER_TYPE == ClusterType.UPGRADED ? TRANSFORM_ENDPOINT : TRANSFORM_ENDPOINT_DEPRECATED;
|
||||
}
|
||||
|
||||
private void putTransform(String id, TransformConfig config) throws IOException {
|
||||
|
@ -268,6 +268,6 @@ setup:
|
||||
|
||||
- do:
|
||||
indices.get_mapping:
|
||||
index: .data-frame-internal-2
|
||||
- match: { \.data-frame-internal-2.mappings.dynamic: "false" }
|
||||
- match: { \.data-frame-internal-2.mappings.properties.id.type: "keyword" }
|
||||
index: .transform-internal-003
|
||||
- match: { \.transform-internal-003.mappings.dynamic: "false" }
|
||||
- match: { \.transform-internal-003.mappings.properties.id.type: "keyword" }
|
||||
|
@ -39,6 +39,7 @@ public final class XPackRestTestConstants {
|
||||
CONFIG_INDEX));
|
||||
|
||||
// Transform constants:
|
||||
public static final String TRANSFORM_TASK_NAME = "data_frame/transforms";
|
||||
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX = ".transform-internal-";
|
||||
public static final String TRANSFORM_NOTIFICATIONS_INDEX_PREFIX = ".transform-notifications-";
|
||||
public static final String TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED = ".data-frame-internal-";
|
||||
|
Loading…
x
Reference in New Issue
Block a user