parent
8745d38dc8
commit
d5ca72740e
|
@ -482,8 +482,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
|
|
||||||
static class ClientDataFrameIndexer extends DataFrameIndexer {
|
static class ClientDataFrameIndexer extends DataFrameIndexer {
|
||||||
|
|
||||||
private static final int ON_FINISH_AUDIT_FREQUENCY = 1000;
|
private long logEvery = 1;
|
||||||
|
private long logCount = 0;
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final DataFrameTransformsConfigManager transformsConfigManager;
|
private final DataFrameTransformsConfigManager transformsConfigManager;
|
||||||
private final DataFrameTransformsCheckpointService transformsCheckpointService;
|
private final DataFrameTransformsCheckpointService transformsCheckpointService;
|
||||||
|
@ -711,7 +711,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
nextCheckpoint = null;
|
nextCheckpoint = null;
|
||||||
// Reset our failure count as we have finished and may start again with a new checkpoint
|
// Reset our failure count as we have finished and may start again with a new checkpoint
|
||||||
failureCount.set(0);
|
failureCount.set(0);
|
||||||
if (checkpoint % ON_FINISH_AUDIT_FREQUENCY == 0) {
|
if (shouldAuditOnFinish(checkpoint)) {
|
||||||
auditor.info(transformTask.getTransformId(),
|
auditor.info(transformTask.getTransformId(),
|
||||||
"Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
|
"Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
|
||||||
}
|
}
|
||||||
|
@ -724,6 +724,26 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates if an audit message should be written when onFinish is called for the given checkpoint
|
||||||
|
* We audit the first checkpoint, and then every 10 checkpoints until completedCheckpoint == 99
|
||||||
|
* Then we audit every 100, until completedCheckpoint == 999
|
||||||
|
*
|
||||||
|
* Then we always audit every 1_000 checkpoints
|
||||||
|
*
|
||||||
|
* @param completedCheckpoint The checkpoint that was just completed
|
||||||
|
* @return {@code true} if an audit message should be written
|
||||||
|
*/
|
||||||
|
protected boolean shouldAuditOnFinish(long completedCheckpoint) {
|
||||||
|
if (++logCount % logEvery != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
int log10Checkpoint = (int) Math.floor(Math.log10(completedCheckpoint + 1));
|
||||||
|
logEvery = log10Checkpoint >= 3 ? 1_000 : (int)Math.pow(10.0, log10Checkpoint);
|
||||||
|
logCount = 0;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void onStop() {
|
protected void onStop() {
|
||||||
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
|
auditor.info(transformConfig.getId(), "Data frame transform has stopped.");
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.dataframe.transforms;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
|
||||||
|
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
|
||||||
|
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
|
||||||
|
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
|
||||||
|
import org.elasticsearch.xpack.core.indexing.IndexerState;
|
||||||
|
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
|
||||||
|
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
|
||||||
|
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
|
||||||
|
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class ClientDataFrameIndexerTests extends ESTestCase {
|
||||||
|
|
||||||
|
public void testAudiOnFinishFrequency() {
|
||||||
|
ThreadPool threadPool = mock(ThreadPool.class);
|
||||||
|
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
|
||||||
|
DataFrameTransformTask parentTask = new DataFrameTransformTask(1,
|
||||||
|
"dataframe",
|
||||||
|
"ptask",
|
||||||
|
new TaskId("dataframe:1"),
|
||||||
|
mock(DataFrameTransform.class),
|
||||||
|
null,
|
||||||
|
mock(SchedulerEngine.class),
|
||||||
|
mock(DataFrameAuditor.class),
|
||||||
|
threadPool,
|
||||||
|
Collections.emptyMap());
|
||||||
|
DataFrameTransformTask.ClientDataFrameIndexer indexer = new DataFrameTransformTask.ClientDataFrameIndexer(randomAlphaOfLength(10),
|
||||||
|
mock(DataFrameTransformsConfigManager.class),
|
||||||
|
mock(DataFrameTransformsCheckpointService.class),
|
||||||
|
new AtomicReference<>(IndexerState.STOPPED),
|
||||||
|
null,
|
||||||
|
mock(Client.class),
|
||||||
|
mock(DataFrameAuditor.class),
|
||||||
|
mock(DataFrameIndexerTransformStats.class),
|
||||||
|
mock(DataFrameTransformConfig.class),
|
||||||
|
Collections.emptyMap(),
|
||||||
|
null,
|
||||||
|
new DataFrameTransformCheckpoint("transform",
|
||||||
|
Instant.now().toEpochMilli(),
|
||||||
|
0L,
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Instant.now().toEpochMilli()),
|
||||||
|
new DataFrameTransformCheckpoint("transform",
|
||||||
|
Instant.now().toEpochMilli(),
|
||||||
|
2L,
|
||||||
|
Collections.emptyMap(),
|
||||||
|
Instant.now().toEpochMilli()),
|
||||||
|
parentTask);
|
||||||
|
|
||||||
|
List<Boolean> shouldAudit = IntStream.range(0, 100_000).boxed().map(indexer::shouldAuditOnFinish).collect(Collectors.toList());
|
||||||
|
|
||||||
|
// Audit every checkpoint for the first 10
|
||||||
|
assertTrue(shouldAudit.get(0));
|
||||||
|
assertTrue(shouldAudit.get(1));
|
||||||
|
assertTrue(shouldAudit.get(9));
|
||||||
|
|
||||||
|
// Then audit every 10 while < 100
|
||||||
|
assertFalse(shouldAudit.get(10));
|
||||||
|
assertFalse(shouldAudit.get(11));
|
||||||
|
assertTrue(shouldAudit.get(19));
|
||||||
|
assertTrue(shouldAudit.get(29));
|
||||||
|
assertFalse(shouldAudit.get(30));
|
||||||
|
assertTrue(shouldAudit.get(99));
|
||||||
|
|
||||||
|
// Then audit every 100 < 1000
|
||||||
|
assertFalse(shouldAudit.get(100));
|
||||||
|
assertFalse(shouldAudit.get(109));
|
||||||
|
assertFalse(shouldAudit.get(110));
|
||||||
|
assertTrue(shouldAudit.get(199));
|
||||||
|
|
||||||
|
// Then audit every 1000 for the rest of time
|
||||||
|
assertTrue(shouldAudit.get(1999));
|
||||||
|
assertFalse(shouldAudit.get(2199));
|
||||||
|
assertTrue(shouldAudit.get(2999));
|
||||||
|
assertTrue(shouldAudit.get(9999));
|
||||||
|
assertTrue(shouldAudit.get(10_999));
|
||||||
|
assertFalse(shouldAudit.get(11_000));
|
||||||
|
assertTrue(shouldAudit.get(11_999));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue