[ML] Refactor GET Transforms API (#40015) (#40269)

* [Data Frame] Refactor GET Transforms API:

* Add pagination
* comma delimited list expression support GET transforms
* Flag troublesome internal code for future refactor

* Removing `allow_no_transforms` param, ratcheting down pageparam option

* Changing  DataFrameFeatureSet#usage to not get all configs

* Intermediate commit

* Writing test for batch data gatherer

* Removing unused import

* removing bad println used for debugging

* Updating BatchedDataIterator comments and query

* addressing pr comments

* disallow null scrollId to cause stackoverflow
This commit is contained in:
Benjamin Trent 2019-03-20 19:14:50 -05:00 committed by GitHub
parent f37f2b5d39
commit 5ae43855fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1038 additions and 212 deletions

View File

@ -8,29 +8,28 @@ package org.elasticsearch.xpack.core.dataframe.action;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.action.AbstractGetResourcesRequest;
import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsAction.Response>{
@ -49,61 +48,45 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
return new Response();
}
public static class Request extends ActionRequest implements ToXContent {
private String id;
public static class Request extends AbstractGetResourcesRequest implements ToXContent {
private static final int MAX_SIZE_RETURN = 1000;
public Request(String id) {
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
this.id = MetaData.ALL;
} else {
this.id = id;
}
super(id, PageParams.defaultParams(), true);
}
public Request() {
super(null, PageParams.defaultParams(), true);
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
readFrom(in);
}
public String getId() {
return id;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
return getResourceId();
}
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException exception = null;
if (getPageParams() != null && getPageParams().getSize() > MAX_SIZE_RETURN) {
exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() +
"] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception);
}
return exception;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(DataFrameField.ID.getPreferredName(), getResourceId());
return builder;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
public String getResourceIdField() {
return DataFrameField.ID.getPreferredName();
}
}
@ -114,19 +97,17 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
}
public static class Response extends ActionResponse implements Writeable, ToXContentObject {
public static class Response extends AbstractGetResourcesResponse<DataFrameTransformConfig> implements Writeable, ToXContentObject {
public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms";
private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms");
private List<DataFrameTransformConfig> transformConfigurations;
public Response(List<DataFrameTransformConfig> transformConfigs) {
this.transformConfigurations = transformConfigs;
super(new QueryPage<>(transformConfigs, transformConfigs.size(), DataFrameField.TRANSFORMS));
}
public Response() {
this.transformConfigurations = Collections.emptyList();
super();
}
public Response(StreamInput in) throws IOException {
@ -134,30 +115,18 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
public List<DataFrameTransformConfig> getTransformConfigurations() {
return transformConfigurations;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
transformConfigurations = in.readList(DataFrameTransformConfig::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(transformConfigurations);
return getResources().results();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
List<String> invalidTransforms = new ArrayList<>();
builder.startObject();
builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
builder.field(DataFrameField.COUNT.getPreferredName(), getResources().count());
// XContentBuilder does not support passing the params object for Iterables
builder.field(DataFrameField.TRANSFORMS.getPreferredName());
builder.startArray();
for (DataFrameTransformConfig configResponse : transformConfigurations) {
for (DataFrameTransformConfig configResponse : getResources().results()) {
configResponse.toXContent(builder, params);
if (configResponse.isValid() == false) {
invalidTransforms.add(configResponse.getId());
@ -177,27 +146,8 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
}
@Override
public int hashCode() {
return Objects.hash(transformConfigurations);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final Response that = (Response) other;
return Objects.equals(this.transformConfigurations, that.transformConfigurations);
}
@Override
public final String toString() {
return Strings.toString(this);
protected Reader<DataFrameTransformConfig> getReader() {
return DataFrameTransformConfig::new;
}
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.io.IOException;
import java.util.Objects;
@ -38,6 +39,12 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
DataFrameField.STATS_FIELD);
}
public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return new DataFrameTransformStateAndStats(id,
new DataFrameTransformState(IndexerState.STOPPED, null, 0),
new DataFrameIndexerTransformStats());
}
public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
this.id = Objects.requireNonNull(id);
this.transformState = Objects.requireNonNull(state);

View File

@ -17,20 +17,16 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class DataFrameFeatureSet implements XPackFeatureSet {
@ -78,39 +74,24 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
return;
}
GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap(
transforms -> {
GetDataFrameTransformsStatsAction.Request transformStatsRequest =
new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest,
ActionListener.wrap(transformStatsResponse -> {
listener.onResponse(createUsage(available(), enabled(), transforms.getTransformConfigurations(),
transformStatsResponse.getTransformsStateAndStats()));
}, listener::onFailure));
},
listener::onFailure
));
final GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
transformStatsRequest,
ActionListener.wrap(transformStatsResponse ->
listener.onResponse(createUsage(available(), enabled(), transformStatsResponse.getTransformsStateAndStats())),
listener::onFailure));
}
static DataFrameFeatureSetUsage createUsage(boolean available, boolean enabled,
List<DataFrameTransformConfig> transforms,
static DataFrameFeatureSetUsage createUsage(boolean available,
boolean enabled,
List<DataFrameTransformStateAndStats> transformsStateAndStats) {
Set<String> transformIds = transforms.stream()
.map(DataFrameTransformConfig::getId)
.collect(Collectors.toSet());
Map<String, Long> transformsCountByState = new HashMap<>();
DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
transformsStateAndStats.forEach(singleResult -> {
transformIds.remove(singleResult.getId());
transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
accumulatedStats.merge(singleResult.getTransformStats());
});
// If there is no state returned, assumed stopped
transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));
return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats);
}

View File

@ -6,35 +6,84 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.AbstractTransportGetResourcesAction;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import java.io.IOException;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
public class TransportGetDataFrameTransformsAction extends HandledTransportAction<Request, Response> {
private final DataFrameTransformsConfigManager transformsConfigManager;
public class TransportGetDataFrameTransformsAction extends AbstractTransportGetResourcesAction<DataFrameTransformConfig,
Request,
Response> {
@Inject
public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters,
DataFrameTransformsConfigManager transformsConfigManager) {
super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request());
this.transformsConfigManager = transformsConfigManager;
Client client, NamedXContentRegistry xContentRegistry) {
super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, Request::new, client, xContentRegistry);
}
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
//TODO support comma delimited and simple regex IDs
transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap(
configs -> listener.onResponse(new Response(configs)),
searchResources(request, ActionListener.wrap(
r -> listener.onResponse(new Response(r.results())),
listener::onFailure
));
}
@Override
protected ParseField getResultsField() {
return DataFrameField.TRANSFORMS;
}
@Override
protected String[] getIndices() {
return new String[]{DataFrameInternalIndex.INDEX_NAME};
}
@Override
protected DataFrameTransformConfig parse(XContentParser parser) throws IOException {
return DataFrameTransformConfig.fromXContent(parser, null, true);
}
@Override
protected ResourceNotFoundException notFoundException(String resourceId) {
return new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, resourceId));
}
@Override
protected String executionOrigin() {
return ClientHelper.DATA_FRAME_ORIGIN;
}
@Override
protected String extractIdFromResource(DataFrameTransformConfig transformConfig) {
return transformConfig.getId();
}
@Override
protected QueryBuilder additionalQuery() {
return QueryBuilders.termQuery(INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME);
}
}

View File

@ -6,31 +6,54 @@
package org.elasticsearch.xpack.dataframe.action;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
import org.elasticsearch.xpack.dataframe.util.BatchedDataIterator;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class TransportGetDataFrameTransformsStatsAction extends
@ -39,11 +62,16 @@ public class TransportGetDataFrameTransformsStatsAction extends
GetDataFrameTransformsStatsAction.Response,
GetDataFrameTransformsStatsAction.Response> {
private final Client client;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
@Inject
public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService) {
ClusterService clusterService, Client client,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
Response::new, ThreadPool.Names.SAME);
this.client = client;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
}
@Override
@ -80,11 +108,14 @@ public class TransportGetDataFrameTransformsStatsAction extends
if (nodes.isLocalNodeElectedMaster()) {
if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) {
super.doExecute(task, request, listener);
ActionListener<Response> transformStatsListener = ActionListener.wrap(
response -> collectStatsForTransformsWithoutTasks(request, response, listener),
listener::onFailure
);
super.doExecute(task, request, transformStatsListener);
} else {
// If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET
// and we can just send an empty response, no need to go looking for the allocated task
listener.onResponse(new Response(Collections.emptyList()));
// If we don't have any tasks, pass empty collection to this method
collectStatsForTransformsWithoutTasks(request, new Response(Collections.emptyList()), listener);
}
} else {
@ -99,4 +130,110 @@ public class TransportGetDataFrameTransformsStatsAction extends
}
}
}
// TODO correct when we start storing stats in docs, right now, just return STOPPED and empty stats
private void collectStatsForTransformsWithoutTasks(Request request,
Response response,
ActionListener<Response> listener) {
if (request.getId().equals(MetaData.ALL) == false) {
// If we did not find any tasks && this is NOT for ALL, verify that the single config exists, and return as stopped
// Empty other wise
if (response.getTransformsStateAndStats().isEmpty()) {
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
config ->
listener.onResponse(
new Response(Collections.singletonList(DataFrameTransformStateAndStats.initialStateAndStats(config.getId())))),
exception -> {
if (exception instanceof ResourceNotFoundException) {
listener.onResponse(new Response(Collections.emptyList()));
} else {
listener.onFailure(exception);
}
}
));
} else {
// If it was not ALL && we DO have stored stats, simply return those as we found them all, since we only support 1 or all
listener.onResponse(response);
}
return;
}
// We only do this mass collection if we are getting ALL tasks
TransformIdCollector collector = new TransformIdCollector();
collector.execute(ActionListener.wrap(
allIds -> {
response.getTransformsStateAndStats().forEach(
tsas -> allIds.remove(tsas.getId())
);
List<DataFrameTransformStateAndStats> statsWithoutTasks = allIds.stream()
.map(DataFrameTransformStateAndStats::initialStateAndStats)
.collect(Collectors.toList());
statsWithoutTasks.addAll(response.getTransformsStateAndStats());
statsWithoutTasks.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
listener.onResponse(new Response(statsWithoutTasks));
},
listener::onFailure
));
}
/**
* This class recursively queries a scroll search over all transform_ids and puts them in a set
*/
private class TransformIdCollector extends BatchedDataIterator<String, Set<String>> {
private final Set<String> ids = new HashSet<>();
TransformIdCollector() {
super(client, DataFrameInternalIndex.INDEX_NAME);
}
void execute(final ActionListener<Set<String>> finalListener) {
if (this.hasNext()) {
next(ActionListener.wrap(
setOfIds -> execute(finalListener),
finalListener::onFailure
));
} else {
finalListener.onResponse(ids);
}
}
@Override
protected QueryBuilder getQuery() {
return QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME));
}
@Override
protected String map(SearchHit hit) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, stream)) {
return (String)parser.map().get(DataFrameField.ID.getPreferredName());
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse bucket", e);
}
}
@Override
protected Set<String> getCollection() {
return ids;
}
@Override
protected SortOrder sortOrder() {
return SortOrder.ASC;
}
@Override
protected String sortField() {
return DataFrameField.ID.getPreferredName();
}
@Override
protected FetchSourceContext getFetchSourceContext() {
return new FetchSourceContext(true, new String[]{DataFrameField.ID.getPreferredName()}, new String[]{});
}
}
}

View File

@ -20,12 +20,8 @@ import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -36,22 +32,14 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@ -59,7 +47,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class DataFrameTransformsConfigManager {
private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
private static final int DEFAULT_SIZE = 100;
public static final Map<String, String> TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true");
@ -102,6 +89,8 @@ public class DataFrameTransformsConfigManager {
}
}
// For transforms returned via GET data_frame/transforms, see the TransportGetDataFrameTransformsAction
// This function is only for internal use.
public void getTransformConfiguration(String transformId, ActionListener<DataFrameTransformConfig> resultListener) {
GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
@ -123,54 +112,6 @@ public class DataFrameTransformsConfigManager {
}));
}
/**
* Get more than one DataFrameTransformConfig
*
* @param transformId Can be a single transformId, `*`, or `_all`
* @param resultListener Listener to alert when request is completed
*/
// TODO add pagination support
public void getTransformConfigurations(String transformId,
ActionListener<List<DataFrameTransformConfig>> resultListener) {
final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId});
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME));
if (isAllOrWildCard == false) {
queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId));
}
SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setTrackTotalHits(true)
.addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
.setSize(DEFAULT_SIZE)
.setQuery(queryBuilder)
.request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
ActionListener.<SearchResponse>wrap(
searchResponse -> {
List<DataFrameTransformConfig> configs = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(),
resultListener::onFailure);
if (config == null) {
return;
}
configs.add(config);
}
if (configs.isEmpty() && (isAllOrWildCard == false)) {
resultListener.onFailure(new ResourceNotFoundException(
DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
return;
}
resultListener.onResponse(configs);
},
resultListener::onFailure)
, client::search);
}
public void deleteTransformConfiguration(String transformId, ActionListener<Boolean> listener) {
DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@ -193,18 +134,6 @@ public class DataFrameTransformsConfigManager {
}));
}
private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer<Exception> onFailure) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
return DataFrameTransformConfig.fromXContent(parser, null, true);
} catch (Exception e) {
logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e);
onFailure.accept(e);
return null;
}
}
private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
ActionListener<DataFrameTransformConfig> transformListener) {
try (InputStream stream = source.streamInput();

View File

@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
@ -25,8 +26,15 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request();
String id = restRequest.param(DataFrameField.ID.getPreferredName());
GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request(id);
request.setResourceId(id);
if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
request.setPageParams(
new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
}
return channel -> client.execute(GetDataFrameTransformsAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import java.util.Collection;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Objects;
/**
* Provides basic tools around scrolling over documents and gathering the data in some Collection
* @param <T> The object type that is being collected
* @param <E> The collection that should be used (i.e. Set, Deque, etc.)
*/
public abstract class BatchedDataIterator<T, E extends Collection<T>> {
private static final Logger LOGGER = LogManager.getLogger(BatchedDataIterator.class);
private static final String CONTEXT_ALIVE_DURATION = "5m";
private static final int BATCH_SIZE = 10_000;
private final Client client;
private final String index;
private volatile long count;
private volatile long totalHits;
private volatile String scrollId;
private volatile boolean isScrollInitialised;
protected BatchedDataIterator(Client client, String index) {
this.client = Objects.requireNonNull(client);
this.index = Objects.requireNonNull(index);
this.totalHits = 0;
this.count = 0;
}
/**
* Returns {@code true} if the iteration has more elements.
* (In other words, returns {@code true} if {@link #next} would
* return an element rather than throwing an exception.)
*
* @return {@code true} if the iteration has more elements
*/
public boolean hasNext() {
return !isScrollInitialised || count != totalHits;
}
/**
* The first time next() is called, the search will be performed and the first
* batch will be given to the listener. Any subsequent call will return the following batches.
* <p>
* Note that in some implementations it is possible that when there are no
* results at all. {@link BatchedDataIterator#hasNext()} will return {@code true} the first time it is called but then a call
* to this function returns an empty Collection to the listener.
*/
public void next(ActionListener<E> listener) {
if (!hasNext()) {
listener.onFailure(new NoSuchElementException());
}
if (!isScrollInitialised) {
ActionListener<SearchResponse> wrappedListener = ActionListener.wrap(
searchResponse -> {
isScrollInitialised = true;
totalHits = searchResponse.getHits().getTotalHits().value;
scrollId = searchResponse.getScrollId();
mapHits(searchResponse, listener);
},
listener::onFailure
);
initScroll(wrappedListener);
} else {
ActionListener<SearchResponse> wrappedListener = ActionListener.wrap(
searchResponse -> {
scrollId = searchResponse.getScrollId();
mapHits(searchResponse, listener);
},
listener::onFailure
);
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
searchScrollRequest,
wrappedListener,
client::searchScroll);
}
}
private void initScroll(ActionListener<SearchResponse> listener) {
LOGGER.trace("ES API CALL: search index {}", index);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.scroll(CONTEXT_ALIVE_DURATION);
searchRequest.source(new SearchSourceBuilder()
.fetchSource(getFetchSourceContext())
.size(getBatchSize())
.query(getQuery())
.trackTotalHits(true)
.sort(sortField(), sortOrder()));
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
searchRequest,
listener,
client::search);
}
private void mapHits(SearchResponse searchResponse, ActionListener<E> mappingListener) {
E results = getCollection();
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
T mapped = map(hit);
if (mapped != null) {
results.add(mapped);
}
}
count += hits.length;
if (!hasNext() && scrollId != null) {
ClearScrollRequest request = client.prepareClearScroll().setScrollIds(Collections.singletonList(scrollId)).request();
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
request,
ActionListener.<ClearScrollResponse>wrap(
r -> mappingListener.onResponse(results),
mappingListener::onFailure
),
client::clearScroll);
} else {
mappingListener.onResponse(results);
}
}
/**
* Get the query to use for the search
* @return the search query
*/
protected abstract QueryBuilder getQuery();
/**
* Maps the search hit to the document type
* @param hit the search hit
* @return The mapped document or {@code null} if the mapping failed
*/
protected abstract T map(SearchHit hit);
protected abstract E getCollection();
protected abstract SortOrder sortOrder();
protected abstract String sortField();
/**
* Should we fetch the source and what fields specifically.
*
* Defaults to all fields and true.
*/
protected FetchSourceContext getFetchSourceContext() {
return FetchSourceContext.FETCH_SOURCE;
}
protected int getBatchSize() {
return BATCH_SIZE;
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.util;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
/**
* A utility that allows chained (serial) execution of a number of tasks
* in async manner.
*/
public class TypedChainTaskExecutor<T> {
public interface ChainTask <T> {
void run(ActionListener<T> listener);
}
private final ExecutorService executorService;
private final LinkedList<ChainTask<T>> tasks = new LinkedList<>();
private final Predicate<Exception> failureShortCircuitPredicate;
private final Predicate<T> continuationPredicate;
private final List<T> collectedResponses;
/**
* Creates a new TypedChainTaskExecutor.
* Each chainedTask is executed in order serially and after each execution the continuationPredicate is tested.
*
* On failures the failureShortCircuitPredicate is tested.
*
* @param executorService The service where to execute the tasks
* @param continuationPredicate The predicate to test on whether to execute the next task or not.
* {@code true} means continue on to the next task.
* Must be able to handle null values.
* @param failureShortCircuitPredicate The predicate on whether to short circuit execution on a give exception.
* {@code true} means that no more tasks should execute and the the listener::onFailure should be
* called.
*/
public TypedChainTaskExecutor(ExecutorService executorService,
Predicate<T> continuationPredicate,
Predicate<Exception> failureShortCircuitPredicate) {
this.executorService = Objects.requireNonNull(executorService);
this.continuationPredicate = continuationPredicate;
this.failureShortCircuitPredicate = failureShortCircuitPredicate;
this.collectedResponses = new ArrayList<>();
}
public synchronized void add(ChainTask<T> task) {
tasks.add(task);
}
private synchronized void execute(T previousValue, ActionListener<List<T>> listener) {
collectedResponses.add(previousValue);
if (continuationPredicate.test(previousValue)) {
if (tasks.isEmpty()) {
listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
return;
}
ChainTask<T> task = tasks.pop();
executorService.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (failureShortCircuitPredicate.test(e)) {
listener.onFailure(e);
} else {
execute(null, listener);
}
}
@Override
protected void doRun() {
task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
}
});
} else {
listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
}
}
/**
* Execute all the chained tasks serially, notify listener when completed
*
* @param listener The ActionListener to notify when all executions have been completed,
* or when no further tasks should be executed.
* The resulting list COULD contain null values depending on if execution is continued
* on exceptions or not.
*/
public synchronized void execute(ActionListener<List<T>> listener) {
if (tasks.isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}
collectedResponses.clear();
ChainTask<T> task = tasks.pop();
executorService.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (failureShortCircuitPredicate.test(e)) {
listener.onFailure(e);
} else {
execute(null, listener);
}
}
@Override
protected void doRun() {
task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
}
});
}
public synchronized List<T> getCollectedResponses() {
return Collections.unmodifiableList(new ArrayList<>(collectedResponses));
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.junit.Before;
import java.io.IOException;
@ -83,17 +82,19 @@ public class DataFrameFeatureSetTests extends ESTestCase {
DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++)));
}
List<DataFrameTransformConfig> transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size());
List<DataFrameTransformConfig> transformConfigWithTasks =
new ArrayList<>(transformsStateAndStats.size() + transformConfigWithoutTasks.size());
transformsStateAndStats.forEach(stats ->
transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId())));
List<DataFrameTransformConfig> allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size());
allConfigs.addAll(transformConfigWithoutTasks);
allConfigs.addAll(transformConfigWithTasks);
transformConfigWithoutTasks.forEach(withoutTask ->
transformsStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(withoutTask.getId())));
boolean enabled = randomBoolean();
boolean available = randomBoolean();
DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, enabled, allConfigs, transformsStateAndStats);
DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available,
enabled,
transformsStateAndStats);
assertEquals(enabled, usage.enabled());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@ -108,13 +109,12 @@ public class DataFrameFeatureSetTests extends ESTestCase {
assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap));
assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap));
} else {
assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(),
XContentMapValues.extractValue("transforms._all", usageAsMap));
assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap));
Map<String, Integer> stateCounts = new HashMap<>();
transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value())
.forEach(x -> stateCounts.merge(x, 1, Integer::sum));
transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum));
transformsStateAndStats.stream()
.map(x -> x.getTransformState().getIndexerState().value())
.forEach(x -> stateCounts.merge(x, 1, Integer::sum));
stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));
// use default constructed stats object for assertions if transformsStateAndStats is empty

View File

@ -0,0 +1,339 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.util;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.Mockito;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BatchedDataIteratorTests extends ESTestCase {
private static final String INDEX_NAME = "some_index_name";
private static final String SCROLL_ID = "someScrollId";
private Client client;
private boolean wasScrollCleared;
private TestIterator testIterator;
private List<SearchRequest> searchRequestCaptor = new ArrayList<>();
private List<SearchScrollRequest> searchScrollRequestCaptor = new ArrayList<>();
@Before
public void setUpMocks() {
ThreadPool pool = mock(ThreadPool.class);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(pool.getThreadContext()).thenReturn(threadContext);
client = Mockito.mock(Client.class);
when(client.threadPool()).thenReturn(pool);
wasScrollCleared = false;
testIterator = new TestIterator(client, INDEX_NAME);
givenClearScrollRequest();
searchRequestCaptor.clear();
searchScrollRequestCaptor.clear();
}
public void testQueryReturnsNoResults() throws Exception {
new ScrollResponsesMocker().finishMock();
assertTrue(testIterator.hasNext());
PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
testIterator.next(future);
assertTrue(future.get().isEmpty());
assertFalse(testIterator.hasNext());
assertTrue(wasScrollCleared);
assertSearchRequest();
assertSearchScrollRequests(0);
}
public void testCallingNextWhenHasNextIsFalseThrows() throws Exception {
PlainActionFuture<Deque<String>> firstFuture = new PlainActionFuture<>();
new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock();
testIterator.next(firstFuture);
firstFuture.get();
assertFalse(testIterator.hasNext());
PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
ExecutionException executionException = ESTestCase.expectThrows(ExecutionException.class, () -> {
testIterator.next(future);
future.get();
});
assertNotNull(executionException.getCause());
assertTrue(executionException.getCause() instanceof NoSuchElementException);
}
public void testQueryReturnsSingleBatch() throws Exception {
PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock();
assertTrue(testIterator.hasNext());
testIterator.next(future);
Deque<String> batch = future.get();
assertEquals(3, batch.size());
assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))));
assertFalse(testIterator.hasNext());
assertTrue(wasScrollCleared);
assertSearchRequest();
assertSearchScrollRequests(0);
}
public void testQueryReturnsThreeBatches() throws Exception {
PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
new ScrollResponsesMocker()
.addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))
.addBatch(createJsonDoc("d"), createJsonDoc("e"))
.addBatch(createJsonDoc("f"))
.finishMock();
assertTrue(testIterator.hasNext());
testIterator.next(future);
Deque<String> batch = future.get();
assertEquals(3, batch.size());
assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))));
future = new PlainActionFuture<>();
testIterator.next(future);
batch = future.get();
assertEquals(2, batch.size());
assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("d"), createJsonDoc("e"))));
future = new PlainActionFuture<>();
testIterator.next(future);
batch = future.get();
assertEquals(1, batch.size());
assertTrue(batch.containsAll(Collections.singletonList(createJsonDoc("f"))));
assertFalse(testIterator.hasNext());
assertTrue(wasScrollCleared);
assertSearchRequest();
assertSearchScrollRequests(2);
}
private String createJsonDoc(String value) {
return "{\"foo\":\"" + value + "\"}";
}
@SuppressWarnings("unchecked")
private void givenClearScrollRequest() {
ClearScrollRequestBuilder requestBuilder = mock(ClearScrollRequestBuilder.class);
when(client.prepareClearScroll()).thenReturn(requestBuilder);
when(requestBuilder.setScrollIds(Collections.singletonList(SCROLL_ID))).thenReturn(requestBuilder);
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(SCROLL_ID);
when(requestBuilder.request()).thenReturn(clearScrollRequest);
doAnswer((answer) -> {
wasScrollCleared = true;
ActionListener<ClearScrollResponse> scrollListener =
(ActionListener<ClearScrollResponse>) answer.getArguments()[1];
scrollListener.onResponse(new ClearScrollResponse(true,0));
return null;
}).when(client).clearScroll(any(ClearScrollRequest.class), any(ActionListener.class));
}
private void assertSearchRequest() {
List<SearchRequest> searchRequests = searchRequestCaptor;
assertThat(searchRequests.size(), equalTo(1));
SearchRequest searchRequest = searchRequests.get(0);
assertThat(searchRequest.indices(), equalTo(new String[] {INDEX_NAME}));
assertThat(searchRequest.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5)));
assertThat(searchRequest.types().length, equalTo(0));
assertThat(searchRequest.source().query(), equalTo(QueryBuilders.matchAllQuery()));
assertThat(searchRequest.source().trackTotalHitsUpTo(), is(SearchContext.TRACK_TOTAL_HITS_ACCURATE));
}
private void assertSearchScrollRequests(int expectedCount) {
List<SearchScrollRequest> searchScrollRequests = searchScrollRequestCaptor;
assertThat(searchScrollRequests.size(), equalTo(expectedCount));
for (SearchScrollRequest request : searchScrollRequests) {
assertThat(request.scrollId(), equalTo(SCROLL_ID));
assertThat(request.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5)));
}
}
private class ScrollResponsesMocker {
private List<String[]> batches = new ArrayList<>();
private long totalHits = 0;
private List<SearchResponse> responses = new ArrayList<>();
ScrollResponsesMocker addBatch(String... hits) {
totalHits += hits.length;
batches.add(hits);
return this;
}
@SuppressWarnings("unchecked")
void finishMock() {
if (batches.isEmpty()) {
givenInitialResponse();
return;
}
givenInitialResponse(batches.get(0));
for (int i = 1; i < batches.size(); ++i) {
givenNextResponse(batches.get(i));
}
if (responses.size() > 0) {
SearchResponse first = responses.get(0);
if (responses.size() > 1) {
List<SearchResponse> rest = new ArrayList<>(responses);
Iterator<SearchResponse> responseIterator = rest.iterator();
doAnswer((answer) -> {
SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0];
ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
searchScrollRequestCaptor.add(request);
rsp.onResponse(responseIterator.next());
return null;
}).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class));
} else {
doAnswer((answer) -> {
SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0];
ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
searchScrollRequestCaptor.add(request);
rsp.onResponse(first);
return null;
}).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class));
}
}
}
@SuppressWarnings("unchecked")
private void givenInitialResponse(String... hits) {
SearchResponse searchResponse = createSearchResponseWithHits(hits);
doAnswer((answer) -> {
SearchRequest request = (SearchRequest)answer.getArguments()[0];
searchRequestCaptor.add(request);
ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
rsp.onResponse(searchResponse);
return null;
}).when(client).search(any(SearchRequest.class), any(ActionListener.class));
}
private void givenNextResponse(String... hits) {
responses.add(createSearchResponseWithHits(hits));
}
private SearchResponse createSearchResponseWithHits(String... hits) {
SearchHits searchHits = createHits(hits);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getScrollId()).thenReturn(SCROLL_ID);
when(searchResponse.getHits()).thenReturn(searchHits);
return searchResponse;
}
private SearchHits createHits(String... values) {
List<SearchHit> hits = new ArrayList<>();
for (String value : values) {
hits.add(new SearchHitBuilder(randomInt()).setSource(value).build());
}
return new SearchHits(hits.toArray(new SearchHit[hits.size()]), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0f);
}
}
private static class TestIterator extends BatchedDataIterator<String, Deque<String>> {
TestIterator(Client client, String jobId) {
super(client, jobId);
}
@Override
protected QueryBuilder getQuery() {
return QueryBuilders.matchAllQuery();
}
@Override
protected String map(SearchHit hit) {
return hit.getSourceAsString();
}
@Override
protected Deque<String> getCollection() {
return new ArrayDeque<>();
}
@Override
protected SortOrder sortOrder() {
return SortOrder.DESC;
}
@Override
protected String sortField() {
return "foo";
}
}
public class SearchHitBuilder {
private final SearchHit hit;
private final Map<String, DocumentField> fields;
public SearchHitBuilder(int docId) {
hit = new SearchHit(docId);
fields = new HashMap<>();
}
public SearchHitBuilder addField(String name, Object value) {
return addField(name, Arrays.asList(value));
}
public SearchHitBuilder addField(String name, List<Object> values) {
fields.put(name, new DocumentField(name, values));
return this;
}
public SearchHitBuilder setSource(String sourceJson) {
hit.sourceRef(new BytesArray(sourceJson));
return this;
}
public SearchHit build() {
if (!fields.isEmpty()) {
hit.fields(fields);
}
return hit;
}
}
}

View File

@ -4,12 +4,24 @@
"methods": [ "GET" ],
"url": {
"path": "/_data_frame/transforms/{transform_id}",
"paths": [ "/_data_frame/transforms/{transform_id}"],
"paths": [ "/_data_frame/transforms/{transform_id}", "/_data_frame/transforms"],
"parts": {
"transform_id": {
"type": "string",
"required": false,
"description": "The id of the transforms to get, '_all' or '*' implies get all transforms"
"description": "The id or comma delimited list of id expressions of the transforms to get, '_all' or '*' implies get all transforms"
}
},
"params": {
"from": {
"type": "int",
"required": false,
"description": "skips a number of transform configs, defaults to 0"
},
"size": {
"type": "int",
"required": false,
"description": "specifies a max number of transforms to get, defaults to 100"
}
}
},

View File

@ -105,3 +105,41 @@ setup:
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform" }
- match: { transforms.1.id: "airline-transform-dos" }
- do:
data_frame.get_data_frame_transform:
transform_id: "airline-transform,airline-transform-dos"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform" }
- match: { transforms.1.id: "airline-transform-dos" }
- do:
data_frame.get_data_frame_transform:
transform_id: "airline-transform*"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform" }
- match: { transforms.1.id: "airline-transform-dos" }
- do:
data_frame.get_data_frame_transform:
transform_id: "airline-transform*"
from: 0
size: 1
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform" }
- do:
data_frame.get_data_frame_transform:
transform_id: "airline-transform*"
from: 1
size: 1
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-dos" }
---
"Test transform with invalid page parameter":
- do:
catch: /Param \[size\] has a max acceptable value of \[1000\]/
data_frame.get_data_frame_transform:
transform_id: "_all"
from: 0
size: 10000

View File

@ -104,3 +104,68 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stats-dos"
---
"Test get multiple transform stats where one does not have a task":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stats-dos"
body: >
{
"source": "airline-data",
"dest": "airline-data-by-airline-stats-dos",
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.transform_state: "stopped" }
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "_all"
- match: { count: 2 }
- match: { transforms.0.id: "airline-transform-stats" }
- match: { transforms.0.state.transform_state: "started" }
- match: { transforms.1.id: "airline-transform-stats-dos" }
- match: { transforms.1.state.transform_state: "stopped" }
---
"Test get single transform stats when it does not have a task":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stats-dos"
body: >
{
"source": "airline-data",
"dest": "airline-data-by-airline-stats-dos",
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-stats-dos"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-stats-dos" }
- match: { transforms.0.state.transform_state: "stopped" }
- match: { transforms.0.state.generation: 0 }
- match: { transforms.0.stats.pages_processed: 0 }
- match: { transforms.0.stats.documents_processed: 0 }
- match: { transforms.0.stats.documents_indexed: 0 }
- match: { transforms.0.stats.trigger_count: 0 }
- match: { transforms.0.stats.index_time_in_ms: 0 }
- match: { transforms.0.stats.index_total: 0 }
- match: { transforms.0.stats.index_failures: 0 }
- match: { transforms.0.stats.search_time_in_ms: 0 }
- match: { transforms.0.stats.search_total: 0 }
- match: { transforms.0.stats.search_failures: 0 }