simplify indexer by moving members to base class (#41741)

simplify indexer by moving members to base class
This commit is contained in:
Hendrik Muhs 2019-05-02 16:06:31 +02:00
parent 728fe2d409
commit be7ec5a47a
3 changed files with 34 additions and 62 deletions

View File

@ -8,13 +8,11 @@ package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -25,6 +23,7 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.indexing.IterationResult;
@ -50,31 +49,46 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
protected final DataFrameAuditor auditor;
protected final DataFrameTransformConfig transformConfig;
protected volatile DataFrameTransformProgress progress;
private final Map<String, String> fieldMappings;
private Pivot pivot;
private int pageSize = 0;
public DataFrameIndexer(Executor executor,
DataFrameAuditor auditor,
DataFrameTransformConfig transformConfig,
Map<String, String> fieldMappings,
AtomicReference<IndexerState> initialState,
Map<String, Object> initialPosition,
DataFrameIndexerTransformStats jobStats) {
DataFrameIndexerTransformStats jobStats,
DataFrameTransformProgress transformProgress) {
super(executor, initialState, initialPosition, jobStats);
this.auditor = Objects.requireNonNull(auditor);
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
}
protected abstract DataFrameTransformConfig getConfig();
protected abstract Map<String, String> getFieldMappings();
@Nullable
protected abstract DataFrameTransformProgress getProgress();
protected abstract void failIndexer(String message);
public int getPageSize() {
return pageSize;
}
public DataFrameTransformConfig getConfig() {
return transformConfig;
}
public Map<String, String> getFieldMappings() {
return fieldMappings;
}
public DataFrameTransformProgress getProgress() {
return progress;
}
/**
* Request a checkpoint
*/
@ -119,8 +133,8 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
IterationResult<Map<String, Object>> result = new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()),
agg.afterKey(),
agg.getBuckets().isEmpty());
if (getProgress() != null) {
getProgress().docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
if (progress != null) {
progress.docsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
}
return result;
}
@ -215,14 +229,14 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
*/
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = ExceptionsHelper.unwrapCause(e);
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e);
if (unwrappedThrowable instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedThrowable;
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = ExceptionsHelper.unwrapCause(shardFailure.getCause());
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());
if (unwrappedShardFailure instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedShardFailure;

View File

@ -444,11 +444,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final DataFrameAuditor auditor;
private final DataFrameTransformTask transformTask;
private final Map<String, String> fieldMappings;
private final DataFrameTransformConfig transformConfig;
private volatile DataFrameTransformProgress progress;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
private final AtomicInteger failureCount;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
@ -470,19 +466,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
.threadPool
.executor(ThreadPool.Names.GENERIC),
ExceptionsHelper.requireNonNull(auditor, "auditor"),
transformConfig,
fieldMappings,
ExceptionsHelper.requireNonNull(initialState, "initialState"),
initialPosition,
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats);
initialStats == null ? new DataFrameIndexerTransformStats(transformId) : initialStats,
transformProgress);
this.transformId = ExceptionsHelper.requireNonNull(transformId, "transformId");
this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager");
this.transformsCheckpointService = ExceptionsHelper.requireNonNull(transformsCheckpointService,
"transformsCheckpointService");
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.auditor = auditor;
this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig");
this.transformTask = parentTask;
this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings");
this.progress = transformProgress;
this.failureCount = new AtomicInteger(0);
}
@ -510,21 +505,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
}
}
@Override
protected DataFrameTransformConfig getConfig() {
return transformConfig;
}
@Override
protected Map<String, String> getFieldMappings() {
return fieldMappings;
}
@Override
protected DataFrameTransformProgress getProgress() {
return progress;
}
@Override
protected String getJobId() {
return transformId;

View File

@ -24,7 +24,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
@ -32,7 +31,6 @@ import org.junit.Before;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@ -48,13 +46,9 @@ import static org.mockito.Mockito.when;
public class DataFrameIndexerTests extends ESTestCase {
private Client client;
private static final String TEST_ORIGIN = "test_origin";
private static final String TEST_INDEX = "test_index";
class MockedDataFrameIndexer extends DataFrameIndexer {
private final DataFrameTransformConfig transformConfig;
private final Map<String, String> fieldMappings;
private final Function<SearchRequest, SearchResponse> searchFunction;
private final Function<BulkRequest, BulkResponse> bulkFunction;
private final Consumer<Exception> failureConsumer;
@ -73,9 +67,8 @@ public class DataFrameIndexerTests extends ESTestCase {
Function<SearchRequest, SearchResponse> searchFunction,
Function<BulkRequest, BulkResponse> bulkFunction,
Consumer<Exception> failureConsumer) {
super(executor, auditor, initialState, initialPosition, jobStats);
this.transformConfig = Objects.requireNonNull(transformConfig);
this.fieldMappings = Objects.requireNonNull(fieldMappings);
super(executor, auditor, transformConfig, fieldMappings, initialState, initialPosition, jobStats,
/* DataFrameTransformProgress */ null);
this.searchFunction = searchFunction;
this.bulkFunction = bulkFunction;
this.failureConsumer = failureConsumer;
@ -85,21 +78,6 @@ public class DataFrameIndexerTests extends ESTestCase {
return latch = new CountDownLatch(count);
}
@Override
protected DataFrameTransformConfig getConfig() {
return transformConfig;
}
@Override
protected Map<String, String> getFieldMappings() {
return fieldMappings;
}
@Override
protected DataFrameTransformProgress getProgress() {
return null;
}
@Override
protected void createCheckpoint(ActionListener<Void> listener) {
listener.onResponse(null);