Remove JobDataDeleterFactory and OldDataDeleter (elastic/elasticsearch#759)

Original commit: elastic/x-pack-elasticsearch@ac5b75eb58
This commit is contained in:
David Kyle 2017-01-20 09:49:24 +00:00 committed by GitHub
parent 9665368755
commit 2eb0499454
6 changed files with 21 additions and 96 deletions

View File

@ -64,7 +64,6 @@ import org.elasticsearch.xpack.ml.job.manager.JobManager;
import org.elasticsearch.xpack.ml.job.metadata.MlInitializationService;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
@ -207,7 +206,6 @@ public class MlPlugin extends Plugin implements ActionPlugin {
return Arrays.asList(
jobProvider,
jobManager,
new JobDataDeleterFactory(client), //NORELEASE: this should use Delete-by-query
dataProcessor,
new MlInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister,

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
@ -29,7 +30,6 @@ import org.elasticsearch.xpack.ml.job.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.manager.JobManager;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.QueryPage;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -129,21 +129,21 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final JobProvider jobProvider;
private final JobManager jobManager;
private final ClusterService clusterService;
private final JobDataDeleterFactory bulkDeleterFactory;
@Inject
public TransportAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
JobProvider jobProvider, JobManager jobManager, ClusterService clusterService,
JobDataDeleterFactory bulkDeleterFactory) {
Client client) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
this.clusterService = clusterService;
this.bulkDeleterFactory = bulkDeleterFactory;
}
@Override
@ -178,7 +178,7 @@ public class DeleteModelSnapshotAction extends Action<DeleteModelSnapshotAction.
}
// Delete the snapshot and any associated state files
JobDataDeleter deleter = bulkDeleterFactory.apply(request.getJobId());
JobDataDeleter deleter = new JobDataDeleter(client, request.getJobId());
deleter.deleteModelSnapshot(deleteCandidate);
deleter.commit(new ActionListener<BulkResponse>() {
@Override

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -42,9 +43,8 @@ import org.elasticsearch.xpack.ml.job.manager.JobManager;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.metadata.Allocation;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleterFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.OldDataRemover;
import org.elasticsearch.xpack.ml.job.persistence.QueryPage;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
@ -52,6 +52,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
public class RevertModelSnapshotAction
@ -267,20 +268,19 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
public static class TransportAction extends TransportMasterNodeAction<Request, Response> {
private final Client client;
private final JobManager jobManager;
private final JobProvider jobProvider;
private final JobDataDeleterFactory bulkDeleterFactory;
private final JobDataCountsPersister jobDataCountsPersister;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, JobProvider jobProvider,
ClusterService clusterService, JobDataDeleterFactory bulkDeleterFactory,
JobDataCountsPersister jobDataCountsPersister) {
ClusterService clusterService, Client client, JobDataCountsPersister jobDataCountsPersister) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.bulkDeleterFactory = bulkDeleterFactory;
this.jobDataCountsPersister = jobDataCountsPersister;
}
@ -351,21 +351,22 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
logger.info("Deleting results after '" + deleteAfter + "'");
// NORELEASE: OldDataRemover is basically delete-by-query.
// We should replace this
// whole abstraction with DBQ eventually
OldDataRemover remover = new OldDataRemover(bulkDeleterFactory);
remover.deleteResultsAfter(new ActionListener<BulkResponse>() {
// NORELEASE: JobDataDeleter is basically delete-by-query.
// We should replace this whole abstraction with DBQ eventually
JobDataDeleter dataDeleter = new JobDataDeleter(client, jobId);
dataDeleter.deleteResultsFromTime(deleteAfter.getTime() + 1, new ActionListener<Boolean>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
listener.onResponse(response);
public void onResponse(Boolean success) {
dataDeleter.commit(ActionListener.wrap(
bulkItemResponses -> {listener.onResponse(response);},
listener::onFailure));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, jobId, deleteAfter.getTime() + 1);
});
}
}, listener::onFailure);
}

View File

@ -213,8 +213,7 @@ public class JobDataDeleter {
totalDeletedCount.addAndGet(searchResponse.getHits().hits().length);
if (totalDeletedCount.get() < searchResponse.getHits().totalHits()) {
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION)
.execute(this);
client.prepareSearchScroll(searchResponse.getScrollId()).setScroll(SCROLL_CONTEXT_DURATION).execute(this);
}
else {
scrollFinishedListener.onResponse(true);

View File

@ -1,28 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.client.Client;
import java.util.function.Function;
/**
* TODO This is all just silly static typing shenanigans because Guice can't inject
* anonymous lambdas. This can all be removed once Guice goes away.
*/
public class JobDataDeleterFactory implements Function<String, JobDataDeleter> {
private final Client client;
public JobDataDeleterFactory(Client client) {
this.client = client;
}
@Override
public JobDataDeleter apply(String jobId) {
return new JobDataDeleter(client, jobId);
}
}

View File

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import java.util.Objects;
import java.util.function.Function;
/**
* A class that removes results from all the jobs that
* have expired their respected retention time.
*/
public class OldDataRemover {
private final Function<String, JobDataDeleter> dataDeleterFactory;
public OldDataRemover(Function<String, JobDataDeleter> dataDeleterFactory) {
this.dataDeleterFactory = Objects.requireNonNull(dataDeleterFactory);
}
/**
* Removes results between the time given and the current time
*/
public void deleteResultsAfter(ActionListener<BulkResponse> listener, String jobId, long cutoffEpochMs) {
JobDataDeleter deleter = dataDeleterFactory.apply(jobId);
deleter.deleteResultsFromTime(cutoffEpochMs, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
if (success) {
deleter.commit(listener);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}