[ML] Periodically persist data-frame running statistics to internal index (#40650) (#40729)

* [ML] Add mappings, serialization, and hooks to persist stats

* Adding tests for transforms without tasks having stats persisted

* intermittent commit

* Adjusting usage stats to account for stored stats docs

* Adding tests for id expander

* Addressing PR comments

* removing unused import

* adding shard failures to the task response
This commit is contained in:
Benjamin Trent 2019-04-02 14:16:55 -05:00 committed by GitHub
parent abbfc75052
commit 945e7ca01e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1030 additions and 284 deletions

View File

@ -33,7 +33,7 @@ public final class DataFrameField {
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";
public static final String DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD = "transform_id";
public static final String TRANSFORM_ID = "transform_id";
// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

View File

@ -29,12 +29,16 @@ public class DataFrameMessages {
public static final String DATA_FRAME_CONFIG_INVALID = "Data frame transform configuration is invalid [{0}]";
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]";
public static final String DATA_FRAME_UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =
"Failed to parse transform configuration for data frame transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION =
"Failed to parse transform statistics for data frame transform [{0}]";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM =
"Data frame transform configuration must specify exactly 1 function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =

View File

@ -6,9 +6,9 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
@ -20,14 +20,18 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransformsStatsAction.Response> {
public static final GetDataFrameTransformsStatsAction INSTANCE = new GetDataFrameTransformsStatsAction();
@ -43,6 +47,11 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
public static class Request extends BaseTasksRequest<Request> {
private String id;
private PageParams pageParams = PageParams.defaultParams();
public static final int MAX_SIZE_RETURN = 1000;
// used internally to expand the queried id expression
private List<String> expandedIds = Collections.emptyList();
public Request(String id) {
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
@ -55,36 +64,58 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
expandedIds = in.readList(StreamInput::readString);
pageParams = in.readOptionalWriteable(PageParams::new);
}
@Override
public boolean match(Task task) {
// If we are retrieving all the transforms, the task description does not contain the id
if (id.equals(MetaData.ALL)) {
return task.getDescription().startsWith(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX);
}
// Otherwise find the task by ID
return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
// Only get tasks that we have expanded to
return expandedIds.stream()
.anyMatch(transformId -> task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transformId));
}
public String getId() {
return id;
}
public List<String> getExpandedIds() {
return expandedIds;
}
public void setExpandedIds(List<String> expandedIds) {
this.expandedIds = Collections.unmodifiableList(new ArrayList<>(expandedIds));
}
public final void setPageParams(PageParams pageParams) {
this.pageParams = pageParams;
}
public final PageParams getPageParams() {
return pageParams;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeStringCollection(expandedIds);
out.writeOptionalWriteable(pageParams);
}
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException exception = null;
if (getPageParams() != null && getPageParams().getSize() > MAX_SIZE_RETURN) {
exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() +
"] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception);
}
return exception;
}
@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, pageParams);
}
@Override
@ -96,7 +127,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && Objects.equals(pageParams, other.pageParams);
}
}
@ -109,7 +140,7 @@ public class GetDataFrameTransformsStatsAction extends Action<GetDataFrameTransf
}
public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
List<? extends ElasticsearchException> nodeFailures) {
super(taskFailures, nodeFailures);
this.transformsStateAndStats = transformsStateAndStats;
}

View File

@ -11,17 +11,17 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
public class DataFrameAuditMessage extends AbstractAuditMessage {
private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD);
private static final ParseField TRANSFORM_ID = new ParseField(DataFrameField.TRANSFORM_ID);
public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_audit_message",
true,

View File

@ -6,35 +6,43 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerJobStats;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameIndexerTransformStats extends IndexerJobStats {
private static final String NAME = "data_frame_indexer_transform_stats";
private static ParseField NUM_PAGES = new ParseField("pages_processed");
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed");
private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
private static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
private static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
private static ParseField INDEX_TOTAL = new ParseField("index_total");
private static ParseField SEARCH_TOTAL = new ParseField("search_total");
private static ParseField SEARCH_FAILURES = new ParseField("search_failures");
private static ParseField INDEX_FAILURES = new ParseField("index_failures");
private static final String DEFAULT_TRANSFORM_ID = "_all";
public static final String NAME = "data_frame_indexer_transform_stats";
public static ParseField NUM_PAGES = new ParseField("pages_processed");
public static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
public static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed");
public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
public static ParseField INDEX_TOTAL = new ParseField("index_total");
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");
public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> PARSER = new ConstructingObjectParser<>(
NAME, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
NAME, args -> new DataFrameIndexerTransformStats((String) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9], (long) args[10]));
static {
PARSER.declareString(optionalConstructorArg(), DataFrameField.ID);
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
@ -45,20 +53,72 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
PARSER.declareString(optionalConstructorArg(), DataFrameField.INDEX_DOC_TYPE);
}
public DataFrameIndexerTransformStats() {
private final String transformId;
/**
* Certain situations call for a default transform ID, e.g. when merging many different transforms for statistics gather.
*
* The returned stats object cannot be stored in the index as the transformId does not refer to a real transform configuration
*
* @return new DataFrameIndexerTransformStats with empty stats and a default transform ID
*/
public static DataFrameIndexerTransformStats withDefaultTransformId() {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID);
}
public static DataFrameIndexerTransformStats withDefaultTransformId(long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures,
long searchFailures) {
return new DataFrameIndexerTransformStats(DEFAULT_TRANSFORM_ID, numPages, numInputDocuments,
numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
}
public DataFrameIndexerTransformStats(String transformId) {
super();
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}
public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal, indexFailures,
searchFailures);
public DataFrameIndexerTransformStats(String transformId, long numPages, long numInputDocuments, long numOutputDocuments,
long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
indexFailures, searchFailures);
this.transformId = Objects.requireNonNull(transformId, "parameter transformId must not be null");
}
public DataFrameIndexerTransformStats(DataFrameIndexerTransformStats other) {
this(other.transformId, other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
}
public DataFrameIndexerTransformStats(StreamInput in) throws IOException {
super(in);
transformId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(transformId);
}
/**
* Get the persisted stats document name from the Data Frame Transformer Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}
@Nullable
public String getTransformId() {
return transformId;
}
@Override
@ -74,11 +134,22 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
// If we are storing something, it should have a valid transform ID.
if (transformId.equals(DEFAULT_TRANSFORM_ID)) {
throw new IllegalArgumentException("when storing transform statistics, a valid transform id must be provided");
}
builder.field(DataFrameField.ID.getPreferredName(), transformId);
builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.endObject();
return builder;
}
public DataFrameIndexerTransformStats merge(DataFrameIndexerTransformStats other) {
// We should probably not merge two sets of stats unless one is an accumulation object (i.e. with the default transform id)
// or the stats are referencing the same transform
assert transformId.equals(DEFAULT_TRANSFORM_ID) || this.transformId.equals(other.transformId);
numPages += other.numPages;
numInputDocuments += other.numInputDocuments;
numOuputDocuments += other.numOuputDocuments;
@ -93,6 +164,37 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
return this;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other;
return Objects.equals(this.transformId, that.transformId)
&& Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal);
}
@Override
public int hashCode() {
return Objects.hash(transformId, numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
}
public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) {
try {
return PARSER.parse(parser, null);

View File

@ -40,9 +40,13 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id, DataFrameIndexerTransformStats indexerTransformStats) {
return new DataFrameTransformStateAndStats(id,
new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null),
new DataFrameIndexerTransformStats());
indexerTransformStats);
}
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
@ -62,7 +66,7 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(STATE_FIELD.getPreferredName(), transformState);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.endObject();
return builder;
}

View File

@ -7,12 +7,19 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException;
import java.util.Collections;
public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTestCase<DataFrameIndexerTransformStats> {
protected static ToXContent.Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));
@Override
protected DataFrameIndexerTransformStats createTestInstance() {
return randomStats();
@ -29,21 +36,32 @@ public class DataFrameIndexerTransformStatsTests extends AbstractSerializingTest
}
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
return randomStats(randomAlphaOfLength(10));
}
public static DataFrameIndexerTransformStats randomStats(String transformId) {
return new DataFrameIndexerTransformStats(transformId, randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L));
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L));
}
@Override
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
public void testMerge() throws IOException {
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats randomStats = randomStats();
String transformId = randomAlphaOfLength(10);
DataFrameIndexerTransformStats emptyStats = new DataFrameIndexerTransformStats(transformId);
DataFrameIndexerTransformStats randomStats = randomStats(transformId);
assertEquals(randomStats, emptyStats.merge(randomStats));
assertEquals(randomStats, randomStats.merge(emptyStats));
DataFrameIndexerTransformStats randomStatsClone = copyInstance(randomStats);
DataFrameIndexerTransformStats trippleRandomStats = new DataFrameIndexerTransformStats(3 * randomStats.getNumPages(),
DataFrameIndexerTransformStats trippleRandomStats = new DataFrameIndexerTransformStats(transformId, 3 * randomStats.getNumPages(),
3 * randomStats.getNumDocuments(), 3 * randomStats.getOutputDocuments(), 3 * randomStats.getNumInvocations(),
3 * randomStats.getIndexTime(), 3 * randomStats.getSearchTime(), 3 * randomStats.getIndexTotal(),
3 * randomStats.getSearchTotal(), 3 * randomStats.getIndexFailures(), 3 * randomStats.getSearchFailures());

View File

@ -7,22 +7,26 @@
package org.elasticsearch.xpack.core.dataframe.transforms;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException;
import java.util.Collections;
public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDataFrameTestCase<DataFrameTransformStateAndStats> {
protected static ToXContent.Params TO_XCONTENT_PARAMS = new ToXContent.MapParams(
Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"));
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats(String id) {
return new DataFrameTransformStateAndStats(id,
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats());
DataFrameIndexerTransformStatsTests.randomStats(id));
}
public static DataFrameTransformStateAndStats randomDataFrameTransformStateAndStats() {
return new DataFrameTransformStateAndStats(randomAlphaOfLengthBetween(1, 10),
DataFrameTransformStateTests.randomDataFrameTransformState(),
DataFrameIndexerTransformStatsTests.randomStats());
return randomDataFrameTransformStateAndStats(randomAlphaOfLengthBetween(1, 10));
}
@Override
@ -30,6 +34,13 @@ public class DataFrameTransformStateAndStatsTests extends AbstractSerializingDat
return DataFrameTransformStateAndStats.PARSER.apply(parser, null);
}
@Override
// Setting params for internal storage so that we can check XContent equivalence as
// DataFrameIndexerTransformStats does not write the ID to the XContentObject unless it is for internal storage
protected ToXContent.Params getToXContentParams() {
return TO_XCONTENT_PARAMS;
}
@Override
protected DataFrameTransformStateAndStats createTestInstance() {
return randomDataFrameTransformStateAndStats();

View File

@ -8,13 +8,17 @@ package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.greaterThan;
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
@ -47,6 +51,11 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
setupUser(TEST_ADMIN_USER_NAME, Collections.singletonList("data_frame_transforms_admin"));
}
@After
public void clearOutTransforms() throws Exception {
wipeDataFrameTransforms();
}
public void testGetAndGetStats() throws Exception {
createPivotReviewsTransform("pivot_1", "pivot_reviews_1", null);
createPivotReviewsTransform("pivot_2", "pivot_reviews_2", null);
@ -67,6 +76,12 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "*/_stats", authHeader);
stats = entityAsMap(client().performRequest(getRequest));
assertEquals(2, XContentMapValues.extractValue("count", stats));
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "pivot_1,pivot_2/_stats", authHeader);
stats = entityAsMap(client().performRequest(getRequest));
assertEquals(2, XContentMapValues.extractValue("count", stats));
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "pivot_*/_stats", authHeader);
stats = entityAsMap(client().performRequest(getRequest));
assertEquals(2, XContentMapValues.extractValue("count", stats));
// only pivot_1
getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "pivot_1/_stats", authHeader);
@ -89,4 +104,35 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
}
@SuppressWarnings("unchecked")
public void testGetPersistedStatsWithoutTask() throws Exception {
createPivotReviewsTransform("pivot_stats_1", "pivot_reviews_stats_1", null);
startAndWaitForTransform("pivot_stats_1", "pivot_reviews_stats_1");
stopDataFrameTransform("pivot_stats_1", false);
// Get rid of the first transform task, but keep the configuration
client().performRequest(new Request("POST", "_tasks/_cancel?actions="+DataFrameField.TASK_NAME+"*"));
// Verify that the task is gone
Map<String, Object> tasks =
entityAsMap(client().performRequest(new Request("GET", "_tasks?actions="+DataFrameField.TASK_NAME+"*")));
assertTrue(((Map<?, ?>)XContentMapValues.extractValue("nodes", tasks)).isEmpty());
createPivotReviewsTransform("pivot_stats_2", "pivot_reviews_stats_2", null);
startAndWaitForTransform("pivot_stats_2", "pivot_reviews_stats_2");
Request getRequest = createRequestWithAuth("GET", DATAFRAME_ENDPOINT + "_stats", BASIC_AUTH_VALUE_DATA_FRAME_ADMIN);
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
assertEquals(2, XContentMapValues.extractValue("count", stats));
List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
// Verify that both transforms, the one with the task and the one without have statistics
for (Map<String, Object> transformStats : transformsStats) {
Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
assertThat(((Integer)stat.get("documents_processed")), greaterThan(0));
assertThat(((Integer)stat.get("search_time_in_ms")), greaterThan(0));
assertThat(((Integer)stat.get("search_total")), greaterThan(0));
assertThat(((Integer)stat.get("pages_processed")), greaterThan(0));
}
}
}

View File

@ -9,11 +9,19 @@ package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;
public class DataFrameUsageIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;
@ -45,22 +53,63 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
assertEquals(null, XContentMapValues.extractValue("data_frame.transforms", usageAsMap));
assertEquals(null, XContentMapValues.extractValue("data_frame.stats", usageAsMap));
// create a transform
// create transforms
createPivotReviewsTransform("test_usage", "pivot_reviews", null);
createPivotReviewsTransform("test_usage_no_task", "pivot_reviews_no_task", null);
createPivotReviewsTransform("test_usage_no_stats_or_task", "pivot_reviews_no_stats_or_task", null);
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
startAndWaitForTransform("test_usage_no_task", "pivot_reviews_no_task");
stopDataFrameTransform("test_usage_no_task", false);
// Remove the task, we should still have the transform and its stat doc
client().performRequest(new Request("POST", "_tasks/_cancel?actions="+ DataFrameField.TASK_NAME+"*"));
// TODO remove as soon as stats are stored in an index instead of ClusterState with the task
startAndWaitForTransform("test_usage", "pivot_reviews");
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameIndexerTransformStats.NAME);
// Verify that we have our two stats documents
assertBusy(() -> {
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
assertEquals(2, XContentMapValues.extractValue("hits.total.value", hasStatsMap));
});
Request getRequest = new Request("GET", DATAFRAME_ENDPOINT + "test_usage/_stats");
Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
Map<String, Integer> expectedStats = new HashMap<>();
for(String statName : PROVIDED_STATS) {
@SuppressWarnings("unchecked")
List<Integer> specificStatistic = ((List<Integer>)XContentMapValues.extractValue("transforms.stats." + statName, stats));
assertNotNull(specificStatistic);
Integer statistic = (specificStatistic).get(0);
expectedStats.put(statName, statistic);
}
getRequest = new Request("GET", DATAFRAME_ENDPOINT + "test_usage_no_task/_stats");
stats = entityAsMap(client().performRequest(getRequest));
for(String statName : PROVIDED_STATS) {
@SuppressWarnings("unchecked")
List<Integer> specificStatistic = ((List<Integer>)XContentMapValues.extractValue("transforms.stats." + statName, stats));
assertNotNull(specificStatistic);
Integer statistic = (specificStatistic).get(0);
expectedStats.merge(statName, statistic, Integer::sum);
}
usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
usageAsMap = entityAsMap(usageResponse);
// we should see some stats
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(3, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap));
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
for(String statName : PROVIDED_STATS) {
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
}
}
}

View File

@ -6,21 +6,41 @@
package org.elasticsearch.xpack.dataframe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -33,11 +53,28 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
private final boolean enabled;
private final Client client;
private final XPackLicenseState licenseState;
private final ClusterService clusterService;
private static final Logger logger = LogManager.getLogger(DataFrameFeatureSet.class);
public static final String[] PROVIDED_STATS = new String[] {
DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName(),
DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName(),
DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(),
DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName(),
DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName(),
DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName(),
DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName(),
DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName(),
DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName(),
DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName(),
};
@Inject
public DataFrameFeatureSet(Settings settings, Client client, @Nullable XPackLicenseState licenseState) {
public DataFrameFeatureSet(Settings settings, ClusterService clusterService, Client client, @Nullable XPackLicenseState licenseState) {
this.enabled = XPackSettings.DATA_FRAME_ENABLED.get(settings);
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.licenseState = licenseState;
}
@ -69,30 +106,127 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
@Override
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
if (enabled == false) {
listener.onResponse(
new DataFrameFeatureSetUsage(available(), enabled(), Collections.emptyMap(), new DataFrameIndexerTransformStats()));
listener.onResponse(new DataFrameFeatureSetUsage(available(),
enabled(),
Collections.emptyMap(),
DataFrameIndexerTransformStats.withDefaultTransformId()));
return;
}
final GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
transformStatsRequest,
ActionListener.wrap(transformStatsResponse ->
listener.onResponse(createUsage(available(), enabled(), transformStatsResponse.getTransformsStateAndStats())),
listener::onFailure));
PersistentTasksCustomMetaData taskMetadata = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterService.state());
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> dataFrameTasks = taskMetadata == null ?
Collections.emptyList() :
taskMetadata.findTasks(DataFrameTransform.NAME, (t) -> true);
final int taskCount = dataFrameTasks.size();
final Map<String, Long> transformsCountByState = new HashMap<>();
for(PersistentTasksCustomMetaData.PersistentTask<?> dataFrameTask : dataFrameTasks) {
DataFrameTransformState state = (DataFrameTransformState)dataFrameTask.getState();
transformsCountByState.merge(state.getTaskState().value(), 1L, Long::sum);
}
static DataFrameFeatureSetUsage createUsage(boolean available,
boolean enabled,
List<DataFrameTransformStateAndStats> transformsStateAndStats) {
ActionListener<DataFrameIndexerTransformStats> totalStatsListener = ActionListener.wrap(
statSummations -> listener.onResponse(new DataFrameFeatureSetUsage(available(),
enabled(),
transformsCountByState,
statSummations)),
listener::onFailure
);
Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
transformsStateAndStats.forEach(singleResult -> {
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats);
ActionListener<SearchResponse> totalTransformCountListener = ActionListener.wrap(
transformCountSuccess -> {
if (transformCountSuccess.getShardFailures().length > 0) {
logger.error("total transform count search returned shard failures: {}",
Arrays.toString(transformCountSuccess.getShardFailures()));
}
long totalTransforms = transformCountSuccess.getHits().getTotalHits().value;
if (totalTransforms == 0) {
listener.onResponse(new DataFrameFeatureSetUsage(available(),
enabled(),
transformsCountByState,
DataFrameIndexerTransformStats.withDefaultTransformId()));
return;
}
transformsCountByState.merge(DataFrameTransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
getStatisticSummations(client, totalStatsListener);
},
transformCountFailure -> {
if (transformCountFailure instanceof ResourceNotFoundException) {
getStatisticSummations(client, totalStatsListener);
} else {
listener.onFailure(transformCountFailure);
}
}
);
SearchRequest totalTransformCount = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME))))
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
totalTransformCount,
totalTransformCountListener,
client::search);
}
static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchResponse) {
List<Long> statisticsList = new ArrayList<>(PROVIDED_STATS.length);
for(String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);
if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
} else {
statisticsList.add(0L);
}
}
return DataFrameIndexerTransformStats.withDefaultTransformId(statisticsList.get(0), // numPages
statisticsList.get(1), // numInputDocuments
statisticsList.get(2), // numOutputDocuments
statisticsList.get(3), // numInvocations
statisticsList.get(4), // indexTime
statisticsList.get(5), // searchTime
statisticsList.get(6), // indexTotal
statisticsList.get(7), // searchTotal
statisticsList.get(8), // indexFailures
statisticsList.get(9)); // searchFailures
}
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameIndexerTransformStats.NAME)));
SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setSize(0)
.setQuery(queryBuilder);
for(String statName : PROVIDED_STATS) {
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
}
ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
searchResponse -> {
if (searchResponse.getShardFailures().length > 0) {
logger.error("statistics summations search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}
statsListener.onResponse(parseSearchAggs(searchResponse));
},
failure -> {
if (failure instanceof ResourceNotFoundException) {
statsListener.onResponse(DataFrameIndexerTransformStats.withDefaultTransformId());
} else {
statsListener.onFailure(failure);
}
}
);
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
requestBuilder.request(),
getStatisticSummationsListener,
client::search);
}
}

View File

@ -6,17 +6,21 @@
package org.elasticsearch.xpack.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
@ -27,28 +31,29 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import org.elasticsearch.xpack.dataframe.util.BatchedDataIterator;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@ -62,6 +67,8 @@ public class TransportGetDataFrameTransformsStatsAction extends
GetDataFrameTransformsStatsAction.Response,
GetDataFrameTransformsStatsAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportGetDataFrameTransformsStatsAction.class);
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
@Inject
@ -88,8 +95,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
List<DataFrameTransformStateAndStats> transformsStateAndStats = Collections.emptyList();
assert task.getTransformId().equals(request.getId()) || request.getId().equals(MetaData.ALL);
// Little extra insurance, make sure we only return transforms that aren't cancelled
if (task.isCancelled() == false) {
DataFrameTransformStateAndStats transformStateAndStats = new DataFrameTransformStateAndStats(task.getTransformId(),
@ -101,139 +106,115 @@ public class TransportGetDataFrameTransformsStatsAction extends
}
@Override
// TODO gather stats from docs when moved out of allocated task
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
final ClusterState state = clusterService.state();
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) {
ActionListener<Response> transformStatsListener = ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, listener),
listener::onFailure
);
super.doExecute(task, request, transformStatsListener);
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), request.getPageParams(), ActionListener.wrap(
ids -> {
request.setExpandedIds(ids);
super.doExecute(task, request, ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, finalListener),
finalListener::onFailure
));
},
e -> {
// If the index to search, or the individual config is not there, just return empty
if (e instanceof ResourceNotFoundException) {
finalListener.onResponse(new Response(Collections.emptyList()));
} else {
// If we don't have any tasks, pass empty collection to this method
collectStatsForTransformsWithoutTasks(request, new Response(Collections.emptyList()), listener);
finalListener.onFailure(e);
}
}
));
} else {
// Delegates GetTransforms to elected master node, so it becomes the coordinating node.
// Non-master nodes may have a stale cluster state that shows transforms which are cancelled
// on the master, which makes testing difficult.
if (nodes.getMasterNode() == null) {
listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
finalListener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
} else {
transportService.sendRequest(nodes.getMasterNode(), actionName, request,
new ActionListenerResponseHandler<>(listener, Response::new));
new ActionListenerResponseHandler<>(finalListener, Response::new));
}
}
}
// TODO correct when we start storing stats in docs, right now, just return STOPPED and empty stats
private void collectStatsForTransformsWithoutTasks(Request request,
Response response,
ActionListener<Response> listener) {
if (request.getId().equals(MetaData.ALL) == false) {
// If we did not find any tasks && this is NOT for ALL, verify that the single config exists, and return as stopped
// Empty other wise
if (response.getTransformsStateAndStats().isEmpty()) {
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
config ->
listener.onResponse(
new Response(Collections.singletonList(DataFrameTransformStateAndStats.initialStateAndStats(config.getId())))),
exception -> {
if (exception instanceof ResourceNotFoundException) {
listener.onResponse(new Response(Collections.emptyList()));
} else {
listener.onFailure(exception);
}
}
));
} else {
// If it was not ALL && we DO have stored stats, simply return those as we found them all, since we only support 1 or all
// We gathered all there is, no need to continue
if (request.getExpandedIds().size() == response.getTransformsStateAndStats().size()) {
listener.onResponse(response);
}
return;
}
// We only do this mass collection if we are getting ALL tasks
TransformIdCollector collector = new TransformIdCollector();
collector.execute(ActionListener.wrap(
allIds -> {
response.getTransformsStateAndStats().forEach(
tsas -> allIds.remove(tsas.getId())
);
List<DataFrameTransformStateAndStats> statsWithoutTasks = allIds.stream()
.map(DataFrameTransformStateAndStats::initialStateAndStats)
.collect(Collectors.toList());
statsWithoutTasks.addAll(response.getTransformsStateAndStats());
statsWithoutTasks.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(statsWithoutTasks));
},
listener::onFailure
));
}
/**
* This class recursively queries a scroll search over all transform_ids and puts them in a set
*/
private class TransformIdCollector extends BatchedDataIterator<String, Set<String>> {
Set<String> transformsWithoutTasks = new HashSet<>(request.getExpandedIds());
transformsWithoutTasks.removeAll(response.getTransformsStateAndStats().stream().map(DataFrameTransformStateAndStats::getId)
.collect(Collectors.toList()));
private final Set<String> ids = new HashSet<>();
TransformIdCollector() {
super(client, DataFrameInternalIndex.INDEX_NAME);
}
// Small assurance that we are at least below the max. Terms search has a hard limit of 10k, we should at least be below that.
assert transformsWithoutTasks.size() <= Request.MAX_SIZE_RETURN;
void execute(final ActionListener<Set<String>> finalListener) {
if (this.hasNext()) {
next(ActionListener.wrap(
setOfIds -> execute(finalListener),
finalListener::onFailure
));
} else {
finalListener.onResponse(ids);
ActionListener<SearchResponse> searchStatsListener = ActionListener.wrap(
searchResponse -> {
List<ElasticsearchException> nodeFailures = new ArrayList<>(response.getNodeFailures());
if (searchResponse.getShardFailures().length > 0) {
String msg = "transform statistics document search returned shard failures: " +
Arrays.toString(searchResponse.getShardFailures());
logger.error(msg);
nodeFailures.add(new ElasticsearchException(msg));
}
}
@Override
protected QueryBuilder getQuery() {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME));
}
@Override
protected String map(SearchHit hit) {
List<DataFrameTransformStateAndStats> allStateAndStats = response.getTransformsStateAndStats();
for(SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
return (String)parser.map().get(DataFrameField.ID.getPreferredName());
try {
DataFrameIndexerTransformStats stats = parseFromSource(source);
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(stats.getTransformId(), stats));
transformsWithoutTasks.remove(stats.getTransformId());
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
listener.onFailure(new ElasticsearchParseException("Could not parse data frame transform stats", e));
return;
}
}
transformsWithoutTasks.forEach(transformId ->
allStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(transformId)));
// Any transform in collection could NOT have a task, so, even though the list is initially sorted
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(allStateAndStats, response.getTaskFailures(), nodeFailures));
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(response);
} else {
listener.onFailure(e);
}
}
);
QueryBuilder builder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), transformsWithoutTasks))
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameIndexerTransformStats.NAME)));
SearchRequest searchRequest = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setQuery(builder)
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
searchRequest,
searchStatsListener, client::search);
}
@Override
protected Set<String> getCollection() {
return ids;
}
@Override
protected SortOrder sortOrder() {
return SortOrder.ASC;
}
@Override
protected String sortField() {
return DataFrameField.ID.getPreferredName();
}
@Override
protected FetchSourceContext getFetchSourceContext() {
return new FetchSourceContext(true, new String[]{DataFrameField.ID.getPreferredName()}, new String[]{});
private static DataFrameIndexerTransformStats parseFromSource(BytesReference source) throws IOException {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameIndexerTransformStats.PARSER.apply(parser, null);
}
}
}

View File

@ -81,7 +81,7 @@ public class TransportPreviewDataFrameTransformAction extends
ActionListener.wrap(
r -> {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
// remove all internal fields
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.map(record -> {

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DestConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.SourceConfig;
@ -23,7 +24,7 @@ import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.TRANSFORM_ID;
public final class DataFrameInternalIndex {
@ -49,6 +50,7 @@ public final class DataFrameInternalIndex {
// data types
public static final String DOUBLE = "double";
public static final String LONG = "long";
public static final String KEYWORD = "keyword";
public static IndexTemplateMetaData getIndexTemplateMetaData() throws IOException {
@ -84,7 +86,7 @@ public final class DataFrameInternalIndex {
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD)
.startObject(TRANSFORM_ID)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
@ -126,7 +128,8 @@ public final class DataFrameInternalIndex {
builder.startObject(DataFrameField.INDEX_DOC_TYPE.getPreferredName()).field(TYPE, KEYWORD).endObject();
// add the schema for transform configurations
addDataFrameTransformsConfigMappings(builder);
// add the schema for transform stats
addDataFrameTransformsStatsMappings(builder);
// end type
builder.endObject();
// end properties
@ -136,6 +139,41 @@ public final class DataFrameInternalIndex {
return builder;
}
private static XContentBuilder addDataFrameTransformsStatsMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameIndexerTransformStats.NUM_PAGES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_OUTPUT_DOCUMENTS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.NUM_INVOCATIONS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TIME_IN_MS.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_TOTAL.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.SEARCH_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DataFrameIndexerTransformStats.INDEX_FAILURES.getPreferredName())
.field(TYPE, LONG)
.endObject();
}
private static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuilder builder) throws IOException {
return builder
.startObject(DataFrameField.ID.getPreferredName())

View File

@ -17,9 +17,13 @@ import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
@ -29,18 +33,26 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
@ -172,6 +184,61 @@ public class DataFrameTransformsConfigManager {
}));
}
/**
* Given some expression comma delimited string of id expressions,
* this queries our internal index for the transform Ids that match the expression.
*
* The results are sorted in ascending order
*
* @param transformIdsExpression The id expression. Can be _all, *, or comma delimited list of simple regex strings
* @param pageParams The paging params
* @param foundIdsListener The listener on signal on success or failure
*/
public void expandTransformIds(String transformIdsExpression, PageParams pageParams, ActionListener<List<String>> foundIdsListener) {
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, DataFrameTransformConfig.NAME);
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setFrom(pageParams.getFrom())
.setSize(pageParams.getSize())
.setQuery(queryBuilder)
// We only care about the `id` field, small optimization
.setFetchSource(DataFrameField.ID.getPreferredName(), "")
.request();
final ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(idTokens, true);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<String> ids = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
ids.add((String) parser.map().get(DataFrameField.ID.getPreferredName()));
} catch (IOException e) {
foundIdsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
return;
}
}
requiredMatches.filterMatchedIds(ids);
if (requiredMatches.hasUnmatchedIds()) {
// some required Ids were not found
foundIdsListener.onFailure(
new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM,
requiredMatches.unmatchedIdsString())));
return;
}
foundIdsListener.onResponse(ids);
},
foundIdsListener::onFailure
), client::search);
}
/**
* This deletes the configuration and all other documents corresponding to the transform id (e.g. checkpoints).
*
@ -206,6 +273,58 @@ public class DataFrameTransformsConfigManager {
}));
}
public void putOrUpdateTransformStats(DataFrameIndexerTransformStats stats, ActionListener<Boolean> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = stats.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(DataFrameIndexerTransformStats.documentId(stats.getTransformId()))
.source(source);
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
r -> listener.onResponse(true),
e -> listener.onFailure(new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_FAILED_TO_PERSIST_STATS, stats.getTransformId()),
e))
));
} catch (IOException e) {
// not expected to happen but for the sake of completeness
listener.onFailure(new ElasticsearchParseException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_FAILED_TO_PERSIST_STATS, stats.getTransformId()),
e));
}
}
public void getTransformStats(String transformId, ActionListener<DataFrameIndexerTransformStats> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameIndexerTransformStats.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
if (getResponse.isExists() == false) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
return;
}
BytesReference source = getResponse.getSourceAsBytesRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
resultListener.onResponse(DataFrameIndexerTransformStats.fromXContent(parser));
} catch (Exception e) {
logger.error(
DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_STATISTICS_CONFIGURATION, transformId), e);
resultListener.onFailure(e);
}
}, e -> {
if (e instanceof ResourceNotFoundException) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNKNOWN_TRANSFORM_STATS, transformId)));
} else {
resultListener.onFailure(e);
}
}));
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformConfig> transformListener) {
try (InputStream stream = source.streamInput();
@ -229,4 +348,28 @@ public class DataFrameTransformsConfigManager {
transformListener.onFailure(e);
}
}
private QueryBuilder buildQueryFromTokenizedIds(String[] idTokens, String resourceName) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), resourceName));
if (Strings.isAllOrWildcard(idTokens) == false) {
List<String> terms = new ArrayList<>();
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();
for (String token : idTokens) {
if (Regex.isSimpleMatchPattern(token)) {
shouldQueries.should(QueryBuilders.wildcardQuery(DataFrameField.ID.getPreferredName(), token));
} else {
terms.add(token);
}
}
if (terms.isEmpty() == false) {
shouldQueries.should(QueryBuilders.termsQuery(DataFrameField.ID.getPreferredName(), terms));
}
if (shouldQueries.should().isEmpty() == false) {
queryBuilder.filter(shouldQueries);
}
}
return QueryBuilders.constantScoreQuery(queryBuilder);
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@ -27,6 +28,11 @@ public class RestGetDataFrameTransformsStatsAction extends BaseRestHandler {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
String id = restRequest.param(DataFrameField.ID.getPreferredName());
GetDataFrameTransformsStatsAction.Request request = new GetDataFrameTransformsStatsAction.Request(id);
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
request.setPageParams(
new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
return channel -> client.execute(GetDataFrameTransformsStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -39,8 +39,11 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
private Pivot pivot;
public DataFrameIndexer(Executor executor, AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition) {
super(executor, initialState, initialPosition, new DataFrameIndexerTransformStats());
public DataFrameIndexer(Executor executor,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerTransformStats jobStats) {
super(executor, initialState, initialPosition, jobStats);
}
protected abstract DataFrameTransformConfig getConfig();

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.persistent.AllocatedPersistentTask;
@ -60,18 +62,33 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
DataFrameTransformTask buildTask = (DataFrameTransformTask) task;
SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(
DataFrameTransformTask.SCHEDULE_NAME + "_" + params.getId(), next());
DataFrameTransformState transformState = (DataFrameTransformState) state;
if (transformState != null && transformState.getTaskState() == DataFrameTransformTaskState.FAILED) {
logger.warn("Tried to start failed transform [" + params.getId() + "] failure reason: " + transformState.getReason());
return;
}
transformsConfigManager.getTransformStats(params.getId(), ActionListener.wrap(
stats -> {
// Initialize with the previously recorded stats
buildTask.initializePreviousStats(stats);
scheduleTask(buildTask, schedulerJob, params.getId());
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
scheduleTask(buildTask, schedulerJob, params.getId());
}
));
}
private void scheduleTask(DataFrameTransformTask buildTask, SchedulerEngine.Job schedulerJob, String id) {
// Note that while the task is added to the scheduler here, the internal state will prevent
// it from doing any work until the task is "started" via the StartTransform api
schedulerEngine.register(buildTask);
schedulerEngine.add(schedulerJob);
logger.info("Data frame transform [" + params.getId() + "] created.");
logger.info("Data frame transform [" + id + "] created.");
}
static SchedulerEngine.Schedule next() {

View File

@ -64,6 +64,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;
private final Auditor<DataFrameAuditMessage> auditor;
private final DataFrameIndexerTransformStats previousStats;
private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
@ -110,6 +111,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
new AtomicReference<>(initialState), initialPosition, client, auditor);
this.generation = new AtomicReference<>(initialGeneration);
this.previousStats = new DataFrameIndexerTransformStats(transform.getId());
this.taskState = new AtomicReference<>(initialTaskState);
this.stateReason = new AtomicReference<>(initialReason);
this.failureCount = new AtomicInteger(0);
@ -131,8 +133,12 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
return new DataFrameTransformState(taskState.get(), indexer.getState(), indexer.getPosition(), generation.get(), stateReason.get());
}
void initializePreviousStats(DataFrameIndexerTransformStats stats) {
previousStats.merge(stats);
}
public DataFrameIndexerTransformStats getStats() {
return indexer.getStats();
return new DataFrameIndexerTransformStats(previousStats).merge(indexer.getStats());
}
public long getGeneration() {
@ -297,6 +303,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final Auditor<DataFrameAuditMessage> auditor;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
private Map<String, String> fieldMappings = null;
@ -307,7 +314,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
DataFrameTransformsCheckpointService transformsCheckpointService,
AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
Auditor<DataFrameAuditMessage> auditor) {
super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition);
super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition,
new DataFrameIndexerTransformStats(transformId));
this.transformId = transformId;
this.transformsConfigManager = transformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService;
@ -422,7 +430,39 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
generation.get(),
stateReason.get());
logger.info("Updating persistent state of transform [" + transform.getId() + "] to [" + state.toString() + "]");
persistStateToClusterState(state, ActionListener.wrap(t -> next.run(), e -> next.run()));
// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
// Make a copy of the previousStats so that they are not constantly updated when `merge` is called
DataFrameIndexerTransformStats tempStats = new DataFrameIndexerTransformStats(previousStats).merge(getStats());
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(tempStats) == false) {
transformsConfigManager.putOrUpdateTransformStats(tempStats,
ActionListener.wrap(
r -> {
previouslyPersistedStats = tempStats;
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transform.getId() + "] failed", statsExc);
next.run();
}
));
// The stats that we have previously written to the doc is the same as as it is now, no need to update it
} else {
next.run();
}
},
exc -> {
logger.error("Updating persistent state of transform [" + transform.getId() + "] failed", exc);
next.run();
}
);
persistStateToClusterState(state, updateClusterStateListener);
}
@Override

View File

@ -6,8 +6,10 @@
package org.elasticsearch.xpack.dataframe;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -15,25 +17,24 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static java.lang.Math.toIntExact;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -47,7 +48,10 @@ public class DataFrameFeatureSetTests extends ESTestCase {
}
public void testAvailable() {
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY, mock(Client.class), licenseState);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY,
mock(ClusterService.class),
mock(Client.class),
licenseState);
boolean available = randomBoolean();
when(licenseState.isDataFrameAllowed()).thenReturn(available);
assertThat(featureSet.available(), is(available));
@ -57,89 +61,67 @@ public class DataFrameFeatureSetTests extends ESTestCase {
boolean enabled = randomBoolean();
Settings.Builder settings = Settings.builder();
settings.put("xpack.data_frame.enabled", enabled);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(settings.build(), mock(Client.class), licenseState);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(settings.build(),
mock(ClusterService.class),
mock(Client.class),
licenseState);
assertThat(featureSet.enabled(), is(enabled));
}
public void testEnabledDefault() {
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY, mock(Client.class), licenseState);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(Settings.EMPTY,
mock(ClusterService.class),
mock(Client.class),
licenseState);
assertTrue(featureSet.enabled());
}
public void testUsage() throws IOException {
List<DataFrameTransformStateAndStats> transformsStateAndStats = new ArrayList<>();
int count = randomIntBetween(0, 10);
int uniqueId = 0;
for (int i = 0; i < count; ++i) {
transformsStateAndStats.add(
DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats("df-" + Integer.toString(uniqueId++)));
public void testParseSearchAggs() {
Aggregations emptyAggs = new Aggregations(Collections.emptyList());
SearchResponse withEmptyAggs = mock(SearchResponse.class);
when(withEmptyAggs.getAggregations()).thenReturn(emptyAggs);
assertThat(DataFrameFeatureSet.parseSearchAggs(withEmptyAggs), equalTo(DataFrameIndexerTransformStats.withDefaultTransformId()));
DataFrameIndexerTransformStats expectedStats = new DataFrameIndexerTransformStats("_all",
1, // numPages
2, // numInputDocuments
3, // numOutputDocuments
4, // numInvocations
5, // indexTime
6, // searchTime
7, // indexTotal
8, // searchTotal
9, // indexFailures
10); // searchFailures
int currentStat = 1;
List<Aggregation> aggs = new ArrayList<>(PROVIDED_STATS.length);
for (String statName : PROVIDED_STATS) {
aggs.add(buildAgg(statName, (double) currentStat++));
}
Aggregations aggregations = new Aggregations(aggs);
SearchResponse withAggs = mock(SearchResponse.class);
when(withAggs.getAggregations()).thenReturn(aggregations);
assertThat(DataFrameFeatureSet.parseSearchAggs(withAggs), equalTo(expectedStats));
}
count = randomIntBetween(0, 10);
List<DataFrameTransformConfig> transformConfigWithoutTasks = new ArrayList<>();
for (int i = 0; i < count; ++i) {
transformConfigWithoutTasks.add(
DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++)));
}
List<DataFrameTransformConfig> transformConfigWithTasks =
new ArrayList<>(transformsStateAndStats.size() + transformConfigWithoutTasks.size());
transformsStateAndStats.forEach(stats ->
transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId())));
transformConfigWithoutTasks.forEach(withoutTask ->
transformsStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(withoutTask.getId())));
boolean enabled = randomBoolean();
boolean available = randomBoolean();
DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available,
enabled,
transformsStateAndStats);
assertEquals(enabled, usage.enabled());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
usage.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = createParser(builder);
Map<String, Object> usageAsMap = parser.map();
assertEquals(available, (boolean) XContentMapValues.extractValue("available", usageAsMap));
if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) {
// no transforms, no stats
assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap));
assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap));
} else {
assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap));
Map<String, Integer> stateCounts = new HashMap<>();
transformsStateAndStats.stream()
.map(x -> x.getTransformState().getIndexerState().value())
.forEach(x -> stateCounts.merge(x, 1, Integer::sum));
stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));
// use default constructed stats object for assertions if transformsStateAndStats is empty
DataFrameIndexerTransformStats combinedStats = new DataFrameIndexerTransformStats();
if (transformsStateAndStats.isEmpty() == false) {
combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats()).reduce((l, r) -> l.merge(r)).get();
}
assertEquals(toIntExact(combinedStats.getIndexFailures()),
XContentMapValues.extractValue("stats.index_failures", usageAsMap));
assertEquals(toIntExact(combinedStats.getIndexTotal()),
XContentMapValues.extractValue("stats.index_total", usageAsMap));
assertEquals(toIntExact(combinedStats.getSearchTime()),
XContentMapValues.extractValue("stats.search_time_in_ms", usageAsMap));
assertEquals(toIntExact(combinedStats.getNumDocuments()),
XContentMapValues.extractValue("stats.documents_processed", usageAsMap));
}
}
private static Aggregation buildAgg(String name, double value) {
NumericMetricsAggregation.SingleValue agg = mock(NumericMetricsAggregation.SingleValue.class);
when(agg.getName()).thenReturn(name);
when(agg.value()).thenReturn(value);
return agg;
}
public void testUsageDisabled() throws IOException, InterruptedException, ExecutionException {
when(licenseState.isDataFrameAllowed()).thenReturn(true);
Settings.Builder settings = Settings.builder();
settings.put("xpack.data_frame.enabled", false);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(settings.build(), mock(Client.class), licenseState);
DataFrameFeatureSet featureSet = new DataFrameFeatureSet(settings.build(),
mock(ClusterService.class),
mock(Client.class),
licenseState);
PlainActionFuture<Usage> future = new PlainActionFuture<>();
featureSet.usage(future);
XPackFeatureSet.Usage usage = future.get();

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.persistence;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointTests;
@ -15,6 +16,13 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTestCase {
private DataFrameTransformsConfigManager transformsConfigManager;
@ -128,4 +136,84 @@ public class DataFrameTransformsConfigManagerTests extends DataFrameSingleNodeTe
assertAsync(listener -> transformsConfigManager.getTransformCheckpoint(checkpoint.getTransformId(), checkpoint.getCheckpoint(),
listener), DataFrameTransformCheckpoint.EMPTY, null, null);
}
public void testExpandIds() throws Exception {
DataFrameTransformConfig transformConfig1 = DataFrameTransformConfigTests.randomDataFrameTransformConfig("transform1_expand");
DataFrameTransformConfig transformConfig2 = DataFrameTransformConfigTests.randomDataFrameTransformConfig("transform2_expand");
DataFrameTransformConfig transformConfig3 = DataFrameTransformConfigTests.randomDataFrameTransformConfig("transform3_expand");
// create transform
assertAsync(listener -> transformsConfigManager.putTransformConfiguration(transformConfig1, listener), true, null, null);
assertAsync(listener -> transformsConfigManager.putTransformConfiguration(transformConfig2, listener), true, null, null);
assertAsync(listener -> transformsConfigManager.putTransformConfiguration(transformConfig3, listener), true, null, null);
// expand 1 id
assertAsync(listener ->
transformsConfigManager.expandTransformIds(transformConfig1.getId(),
PageParams.defaultParams(),
listener),
Collections.singletonList("transform1_expand"),
null,
null);
// expand 2 ids explicitly
assertAsync(listener ->
transformsConfigManager.expandTransformIds("transform1_expand,transform2_expand",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand"),
null,
null);
// expand 3 ids wildcard and explicit
assertAsync(listener ->
transformsConfigManager.expandTransformIds("transform1*,transform2_expand,transform3_expand",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
null,
null);
// expand 3 ids _all
assertAsync(listener ->
transformsConfigManager.expandTransformIds("_all",
PageParams.defaultParams(),
listener),
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
null,
null);
// expand 1 id _all with pagination
assertAsync(listener ->
transformsConfigManager.expandTransformIds("_all",
new PageParams(0, 1),
listener),
Collections.singletonList("transform1_expand"),
null,
null);
// expand 2 later ids _all with pagination
assertAsync(listener ->
transformsConfigManager.expandTransformIds("_all",
new PageParams(1, 2),
listener),
Arrays.asList("transform2_expand", "transform3_expand"),
null,
null);
// expand 1 id explicitly that does not exist
assertAsync(listener ->
transformsConfigManager.expandTransformIds("unknown,unknown2",
new PageParams(1, 2),
listener),
(List<String>)null,
null,
e -> {
assertThat(e, instanceOf(ResourceNotFoundException.class));
assertThat(e.getMessage(),
equalTo(DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, "unknown,unknown2")));
});
}
}

View File

@ -501,7 +501,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
"value", 122.55),
DOC_COUNT, 44)
));
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
Map<String, String> fieldTypeMap = asStringMap(
aggName, "double",
@ -534,7 +534,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input,
Map<String, String> fieldTypeMap, List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
DataFrameIndexerTransformStats stats = DataFrameIndexerTransformStats.withDefaultTransformId();
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
builder.map(input);

View File

@ -11,6 +11,18 @@
"required": false,
"description": "The id of the transform for which to get stats. '_all' or '*' implies all transforms"
}
},
"params": {
"from": {
"type": "number",
"required": false,
"description": "skips a number of transform stats, defaults to 0"
},
"size": {
"type": "number",
"required": false,
"description": "specifies a max number of transform stats to get, defaults to 100"
}
}
},
"body": null

View File

@ -83,29 +83,66 @@ teardown:
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stats-dos"
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stats-the-third"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-stats-the-third" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { count: 3 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.2.id: "airline-transform-stats-the-third" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "_all"
- match: { count: 2 }
- match: { count: 3 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.2.id: "airline-transform-stats-the-third" }
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-stats-dos"
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-stats-dos,airline-transform-stats-the*"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.1.id: "airline-transform-stats-the-third" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "_all"
from: 0
size: 1
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "_all"
from: 1
size: 2
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.1.id: "airline-transform-stats-the-third" }
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats-dos"
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats-the-third"
---
"Test get multiple transform stats where one does not have a task":
- do: