[ML-DataFrame] Dataframe access headers (#39289) (#39368)

store user headers as part of the config and run transform as user
This commit is contained in:
Hendrik Muhs 2019-02-25 19:08:26 +01:00 committed by GitHub
parent bf058d6e4d
commit 1897883adc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 150 additions and 35 deletions

View File

@ -41,6 +41,12 @@ public final class DataFrameField {
public static final String TRANSFORM = "transform";
public static final String DATA_FRAME_SIGNATURE = "data-frame-transform";
/**
* Parameter to indicate whether we are serialising to X Content for internal storage. Default the field is invisible (e.g. for get
* API's)
*/
public static final String FOR_INTERNAL_STORAGE = "for_internal_storage";
private DataFrameField() {
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
@ -34,8 +35,12 @@ import org.elasticsearch.xpack.dataframe.action.PutDataFrameTransformAction.Resp
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.util.Map;
import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction
extends TransportMasterNodeAction<PutDataFrameTransformAction.Request, PutDataFrameTransformAction.Response> {
@ -79,7 +84,15 @@ public class TransportPutDataFrameTransformAction
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
String transformId = request.getConfig().getId();
// set headers to run data frame transform as calling user
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
DataFrameTransformConfig config = request.getConfig();
config.setHeaders(filteredHeaders);
String transformId = config.getId();
// quick check whether a transform has already been created under that name
if (PersistentTasksCustomMetaData.getTaskWithId(clusterState, transformId) != null) {
listener.onFailure(new ResourceAlreadyExistsException(
@ -88,17 +101,17 @@ public class TransportPutDataFrameTransformAction
}
// create the transform, for now we only have pivot and no support for custom queries
Pivot pivot = new Pivot(request.getConfig().getSource(), new MatchAllQueryBuilder(), request.getConfig().getPivotConfig());
Pivot pivot = new Pivot(config.getSource(), new MatchAllQueryBuilder(), config.getPivotConfig());
// the non-state creating steps are done first, so we minimize the chance to end up with orphaned state transform validation
pivot.validate(client, ActionListener.wrap(validationResult -> {
// deduce target mappings
pivot.deduceMappings(client, ActionListener.wrap(mappings -> {
// create the destination index
DataframeIndex.createDestinationIndex(client, request.getConfig(), mappings, ActionListener.wrap(createIndexResult -> {
DataframeIndex.createDestinationIndex(client, config, mappings, ActionListener.wrap(createIndexResult -> {
DataFrameTransform transform = createDataFrameTransform(transformId, threadPool);
// create the transform configuration and store it in the internal index
dataFrameTransformsConfigManager.putTransformConfiguration(request.getConfig(), ActionListener.wrap(r -> {
dataFrameTransformsConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {
// finally start the persistent task
persistentTasksService.sendStartRequest(transform.getId(), DataFrameTransform.NAME, transform,
ActionListener.wrap(persistentTask -> {

View File

@ -32,13 +32,13 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
@ -48,12 +48,7 @@ public class DataFrameTransformsConfigManager {
private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
public static final Map<String, String> TO_XCONTENT_PARAMS;
static {
Map<String, String> modifiable = new HashMap<>();
modifiable.put("for_internal_storage", "true");
TO_XCONTENT_PARAMS = Collections.unmodifiableMap(modifiable);
}
public static final Map<String, String> TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true");
private final Client client;
private final NamedXContentRegistry xContentRegistry;

View File

@ -17,8 +17,6 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> implements XPackPlugin.XPackPersistentTaskParams {
@ -92,8 +90,4 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
public int hashCode() {
return Objects.hash(transformId);
}
public Map<String, String> getHeaders() {
return Collections.emptyMap();
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.dataframe.transforms.pivot.PivotConfig;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -36,6 +37,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransformConfig> implements Writeable, ToXContentObject {
public static final String NAME = "data_frame_transforms";
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField QUERY = new ParseField("query");
@ -50,6 +52,10 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
private final String source;
private final String dest;
// headers store the user context from the creating user, which allows us to run the transform as this user
// the header only contains name, groups and other context but no authorization keys
private Map<String, String> headers;
private final QueryConfig queryConfig;
private final PivotConfig pivotConfig;
@ -60,22 +66,32 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
String source = (String) args[1];
String dest = (String) args[2];
// on strict parsing do not allow injection of headers
if (lenient == false && args[3] != null) {
throw new IllegalArgumentException("Found [headers], not allowed for strict parsing");
}
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>) args[3];
// default handling: if the user does not specify a query, we default to match_all
QueryConfig queryConfig = null;
if (args[3] == null) {
if (args[4] == null) {
queryConfig = new QueryConfig(Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()),
new MatchAllQueryBuilder());
} else {
queryConfig = (QueryConfig) args[3];
queryConfig = (QueryConfig) args[4];
}
PivotConfig pivotConfig = (PivotConfig) args[4];
return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig);
PivotConfig pivotConfig = (PivotConfig) args[5];
return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig);
});
parser.declareString(optionalConstructorArg(), DataFrameField.ID);
parser.declareString(constructorArg(), SOURCE);
parser.declareString(constructorArg(), DESTINATION);
parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS);
parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY);
parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM);
@ -89,12 +105,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
public DataFrameTransformConfig(final String id,
final String source,
final String dest,
final Map<String, String> headers,
final QueryConfig queryConfig,
final PivotConfig pivotConfig) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.source = ExceptionsHelper.requireNonNull(source, SOURCE.getPreferredName());
this.dest = ExceptionsHelper.requireNonNull(dest, DESTINATION.getPreferredName());
this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName());
this.setHeaders(headers == null ? Collections.emptyMap() : headers);
this.pivotConfig = pivotConfig;
// at least one function must be defined
@ -107,6 +125,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
id = in.readString();
source = in.readString();
dest = in.readString();
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
queryConfig = in.readOptionalWriteable(QueryConfig::new);
pivotConfig = in.readOptionalWriteable(PivotConfig::new);
}
@ -127,6 +146,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
return dest;
}
public Map<String, String> getHeaders() {
return headers;
}
public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
public PivotConfig getPivotConfig() {
return pivotConfig;
}
@ -153,6 +180,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
out.writeString(id);
out.writeString(source);
out.writeString(dest);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
out.writeOptionalWriteable(queryConfig);
out.writeOptionalWriteable(pivotConfig);
}
@ -169,6 +197,10 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
if (pivotConfig != null) {
builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig);
}
if (headers.isEmpty() == false && params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false) == true) {
builder.field(HEADERS.getPreferredName(), headers);
}
builder.endObject();
return builder;
}
@ -188,13 +220,14 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
return Objects.equals(this.id, that.id)
&& Objects.equals(this.source, that.source)
&& Objects.equals(this.dest, that.dest)
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.queryConfig, that.queryConfig)
&& Objects.equals(this.pivotConfig, that.pivotConfig);
}
@Override
public int hashCode() {
return Objects.hash(id, source, dest, queryConfig, pivotConfig);
return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig);
}
@Override

View File

@ -277,13 +277,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override
protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
ClientHelper.executeWithHeadersAsync(transform.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, SearchAction.INSTANCE,
request, nextPhase);
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client,
SearchAction.INSTANCE, request, nextPhase);
}
@Override
protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
ClientHelper.executeWithHeadersAsync(transform.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, BulkAction.INSTANCE,
ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, BulkAction.INSTANCE,
request, nextPhase);
}

View File

@ -43,4 +43,27 @@ public class GetDataFrameTransformsActionResponseTests extends ESTestCase {
assertEquals(expectedInvalidTransforms, XContentMapValues.extractValue("invalid_transforms.transforms", responseAsMap));
assertWarnings(LoggerMessageFormat.format(Response.INVALID_TRANSFORMS_DEPRECATION_WARNING, 2));
}
public void testNoHeaderInResponse() throws IOException {
List<DataFrameTransformConfig> transforms = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 10); ++i) {
transforms.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
}
Response r = new Response(transforms);
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
r.toXContent(builder, XContent.EMPTY_PARAMS);
Map<String, Object> responseAsMap = createParser(builder).map();
@SuppressWarnings("unchecked")
List<Map<String, Object>> transformsResponse = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms",
responseAsMap);
assertEquals(transforms.size(), transformsResponse.size());
for (int i = 0; i < transforms.size(); ++i) {
assertEquals(transforms.get(i).getSource(), XContentMapValues.extractValue("source", transformsResponse.get(i)));
assertEquals(null, XContentMapValues.extractValue("headers", transformsResponse.get(i)));
}
}
}

View File

@ -63,7 +63,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractStreama
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10),
"unused-transform-preview-index", QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
"unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
return new Request(config);
}

View File

@ -68,8 +68,7 @@ public class PutDataFrameTransformActionRequestTests extends AbstractStreamableX
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
return new Request(config);
}
}

View File

@ -10,52 +10,71 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.dataframe.transforms.pivot.PivotConfigTests;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.test.TestMatchers.matchesPattern;
public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformConfig> {
private static Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));
private String transformId;
private boolean runWithHeaders;
public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomDataFrameTransformConfig() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
if (randomBoolean()) {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomInvalidQueryConfig(), PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomInvalidQueryConfig(),
PivotConfigTests.randomPivotConfig());
} // else
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomInvalidPivotConfig());
randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomInvalidPivotConfig());
}
@Before
public void setUpOptionalId() {
transformId = randomAlphaOfLengthBetween(1, 10);
runWithHeaders = randomBoolean();
}
@Override
protected DataFrameTransformConfig doParseInstance(XContentParser parser) throws IOException {
if (randomBoolean()) {
return DataFrameTransformConfig.fromXContent(parser, transformId, false);
return DataFrameTransformConfig.fromXContent(parser, transformId, runWithHeaders);
} else {
return DataFrameTransformConfig.fromXContent(parser, null, false);
return DataFrameTransformConfig.fromXContent(parser, null, runWithHeaders);
}
}
@Override
protected DataFrameTransformConfig createTestInstance() {
return randomDataFrameTransformConfig();
return runWithHeaders ? randomDataFrameTransformConfig() : randomDataFrameTransformConfigWithoutHeaders();
}
@Override
@ -63,7 +82,19 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
return DataFrameTransformConfig::new;
}
public void testDefaultMatchAll( ) throws IOException {
@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
private static Map<String, String> randomHeaders() {
Map<String, String> headers = new HashMap<>(1);
headers.put("key", "value");
return headers;
}
public void testDefaultMatchAll() throws IOException {
String pivotTransform = "{"
+ " \"source\" : \"src\","
+ " \"dest\" : \"dest\","
@ -91,6 +122,27 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
}
}
public void testPreventHeaderInjection() throws IOException {
String pivotTransform = "{"
+ " \"headers\" : {\"key\" : \"value\" },"
+ " \"source\" : \"src\","
+ " \"dest\" : \"dest\","
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
}
private DataFrameTransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);