[ML][Data Frame] Moving destination creation to _start (#41416) (#41433)

* [ML][Data Frame] Moving destination creation to _start

* slight refactor of DataFrameAuditor constructor
This commit is contained in:
Benjamin Trent 2019-04-23 09:32:57 -05:00 committed by GitHub
parent 85b9dc18a7
commit e2f8ffdde8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 173 additions and 117 deletions

View File

@ -22,7 +22,6 @@ public class DataFrameMessages {
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists";
public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";

View File

@ -37,9 +37,10 @@ public class DataFrameMetaDataIT extends DataFrameRestTestCase {
indicesCreated = true;
}
public void testMetaData() throws IOException {
public void testMetaData() throws Exception {
long testStarted = System.currentTimeMillis();
createPivotReviewsTransform("test_meta", "pivot_reviews", null);
startAndWaitForTransform("test_meta", "pivot_reviews");
Response mappingResponse = client().performRequest(new Request("GET", "pivot_reviews/_mapping"));

View File

@ -115,9 +115,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex);
assertTrue(indexExists(dataFrameIndex));
// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@ -174,9 +174,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));
// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@ -228,9 +228,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));
// we expect 21 documents as there shall be 21 days worth of docs
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@ -301,9 +301,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));
// we expect 21 documents as there shall be 21 days worth of docs
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@ -351,9 +351,9 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
createDataframeTransformRequest.setJsonEntity(config);
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
assertTrue(indexExists(dataFrameIndex));
// we expect 27 documents as there shall be 27 user_id's
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");

View File

@ -164,7 +164,6 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertTrue(indexExists(dataFrameIndex));
}
protected void startDataframeTransform(String transformId, boolean force) throws IOException {
@ -195,6 +194,7 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
// start the transform
startDataframeTransform(transformId, false, authHeader);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
refreshIndex(dataFrameIndex);

View File

@ -40,7 +40,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@ -49,7 +48,6 @@ import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
@ -60,6 +58,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransform
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
@ -85,7 +84,6 @@ import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin {
@ -102,7 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
private final Settings settings;
private final boolean transportClientMode;
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
private final SetOnce<Auditor<DataFrameAuditMessage>> dataFrameAuditor = new SetOnce<>();
private final SetOnce<DataFrameAuditor> dataFrameAuditor = new SetOnce<>();
private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>();
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
@ -184,11 +182,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
if (enabled == false || transportClientMode) {
return emptyList();
}
dataFrameAuditor.set(new Auditor<>(client,
clusterService.getNodeName(),
DataFrameInternalIndex.AUDIT_INDEX,
DATA_FRAME_ORIGIN,
DataFrameAuditMessage.builder()));
dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName()));
dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get()));

View File

@ -11,8 +11,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -50,7 +48,6 @@ import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
import org.elasticsearch.xpack.core.security.support.Exceptions;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
@ -117,18 +114,6 @@ public class TransportPutDataFrameTransformAction
return;
}
final String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex());
if (dest.length > 0) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS,
config.getDestination().getIndex()),
RestStatus.BAD_REQUEST));
return;
}
for(String src : config.getSource().getIndex()) {
if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) {
listener.onFailure(new ElasticsearchStatusException(
@ -145,9 +130,19 @@ public class TransportPutDataFrameTransformAction
.indices(config.getSource().getIndex())
.privileges("read")
.build();
String[] destPrivileges = new String[3];
destPrivileges[0] = "read";
destPrivileges[1] = "index";
// If the destination index does not exist, we can assume that we may have to create it on start.
// We should check that the creating user has the privileges to create the index.
if (indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex()).length == 0) {
destPrivileges[2] = "create_index";
}
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
.indices(config.getDestination().getIndex())
.privileges("read", "index", "create_index")
.privileges(destPrivileges)
.build();
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
@ -202,41 +197,12 @@ public class TransportPutDataFrameTransformAction
// <5> Return the listener, or clean up destination index on failure.
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
putTransformConfigurationResult -> listener.onResponse(new Response(true)),
putTransformConfigurationException ->
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
DeleteIndexAction.INSTANCE,
new DeleteIndexRequest(config.getDestination().getIndex()), ActionListener.wrap(
deleteIndexResponse -> listener.onFailure(putTransformConfigurationException),
deleteIndexException -> {
String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed";
listener.onFailure(
new ElasticsearchStatusException(msg,
RestStatus.INTERNAL_SERVER_ERROR,
putTransformConfigurationException));
})
)
listener::onFailure
);
// <4> Put our transform
ActionListener<Boolean> createDestinationIndexListener = ActionListener.wrap(
createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
createDestinationIndexException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX,
createDestinationIndexException))
);
// <3> Create the destination index
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener),
deduceTargetMappingsException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
deduceTargetMappingsException))
);
// <2> Deduce our mappings for the destination index
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
validationResult -> pivot.deduceMappings(client, deduceMappingsListener),
validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
validationException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException))

View File

@ -6,10 +6,14 @@
package org.elasticsearch.xpack.dataframe.action;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -35,32 +39,40 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class TransportStartDataFrameTransformAction extends
TransportMasterNodeAction<StartDataFrameTransformAction.Request, StartDataFrameTransformAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportStartDataFrameTransformAction.class);
private final XPackLicenseState licenseState;
private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final DataFrameAuditor auditor;
@Inject
public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, XPackLicenseState licenseState,
ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
PersistentTasksService persistentTasksService, Client client) {
PersistentTasksService persistentTasksService, Client client,
DataFrameAuditor auditor) {
super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
StartDataFrameTransformAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.auditor = auditor;
}
@Override
@ -83,7 +95,7 @@ public class TransportStartDataFrameTransformAction extends
}
final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
// <3> Set the allocated task's state to STARTED
// <4> Set the allocated task's state to STARTED
ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> persistentTaskActionListener = ActionListener.wrap(
task -> {
waitForDataFrameTaskAllocated(task.getId(),
@ -102,16 +114,9 @@ public class TransportStartDataFrameTransformAction extends
listener::onFailure
);
// <2> Create the task in cluster state so that it will start executing on the node
ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
config -> {
if (config.isValid() == false) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
RestStatus.BAD_REQUEST
));
return;
}
// <3> Create the task in cluster state so that it will start executing on the node
ActionListener<Void> createOrGetIndexListener = ActionListener.wrap(
unused -> {
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
getExistingTask(transformTask.getId(), state);
if (existingTask == null) {
@ -123,14 +128,14 @@ public class TransportStartDataFrameTransformAction extends
DataFrameTransformState transformState = (DataFrameTransformState)existingTask.getState();
if(transformState.getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(new ElasticsearchStatusException(
"Unable to start data frame transform [" + config.getId() +
"Unable to start data frame transform [" + request.getId() +
"] as it is in a failed state with failure: [" + transformState.getReason() +
"]. Use force start to restart data frame transform once error is resolved.",
RestStatus.CONFLICT));
} else if (transformState.getTaskState() != DataFrameTransformTaskState.STOPPED &&
transformState.getTaskState() != DataFrameTransformTaskState.FAILED) {
listener.onFailure(new ElasticsearchStatusException(
"Unable to start data frame transform [" + config.getId() +
"Unable to start data frame transform [" + request.getId() +
"] as it is in state [" + transformState.getTaskState() + "]", RestStatus.CONFLICT));
} else {
persistentTaskActionListener.onResponse(existingTask);
@ -140,10 +145,80 @@ public class TransportStartDataFrameTransformAction extends
listener::onFailure
);
// <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
config -> {
if (config.isValid() == false) {
listener.onFailure(new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
RestStatus.BAD_REQUEST
));
return;
}
final String destinationIndex = config.getDestination().getIndex();
String[] dest = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.lenientExpandOpen(),
destinationIndex);
if(dest.length == 0) {
auditor.info(request.getId(),
"Could not find destination index [" + destinationIndex + "]." +
" Creating index with deduced mappings.");
createDestinationIndex(config, createOrGetIndexListener);
} else {
auditor.info(request.getId(), "Destination index [" + destinationIndex + "] already exists.");
ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ClientHelper.DATA_FRAME_ORIGIN,
client.admin()
.indices()
.prepareStats(dest)
.clear()
.setDocs(true)
.request(),
ActionListener.<IndicesStatsResponse>wrap(
r -> {
long docTotal = r.getTotal().docs.getCount();
if (docTotal > 0L) {
auditor.warning(request.getId(), "Non-empty destination index [" + destinationIndex + "]. " +
"Contains [" + docTotal + "] total documents.");
}
createOrGetIndexListener.onResponse(null);
},
e -> {
String msg = "Unable to determine destination index stats, error: " + e.getMessage();
logger.error(msg, e);
auditor.warning(request.getId(), msg);
createOrGetIndexListener.onResponse(null);
}),
client.admin().indices()::stats);
}
},
listener::onFailure
);
// <1> Get the config to verify it exists and is valid
dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
}
private void createDestinationIndex(final DataFrameTransformConfig config, final ActionListener<Void> listener) {
final Pivot pivot = new Pivot(config.getSource().getIndex(),
config.getSource().getQueryConfig().getQuery(),
config.getPivotConfig());
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
mappings -> DataframeIndex.createDestinationIndex(client,
config,
mappings,
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)),
deduceTargetMappingsException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
deduceTargetMappingsException))
);
pivot.deduceMappings(client, deduceMappingsListener);
}
@Override
protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
/**
* DataFrameAuditor class that abstracts away generic templating for easier injection
*/
public class DataFrameAuditor extends Auditor<DataFrameAuditMessage> {
public DataFrameAuditor(Client client, String nodeName) {
super(client, nodeName, DataFrameInternalIndex.AUDIT_INDEX, DATA_FRAME_ORIGIN, DataFrameAuditMessage.builder());
}
}

