[7.x][Transform] improve update API (#57685)

rewrite config on update if either version is outdated, credentials change,
the update changes the config or deprecated settings are found. Deprecated
settings get migrated to the new format. The upgrade can be easily extended to
do any necessary re-writes.

fixes #56499
backport #57648
This commit is contained in:
Hendrik Muhs 2020-06-05 08:48:47 +02:00 committed by GitHub
parent f4a3d969ad
commit c1c8817eae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 391 additions and 8 deletions

View File

@ -423,6 +423,41 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return lenient ? LENIENT_PARSER.apply(parser, optionalTransformId) : STRICT_PARSER.apply(parser, optionalTransformId);
}
/**
* Rewrites the transform config according to the latest format, for example moving deprecated
* settings to its new place.
*
* @param transformConfig original config
* @return a rewritten transform config if a rewrite was necessary, otherwise the given transformConfig
*/
public static TransformConfig rewriteForUpdate(final TransformConfig transformConfig) {
// quick checks for deprecated features, if none found just return the original
if (transformConfig.getPivotConfig() == null || transformConfig.getPivotConfig().getMaxPageSearchSize() == null) {
return transformConfig;
}
Builder builder = new Builder(transformConfig);
if (transformConfig.getPivotConfig() != null && transformConfig.getPivotConfig().getMaxPageSearchSize() != null) {
// create a new pivot config but set maxPageSearchSize to null
PivotConfig newPivotConfig = new PivotConfig(
transformConfig.getPivotConfig().getGroupConfig(),
transformConfig.getPivotConfig().getAggregationConfig(),
null
);
builder.setPivotConfig(newPivotConfig);
Integer maxPageSearchSizeDeprecated = transformConfig.getPivotConfig().getMaxPageSearchSize();
Integer maxPageSearchSize = transformConfig.getSettings().getMaxPageSearchSize() != null
? transformConfig.getSettings().getMaxPageSearchSize()
: maxPageSearchSizeDeprecated;
builder.setSettings(new SettingsConfig(maxPageSearchSize, transformConfig.getSettings().getDocsPerSecond()));
}
return builder.setVersion(Version.CURRENT).build();
}
public static class Builder {
private String id;
private SourceConfig source;

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.core.transform.transforms.persistence;
import org.elasticsearch.Version;
public final class TransformInternalIndexConstants {
/* Constants for internal indexes of the transform plugin
@ -24,6 +26,7 @@ public final class TransformInternalIndexConstants {
// internal index
// version is not a rollover pattern, however padded because sort is string based
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_7_0;
public static final String INDEX_VERSION = "005";
public static final String INDEX_PATTERN = ".transform-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
@ -42,7 +45,6 @@ public final class TransformInternalIndexConstants {
public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";
public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION;
private TransformInternalIndexConstants() {
}
private TransformInternalIndexConstants() {}
}

View File

@ -313,9 +313,81 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
);
}
public void testRewriteForUpdate() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } },"
+ " \"max_page_search_size\" : 111"
+ "},"
+ " \"version\" : \""
+ Version.V_7_6_0.toString()
+ "\""
+ "}";
TransformConfig transformConfig = createTransformConfigFromString(pivotTransform, "body_id", true);
TransformConfig transformConfigRewritten = TransformConfig.rewriteForUpdate(transformConfig);
assertNull(transformConfigRewritten.getPivotConfig().getMaxPageSearchSize());
assertNotNull(transformConfigRewritten.getSettings().getMaxPageSearchSize());
assertEquals(111L, transformConfigRewritten.getSettings().getMaxPageSearchSize().longValue());
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
assertEquals(Version.CURRENT, transformConfigRewritten.getVersion());
}
public void testRewriteForUpdateConflicting() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } },"
+ " \"max_page_search_size\": 111"
+ "},"
+ " \"settings\" : { \"max_page_search_size\": 555"
+ "},"
+ " \"version\" : \""
+ Version.V_7_5_0.toString()
+ "\""
+ "}";
TransformConfig transformConfig = createTransformConfigFromString(pivotTransform, "body_id", true);
TransformConfig transformConfigRewritten = TransformConfig.rewriteForUpdate(transformConfig);
assertNull(transformConfigRewritten.getPivotConfig().getMaxPageSearchSize());
assertNotNull(transformConfigRewritten.getSettings().getMaxPageSearchSize());
assertEquals(555L, transformConfigRewritten.getSettings().getMaxPageSearchSize().longValue());
assertEquals(Version.CURRENT, transformConfigRewritten.getVersion());
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
}
private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
return createTransformConfigFromString(json, id, false);
}
private TransformConfig createTransformConfigFromString(String json, String id, boolean lenient) throws IOException {
final XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
return TransformConfig.fromXContent(parser, id, false);
return TransformConfig.fromXContent(parser, id, lenient);
}
}

View File

@ -279,7 +279,11 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected void stopTransform(String transformId, boolean force, boolean waitForCheckpoint) throws Exception {
final Request stopTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_stop", null);
stopTransform(transformId, null, force, false);
}
protected void stopTransform(String transformId, String authHeader, boolean force, boolean waitForCheckpoint) throws Exception {
final Request stopTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_stop", authHeader);
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(force));
stopTransformRequest.addParameter(TransformField.WAIT_FOR_COMPLETION.getPreferredName(), Boolean.toString(true));
stopTransformRequest.addParameter(TransformField.WAIT_FOR_CHECKPOINT.getPreferredName(), Boolean.toString(waitForCheckpoint));

View File

@ -0,0 +1,261 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.integration;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
public class TransformUpdateIT extends TransformRestTestCase {
private static final String TEST_USER_NAME = "transform_user";
private static final String BASIC_AUTH_VALUE_TRANSFORM_USER = basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_1,
TEST_PASSWORD_SECURE_STRING
);
private static final String TEST_ADMIN_USER_NAME_2 = "transform_admin_2";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2 = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_2,
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String DATA_ACCESS_ROLE_2 = "test_data_access_2";
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
@Override
protected boolean enableWarningsCheck() {
return false;
}
@Override
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
configureClient(builder, settings);
builder.setStrictDeprecationMode(false);
return builder.build();
}
@Before
public void createIndexes() throws IOException {
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
createReviewsIndex();
indicesCreated = true;
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupDataAccessRole(DATA_ACCESS_ROLE_2, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_2, Arrays.asList("transform_admin", DATA_ACCESS_ROLE_2));
}
@SuppressWarnings("unchecked")
public void testUpdateDeprecatedSettings() throws Exception {
String transformId = "old_transform";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = "{ \"dest\": {\"index\":\""
+ transformDest
+ "\"},"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } },"
+ " \"max_page_search_size\": 555"
+ " }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_USER);
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
Map<String, Object> transform = ((List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms)).get(0);
assertThat(XContentMapValues.extractValue("pivot.max_page_search_size", transform), equalTo(555));
final Request updateRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + transformId + "/_update",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
updateRequest.setJsonEntity("{}");
Map<String, Object> updateResponse = entityAsMap(client().performRequest(updateRequest));
assertNull(XContentMapValues.extractValue("pivot.max_page_search_size", updateResponse));
assertThat(XContentMapValues.extractValue("settings.max_page_search_size", updateResponse), equalTo(555));
getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_USER);
transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
transform = ((List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms)).get(0);
assertNull(XContentMapValues.extractValue("pivot.max_page_search_size", transform));
assertThat(XContentMapValues.extractValue("settings.max_page_search_size", transform), equalTo(555));
}
@SuppressWarnings("unchecked")
public void testUpdateTransferRights() throws Exception {
String transformId = "transform1";
// Note: Due to a bug the transform does not fail to start after deleting the user and role, therefore invalidating
// the credentials stored with the config. As a workaround we use a 2nd transform that uses the same config
// once the bug is fixed, delete this 2nd transform
String transformIdCloned = "transform2";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
setupDataAccessRole(DATA_ACCESS_ROLE_2, REVIEWS_INDEX_NAME, transformDest);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
);
final Request createTransformRequest_2 = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformIdCloned,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2
);
String config = "{ \"dest\": {\"index\":\""
+ transformDest
+ "\"},"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } }"
+ " }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_2);
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
// create a 2nd, identical one
createTransformRequest_2.setJsonEntity(config);
createTransformResponse = entityAsMap(client().performRequest(createTransformRequest_2));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// delete the user _and_ the role to access the data
deleteUser(TEST_ADMIN_USER_NAME_2);
deleteDataAccessRole(DATA_ACCESS_ROLE_2);
// getting the transform with the just deleted admin 2 user should fail
try {
client().performRequest(getRequest);
fail("request should have failed");
} catch (ResponseException e) {
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(401));
}
// get the transform with admin 1
getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
// start using admin 1, but as the header is still admin 2
// BUG: this should fail, because the transform can not access the source index any longer
startAndWaitForTransform(transformId, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
assertBusy(() -> {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(0));
}, 3, TimeUnit.SECONDS);
// update the transform with an empty body, the credentials (headers) should change
final Request updateRequest = createRequestWithAuth(
"POST",
getTransformEndpoint() + transformIdCloned + "/_update",
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
updateRequest.setJsonEntity("{}");
assertOK(client().performRequest(updateRequest));
// get should still work
getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformIdCloned, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
// start with updated configuration should succeed
startAndWaitForTransform(transformIdCloned, transformDest, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1);
assertBusy(() -> {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformIdCloned);
assertThat(XContentMapValues.extractValue("stats.documents_indexed", transformStatsAsMap), equalTo(27));
}, 15, TimeUnit.SECONDS);
}
private void deleteUser(String user) throws IOException {
Request request = new Request("DELETE", "/_security/user/" + user);
client().performRequest(request);
}
protected void deleteDataAccessRole(String role) throws IOException {
Request request = new Request("DELETE", "/_security/role/" + role);
client().performRequest(request);
}
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
@ -193,9 +194,18 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
// GET transform and attempt to update
// We don't want the update to complete if the config changed between GET and INDEX
transformConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(configAndVersion -> {
final TransformConfig config = configAndVersion.v1();
final TransformConfig oldConfig = configAndVersion.v1();
final TransformConfig config = TransformConfig.rewriteForUpdate(oldConfig);
// If it is a noop don't bother even writing the doc, save the cycles, just return here.
if (update.isNoop(config)) {
// skip when:
// - config is in the latest index
// - rewrite did not change the config
// - update is not making any changes
if (config.getVersion() != null
&& config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED)
&& config.equals(oldConfig)
&& update.isNoop(config)) {
listener.onResponse(new Response(config));
return;
}
@ -213,8 +223,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
if (transformTask != null
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)
) {
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {
request.setNodes(transformTask.getExecutorNode());
updateListener = ActionListener.wrap(updateResponse -> {
request.setConfig(updateResponse.getConfig());