Make persist DataCounts a non-blocking operation (elastic/elasticsearch#447)

* Make persist DataCounts a non-blocking operation

* Add trace and debug level logging to the persist data counts action listener.

Remove dead code from test

Original commit: elastic/x-pack-elasticsearch@84bbfa880a
This commit is contained in:
David Kyle 2016-12-02 17:44:53 +00:00 committed by GitHub
parent f10c4818e7
commit 850e43028b
7 changed files with 80 additions and 53 deletions

View File

@ -157,7 +157,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
ElasticsearchJobProvider jobProvider = new ElasticsearchJobProvider(client, 0, parseFieldMatcherSupplier.getParseFieldMatcher());
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, jobDataCountsPersister, clusterService);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
AutodetectProcessFactory processFactory;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
try {
@ -182,7 +182,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
new JobLifeCycleService(settings, client, clusterService, scheduledJobService, dataProcessor, threadPool.generic()),
new ElasticsearchBulkDeleterFactory(client), //NORELEASE: this should use Delete-by-query
dataProcessor,
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider)
new PrelertInitializationService(settings, threadPool, clusterService, jobProvider),
jobDataCountsPersister
);
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobStatus;
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
@ -44,11 +45,11 @@ import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchBulkDeleterFactory;
import org.elasticsearch.xpack.prelert.job.persistence.ElasticsearchJobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.OldDataRemover;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
import org.elasticsearch.xpack.prelert.utils.ExceptionsHelper;
import org.elasticsearch.xpack.prelert.utils.SingleDocument;
import java.io.IOException;
import java.util.Date;
@ -311,15 +312,18 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
private final JobManager jobManager;
private final JobProvider jobProvider;
private final ElasticsearchBulkDeleterFactory bulkDeleterFactory;
private final JobDataCountsPersister jobDataCountsPersister;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, JobManager jobManager, ElasticsearchJobProvider jobProvider,
ClusterService clusterService, ElasticsearchBulkDeleterFactory bulkDeleterFactory) {
ClusterService clusterService, ElasticsearchBulkDeleterFactory bulkDeleterFactory,
JobDataCountsPersister jobDataCountsPersister) {
super(settings, NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, Request::new);
this.jobManager = jobManager;
this.jobProvider = jobProvider;
this.bulkDeleterFactory = bulkDeleterFactory;
this.jobDataCountsPersister = jobDataCountsPersister;
}
@Override
@ -351,7 +355,8 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
ModelSnapshot modelSnapshot = getModelSnapshot(request, jobProvider);
if (request.getDeleteInterveningResults()) {
listener = wrapListener(listener, modelSnapshot, request.getJobId());
listener = wrapDeleteOldDataListener(listener, modelSnapshot, request.getJobId());
listener = wrapRevertDataCountsListener(listener, modelSnapshot, request.getJobId());
}
jobManager.revertSnapshot(request, listener, modelSnapshot);
}
@ -374,12 +379,12 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
return modelSnapshot;
}
private ActionListener<RevertModelSnapshotAction.Response> wrapListener(ActionListener<RevertModelSnapshotAction.Response> listener,
private ActionListener<RevertModelSnapshotAction.Response> wrapDeleteOldDataListener(
ActionListener<RevertModelSnapshotAction.Response> listener,
ModelSnapshot modelSnapshot, String jobId) {
// If we need to delete buckets that occurred after the snapshot, we
// wrap
// the listener with one that invokes the OldDataRemover on
// wrap the listener with one that invokes the OldDataRemover on
// acknowledged responses
return ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
@ -408,6 +413,30 @@ extends Action<RevertModelSnapshotAction.Request, RevertModelSnapshotAction.Resp
}, listener::onFailure);
}
private ActionListener<RevertModelSnapshotAction.Response> wrapRevertDataCountsListener(
ActionListener<RevertModelSnapshotAction.Response> listener,
ModelSnapshot modelSnapshot, String jobId) {
return ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
DataCounts counts = jobProvider.dataCounts(jobId);
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
jobDataCountsPersister.persistDataCounts(jobId, counts, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}, listener::onFailure);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.prelert.action.StartJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.StopJobSchedulerAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobSchedulerStatusAction;
import org.elasticsearch.xpack.prelert.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.IgnoreDowntime;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.job.JobSchedulerStatus;
@ -35,7 +34,6 @@ import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import org.elasticsearch.xpack.prelert.job.messages.Messages;
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
@ -71,19 +69,17 @@ public class JobManager extends AbstractComponent {
private final JobProvider jobProvider;
private final ClusterService clusterService;
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
/**
* Create a JobManager
*/
public JobManager(Settings settings, JobProvider jobProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister, ClusterService clusterService) {
ClusterService clusterService) {
super(settings);
this.jobProvider = Objects.requireNonNull(jobProvider);
this.clusterService = clusterService;
this.jobResultsPersister = jobResultsPersister;
this.jobDataCountsPersister = jobDataCountsPersister;
}
/**
@ -441,11 +437,6 @@ public class JobManager extends AbstractComponent {
builder.setModelSnapshotId(modelSnapshot.getSnapshotId());
if (request.getDeleteInterveningResults()) {
builder.setIgnoreDowntime(IgnoreDowntime.NEVER);
DataCounts counts = jobProvider.dataCounts(request.getJobId());
counts.setLatestRecordTimeStamp(modelSnapshot.getLatestRecordTimeStamp());
// NORELEASE This update should be async. See #127
jobDataCountsPersister.persistDataCounts(request.getJobId(), counts);
} else {
builder.setIgnoreDowntime(IgnoreDowntime.ONCE);
}

View File

@ -10,6 +10,8 @@ import java.util.Locale;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -44,18 +46,26 @@ public class JobDataCountsPersister extends AbstractComponent {
*
* @param jobId Job to update
* @param counts The counts
* @param listener Action response listener
*/
public void persistDataCounts(String jobId, DataCounts counts) {
public void persistDataCounts(String jobId, DataCounts counts, ActionListener<Boolean> listener) {
try {
XContentBuilder content = serialiseCounts(counts);
client.prepareIndex(getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(),
jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute().actionGet();
client.prepareIndex(getJobIndexName(jobId), DataCounts.TYPE.getPreferredName(), jobId + DataCounts.DOCUMENT_SUFFIX)
.setSource(content).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
listener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (IOException ioe) {
logger.warn((Supplier<?>)() -> new ParameterizedMessage("[{}] Error serialising DataCounts stats", jobId), ioe);
} catch (IndexNotFoundException e) {
String msg = String.format(Locale.ROOT, "[%s] Error writing status stats.", jobId);
logger.warn(msg, e);
}
}
}

View File

@ -5,13 +5,14 @@
*/
package org.elasticsearch.xpack.prelert.job.status;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.prelert.PrelertPlugin;
import org.elasticsearch.xpack.prelert.job.DataCounts;
import org.elasticsearch.xpack.prelert.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.prelert.job.usage.UsageReporter;
@ -78,7 +79,6 @@ public class StatusReporter extends AbstractComponent implements Closeable {
private volatile boolean persistDataCountsOnNextRecord;
private final ThreadPool.Cancellable persistDataCountsScheduledAction;
private final ThreadPool threadPool;
public StatusReporter(ThreadPool threadPool, Settings settings, String jobId, DataCounts counts, UsageReporter usageReporter,
JobDataCountsPersister dataCountsPersister) {
@ -97,7 +97,6 @@ public class StatusReporter extends AbstractComponent implements Closeable {
reportingBoundaryFunction = this::reportEvery100Records;
this.threadPool = threadPool;
persistDataCountsScheduledAction = threadPool.scheduleWithFixedDelay(() -> persistDataCountsOnNextRecord = true,
PERSIST_INTERVAL, ThreadPool.Names.GENERIC);
}
@ -139,7 +138,7 @@ public class StatusReporter extends AbstractComponent implements Closeable {
if (persistDataCountsOnNextRecord) {
DataCounts copy = new DataCounts(runningTotalStats());
threadPool.generic().submit(() -> dataCountsPersister.persistDataCounts(jobId, copy));
dataCountsPersister.persistDataCounts(jobId, copy, new LoggingActionListener());
persistDataCountsOnNextRecord = false;
}
}
@ -268,7 +267,7 @@ public class StatusReporter extends AbstractComponent implements Closeable {
*/
public void finishReporting() {
usageReporter.reportUsage();
dataCountsPersister.persistDataCounts(jobId, runningTotalStats());
dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener());
}
/**
@ -356,4 +355,19 @@ public class StatusReporter extends AbstractComponent implements Closeable {
public void close() {
persistDataCountsScheduledAction.cancel();
}
/**
* Log success/error
*/
private class LoggingActionListener implements ActionListener<Boolean> {
@Override
public void onResponse(Boolean aBoolean) {
logger.trace("[{}] Persisted DataCounts", jobId);
}
@Override
public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", jobId), e);
}
}
}

View File

@ -217,7 +217,7 @@ public class JobManagerTests extends ESTestCase {
private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);
return new JobManager(settings, jobProvider, jobResultsPersister, jobDataCountsPersister, clusterService);
return new JobManager(settings, jobProvider, jobResultsPersister, clusterService);
}
private ClusterState createClusterState() {

View File

@ -18,16 +18,10 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -157,7 +151,7 @@ public class StatusReporterTests extends ESTestCase {
assertEquals(statusReporter.incrementalStats(), statusReporter.runningTotalStats());
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class));
verify(jobDataCountsPersister, never()).persistDataCounts(anyString(), any(DataCounts.class), any());
}
}
@ -262,7 +256,7 @@ public class StatusReporterTests extends ESTestCase {
statusReporter.finishReporting();
Mockito.verify(usageReporter, Mockito.times(1)).reportUsage();
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc));
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc), any());
assertEquals(dc, statusReporter.incrementalStats());
}
}
@ -284,12 +278,6 @@ public class StatusReporterTests extends ESTestCase {
}
});
ExecutorService executorService = mock(ExecutorService.class);
ArgumentCaptor<Runnable> persistTaskCapture = ArgumentCaptor.forClass(Runnable.class);
when(executorService.submit(persistTaskCapture.capture())).thenReturn(null);
when(mockThreadPool.generic()).thenReturn(executorService);
try (StatusReporter statusReporter = new StatusReporter(mockThreadPool, settings, JOB_ID, new DataCounts(JOB_ID), usageReporter,
jobDataCountsPersister)) {
@ -298,16 +286,10 @@ public class StatusReporterTests extends ESTestCase {
statusReporter.reportRecordWritten(5, 2000);
statusReporter.reportRecordWritten(5, 3000);
Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("SR"), any());
Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("SR"), any(), any());
argumentCaptor.getValue().run();
statusReporter.reportRecordWritten(5, 4000);
DataCounts dc = new DataCounts(JOB_ID, 2L, 6L, 0L, 10L, 0L, 0L, 0L, new Date(2000), new Date(4000));
// verify threadpool executor service to do the persistence is launched
Mockito.verify(mockThreadPool, Mockito.times(1)).generic();
// run the captured persist task
persistTaskCapture.getValue().run();
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), any());
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), any(), any());
}
}