View File

@ -19,15 +19,14 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import java.io.IOException;
@ -47,13 +46,13 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
public static final String COMPOSITE_AGGREGATION_NAME = "_data_frame";
private static final Logger logger = LogManager.getLogger(DataFrameIndexer.class);
protected final Auditor<DataFrameAuditMessage> auditor;
protected final DataFrameAuditor auditor;
private Pivot pivot;
private int pageSize = 0;
public DataFrameIndexer(Executor executor,
Auditor<DataFrameAuditMessage> auditor,
DataFrameAuditor auditor,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerTransformStats jobStats) {

View File

@ -18,15 +18,14 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.DataFrame;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import java.util.Map;
@ -40,13 +39,13 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final Auditor<DataFrameAuditMessage> auditor;
private final DataFrameAuditor auditor;
public DataFrameTransformPersistentTasksExecutor(Client client,
DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService,
SchedulerEngine schedulerEngine,
Auditor<DataFrameAuditMessage> auditor,
DataFrameAuditor auditor,
ThreadPool threadPool) {
super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME);
this.client = client;

View File

@ -24,13 +24,11 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@ -40,6 +38,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil;
@ -64,7 +63,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final DataFrameIndexer indexer;
private final Auditor<DataFrameAuditMessage> auditor;
private final DataFrameAuditor auditor;
private final DataFrameIndexerTransformStats previousStats;
private final AtomicReference<DataFrameTransformTaskState> taskState;
@ -77,7 +76,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService,
SchedulerEngine schedulerEngine, Auditor<DataFrameAuditMessage> auditor,
SchedulerEngine schedulerEngine, DataFrameAuditor auditor,
ThreadPool threadPool, Map<String, String> headers) {
super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
this.transform = transform;
@ -309,7 +308,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService,
AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
Auditor<DataFrameAuditMessage> auditor) {
DataFrameAuditor auditor) {
super(threadPool.executor(ThreadPool.Names.GENERIC), auditor, initialState, initialPosition,
new DataFrameIndexerTransformStats(transformId));
this.transformId = transformId;

View File

@ -21,12 +21,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
import org.junit.Before;
@ -66,7 +65,7 @@ public class DataFrameIndexerTests extends ESTestCase {
Executor executor,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
Auditor<DataFrameAuditMessage> auditor,
DataFrameAuditor auditor,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerTransformStats jobStats,
@ -202,8 +201,7 @@ public class DataFrameIndexerTests extends ESTestCase {
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
Auditor<DataFrameAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN,
DataFrameAuditMessage.builder());
DataFrameAuditor auditor = new DataFrameAuditor(client, "node_1");
MockedDataFrameIndexer indexer = new MockedDataFrameIndexer(executor, config, Collections.emptyMap(), auditor, state, null,
new DataFrameIndexerTransformStats(config.getId()), searchFunction, bulkFunction, failureConsumer);

View File

@ -188,24 +188,3 @@ setup:
from: 0
size: 10000
---
"Verify put transform creates destination index with appropriate mapping":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}}
}
}
- match: { acknowledged: true }
- do:
indices.get_mapping:
index: airline-data-by-airline
- match: { airline-data-by-airline.mappings.properties.airline.type: keyword }
- match: { airline-data-by-airline.mappings.properties.avg_response.type: double }
- match: { airline-data-by-airline.mappings.properties.time.type: date }

View File

@ -64,6 +64,31 @@ teardown:
transform_id: "airline-transform-start-stop"
---
"Verify start transform creates destination index with appropriate mapping":
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
indices.get_mapping:
index: airline-data-by-airline-start-stop
- match: { airline-data-by-airline-start-stop.mappings.properties.airline.type: keyword }
- match: { airline-data-by-airline-start-stop.mappings.properties.avg_response.type: double }
---
"Verify start transform reuses destination index":
- do:
indices.create:
index: airline-data-by-airline-start-stop
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { started: true }
- do:
indices.get_mapping:
index: airline-data-by-airline-start-stop
- match: { airline-data-by-airline-start-stop.mappings: {} }
---
"Test start/stop/start transform":
- do:
data_frame.start_data_frame_transform: