diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java index 86db09e9f21..ca6bf9d16e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java @@ -8,29 +8,28 @@ package org.elasticsearch.xpack.core.dataframe.action; import org.apache.logging.log4j.LogManager; import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.action.AbstractGetResourcesRequest; +import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse; +import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; public class GetDataFrameTransformsAction extends Action{ @@ -49,61 +48,45 @@ public class GetDataFrameTransformsAction extends Action MAX_SIZE_RETURN) { + exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() + + "] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception); + } + return exception; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(DataFrameField.ID.getPreferredName(), id); + builder.field(DataFrameField.ID.getPreferredName(), getResourceId()); return builder; } @Override - public int hashCode() { - return Objects.hash(id); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - Request other = (Request) obj; - return Objects.equals(id, other.id); + public String getResourceIdField() { + return DataFrameField.ID.getPreferredName(); } } @@ -114,19 +97,17 @@ public class GetDataFrameTransformsAction extends Action implements Writeable, ToXContentObject { public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms"; private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms"); - private List transformConfigurations; - public Response(List transformConfigs) { - this.transformConfigurations = transformConfigs; + super(new QueryPage<>(transformConfigs, transformConfigs.size(), DataFrameField.TRANSFORMS)); } public Response() { - this.transformConfigurations = Collections.emptyList(); + super(); } public Response(StreamInput in) throws IOException { @@ -134,30 +115,18 @@ public class GetDataFrameTransformsAction extends Action getTransformConfigurations() { - return transformConfigurations; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - transformConfigurations = in.readList(DataFrameTransformConfig::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeList(transformConfigurations); + return getResources().results(); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { List invalidTransforms = new ArrayList<>(); builder.startObject(); - builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size()); + builder.field(DataFrameField.COUNT.getPreferredName(), getResources().count()); // XContentBuilder does not support passing the params object for Iterables builder.field(DataFrameField.TRANSFORMS.getPreferredName()); builder.startArray(); - for (DataFrameTransformConfig configResponse : transformConfigurations) { + for (DataFrameTransformConfig configResponse : getResources().results()) { configResponse.toXContent(builder, params); if (configResponse.isValid() == false) { invalidTransforms.add(configResponse.getId()); @@ -177,27 +146,8 @@ public class GetDataFrameTransformsAction extends Action getReader() { + return DataFrameTransformConfig::new; } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java index 8ca1531cc65..116ad482d01 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.Objects; @@ -38,6 +39,12 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj DataFrameField.STATS_FIELD); } + public static DataFrameTransformStateAndStats initialStateAndStats(String id) { + return new DataFrameTransformStateAndStats(id, + new DataFrameTransformState(IndexerState.STOPPED, null, 0), + new DataFrameIndexerTransformStats()); + } + public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) { this.id = Objects.requireNonNull(id); this.transformState = Objects.requireNonNull(state); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java index 188f9a02edc..2cb7d594754 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java @@ -17,20 +17,16 @@ import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage; -import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; -import org.elasticsearch.xpack.core.indexing.IndexerState; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; + public class DataFrameFeatureSet implements XPackFeatureSet { @@ -78,39 +74,24 @@ public class DataFrameFeatureSet implements XPackFeatureSet { return; } - GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL); - client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap( - transforms -> { - GetDataFrameTransformsStatsAction.Request transformStatsRequest = - new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); - client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest, - ActionListener.wrap(transformStatsResponse -> { - listener.onResponse(createUsage(available(), enabled(), transforms.getTransformConfigurations(), - transformStatsResponse.getTransformsStateAndStats())); - }, listener::onFailure)); - }, - listener::onFailure - )); + final GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL); + client.execute(GetDataFrameTransformsStatsAction.INSTANCE, + transformStatsRequest, + ActionListener.wrap(transformStatsResponse -> + listener.onResponse(createUsage(available(), enabled(), transformStatsResponse.getTransformsStateAndStats())), + listener::onFailure)); } - static DataFrameFeatureSetUsage createUsage(boolean available, boolean enabled, - List transforms, + static DataFrameFeatureSetUsage createUsage(boolean available, + boolean enabled, List transformsStateAndStats) { - Set transformIds = transforms.stream() - .map(DataFrameTransformConfig::getId) - .collect(Collectors.toSet()); - Map transformsCountByState = new HashMap<>(); DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats(); - transformsStateAndStats.forEach(singleResult -> { - transformIds.remove(singleResult.getId()); transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum); accumulatedStats.merge(singleResult.getTransformStats()); }); - // If there is no state returned, assumed stopped - transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum)); return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java index 43372667fd3..bbdd8a6dee8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java @@ -6,35 +6,84 @@ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.action.AbstractTransportGetResourcesAction; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response; -import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; + +import java.io.IOException; + +import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE; -public class TransportGetDataFrameTransformsAction extends HandledTransportAction { - - private final DataFrameTransformsConfigManager transformsConfigManager; +public class TransportGetDataFrameTransformsAction extends AbstractTransportGetResourcesAction { @Inject public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters, - DataFrameTransformsConfigManager transformsConfigManager) { - super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request()); - this.transformsConfigManager = transformsConfigManager; + Client client, NamedXContentRegistry xContentRegistry) { + super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, Request::new, client, xContentRegistry); } @Override protected void doExecute(Task task, Request request, ActionListener listener) { - //TODO support comma delimited and simple regex IDs - transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap( - configs -> listener.onResponse(new Response(configs)), + searchResources(request, ActionListener.wrap( + r -> listener.onResponse(new Response(r.results())), listener::onFailure )); } + + @Override + protected ParseField getResultsField() { + return DataFrameField.TRANSFORMS; + } + + @Override + protected String[] getIndices() { + return new String[]{DataFrameInternalIndex.INDEX_NAME}; + } + + @Override + protected DataFrameTransformConfig parse(XContentParser parser) throws IOException { + return DataFrameTransformConfig.fromXContent(parser, null, true); + } + + @Override + protected ResourceNotFoundException notFoundException(String resourceId) { + return new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, resourceId)); + } + + @Override + protected String executionOrigin() { + return ClientHelper.DATA_FRAME_ORIGIN; + } + + @Override + protected String extractIdFromResource(DataFrameTransformConfig transformConfig) { + return transformConfig.getId(); + } + + @Override + protected QueryBuilder additionalQuery() { + return QueryBuilders.termQuery(INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index a3cd220f257..b751279abf2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -6,31 +6,54 @@ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import org.elasticsearch.xpack.dataframe.util.BatchedDataIterator; +import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class TransportGetDataFrameTransformsStatsAction extends @@ -39,11 +62,16 @@ public class TransportGetDataFrameTransformsStatsAction extends GetDataFrameTransformsStatsAction.Response, GetDataFrameTransformsStatsAction.Response> { + private final Client client; + private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; @Inject public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService) { + ClusterService clusterService, Client client, + DataFrameTransformsConfigManager dataFrameTransformsConfigManager) { super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, Response::new, ThreadPool.Names.SAME); + this.client = client; + this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; } @Override @@ -80,11 +108,14 @@ public class TransportGetDataFrameTransformsStatsAction extends if (nodes.isLocalNodeElectedMaster()) { if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) { - super.doExecute(task, request, listener); + ActionListener transformStatsListener = ActionListener.wrap( + response -> collectStatsForTransformsWithoutTasks(request, response, listener), + listener::onFailure + ); + super.doExecute(task, request, transformStatsListener); } else { - // If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET - // and we can just send an empty response, no need to go looking for the allocated task - listener.onResponse(new Response(Collections.emptyList())); + // If we don't have any tasks, pass empty collection to this method + collectStatsForTransformsWithoutTasks(request, new Response(Collections.emptyList()), listener); } } else { @@ -99,4 +130,110 @@ public class TransportGetDataFrameTransformsStatsAction extends } } } + + // TODO correct when we start storing stats in docs, right now, just return STOPPED and empty stats + private void collectStatsForTransformsWithoutTasks(Request request, + Response response, + ActionListener listener) { + if (request.getId().equals(MetaData.ALL) == false) { + // If we did not find any tasks && this is NOT for ALL, verify that the single config exists, and return as stopped + // Empty other wise + if (response.getTransformsStateAndStats().isEmpty()) { + dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap( + config -> + listener.onResponse( + new Response(Collections.singletonList(DataFrameTransformStateAndStats.initialStateAndStats(config.getId())))), + exception -> { + if (exception instanceof ResourceNotFoundException) { + listener.onResponse(new Response(Collections.emptyList())); + } else { + listener.onFailure(exception); + } + } + )); + } else { + // If it was not ALL && we DO have stored stats, simply return those as we found them all, since we only support 1 or all + listener.onResponse(response); + } + return; + } + // We only do this mass collection if we are getting ALL tasks + TransformIdCollector collector = new TransformIdCollector(); + collector.execute(ActionListener.wrap( + allIds -> { + response.getTransformsStateAndStats().forEach( + tsas -> allIds.remove(tsas.getId()) + ); + List statsWithoutTasks = allIds.stream() + .map(DataFrameTransformStateAndStats::initialStateAndStats) + .collect(Collectors.toList()); + statsWithoutTasks.addAll(response.getTransformsStateAndStats()); + statsWithoutTasks.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId)); + listener.onResponse(new Response(statsWithoutTasks)); + }, + listener::onFailure + )); + } + + /** + * This class recursively queries a scroll search over all transform_ids and puts them in a set + */ + private class TransformIdCollector extends BatchedDataIterator> { + + private final Set ids = new HashSet<>(); + TransformIdCollector() { + super(client, DataFrameInternalIndex.INDEX_NAME); + } + + void execute(final ActionListener> finalListener) { + if (this.hasNext()) { + next(ActionListener.wrap( + setOfIds -> execute(finalListener), + finalListener::onFailure + )); + } else { + finalListener.onResponse(ids); + } + } + + @Override + protected QueryBuilder getQuery() { + return QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME)); + } + + @Override + protected String map(SearchHit hit) { + BytesReference source = hit.getSourceRef(); + try (InputStream stream = source.streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, stream)) { + return (String)parser.map().get(DataFrameField.ID.getPreferredName()); + } catch (IOException e) { + throw new ElasticsearchParseException("failed to parse bucket", e); + } + } + + @Override + protected Set getCollection() { + return ids; + } + + @Override + protected SortOrder sortOrder() { + return SortOrder.ASC; + } + + @Override + protected String sortField() { + return DataFrameField.ID.getPreferredName(); + } + + @Override + protected FetchSourceContext getFetchSourceContext() { + return new FetchSourceContext(true, new String[]{DataFrameField.ID.getPreferredName()}, new String[]{}); + } + } + + } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index 874acdb1ec9..445a7913e99 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -20,12 +20,8 @@ import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -36,22 +32,14 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -59,7 +47,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class DataFrameTransformsConfigManager { private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class); - private static final int DEFAULT_SIZE = 100; public static final Map TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true"); @@ -102,6 +89,8 @@ public class DataFrameTransformsConfigManager { } } + // For transforms returned via GET data_frame/transforms, see the TransportGetDataFrameTransformsAction + // This function is only for internal use. public void getTransformConfiguration(String transformId, ActionListener resultListener) { GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { @@ -123,54 +112,6 @@ public class DataFrameTransformsConfigManager { })); } - /** - * Get more than one DataFrameTransformConfig - * - * @param transformId Can be a single transformId, `*`, or `_all` - * @param resultListener Listener to alert when request is completed - */ - // TODO add pagination support - public void getTransformConfigurations(String transformId, - ActionListener> resultListener) { - final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId}); - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME)); - if (isAllOrWildCard == false) { - queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId)); - } - - SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .setTrackTotalHits(true) - .addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC) - .setSize(DEFAULT_SIZE) - .setQuery(queryBuilder) - .request(); - - ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, - ActionListener.wrap( - searchResponse -> { - List configs = new ArrayList<>(searchResponse.getHits().getHits().length); - for (SearchHit hit : searchResponse.getHits().getHits()) { - DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(), - resultListener::onFailure); - if (config == null) { - return; - } - configs.add(config); - } - if (configs.isEmpty() && (isAllOrWildCard == false)) { - resultListener.onFailure(new ResourceNotFoundException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); - return; - } - resultListener.onResponse(configs); - }, - resultListener::onFailure) - , client::search); - - } - public void deleteTransformConfiguration(String transformId, ActionListener listener) { DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -193,18 +134,6 @@ public class DataFrameTransformsConfigManager { })); } - private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer onFailure) { - try (InputStream stream = source.streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) { - return DataFrameTransformConfig.fromXContent(parser, null, true); - } catch (Exception e) { - logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e); - onFailure.accept(e); - return null; - } - } - private void parseTransformLenientlyFromSource(BytesReference source, String transformId, ActionListener transformListener) { try (InputStream stream = source.streamInput(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java index 1980a8dd87a..d079b2fd72c 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; @@ -25,8 +26,15 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { + GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request(); + String id = restRequest.param(DataFrameField.ID.getPreferredName()); - GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request(id); + request.setResourceId(id); + if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { + request.setPageParams( + new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM), + restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE))); + } return channel -> client.execute(GetDataFrameTransformsAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIterator.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIterator.java new file mode 100644 index 00000000000..56c58252454 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIterator.java @@ -0,0 +1,186 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.core.ClientHelper; + +import java.util.Collection; +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.Objects; + +/** + * Provides basic tools around scrolling over documents and gathering the data in some Collection + * @param The object type that is being collected + * @param The collection that should be used (i.e. Set, Deque, etc.) + */ +public abstract class BatchedDataIterator> { + private static final Logger LOGGER = LogManager.getLogger(BatchedDataIterator.class); + + private static final String CONTEXT_ALIVE_DURATION = "5m"; + private static final int BATCH_SIZE = 10_000; + + private final Client client; + private final String index; + private volatile long count; + private volatile long totalHits; + private volatile String scrollId; + private volatile boolean isScrollInitialised; + + protected BatchedDataIterator(Client client, String index) { + this.client = Objects.requireNonNull(client); + this.index = Objects.requireNonNull(index); + this.totalHits = 0; + this.count = 0; + } + + /** + * Returns {@code true} if the iteration has more elements. + * (In other words, returns {@code true} if {@link #next} would + * return an element rather than throwing an exception.) + * + * @return {@code true} if the iteration has more elements + */ + public boolean hasNext() { + return !isScrollInitialised || count != totalHits; + } + + /** + * The first time next() is called, the search will be performed and the first + * batch will be given to the listener. Any subsequent call will return the following batches. + *

+ * Note that in some implementations it is possible that when there are no + * results at all. {@link BatchedDataIterator#hasNext()} will return {@code true} the first time it is called but then a call + * to this function returns an empty Collection to the listener. + */ + public void next(ActionListener listener) { + if (!hasNext()) { + listener.onFailure(new NoSuchElementException()); + } + + if (!isScrollInitialised) { + ActionListener wrappedListener = ActionListener.wrap( + searchResponse -> { + isScrollInitialised = true; + totalHits = searchResponse.getHits().getTotalHits().value; + scrollId = searchResponse.getScrollId(); + mapHits(searchResponse, listener); + }, + listener::onFailure + ); + initScroll(wrappedListener); + } else { + ActionListener wrappedListener = ActionListener.wrap( + searchResponse -> { + scrollId = searchResponse.getScrollId(); + mapHits(searchResponse, listener); + }, + listener::onFailure + ); + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION); + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.DATA_FRAME_ORIGIN, + searchScrollRequest, + wrappedListener, + client::searchScroll); + } + } + + private void initScroll(ActionListener listener) { + LOGGER.trace("ES API CALL: search index {}", index); + + SearchRequest searchRequest = new SearchRequest(index); + searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + searchRequest.scroll(CONTEXT_ALIVE_DURATION); + searchRequest.source(new SearchSourceBuilder() + .fetchSource(getFetchSourceContext()) + .size(getBatchSize()) + .query(getQuery()) + .trackTotalHits(true) + .sort(sortField(), sortOrder())); + + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.DATA_FRAME_ORIGIN, + searchRequest, + listener, + client::search); + } + + private void mapHits(SearchResponse searchResponse, ActionListener mappingListener) { + E results = getCollection(); + + SearchHit[] hits = searchResponse.getHits().getHits(); + for (SearchHit hit : hits) { + T mapped = map(hit); + if (mapped != null) { + results.add(mapped); + } + } + count += hits.length; + + if (!hasNext() && scrollId != null) { + ClearScrollRequest request = client.prepareClearScroll().setScrollIds(Collections.singletonList(scrollId)).request(); + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.DATA_FRAME_ORIGIN, + request, + ActionListener.wrap( + r -> mappingListener.onResponse(results), + mappingListener::onFailure + ), + client::clearScroll); + } else { + mappingListener.onResponse(results); + } + } + + /** + * Get the query to use for the search + * @return the search query + */ + protected abstract QueryBuilder getQuery(); + + /** + * Maps the search hit to the document type + * @param hit the search hit + * @return The mapped document or {@code null} if the mapping failed + */ + protected abstract T map(SearchHit hit); + + protected abstract E getCollection(); + + protected abstract SortOrder sortOrder(); + + protected abstract String sortField(); + + /** + * Should we fetch the source and what fields specifically. + * + * Defaults to all fields and true. + */ + protected FetchSourceContext getFetchSourceContext() { + return FetchSourceContext.FETCH_SOURCE; + } + + protected int getBatchSize() { + return BATCH_SIZE; + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/TypedChainTaskExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/TypedChainTaskExecutor.java new file mode 100644 index 00000000000..6657a1a81c7 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/TypedChainTaskExecutor.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.util; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; + +/** + * A utility that allows chained (serial) execution of a number of tasks + * in async manner. + */ +public class TypedChainTaskExecutor { + + public interface ChainTask { + void run(ActionListener listener); + } + + private final ExecutorService executorService; + private final LinkedList> tasks = new LinkedList<>(); + private final Predicate failureShortCircuitPredicate; + private final Predicate continuationPredicate; + private final List collectedResponses; + + /** + * Creates a new TypedChainTaskExecutor. + * Each chainedTask is executed in order serially and after each execution the continuationPredicate is tested. + * + * On failures the failureShortCircuitPredicate is tested. + * + * @param executorService The service where to execute the tasks + * @param continuationPredicate The predicate to test on whether to execute the next task or not. + * {@code true} means continue on to the next task. + * Must be able to handle null values. + * @param failureShortCircuitPredicate The predicate on whether to short circuit execution on a give exception. + * {@code true} means that no more tasks should execute and the the listener::onFailure should be + * called. + */ + public TypedChainTaskExecutor(ExecutorService executorService, + Predicate continuationPredicate, + Predicate failureShortCircuitPredicate) { + this.executorService = Objects.requireNonNull(executorService); + this.continuationPredicate = continuationPredicate; + this.failureShortCircuitPredicate = failureShortCircuitPredicate; + this.collectedResponses = new ArrayList<>(); + } + + public synchronized void add(ChainTask task) { + tasks.add(task); + } + + private synchronized void execute(T previousValue, ActionListener> listener) { + collectedResponses.add(previousValue); + if (continuationPredicate.test(previousValue)) { + if (tasks.isEmpty()) { + listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses))); + return; + } + ChainTask task = tasks.pop(); + executorService.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (failureShortCircuitPredicate.test(e)) { + listener.onFailure(e); + } else { + execute(null, listener); + } + } + + @Override + protected void doRun() { + task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure)); + } + }); + } else { + listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses))); + } + } + + /** + * Execute all the chained tasks serially, notify listener when completed + * + * @param listener The ActionListener to notify when all executions have been completed, + * or when no further tasks should be executed. + * The resulting list COULD contain null values depending on if execution is continued + * on exceptions or not. + */ + public synchronized void execute(ActionListener> listener) { + if (tasks.isEmpty()) { + listener.onResponse(Collections.emptyList()); + return; + } + collectedResponses.clear(); + ChainTask task = tasks.pop(); + executorService.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (failureShortCircuitPredicate.test(e)) { + listener.onFailure(e); + } else { + execute(null, listener); + } + } + + @Override + protected void doRun() { + task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure)); + } + }); + } + + public synchronized List getCollectedResponses() { + return Collections.unmodifiableList(new ArrayList<>(collectedResponses)); + } +} diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java index 6c314adab74..bea7e5b0148 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.junit.Before; import java.io.IOException; @@ -83,17 +82,19 @@ public class DataFrameFeatureSetTests extends ESTestCase { DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++))); } - List transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size()); + List transformConfigWithTasks = + new ArrayList<>(transformsStateAndStats.size() + transformConfigWithoutTasks.size()); + transformsStateAndStats.forEach(stats -> transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId()))); - - List allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size()); - allConfigs.addAll(transformConfigWithoutTasks); - allConfigs.addAll(transformConfigWithTasks); + transformConfigWithoutTasks.forEach(withoutTask -> + transformsStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(withoutTask.getId()))); boolean enabled = randomBoolean(); boolean available = randomBoolean(); - DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, enabled, allConfigs, transformsStateAndStats); + DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, + enabled, + transformsStateAndStats); assertEquals(enabled, usage.enabled()); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { @@ -108,13 +109,12 @@ public class DataFrameFeatureSetTests extends ESTestCase { assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap)); assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap)); } else { - assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(), - XContentMapValues.extractValue("transforms._all", usageAsMap)); + assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap)); Map stateCounts = new HashMap<>(); - transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value()) - .forEach(x -> stateCounts.merge(x, 1, Integer::sum)); - transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum)); + transformsStateAndStats.stream() + .map(x -> x.getTransformState().getIndexerState().value()) + .forEach(x -> stateCounts.merge(x, 1, Integer::sum)); stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap))); // use default constructed stats object for assertions if transformsStateAndStats is empty diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java new file mode 100644 index 00000000000..2a9e99de559 --- /dev/null +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java @@ -0,0 +1,339 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.util; + +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollRequestBuilder; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; +import org.mockito.Mockito; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class BatchedDataIteratorTests extends ESTestCase { + + private static final String INDEX_NAME = "some_index_name"; + private static final String SCROLL_ID = "someScrollId"; + + private Client client; + private boolean wasScrollCleared; + + private TestIterator testIterator; + + private List searchRequestCaptor = new ArrayList<>(); + private List searchScrollRequestCaptor = new ArrayList<>(); + + @Before + public void setUpMocks() { + ThreadPool pool = mock(ThreadPool.class); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(pool.getThreadContext()).thenReturn(threadContext); + client = Mockito.mock(Client.class); + when(client.threadPool()).thenReturn(pool); + wasScrollCleared = false; + testIterator = new TestIterator(client, INDEX_NAME); + givenClearScrollRequest(); + searchRequestCaptor.clear(); + searchScrollRequestCaptor.clear(); + } + + public void testQueryReturnsNoResults() throws Exception { + new ScrollResponsesMocker().finishMock(); + + assertTrue(testIterator.hasNext()); + PlainActionFuture> future = new PlainActionFuture<>(); + testIterator.next(future); + assertTrue(future.get().isEmpty()); + assertFalse(testIterator.hasNext()); + assertTrue(wasScrollCleared); + assertSearchRequest(); + assertSearchScrollRequests(0); + } + + public void testCallingNextWhenHasNextIsFalseThrows() throws Exception { + PlainActionFuture> firstFuture = new PlainActionFuture<>(); + new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock(); + testIterator.next(firstFuture); + firstFuture.get(); + assertFalse(testIterator.hasNext()); + PlainActionFuture> future = new PlainActionFuture<>(); + ExecutionException executionException = ESTestCase.expectThrows(ExecutionException.class, () -> { + testIterator.next(future); + future.get(); + }); + assertNotNull(executionException.getCause()); + assertTrue(executionException.getCause() instanceof NoSuchElementException); + } + + public void testQueryReturnsSingleBatch() throws Exception { + PlainActionFuture> future = new PlainActionFuture<>(); + new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock(); + + assertTrue(testIterator.hasNext()); + testIterator.next(future); + Deque batch = future.get(); + assertEquals(3, batch.size()); + assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")))); + assertFalse(testIterator.hasNext()); + assertTrue(wasScrollCleared); + + assertSearchRequest(); + assertSearchScrollRequests(0); + } + + public void testQueryReturnsThreeBatches() throws Exception { + PlainActionFuture> future = new PlainActionFuture<>(); + new ScrollResponsesMocker() + .addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")) + .addBatch(createJsonDoc("d"), createJsonDoc("e")) + .addBatch(createJsonDoc("f")) + .finishMock(); + + assertTrue(testIterator.hasNext()); + + testIterator.next(future); + Deque batch = future.get(); + assertEquals(3, batch.size()); + assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")))); + + future = new PlainActionFuture<>(); + testIterator.next(future); + batch = future.get(); + assertEquals(2, batch.size()); + assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("d"), createJsonDoc("e")))); + + future = new PlainActionFuture<>(); + testIterator.next(future); + batch = future.get(); + assertEquals(1, batch.size()); + assertTrue(batch.containsAll(Collections.singletonList(createJsonDoc("f")))); + + assertFalse(testIterator.hasNext()); + assertTrue(wasScrollCleared); + + assertSearchRequest(); + assertSearchScrollRequests(2); + } + + private String createJsonDoc(String value) { + return "{\"foo\":\"" + value + "\"}"; + } + + @SuppressWarnings("unchecked") + private void givenClearScrollRequest() { + ClearScrollRequestBuilder requestBuilder = mock(ClearScrollRequestBuilder.class); + + when(client.prepareClearScroll()).thenReturn(requestBuilder); + when(requestBuilder.setScrollIds(Collections.singletonList(SCROLL_ID))).thenReturn(requestBuilder); + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(SCROLL_ID); + when(requestBuilder.request()).thenReturn(clearScrollRequest); + doAnswer((answer) -> { + wasScrollCleared = true; + ActionListener scrollListener = + (ActionListener) answer.getArguments()[1]; + scrollListener.onResponse(new ClearScrollResponse(true,0)); + return null; + }).when(client).clearScroll(any(ClearScrollRequest.class), any(ActionListener.class)); + } + + private void assertSearchRequest() { + List searchRequests = searchRequestCaptor; + assertThat(searchRequests.size(), equalTo(1)); + SearchRequest searchRequest = searchRequests.get(0); + assertThat(searchRequest.indices(), equalTo(new String[] {INDEX_NAME})); + assertThat(searchRequest.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5))); + assertThat(searchRequest.types().length, equalTo(0)); + assertThat(searchRequest.source().query(), equalTo(QueryBuilders.matchAllQuery())); + assertThat(searchRequest.source().trackTotalHitsUpTo(), is(SearchContext.TRACK_TOTAL_HITS_ACCURATE)); + } + + private void assertSearchScrollRequests(int expectedCount) { + List searchScrollRequests = searchScrollRequestCaptor; + assertThat(searchScrollRequests.size(), equalTo(expectedCount)); + for (SearchScrollRequest request : searchScrollRequests) { + assertThat(request.scrollId(), equalTo(SCROLL_ID)); + assertThat(request.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5))); + } + } + + private class ScrollResponsesMocker { + private List batches = new ArrayList<>(); + private long totalHits = 0; + private List responses = new ArrayList<>(); + + ScrollResponsesMocker addBatch(String... hits) { + totalHits += hits.length; + batches.add(hits); + return this; + } + + @SuppressWarnings("unchecked") + void finishMock() { + if (batches.isEmpty()) { + givenInitialResponse(); + return; + } + givenInitialResponse(batches.get(0)); + for (int i = 1; i < batches.size(); ++i) { + givenNextResponse(batches.get(i)); + } + if (responses.size() > 0) { + SearchResponse first = responses.get(0); + if (responses.size() > 1) { + List rest = new ArrayList<>(responses); + Iterator responseIterator = rest.iterator(); + doAnswer((answer) -> { + SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0]; + ActionListener rsp = (ActionListener)answer.getArguments()[1]; + searchScrollRequestCaptor.add(request); + rsp.onResponse(responseIterator.next()); + return null; + }).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class)); + } else { + doAnswer((answer) -> { + SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0]; + ActionListener rsp = (ActionListener)answer.getArguments()[1]; + searchScrollRequestCaptor.add(request); + rsp.onResponse(first); + return null; + }).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class)); + } + } + } + + @SuppressWarnings("unchecked") + private void givenInitialResponse(String... hits) { + SearchResponse searchResponse = createSearchResponseWithHits(hits); + doAnswer((answer) -> { + SearchRequest request = (SearchRequest)answer.getArguments()[0]; + searchRequestCaptor.add(request); + ActionListener rsp = (ActionListener)answer.getArguments()[1]; + rsp.onResponse(searchResponse); + return null; + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); + } + + private void givenNextResponse(String... hits) { + responses.add(createSearchResponseWithHits(hits)); + } + + private SearchResponse createSearchResponseWithHits(String... hits) { + SearchHits searchHits = createHits(hits); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getScrollId()).thenReturn(SCROLL_ID); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + + private SearchHits createHits(String... values) { + List hits = new ArrayList<>(); + for (String value : values) { + hits.add(new SearchHitBuilder(randomInt()).setSource(value).build()); + } + return new SearchHits(hits.toArray(new SearchHit[hits.size()]), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0f); + } + } + + private static class TestIterator extends BatchedDataIterator> { + TestIterator(Client client, String jobId) { + super(client, jobId); + } + + @Override + protected QueryBuilder getQuery() { + return QueryBuilders.matchAllQuery(); + } + + @Override + protected String map(SearchHit hit) { + return hit.getSourceAsString(); + } + + @Override + protected Deque getCollection() { + return new ArrayDeque<>(); + } + + @Override + protected SortOrder sortOrder() { + return SortOrder.DESC; + } + + @Override + protected String sortField() { + return "foo"; + } + } + public class SearchHitBuilder { + + private final SearchHit hit; + private final Map fields; + + public SearchHitBuilder(int docId) { + hit = new SearchHit(docId); + fields = new HashMap<>(); + } + + public SearchHitBuilder addField(String name, Object value) { + return addField(name, Arrays.asList(value)); + } + + public SearchHitBuilder addField(String name, List values) { + fields.put(name, new DocumentField(name, values)); + return this; + } + + public SearchHitBuilder setSource(String sourceJson) { + hit.sourceRef(new BytesArray(sourceJson)); + return this; + } + + public SearchHit build() { + if (!fields.isEmpty()) { + hit.fields(fields); + } + return hit; + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json index e3653fecb98..094ee1a0237 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json @@ -4,12 +4,24 @@ "methods": [ "GET" ], "url": { "path": "/_data_frame/transforms/{transform_id}", - "paths": [ "/_data_frame/transforms/{transform_id}"], + "paths": [ "/_data_frame/transforms/{transform_id}", "/_data_frame/transforms"], "parts": { "transform_id": { "type": "string", "required": false, - "description": "The id of the transforms to get, '_all' or '*' implies get all transforms" + "description": "The id or comma delimited list of id expressions of the transforms to get, '_all' or '*' implies get all transforms" + } + }, + "params": { + "from": { + "type": "int", + "required": false, + "description": "skips a number of transform configs, defaults to 0" + }, + "size": { + "type": "int", + "required": false, + "description": "specifies a max number of transforms to get, defaults to 100" } } }, diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index c8e7bc0a6ba..d007a51fe44 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -105,3 +105,41 @@ setup: - match: { count: 2 } - match: { transforms.0.id: "airline-transform" } - match: { transforms.1.id: "airline-transform-dos" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "airline-transform,airline-transform-dos" + - match: { count: 2 } + - match: { transforms.0.id: "airline-transform" } + - match: { transforms.1.id: "airline-transform-dos" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "airline-transform*" + - match: { count: 2 } + - match: { transforms.0.id: "airline-transform" } + - match: { transforms.1.id: "airline-transform-dos" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "airline-transform*" + from: 0 + size: 1 + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "airline-transform*" + from: 1 + size: 1 + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-dos" } +--- +"Test transform with invalid page parameter": + - do: + catch: /Param \[size\] has a max acceptable value of \[1000\]/ + data_frame.get_data_frame_transform: + transform_id: "_all" + from: 0 + size: 10000 diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml index 1c89058ed4a..1b224a692f5 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml @@ -104,3 +104,68 @@ teardown: - do: data_frame.delete_data_frame_transform: transform_id: "airline-transform-stats-dos" + +--- +"Test get multiple transform stats where one does not have a task": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stats-dos" + body: > + { + "source": "airline-data", + "dest": "airline-data-by-airline-stats-dos", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "*" + - match: { count: 2 } + - match: { transforms.0.id: "airline-transform-stats" } + - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.1.id: "airline-transform-stats-dos" } + - match: { transforms.1.state.transform_state: "stopped" } + + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "_all" + - match: { count: 2 } + - match: { transforms.0.id: "airline-transform-stats" } + - match: { transforms.0.state.transform_state: "started" } + - match: { transforms.1.id: "airline-transform-stats-dos" } + - match: { transforms.1.state.transform_state: "stopped" } + +--- +"Test get single transform stats when it does not have a task": + + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-stats-dos" + body: > + { + "source": "airline-data", + "dest": "airline-data-by-airline-stats-dos", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.get_data_frame_transform_stats: + transform_id: "airline-transform-stats-dos" + - match: { count: 1 } + - match: { transforms.0.id: "airline-transform-stats-dos" } + - match: { transforms.0.state.transform_state: "stopped" } + - match: { transforms.0.state.generation: 0 } + - match: { transforms.0.stats.pages_processed: 0 } + - match: { transforms.0.stats.documents_processed: 0 } + - match: { transforms.0.stats.documents_indexed: 0 } + - match: { transforms.0.stats.trigger_count: 0 } + - match: { transforms.0.stats.index_time_in_ms: 0 } + - match: { transforms.0.stats.index_total: 0 } + - match: { transforms.0.stats.index_failures: 0 } + - match: { transforms.0.stats.search_time_in_ms: 0 } + - match: { transforms.0.stats.search_total: 0 } + - match: { transforms.0.stats.search_failures: 0 }