PipelineStore no longer is a lifecycle component

Client in PipelineStore gets provided via a guice provider
Processor and Factory throw Exception instead of IOException
Removed PipelineExecutionService.Listener with ActionListener
This commit is contained in:
Martijn van Groningen 2015-11-26 18:12:15 +01:00
parent 9aff8c6352
commit 0fe1b4eab1
19 changed files with 139 additions and 148 deletions

View File

@ -44,7 +44,7 @@ public final class Pipeline {
/**
* Modifies the data of a document to be indexed based on the processor this pipeline holds
*/
public void execute(IngestDocument ingestDocument) {
public void execute(IngestDocument ingestDocument) throws Exception {
for (Processor processor : processors) {
processor.execute(ingestDocument);
}
@ -73,7 +73,7 @@ public final class Pipeline {
public final static class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws IOException {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(config, "description");
List<Processor> processors = new ArrayList<>();
@SuppressWarnings("unchecked")

View File

@ -36,7 +36,7 @@ public interface Processor {
/**
* Introspect and potentially modify the incoming data.
*/
void execute(IngestDocument ingestDocument);
void execute(IngestDocument ingestDocument) throws Exception;
/**
* Gets the type of a processor
@ -54,7 +54,7 @@ public interface Processor {
* Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can
* verify if all configurations settings have been used.
*/
P create(Map<String, Object> config) throws IOException;
P create(Map<String, Object> config) throws Exception;
/**
* Sets the configuration directory when needed to read additional config files

View File

@ -82,15 +82,6 @@ public class IngestPlugin extends Plugin {
}
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(PipelineStore.class);
}
}
@Override
public Settings additionalSettings() {
return settingsBuilder()

View File

@ -19,6 +19,7 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -43,10 +44,10 @@ public class PipelineExecutionService {
this.threadPool = threadPool;
}
public void execute(IndexRequest indexRequest, String pipelineId, Listener listener) {
public void execute(IndexRequest indexRequest, String pipelineId, ActionListener<IngestDocument> listener) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"));
listener.onFailure(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"));
return;
}
@ -81,21 +82,13 @@ public class PipelineExecutionService {
TimeValue timeValue = TimeValue.parseTimeValue(ttlStr, null, "ttl");
indexRequest.ttl(timeValue.millis());
}
listener.executed(ingestDocument);
listener.onResponse(ingestDocument);
} catch (Throwable e) {
listener.failed(e);
listener.onFailure(e);
}
});
}
public interface Listener {
void executed(IngestDocument ingestDocument);
void failed(Throwable e);
}
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {

View File

@ -35,10 +35,10 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.SearchScrollIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -57,15 +57,15 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.*;
public class PipelineStore extends AbstractLifecycleComponent {
public class PipelineStore extends AbstractComponent {
public final static String INDEX = ".ingest";
public final static String TYPE = "pipeline";
private final Injector injector;
private final ThreadPool threadPool;
private final TimeValue scrollTimeout;
private final ClusterService clusterService;
private final Provider<Client> clientProvider;
private final TimeValue pipelineUpdateInterval;
private final Pipeline.Factory factory = new Pipeline.Factory();
private final Map<String, Processor.Factory> processorFactoryRegistry;
@ -74,11 +74,11 @@ public class PipelineStore extends AbstractLifecycleComponent {
private volatile Map<String, PipelineDefinition> pipelines = new HashMap<>();
@Inject
public PipelineStore(Settings settings, Injector injector, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, Processor.Factory> processors) {
public PipelineStore(Settings settings, Provider<Client> clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map<String, Processor.Factory> processors) {
super(settings);
this.injector = injector;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.clientProvider = clientProvider;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1));
for (Processor.Factory factory : processors.values()) {
@ -86,43 +86,43 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
this.processorFactoryRegistry = Collections.unmodifiableMap(processors);
clusterService.add(new PipelineStoreListener());
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeClose() {
// Ideally we would implement Closeable, but when a node is stopped this doesn't get invoked:
try {
IOUtils.close(processorFactoryRegistry.values());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
@Override
protected void doStart() {
client = injector.getInstance(Client.class);
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
try {
IOUtils.close(processorFactoryRegistry.values());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Deletes the pipeline specified by id in the request.
*/
public void delete(DeletePipelineRequest request, ActionListener<DeleteResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(request);
deleteRequest.index(PipelineStore.INDEX);
deleteRequest.type(PipelineStore.TYPE);
deleteRequest.id(request.id());
deleteRequest.refresh(true);
client.delete(deleteRequest, listener);
client().delete(deleteRequest, listener);
}
public void put(PutPipelineRequest request, ActionListener<IndexResponse> listener) {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
/**
* Stores the specified pipeline definition in the request.
*
* @throws IllegalArgumentException If the pipeline holds incorrect configuration
*/
public void put(PutPipelineRequest request, ActionListener<IndexResponse> listener) throws IllegalArgumentException {
try {
// validates the pipeline and processor configuration:
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2();
constructPipeline(request.id(), pipelineConfig);
} catch (IOException e) {
listener.onFailure(e);
return;
} catch (Exception e) {
throw new IllegalArgumentException("Invalid pipeline configuration", e);
}
IndexRequest indexRequest = new IndexRequest(request);
@ -131,9 +131,12 @@ public class PipelineStore extends AbstractLifecycleComponent {
indexRequest.id(request.id());
indexRequest.source(request.source());
indexRequest.refresh(true);
client.index(indexRequest, listener);
client().index(indexRequest, listener);
}
/**
* Returns the pipeline by the specified id
*/
public Pipeline get(String id) {
PipelineDefinition ref = pipelines.get(id);
if (ref != null) {
@ -166,11 +169,11 @@ public class PipelineStore extends AbstractLifecycleComponent {
return result;
}
Pipeline constructPipeline(String id, Map<String, Object> config) throws IOException {
Pipeline constructPipeline(String id, Map<String, Object> config) throws Exception {
return factory.create(id, config, processorFactoryRegistry);
}
synchronized void updatePipelines() throws IOException {
synchronized void updatePipelines() throws Exception {
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
// so for that reason the goal is to keep the update logic simple.
@ -208,14 +211,12 @@ public class PipelineStore extends AbstractLifecycleComponent {
}
void startUpdateWorker() {
if (lifecycleState() == Lifecycle.State.STARTED) {
threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater());
}
threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater());
}
boolean existPipeline(String pipelineId) {
GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId);
GetResponse response = client.get(request).actionGet();
GetResponse response = client().get(request).actionGet();
return response.isExists();
}
@ -227,7 +228,15 @@ public class PipelineStore extends AbstractLifecycleComponent {
SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX);
searchRequest.source(sourceBuilder);
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest);
return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest);
}
private Client client() {
if (client == null) {
client = clientProvider.get();
}
return client;
}
class Updater implements Runnable {

View File

@ -84,15 +84,15 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
chain.proceed(action, indexRequest, listener);
return;
}
executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() {
executionService.execute(indexRequest, pipelineId, new ActionListener<IngestDocument>() {
@Override
public void executed(IngestDocument ingestDocument) {
public void onResponse(IngestDocument ingestDocument) {
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
}
@Override
public void failed(Throwable e) {
public void onFailure(Throwable e) {
logger.error("failed to execute pipeline [{}]", e, pipelineId);
listener.onFailure(e);
}
@ -121,14 +121,14 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
}
IndexRequest indexRequest = (IndexRequest) actionRequest;
executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() {
executionService.execute(indexRequest, pipelineId, new ActionListener<IngestDocument>() {
@Override
public void executed(IngestDocument ingestDocument) {
public void onResponse(IngestDocument ingestDocument) {
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
}
@Override
public void failed(Throwable e) {
public void onFailure(Throwable e) {
logger.debug("failed to execute pipeline [{}]", e, pipelineId);
bulkRequestModifier.markCurrentItemAsFailed(e);
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);

View File

@ -135,7 +135,7 @@ public class SimulatePipelineRequest extends ActionRequest {
return new Parsed(pipeline, ingestDocumentList, verbose);
}
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config);

View File

@ -55,7 +55,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
}
} catch (IOException e) {
} catch (Exception e) {
listener.onFailure(e);
return;
}

View File

@ -41,7 +41,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
protected abstract String expectedResult(String input);
public void testProcessor() throws IOException {
public void testProcessor() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, String> expected = new HashMap<>();
@ -57,7 +57,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
}
}
public void testNullValue() throws IOException {
public void testNullValue() throws Exception {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = newProcessor(Collections.singletonList(fieldName));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
@ -69,7 +69,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase {
}
}
public void testNonStringValue() throws IOException {
public void testNonStringValue() throws Exception {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = newProcessor(Collections.singletonList(fieldName));
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());

View File

@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.equalTo;
public class AddProcessorTests extends ESTestCase {
public void testAddExistingFields() throws IOException {
public void testAddExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, Object> fields = new HashMap<>();
@ -49,7 +49,7 @@ public class AddProcessorTests extends ESTestCase {
}
}
public void testAddNewFields() throws IOException {
public void testAddNewFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
//used to verify that there are no conflicts between subsequent fields going to be added
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
@ -68,7 +68,7 @@ public class AddProcessorTests extends ESTestCase {
}
}
public void testAddFieldsTypeMismatch() throws IOException {
public void testAddFieldsTypeMismatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("field", "value");
Processor processor = new AddProcessor(Collections.singletonMap("field.inner", "value"));

View File

@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.equalTo;
public class ConvertProcessorTests extends ESTestCase {
public void testConvertInt() throws IOException {
public void testConvertInt() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, ConvertProcessor.Type> fields = new HashMap<>();
Map<String, Integer> expectedResult = new HashMap<>();
@ -50,7 +50,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertIntList() throws IOException {
public void testConvertIntList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, ConvertProcessor.Type> fields = new HashMap<>();
Map<String, List<Integer>> expectedResult = new HashMap<>();
@ -75,7 +75,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertIntError() throws IOException {
public void testConvertIntError() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
@ -91,7 +91,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertFloat() throws IOException {
public void testConvertFloat() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, Float> expectedResult = new HashMap<>();
@ -110,7 +110,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertFloatList() throws IOException {
public void testConvertFloatList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<Float>> expectedResult = new HashMap<>();
@ -135,7 +135,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertFloatError() throws IOException {
public void testConvertFloatError() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String value = "string-" + randomAsciiOfLengthBetween(1, 10);
@ -151,7 +151,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertBoolean() throws IOException {
public void testConvertBoolean() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, Boolean> expectedResult = new HashMap<>();
@ -174,7 +174,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertBooleanList() throws IOException {
public void testConvertBooleanList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<Boolean>> expectedResult = new HashMap<>();
@ -203,7 +203,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertBooleanError() throws IOException {
public void testConvertBooleanError() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
String fieldValue;
@ -225,7 +225,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertString() throws IOException {
public void testConvertString() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, String> expectedResult = new HashMap<>();
@ -264,7 +264,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertStringList() throws IOException {
public void testConvertStringList() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, Type> fields = new HashMap<>();
Map<String, List<String>> expectedResult = new HashMap<>();
@ -309,7 +309,7 @@ public class ConvertProcessorTests extends ESTestCase {
}
}
public void testConvertNullField() throws IOException {
public void testConvertNullField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Type type = randomFrom(Type.values());

View File

@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo;
public class GsubProcessorTests extends ESTestCase {
public void testGsub() throws IOException {
public void testGsub() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
List<GsubExpression> expressions = new ArrayList<>();
@ -50,7 +50,7 @@ public class GsubProcessorTests extends ESTestCase {
}
}
public void testGsubNotAStringValue() throws IOException {
public void testGsubNotAStringValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, 123);
@ -64,7 +64,7 @@ public class GsubProcessorTests extends ESTestCase {
}
}
public void testGsubNullValue() throws IOException {
public void testGsubNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
List<GsubExpression> gsubExpressions = Collections.singletonList(new GsubExpression(fieldName, Pattern.compile("\\."), "-"));

View File

@ -33,7 +33,7 @@ public class JoinProcessorTests extends ESTestCase {
private static final String[] SEPARATORS = new String[]{"-", "_", "."};
public void testJoinStrings() throws IOException {
public void testJoinStrings() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
Map<String, String> expectedResultMap = new HashMap<>();
@ -62,7 +62,7 @@ public class JoinProcessorTests extends ESTestCase {
}
}
public void testJoinIntegers() throws IOException {
public void testJoinIntegers() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
Map<String, String> expectedResultMap = new HashMap<>();
@ -91,7 +91,7 @@ public class JoinProcessorTests extends ESTestCase {
}
}
public void testJoinNonListField() throws IOException {
public void testJoinNonListField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10));
@ -104,7 +104,7 @@ public class JoinProcessorTests extends ESTestCase {
}
}
public void testJoinNonExistingField() throws IOException {
public void testJoinNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new JoinProcessor(Collections.singletonMap(fieldName, "-"));

View File

@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.nullValue;
public class RemoveProcessorTests extends ESTestCase {
public void testRemoveFields() throws IOException {
public void testRemoveFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Set<String> fields = new HashSet<>();
@ -50,7 +50,7 @@ public class RemoveProcessorTests extends ESTestCase {
}
}
public void testRemoveNonExistingField() throws IOException {
public void testRemoveNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Processor processor = new RemoveProcessor(Collections.singletonList(RandomDocumentPicks.randomFieldName(random())));
processor.execute(ingestDocument);

View File

@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.nullValue;
public class RenameProcessorTests extends ESTestCase {
public void testRename() throws IOException {
public void testRename() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int numFields = randomIntBetween(1, 5);
Map<String, String> fields = new HashMap<>();
@ -56,14 +56,14 @@ public class RenameProcessorTests extends ESTestCase {
}
}
public void testRenameNonExistingField() throws IOException {
public void testRenameNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Processor processor = new RenameProcessor(Collections.singletonMap(RandomDocumentPicks.randomFieldName(random()), RandomDocumentPicks.randomFieldName(random())));
processor.execute(ingestDocument);
assertThat(ingestDocument.getSource().size(), equalTo(0));
}
public void testRenameExistingFieldNullValue() throws IOException {
public void testRenameExistingFieldNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, null);

View File

@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.equalTo;
public class SplitProcessorTests extends ESTestCase {
public void testSplit() throws IOException {
public void testSplit() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Map<String, String> fields = new HashMap<>();
int numFields = randomIntBetween(1, 5);
@ -46,7 +46,7 @@ public class SplitProcessorTests extends ESTestCase {
}
}
public void testSplitNullValue() throws IOException {
public void testSplitNullValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Map<String, String> split = Collections.singletonMap(fieldName, "\\.");
@ -59,7 +59,7 @@ public class SplitProcessorTests extends ESTestCase {
}
}
public void testSplitNonStringValue() throws IOException {
public void testSplitNonStringValue() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
ingestDocument.setFieldValue(fieldName, randomInt());

View File

@ -20,6 +20,7 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -59,10 +60,10 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecute_pipelineDoesNotExist() {
when(store.get("_id")).thenReturn(null);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
ActionListener listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(listener).failed(any(IllegalArgumentException.class));
verify(listener, times(0)).executed(any());
verify(listener).onFailure(any(IllegalArgumentException.class));
verify(listener, times(0)).onResponse(any());
}
public void testExecuteSuccess() throws Exception {
@ -70,11 +71,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
ActionListener listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).executed(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).failed(any(Exception.class));
verify(listener).onResponse(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).onFailure(any(Exception.class));
}
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
@ -94,11 +95,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
ActionListener listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(any());
verify(listener).executed(any());
verify(listener, times(0)).failed(any(Exception.class));
verify(listener).onResponse(any());
verify(listener, times(0)).onFailure(any(Exception.class));
assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type"));
@ -114,11 +115,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
ActionListener listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).executed(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).failed(any(RuntimeException.class));
verify(listener, times(0)).onResponse(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).onFailure(any(RuntimeException.class));
}
public void testExecuteTTL() throws Exception {
@ -130,12 +131,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
ActionListener listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl").millis()));
verify(listener, times(1)).executed(any());
verify(listener, never()).failed(any());
verify(listener, times(1)).onResponse(any());
verify(listener, never()).onFailure(any());
// test with invalid ttl
metaProcessorFactory = new MetaDataProcessor.Factory();
@ -145,11 +146,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
listener = mock(PipelineExecutionService.Listener.class);
listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(listener, never()).executed(any());
verify(listener, times(1)).failed(any(ElasticsearchParseException.class));
verify(listener, never()).onResponse(any());
verify(listener, times(1)).onFailure(any(ElasticsearchParseException.class));
// test with provided ttl
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList()));
@ -157,12 +158,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.ttl(1000l);
listener = mock(PipelineExecutionService.Listener.class);
listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
assertThat(indexRequest.ttl(), equalTo(1000l));
verify(listener, times(1)).executed(any());
verify(listener, never()).failed(any(Throwable.class));
verify(listener, times(1)).onResponse(any());
verify(listener, never()).onFailure(any(Throwable.class));
}
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.env.Environment;
@ -65,19 +66,15 @@ public class PipelineStoreTests extends ESTestCase {
public void init() {
threadPool = new ThreadPool("test");
client = mock(Client.class);
Injector injector = mock(Injector.class);
when(injector.getInstance(Client.class)).thenReturn(client);
ClusterService clusterService = mock(ClusterService.class);
when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList()));
Environment environment = mock(Environment.class);
store = new PipelineStore(Settings.EMPTY, injector, threadPool, environment, clusterService, Collections.emptyMap());
store.start();
store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, Collections.emptyMap());
}
@After
public void cleanup() {
store.stop();
threadPool.shutdown();
}

View File

@ -83,7 +83,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verifyZeroInteractions(actionFilterChain);
}
@ -96,7 +96,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verifyZeroInteractions(actionFilterChain);
}
@ -122,14 +122,14 @@ public class IngestActionFilterTests extends ESTestCase {
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
Answer answer = invocationOnMock -> {
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.executed(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap()));
ActionListener<IngestDocument> listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap()));
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
verifyZeroInteractions(actionListener);
}
@ -145,15 +145,15 @@ public class IngestActionFilterTests extends ESTestCase {
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.failed(exception);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
return null;
}
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
@ -242,11 +242,11 @@ public class IngestActionFilterTests extends ESTestCase {
RuntimeException exception = new RuntimeException();
Answer answer = (invocationOnMock) -> {
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.failed(exception);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
CaptureActionListener actionListener = new CaptureActionListener();
RecordRequestAFC actionFilterChain = new RecordRequestAFC();
@ -287,11 +287,11 @@ public class IngestActionFilterTests extends ESTestCase {
RuntimeException exception = new RuntimeException();
Answer answer = (invocationOnMock) -> {
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
listener.failed(exception);
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
ActionListener actionListener = mock(ActionListener.class);
RecordRequestAFC actionFilterChain = new RecordRequestAFC();