diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 86dce1b3314..e027191d8a5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -22,7 +22,6 @@ public class DataFrameMessages { public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration"; public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings"; public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index"; - public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists"; public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist"; public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID = "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument"; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java index d278c78842c..26a957ea055 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java @@ -37,9 +37,10 @@ public class DataFrameMetaDataIT extends DataFrameRestTestCase { indicesCreated = true; } - public void testMetaData() throws IOException { + public void testMetaData() throws Exception { long testStarted = System.currentTimeMillis(); createPivotReviewsTransform("test_meta", "pivot_reviews", null); + startAndWaitForTransform("test_meta", "pivot_reviews"); Response mappingResponse = client().performRequest(new Request("GET", "pivot_reviews/_mapping")); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 6ff97e1ed9d..fd89a32a817 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -115,9 +115,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); startAndWaitForTransform(transformId, dataFrameIndex); + assertTrue(indexExists(dataFrameIndex)); // we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0 Map indexStats = getAsMap(dataFrameIndex + "/_stats"); @@ -174,9 +174,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + assertTrue(indexExists(dataFrameIndex)); // we expect 27 documents as there shall be 27 user_id's Map indexStats = getAsMap(dataFrameIndex + "/_stats"); @@ -228,9 +228,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + assertTrue(indexExists(dataFrameIndex)); // we expect 21 documents as there shall be 21 days worth of docs Map indexStats = getAsMap(dataFrameIndex + "/_stats"); @@ -301,9 +301,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + assertTrue(indexExists(dataFrameIndex)); // we expect 21 documents as there shall be 21 days worth of docs Map indexStats = getAsMap(dataFrameIndex + "/_stats"); @@ -351,9 +351,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { createDataframeTransformRequest.setJsonEntity(config); Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + assertTrue(indexExists(dataFrameIndex)); // we expect 27 documents as there shall be 27 user_id's Map indexStats = getAsMap(dataFrameIndex + "/_stats"); diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 89bdafbde2b..c0e6c97fd69 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -164,7 +164,6 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); } protected void startDataframeTransform(String transformId, boolean force) throws IOException { @@ -195,6 +194,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase { protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception { // start the transform startDataframeTransform(transformId, false, authHeader); + assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForDataFrameCheckpoint(transformId); refreshIndex(dataFrameIndex); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 4f689c6a5af..b7e6c235f8e 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -40,7 +40,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; @@ -49,7 +48,6 @@ import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction; @@ -60,6 +58,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransform import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction; @@ -85,7 +84,6 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; import static java.util.Collections.emptyList; -import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin { @@ -102,7 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu private final Settings settings; private final boolean transportClientMode; private final SetOnce dataFrameTransformsConfigManager = new SetOnce<>(); - private final SetOnce> dataFrameAuditor = new SetOnce<>(); + private final SetOnce dataFrameAuditor = new SetOnce<>(); private final SetOnce dataFrameTransformsCheckpointService = new SetOnce<>(); private final SetOnce schedulerEngine = new SetOnce<>(); @@ -184,11 +182,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu if (enabled == false || transportClientMode) { return emptyList(); } - dataFrameAuditor.set(new Auditor<>(client, - clusterService.getNodeName(), - DataFrameInternalIndex.AUDIT_INDEX, - DATA_FRAME_ORIGIN, - DataFrameAuditMessage.builder())); + dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName())); dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get())); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 6f364d91e44..c7f750ecdb2 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -11,8 +11,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -50,7 +48,6 @@ import org.elasticsearch.xpack.core.security.authz.RoleDescriptor; import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges; import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; @@ -117,18 +114,6 @@ public class TransportPutDataFrameTransformAction return; } - final String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - config.getDestination().getIndex()); - - if (dest.length > 0) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS, - config.getDestination().getIndex()), - RestStatus.BAD_REQUEST)); - return; - } - for(String src : config.getSource().getIndex()) { if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) { listener.onFailure(new ElasticsearchStatusException( @@ -145,9 +130,19 @@ public class TransportPutDataFrameTransformAction .indices(config.getSource().getIndex()) .privileges("read") .build(); + String[] destPrivileges = new String[3]; + destPrivileges[0] = "read"; + destPrivileges[1] = "index"; + // If the destination index does not exist, we can assume that we may have to create it on start. + // We should check that the creating user has the privileges to create the index. + if (indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination().getIndex()).length == 0) { + destPrivileges[2] = "create_index"; + } RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() .indices(config.getDestination().getIndex()) - .privileges("read", "index", "create_index") + .privileges(destPrivileges) .build(); HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); @@ -202,41 +197,12 @@ public class TransportPutDataFrameTransformAction // <5> Return the listener, or clean up destination index on failure. ActionListener putTransformConfigurationListener = ActionListener.wrap( putTransformConfigurationResult -> listener.onResponse(new Response(true)), - putTransformConfigurationException -> - ClientHelper.executeAsyncWithOrigin(client, - ClientHelper.DATA_FRAME_ORIGIN, - DeleteIndexAction.INSTANCE, - new DeleteIndexRequest(config.getDestination().getIndex()), ActionListener.wrap( - deleteIndexResponse -> listener.onFailure(putTransformConfigurationException), - deleteIndexException -> { - String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed"; - listener.onFailure( - new ElasticsearchStatusException(msg, - RestStatus.INTERNAL_SERVER_ERROR, - putTransformConfigurationException)); - }) - ) + listener::onFailure ); // <4> Put our transform - ActionListener createDestinationIndexListener = ActionListener.wrap( - createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), - createDestinationIndexException -> listener.onFailure( - new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX, - createDestinationIndexException)) - ); - - // <3> Create the destination index - ActionListener> deduceMappingsListener = ActionListener.wrap( - mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener), - deduceTargetMappingsException -> listener.onFailure( - new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS, - deduceTargetMappingsException)) - ); - - // <2> Deduce our mappings for the destination index ActionListener pivotValidationListener = ActionListener.wrap( - validationResult -> pivot.deduceMappings(client, deduceMappingsListener), + validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), validationException -> listener.onFailure( new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, validationException)) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index f68e246ed86..0aeb757e562 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -6,10 +6,14 @@ package org.elasticsearch.xpack.dataframe.action; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -35,32 +39,40 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; +import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.util.Collection; +import java.util.Map; import java.util.function.Consumer; import java.util.function.Predicate; public class TransportStartDataFrameTransformAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportStartDataFrameTransformAction.class); private final XPackLicenseState licenseState; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; private final PersistentTasksService persistentTasksService; private final Client client; + private final DataFrameAuditor auditor; @Inject public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, XPackLicenseState licenseState, ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver, DataFrameTransformsConfigManager dataFrameTransformsConfigManager, - PersistentTasksService persistentTasksService, Client client) { + PersistentTasksService persistentTasksService, Client client, + DataFrameAuditor auditor) { super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, StartDataFrameTransformAction.Request::new, indexNameExpressionResolver); this.licenseState = licenseState; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; this.persistentTasksService = persistentTasksService; this.client = client; + this.auditor = auditor; } @Override @@ -83,7 +95,7 @@ public class TransportStartDataFrameTransformAction extends } final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool); - // <3> Set the allocated task's state to STARTED + // <4> Set the allocated task's state to STARTED ActionListener> persistentTaskActionListener = ActionListener.wrap( task -> { waitForDataFrameTaskAllocated(task.getId(), @@ -102,16 +114,9 @@ public class TransportStartDataFrameTransformAction extends listener::onFailure ); - // <2> Create the task in cluster state so that it will start executing on the node - ActionListener getTransformListener = ActionListener.wrap( - config -> { - if (config.isValid() == false) { - listener.onFailure(new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()), - RestStatus.BAD_REQUEST - )); - return; - } + // <3> Create the task in cluster state so that it will start executing on the node + ActionListener createOrGetIndexListener = ActionListener.wrap( + unused -> { PersistentTasksCustomMetaData.PersistentTask existingTask = getExistingTask(transformTask.getId(), state); if (existingTask == null) { @@ -123,14 +128,14 @@ public class TransportStartDataFrameTransformAction extends DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState(); if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { listener.onFailure(new ElasticsearchStatusException( - "Unable to start data frame transform [" + config.getId() + + "Unable to start data frame transform [" + request.getId() + "] as it is in a failed state with failure: [" + transformState.getReason() + "]. Use force start to restart data frame transform once error is resolved.", RestStatus.CONFLICT)); } else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED && transformState.getTaskState() != DataFrameTransformTaskState.FAILED) { listener.onFailure(new ElasticsearchStatusException( - "Unable to start data frame transform [" + config.getId() + + "Unable to start data frame transform [" + request.getId() + "] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT)); } else { persistentTaskActionListener.onResponse(existingTask); @@ -140,10 +145,80 @@ public class TransportStartDataFrameTransformAction extends listener::onFailure ); + // <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it + ActionListener getTransformListener = ActionListener.wrap( + config -> { + if (config.isValid() == false) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()), + RestStatus.BAD_REQUEST + )); + return; + } + final String destinationIndex = config.getDestination().getIndex(); + String[] dest = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.lenientExpandOpen(), + destinationIndex); + + if(dest.length == 0) { + auditor.info(request.getId(), + "Could not find destination index [" + destinationIndex + "]." + + " Creating index with deduced mappings."); + createDestinationIndex(config, createOrGetIndexListener); + } else { + auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists."); + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.DATA_FRAME_ORIGIN, + client.admin() + .indices() + .prepareStats(dest) + .clear() + .setDocs(true) + .request(), + ActionListener.wrap( + r -> { + long docTotal = r.getTotal().docs.getCount(); + if (docTotal > 0L) { + auditor.warning(request.getId(), "Non-empty destination index [" + destinationIndex + "]. " + + "Contains [" + docTotal + "] total documents."); + } + createOrGetIndexListener.onResponse(null); + }, + e -> { + String msg = "Unable to determine destination index stats, error: " + e.getMessage(); + logger.error(msg, e); + auditor.warning(request.getId(), msg); + createOrGetIndexListener.onResponse(null); + }), + client.admin().indices()::stats); + } + }, + listener::onFailure + ); + // <1> Get the config to verify it exists and is valid dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener); } + private void createDestinationIndex(final DataFrameTransformConfig config, final ActionListener listener) { + + final Pivot pivot = new Pivot(config.getSource().getIndex(), + config.getSource().getQueryConfig().getQuery(), + config.getPivotConfig()); + + ActionListener> deduceMappingsListener = ActionListener.wrap( + mappings -> DataframeIndex.createDestinationIndex(client, + config, + mappings, + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)), + deduceTargetMappingsException -> listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS, + deduceTargetMappingsException)) + ); + + pivot.deduceMappings(client, deduceMappingsListener); + } + @Override protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java new file mode 100644 index 00000000000..e02954a280b --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/notifications/DataFrameAuditor.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.dataframe.notifications; + +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.common.notifications.Auditor; +import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; + +import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; + +/** + * DataFrameAuditor class that abstracts away generic templating for easier injection + */ +public class DataFrameAuditor extends Auditor { + public DataFrameAuditor(Client client, String nodeName) { + super(client, nodeName, DataFrameInternalIndex.AUDIT_INDEX, DATA_FRAME_ORIGIN, DataFrameAuditMessage.builder()); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index c670f32740c..823ccaff71b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -19,15 +19,14 @@ import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; -import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.io.IOException; @@ -47,13 +46,13 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer auditor; + protected final DataFrameAuditor auditor; private Pivot pivot; private int pageSize = 0; public DataFrameIndexer(Executor executor, - Auditor auditor, + DataFrameAuditor auditor, AtomicReference initialState, Map initialPosition, DataFrameIndexerTransformStats jobStats) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index e3c27fd21fe..b6f38a5dd23 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -18,15 +18,14 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import java.util.Map; @@ -40,13 +39,13 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; - private final Auditor auditor; + private final DataFrameAuditor auditor; public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine, - Auditor auditor, + DataFrameAuditor auditor, ThreadPool threadPool) { super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); this.client = client; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b8ceb2e7bd4..f142fc36179 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -24,13 +24,11 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.common.notifications.Auditor; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -40,6 +38,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; @@ -64,7 +63,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; private final DataFrameIndexer indexer; - private final Auditor auditor; + private final DataFrameAuditor auditor; private final DataFrameIndexerTransformStats previousStats; private final AtomicReference taskState; @@ -77,7 +76,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformsCheckpointService transformsCheckpointService, - SchedulerEngine schedulerEngine, Auditor auditor, + SchedulerEngine schedulerEngine, DataFrameAuditor auditor, ThreadPool threadPool, Map headers) { super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); this.transform = transform; @@ -309,7 +308,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference initialState, Map initialPosition, Client client, - Auditor auditor) { + DataFrameAuditor auditor) { super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition, new DataFrameIndexerTransformStats(transformId)); this.transformId = transformId; diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index b121e8091c1..89388d82e08 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -21,12 +21,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.common.notifications.Auditor; -import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import org.junit.Before; @@ -66,7 +65,7 @@ public class DataFrameIndexerTests extends ESTestCase { Executor executor, DataFrameTransformConfig transformConfig, Map fieldMappings, - Auditor auditor, + DataFrameAuditor auditor, AtomicReference initialState, Map initialPosition, DataFrameIndexerTransformStats jobStats, @@ -202,8 +201,7 @@ public class DataFrameIndexerTests extends ESTestCase { final ExecutorService executor = Executors.newFixedThreadPool(1); try { - Auditor auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, - DataFrameAuditMessage.builder()); + DataFrameAuditor auditor = new DataFrameAuditor(client, "node_1"); MockedDataFrameIndexer indexer = new MockedDataFrameIndexer(executor, config, Collections.emptyMap(), auditor, state, null, new DataFrameIndexerTransformStats(config.getId()), searchFunction, bulkFunction, failureConsumer); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index 00b09133002..353fc0aa932 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -188,24 +188,3 @@ setup: from: 0 size: 10000 ---- -"Verify put transform creates destination index with appropriate mapping": - - do: - data_frame.put_data_frame_transform: - transform_id: "airline-transform" - body: > - { - "source": { "index": "airline-data" }, - "dest": { "index": "airline-data-by-airline" }, - "pivot": { - "group_by": { "airline": {"terms": {"field": "airline"}}}, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}} - } - } - - match: { acknowledged: true } - - do: - indices.get_mapping: - index: airline-data-by-airline - - match: { airline-data-by-airline.mappings.properties.airline.type: keyword } - - match: { airline-data-by-airline.mappings.properties.avg_response.type: double } - - match: { airline-data-by-airline.mappings.properties.time.type: date } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 96f6b6d0a41..f1ac07b7234 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -64,6 +64,31 @@ teardown: transform_id: "airline-transform-start-stop" --- +"Verify start transform creates destination index with appropriate mapping": + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { started: true } + - do: + indices.get_mapping: + index: airline-data-by-airline-start-stop + - match: { airline-data-by-airline-start-stop.mappings.properties.airline.type: keyword } + - match: { airline-data-by-airline-start-stop.mappings.properties.avg_response.type: double } + +--- +"Verify start transform reuses destination index": + - do: + indices.create: + index: airline-data-by-airline-start-stop + - do: + data_frame.start_data_frame_transform: + transform_id: "airline-transform-start-stop" + - match: { started: true } + - do: + indices.get_mapping: + index: airline-data-by-airline-start-stop + - match: { airline-data-by-airline-start-stop.mappings: {} } +--- "Test start/stop/start transform": - do: data_frame.start_data_frame_transform: