From 1897883adc489fdbad1061e846b903791a2ff44a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 25 Feb 2019 19:08:26 +0100 Subject: [PATCH] [ML-DataFrame] Dataframe access headers (#39289) (#39368) store user headers as part of the config and run transform as user --- .../xpack/core/dataframe/DataFrameField.java | 6 ++ .../TransportPutDataFrameTransformAction.java | 21 ++++-- .../DataFrameTransformsConfigManager.java | 9 +-- .../transforms/DataFrameTransform.java | 6 -- .../transforms/DataFrameTransformConfig.java | 43 ++++++++++-- .../transforms/DataFrameTransformTask.java | 6 +- ...ataFrameTransformsActionResponseTests.java | 23 +++++++ ...wDataFrameTransformActionRequestTests.java | 2 +- ...tDataFrameTransformActionRequestTests.java | 3 +- .../DataFrameTransformConfigTests.java | 66 +++++++++++++++++-- 10 files changed, 150 insertions(+), 35 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 9749cd915b5..b753bf777d8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -41,6 +41,12 @@ public final class DataFrameField { public static final String TRANSFORM = "transform"; public static final String DATA_FRAME_SIGNATURE = "data-frame-transform"; + /** + * Parameter to indicate whether we are serialising to X Content for internal storage. Default the field is invisible (e.g. for get + * API's) + */ + public static final String FOR_INTERNAL_STORAGE = "for_internal_storage"; + private DataFrameField() { } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index c611fb9d3fb..1712db263dd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; @@ -34,8 +35,12 @@ import org.elasticsearch.xpack.dataframe.action.PutDataFrameTransformAction.Resp import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransform; +import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; +import java.util.Map; +import java.util.stream.Collectors; + public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction { @@ -79,7 +84,15 @@ public class TransportPutDataFrameTransformAction XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); - String transformId = request.getConfig().getId(); + // set headers to run data frame transform as calling user + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + DataFrameTransformConfig config = request.getConfig(); + config.setHeaders(filteredHeaders); + + String transformId = config.getId(); // quick check whether a transform has already been created under that name if (PersistentTasksCustomMetaData.getTaskWithId(clusterState, transformId) != null) { listener.onFailure(new ResourceAlreadyExistsException( @@ -88,17 +101,17 @@ public class TransportPutDataFrameTransformAction } // create the transform, for now we only have pivot and no support for custom queries - Pivot pivot = new Pivot(request.getConfig().getSource(), new MatchAllQueryBuilder(), request.getConfig().getPivotConfig()); + Pivot pivot = new Pivot(config.getSource(), new MatchAllQueryBuilder(), config.getPivotConfig()); // the non-state creating steps are done first, so we minimize the chance to end up with orphaned state transform validation pivot.validate(client, ActionListener.wrap(validationResult -> { // deduce target mappings pivot.deduceMappings(client, ActionListener.wrap(mappings -> { // create the destination index - DataframeIndex.createDestinationIndex(client, request.getConfig(), mappings, ActionListener.wrap(createIndexResult -> { + DataframeIndex.createDestinationIndex(client, config, mappings, ActionListener.wrap(createIndexResult -> { DataFrameTransform transform = createDataFrameTransform(transformId, threadPool); // create the transform configuration and store it in the internal index - dataFrameTransformsConfigManager.putTransformConfiguration(request.getConfig(), ActionListener.wrap(r -> { + dataFrameTransformsConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> { // finally start the persistent task persistentTasksService.sendStartRequest(transform.getId(), DataFrameTransform.NAME, transform, ActionListener.wrap(persistentTask -> { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index 2293d2b6319..65f531a9893 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -32,13 +32,13 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig; import java.io.IOException; import java.io.InputStream; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; @@ -48,12 +48,7 @@ public class DataFrameTransformsConfigManager { private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class); - public static final Map TO_XCONTENT_PARAMS; - static { - Map modifiable = new HashMap<>(); - modifiable.put("for_internal_storage", "true"); - TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable); - } + public static final Map TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"); private final Client client; private final NamedXContentRegistry xContentRegistry; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransform.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransform.java index d2a9e324584..cf2d296b678 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransform.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransform.java @@ -17,8 +17,6 @@ import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import java.io.IOException; -import java.util.Collections; -import java.util.Map; import java.util.Objects; public class DataFrameTransform extends AbstractDiffable implements XPackPlugin.XPackPersistentTaskParams { @@ -92,8 +90,4 @@ public class DataFrameTransform extends AbstractDiffable imp public int hashCode() { return Objects.hash(transformId); } - - public Map getHeaders() { - return Collections.emptyMap(); - } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfig.java index 13403e51dcd..8bb1a2b4008 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfig.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.PivotConfig; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -36,6 +37,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class DataFrameTransformConfig extends AbstractDiffable implements Writeable, ToXContentObject { public static final String NAME = "data_frame_transforms"; + public static final ParseField HEADERS = new ParseField("headers"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField QUERY = new ParseField("query"); @@ -50,6 +52,10 @@ public class DataFrameTransformConfig extends AbstractDiffable headers; + private final QueryConfig queryConfig; private final PivotConfig pivotConfig; @@ -60,22 +66,32 @@ public class DataFrameTransformConfig extends AbstractDiffable headers = (Map) args[3]; + // default handling: if the user does not specify a query, we default to match_all QueryConfig queryConfig = null; - if (args[3] == null) { + if (args[4] == null) { queryConfig = new QueryConfig(Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()), new MatchAllQueryBuilder()); } else { - queryConfig = (QueryConfig) args[3]; + queryConfig = (QueryConfig) args[4]; } - PivotConfig pivotConfig = (PivotConfig) args[4]; - return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig); + PivotConfig pivotConfig = (PivotConfig) args[5]; + return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); parser.declareString(constructorArg(), SOURCE); parser.declareString(constructorArg(), DESTINATION); + + parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS); parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY); parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM); @@ -89,12 +105,14 @@ public class DataFrameTransformConfig extends AbstractDiffable headers, final QueryConfig queryConfig, final PivotConfig pivotConfig) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.source = ExceptionsHelper.requireNonNull(source, SOURCE.getPreferredName()); this.dest = ExceptionsHelper.requireNonNull(dest, DESTINATION.getPreferredName()); this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName()); + this.setHeaders(headers == null ? Collections.emptyMap() : headers); this.pivotConfig = pivotConfig; // at least one function must be defined @@ -107,6 +125,7 @@ public class DataFrameTransformConfig extends AbstractDiffable getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + public PivotConfig getPivotConfig() { return pivotConfig; } @@ -153,6 +180,7 @@ public class DataFrameTransformConfig extends AbstractDiffable nextPhase) { - ClientHelper.executeWithHeadersAsync(transform.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE, - request, nextPhase); + ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, + SearchAction.INSTANCE, request, nextPhase); } @Override protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { - ClientHelper.executeWithHeadersAsync(transform.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, BulkAction.INSTANCE, + ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, BulkAction.INSTANCE, request, nextPhase); } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/GetDataFrameTransformsActionResponseTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/GetDataFrameTransformsActionResponseTests.java index 0872eb3d7bd..f9e714bf3cc 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/GetDataFrameTransformsActionResponseTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/GetDataFrameTransformsActionResponseTests.java @@ -43,4 +43,27 @@ public class GetDataFrameTransformsActionResponseTests extends ESTestCase { assertEquals(expectedInvalidTransforms, XContentMapValues.extractValue("invalid_transforms.transforms", responseAsMap)); assertWarnings(LoggerMessageFormat.format(Response.INVALID_TRANSFORMS_DEPRECATION_WARNING, 2)); } + + public void testNoHeaderInResponse() throws IOException { + List transforms = new ArrayList<>(); + + for (int i = 0; i < randomIntBetween(1, 10); ++i) { + transforms.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig()); + } + + Response r = new Response(transforms); + XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); + r.toXContent(builder, XContent.EMPTY_PARAMS); + Map responseAsMap = createParser(builder).map(); + + @SuppressWarnings("unchecked") + List> transformsResponse = (List>) XContentMapValues.extractValue("transforms", + responseAsMap); + + assertEquals(transforms.size(), transformsResponse.size()); + for (int i = 0; i < transforms.size(); ++i) { + assertEquals(transforms.get(i).getSource(), XContentMapValues.extractValue("source", transformsResponse.get(i))); + assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i))); + } + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index 5bce3d13445..a8ee47a5af4 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -63,7 +63,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama @Override protected Request createTestInstance() { DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10), - "unused-transform-preview-index", QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); return new Request(config); } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PutDataFrameTransformActionRequestTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PutDataFrameTransformActionRequestTests.java index e2dc9edfe54..983222127c9 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PutDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/PutDataFrameTransformActionRequestTests.java @@ -68,8 +68,7 @@ public class PutDataFrameTransformActionRequestTests extends AbstractStreamableX @Override protected Request createTestInstance() { - DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig(); + DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(); return new Request(config); } - } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfigTests.java index daabe1cccaa..31ba44d73d9 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformConfigTests.java @@ -10,52 +10,71 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.dataframe.transforms.pivot.PivotConfigTests; import org.junit.Before; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.elasticsearch.test.TestMatchers.matchesPattern; public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase { + private static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams( + Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true")); + private String transformId; + private boolean runWithHeaders; + + public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() { + return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), + PivotConfigTests.randomPivotConfig()); + } public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), + PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() { if (randomBoolean()) { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomInvalidQueryConfig(), PivotConfigTests.randomPivotConfig()); + randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomInvalidQueryConfig(), + PivotConfigTests.randomPivotConfig()); } // else return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomInvalidPivotConfig()); + randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), + PivotConfigTests.randomInvalidPivotConfig()); } @Before public void setUpOptionalId() { transformId = randomAlphaOfLengthBetween(1, 10); + runWithHeaders = randomBoolean(); } @Override protected DataFrameTransformConfig doParseInstance(XContentParser parser) throws IOException { if (randomBoolean()) { - return DataFrameTransformConfig.fromXContent(parser, transformId, false); + return DataFrameTransformConfig.fromXContent(parser, transformId, runWithHeaders); } else { - return DataFrameTransformConfig.fromXContent(parser, null, false); + return DataFrameTransformConfig.fromXContent(parser, null, runWithHeaders); } } @Override protected DataFrameTransformConfig createTestInstance() { - return randomDataFrameTransformConfig(); + return runWithHeaders ? randomDataFrameTransformConfig() : randomDataFrameTransformConfigWithoutHeaders(); } @Override @@ -63,7 +82,19 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT return DataFrameTransformConfig::new; } - public void testDefaultMatchAll( ) throws IOException { + @Override + protected ToXContent.Params getToXContentParams() { + return TO_XCONTENT_PARAMS; + } + + private static Map randomHeaders() { + Map headers = new HashMap<>(1); + headers.put("key", "value"); + + return headers; + } + + public void testDefaultMatchAll() throws IOException { String pivotTransform = "{" + " \"source\" : \"src\"," + " \"dest\" : \"dest\"," @@ -91,6 +122,27 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT } } + public void testPreventHeaderInjection() throws IOException { + String pivotTransform = "{" + + " \"headers\" : {\"key\" : \"value\" }," + + " \"source\" : \"src\"," + + " \"dest\" : \"dest\"," + + " \"pivot\" : {" + + " \"group_by\": {" + + " \"id\": {" + + " \"terms\": {" + + " \"field\": \"id\"" + + "} } }," + + " \"aggs\": {" + + " \"avg\": {" + + " \"avg\": {" + + " \"field\": \"points\"" + + "} } } } }"; + + expectThrows(IllegalArgumentException.class, + () -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection")); + } + private DataFrameTransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException { final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);