[ML-DataFrame] Dataframe REST cleanups (#39451) (#39503)

fix a couple of odd behaviors of data frame transforms REST API's:

 -  check if id from body and id from URL match if both are specified
 -  do not allow a body for delete
 -  allow get and stats without specifying an id
This commit is contained in:
Hendrik Muhs 2019-02-28 13:00:37 +01:00 committed by GitHub
parent 5c96b90ed5
commit 30e5c11cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 206 additions and 37 deletions

View File

@ -27,7 +27,8 @@ public final class DataFrameField {
// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/";
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";
// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

View File

@ -24,6 +24,9 @@ public class DataFrameMessages {
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";

View File

@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
import java.io.IOException;
import java.util.Map;
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
@Before
public void createIndexes() throws IOException {
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
createReviewsIndex();
indicesCreated = true;
}
public void testGetAndGetStats() throws Exception {
createPivotReviewsTransform("pivot_1", "pivot_reviews_1", null);
createPivotReviewsTransform("pivot_2", "pivot_reviews_2", null);
startAndWaitForTransform("pivot_1", "pivot_reviews_1");
startAndWaitForTransform("pivot_2", "pivot_reviews_2");
// check all the different ways to retrieve all stats
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*/_stats")));
assertEquals(2, XContentMapValues.extractValue("count", stats));
// only pivot_1
stats = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats")));
assertEquals(1, XContentMapValues.extractValue("count", stats));
// check all the different ways to retrieve all transforms
Map<String, Object> transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT)));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "*")));
assertEquals(2, XContentMapValues.extractValue("count", transforms));
// only pivot_1
transforms = entityAsMap(client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "pivot_1")));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
}
}

View File

@ -7,7 +7,6 @@
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
@ -17,7 +16,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@ -255,35 +253,6 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
});
}
private void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}
private void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}
private static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}
private void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}
private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
@ -143,6 +144,28 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
assertTrue(indexExists(dataFrameIndex));
}
protected void startAndWaitForTransform(String transformId, String dataFrameIndex) throws IOException, Exception {
// start the transform
final Request startTransformRequest = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_start");
Map<String, Object> startTransformResponse = entityAsMap(client().performRequest(startTransformRequest));
assertThat(startTransformResponse.get("started"), equalTo(Boolean.TRUE));
// wait until the dataframe has been created and all data is available
waitForDataFrameGeneration(transformId);
refreshIndex(dataFrameIndex);
}
void waitForDataFrameGeneration(String transformId) throws Exception {
assertBusy(() -> {
long generation = getDataFrameGeneration(transformId);
assertEquals(1, generation);
}, 30, TimeUnit.SECONDS);
}
void refreshIndex(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", index + "/_refresh")));
}
@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", DATAFRAME_ENDPOINT + "_all"));
@ -221,4 +244,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
}
}
}
static int getDataFrameGeneration(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
return (int) XContentMapValues.extractValue("state.generation", transformStatsAsMap);
}
}

View File

@ -27,6 +27,10 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("delete data frame transforms requests can not have a request body");
}
String id = restRequest.param(DataFrameField.ID.getPreferredName());
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);

View File

@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler {
public RestGetDataFrameTransformsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS, this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID, this);
}

View File

@ -19,6 +19,7 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler {
public RestGetDataFrameTransformsStatsAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS + "_stats", this);
controller.registerHandler(RestRequest.Method.GET, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_stats", this);
}

View File

@ -62,7 +62,16 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
private static ConstructingObjectParser<DataFrameTransformConfig, String> createParser(boolean lenient) {
ConstructingObjectParser<DataFrameTransformConfig, String> parser = new ConstructingObjectParser<>(NAME, lenient,
(args, optionalId) -> {
String id = args[0] != null ? (String) args[0] : optionalId;
String id = (String) args[0];
// if the id has been specified in the body and the path, they must match
if (id == null) {
id = optionalId;
} else if (optionalId != null && id.equals(optionalId) == false) {
throw new IllegalArgumentException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_INCONSISTENT_ID, id, optionalId));
}
String source = (String) args[1];
String dest = (String) args[2];

View File

@ -68,7 +68,7 @@ public class PutDataFrameTransformActionRequestTests extends AbstractStreamableX
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders();
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config);
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.rest.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class RestDeleteDataFrameTransformActionTests extends ESTestCase {
public void testBodyRejection() throws Exception {
final RestDeleteDataFrameTransformAction handler = new RestDeleteDataFrameTransformAction(Settings.EMPTY,
mock(RestController.class));
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.field("id", "my_id");
}
builder.endObject();
final FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withContent(new BytesArray(builder.toString()), XContentType.JSON)
.build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> handler.prepareRequest(request, mock(NodeClient.class)));
assertThat(e.getMessage(), equalTo("delete data frame transforms requests can not have a request body"));
}
}
}

View File

@ -36,8 +36,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(),
PivotConfigTests.randomPivotConfig());
randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomDataFrameTransformConfig() {
@ -46,6 +45,16 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null,
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) {
return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(),
QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig());
}
public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
if (randomBoolean()) {
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
@ -74,7 +83,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
@Override
protected DataFrameTransformConfig createTestInstance() {
return runWithHeaders ? randomDataFrameTransformConfig() : randomDataFrameTransformConfigWithoutHeaders();
return runWithHeaders ? randomDataFrameTransformConfig(transformId) : randomDataFrameTransformConfigWithoutHeaders(transformId);
}
@Override
@ -143,6 +152,33 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
}
public void testSetIdInBody() throws IOException {
String pivotTransform = "{"
+ " \"id\" : \"body_id\","
+ " \"source\" : \"src\","
+ " \"dest\" : \"dest\","
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"id\": {"
+ " \"terms\": {"
+ " \"field\": \"id\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
DataFrameTransformConfig dataFrameTransformConfig = createDataFrameTransformConfigFromString(pivotTransform, "body_id");
assertEquals("body_id", dataFrameTransformConfig.getId());
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> createDataFrameTransformConfigFromString(pivotTransform, "other_id"));
assertEquals("Inconsistent id; 'body_id' specified in the body differs from 'other_id' specified as a URL argument",
ex.getCause().getMessage());
}
private DataFrameTransformConfig createDataFrameTransformConfigFromString(String json, String id) throws IOException {
final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);