From 0fe1b4eab1a6240edeea7bc41f107cc849d90fb6 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 26 Nov 2015 18:12:15 +0100 Subject: [PATCH] PipelineStore no longer is a lifecycle component Client in PipelineStore gets provided via a guice provider Processor and Factory throw Exception instead of IOException Removed PipelineExecutionService.Listener with ActionListener --- .../org/elasticsearch/ingest/Pipeline.java | 4 +- .../ingest/processor/Processor.java | 4 +- .../plugin/ingest/IngestPlugin.java | 9 -- .../ingest/PipelineExecutionService.java | 17 ++-- .../plugin/ingest/PipelineStore.java | 89 ++++++++++--------- .../ingest/transport/IngestActionFilter.java | 12 +-- .../simulate/SimulatePipelineRequest.java | 2 +- .../SimulatePipelineTransportAction.java | 2 +- .../AbstractStringProcessorTestCase.java | 6 +- .../processor/add/AddProcessorTests.java | 6 +- .../convert/ConvertProcessorTests.java | 24 ++--- .../processor/gsub/GsubProcessorTests.java | 6 +- .../processor/join/JoinProcessorTests.java | 8 +- .../remove/RemoveProcessorTests.java | 4 +- .../rename/RenameProcessorTests.java | 6 +- .../processor/split/SplitProcessorTests.java | 6 +- .../ingest/PipelineExecutionServiceTests.java | 43 ++++----- .../plugin/ingest/PipelineStoreTests.java | 7 +- .../transport/IngestActionFilterTests.java | 32 +++---- 19 files changed, 139 insertions(+), 148 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java index b2ba34cfd54..4fb26bc7ab0 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -44,7 +44,7 @@ public final class Pipeline { /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ - public void execute(IngestDocument ingestDocument) { + public void execute(IngestDocument ingestDocument) throws Exception { for (Processor processor : processors) { processor.execute(ingestDocument); } @@ -73,7 +73,7 @@ public final class Pipeline { public final static class Factory { - public Pipeline create(String id, Map config, Map processorRegistry) throws IOException { + public Pipeline create(String id, Map config, Map processorRegistry) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); List processors = new ArrayList<>(); @SuppressWarnings("unchecked") diff --git a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java index 67a1cad45a7..1795d2f5c70 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/ingest/processor/Processor.java @@ -36,7 +36,7 @@ public interface Processor { /** * Introspect and potentially modify the incoming data. */ - void execute(IngestDocument ingestDocument); + void execute(IngestDocument ingestDocument) throws Exception; /** * Gets the type of a processor @@ -54,7 +54,7 @@ public interface Processor { * Implementations are responsible for removing the used keys, so that after creating a pipeline ingest can * verify if all configurations settings have been used. */ - P create(Map config) throws IOException; + P create(Map config) throws Exception; /** * Sets the configuration directory when needed to read additional config files diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index 5405459b28a..77bac7df2bb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -82,15 +82,6 @@ public class IngestPlugin extends Plugin { } } - @Override - public Collection> nodeServices() { - if (transportClient) { - return Collections.emptyList(); - } else { - return Collections.singletonList(PipelineStore.class); - } - } - @Override public Settings additionalSettings() { return settingsBuilder() diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java index e66697f02d6..d717174c05d 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineExecutionService.java @@ -19,6 +19,7 @@ package org.elasticsearch.plugin.ingest; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -43,10 +44,10 @@ public class PipelineExecutionService { this.threadPool = threadPool; } - public void execute(IndexRequest indexRequest, String pipelineId, Listener listener) { + public void execute(IndexRequest indexRequest, String pipelineId, ActionListener listener) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { - listener.failed(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist")); + listener.onFailure(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist")); return; } @@ -81,21 +82,13 @@ public class PipelineExecutionService { TimeValue timeValue = TimeValue.parseTimeValue(ttlStr, null, "ttl"); indexRequest.ttl(timeValue.millis()); } - listener.executed(ingestDocument); + listener.onResponse(ingestDocument); } catch (Throwable e) { - listener.failed(e); + listener.onFailure(e); } }); } - public interface Listener { - - void executed(IngestDocument ingestDocument); - - void failed(Throwable e); - - } - public static Settings additionalSettings(Settings nodeSettings) { Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); if (!settings.names().isEmpty()) { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java index cbfbf269dd3..133b40b9cc7 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/PipelineStore.java @@ -35,10 +35,10 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.SearchScrollIterator; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -57,15 +57,15 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.*; -public class PipelineStore extends AbstractLifecycleComponent { +public class PipelineStore extends AbstractComponent { public final static String INDEX = ".ingest"; public final static String TYPE = "pipeline"; - private final Injector injector; private final ThreadPool threadPool; private final TimeValue scrollTimeout; private final ClusterService clusterService; + private final Provider clientProvider; private final TimeValue pipelineUpdateInterval; private final Pipeline.Factory factory = new Pipeline.Factory(); private final Map processorFactoryRegistry; @@ -74,11 +74,11 @@ public class PipelineStore extends AbstractLifecycleComponent { private volatile Map pipelines = new HashMap<>(); @Inject - public PipelineStore(Settings settings, Injector injector, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processors) { + public PipelineStore(Settings settings, Provider clientProvider, ThreadPool threadPool, Environment environment, ClusterService clusterService, Map processors) { super(settings); - this.injector = injector; this.threadPool = threadPool; this.clusterService = clusterService; + this.clientProvider = clientProvider; this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30)); this.pipelineUpdateInterval = settings.getAsTime("ingest.pipeline.store.update.interval", TimeValue.timeValueSeconds(1)); for (Processor.Factory factory : processors.values()) { @@ -86,43 +86,43 @@ public class PipelineStore extends AbstractLifecycleComponent { } this.processorFactoryRegistry = Collections.unmodifiableMap(processors); clusterService.add(new PipelineStoreListener()); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void beforeClose() { + // Ideally we would implement Closeable, but when a node is stopped this doesn't get invoked: + try { + IOUtils.close(processorFactoryRegistry.values()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); } - @Override - protected void doStart() { - client = injector.getInstance(Client.class); - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - try { - IOUtils.close(processorFactoryRegistry.values()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - + /** + * Deletes the pipeline specified by id in the request. + */ public void delete(DeletePipelineRequest request, ActionListener listener) { DeleteRequest deleteRequest = new DeleteRequest(request); deleteRequest.index(PipelineStore.INDEX); deleteRequest.type(PipelineStore.TYPE); deleteRequest.id(request.id()); deleteRequest.refresh(true); - client.delete(deleteRequest, listener); + client().delete(deleteRequest, listener); } - public void put(PutPipelineRequest request, ActionListener listener) { - // validates the pipeline and processor configuration: - Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); + /** + * Stores the specified pipeline definition in the request. + * + * @throws IllegalArgumentException If the pipeline holds incorrect configuration + */ + public void put(PutPipelineRequest request, ActionListener listener) throws IllegalArgumentException { try { + // validates the pipeline and processor configuration: + Map pipelineConfig = XContentHelper.convertToMap(request.source(), false).v2(); constructPipeline(request.id(), pipelineConfig); - } catch (IOException e) { - listener.onFailure(e); - return; + } catch (Exception e) { + throw new IllegalArgumentException("Invalid pipeline configuration", e); } IndexRequest indexRequest = new IndexRequest(request); @@ -131,9 +131,12 @@ public class PipelineStore extends AbstractLifecycleComponent { indexRequest.id(request.id()); indexRequest.source(request.source()); indexRequest.refresh(true); - client.index(indexRequest, listener); + client().index(indexRequest, listener); } + /** + * Returns the pipeline by the specified id + */ public Pipeline get(String id) { PipelineDefinition ref = pipelines.get(id); if (ref != null) { @@ -166,11 +169,11 @@ public class PipelineStore extends AbstractLifecycleComponent { return result; } - Pipeline constructPipeline(String id, Map config) throws IOException { + Pipeline constructPipeline(String id, Map config) throws Exception { return factory.create(id, config, processorFactoryRegistry); } - synchronized void updatePipelines() throws IOException { + synchronized void updatePipelines() throws Exception { // note: this process isn't fast or smart, but the idea is that there will not be many pipelines, // so for that reason the goal is to keep the update logic simple. @@ -208,14 +211,12 @@ public class PipelineStore extends AbstractLifecycleComponent { } void startUpdateWorker() { - if (lifecycleState() == Lifecycle.State.STARTED) { - threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater()); - } + threadPool.schedule(pipelineUpdateInterval, ThreadPool.Names.GENERIC, new Updater()); } boolean existPipeline(String pipelineId) { GetRequest request = new GetRequest(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId); - GetResponse response = client.get(request).actionGet(); + GetResponse response = client().get(request).actionGet(); return response.isExists(); } @@ -227,7 +228,15 @@ public class PipelineStore extends AbstractLifecycleComponent { SearchRequest searchRequest = new SearchRequest(PipelineStore.INDEX); searchRequest.source(sourceBuilder); searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - return SearchScrollIterator.createIterator(client, scrollTimeout, searchRequest); + return SearchScrollIterator.createIterator(client(), scrollTimeout, searchRequest); + } + + + private Client client() { + if (client == null) { + client = clientProvider.get(); + } + return client; } class Updater implements Runnable { diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 910f26341e7..91453270803 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -84,15 +84,15 @@ public final class IngestActionFilter extends AbstractComponent implements Actio chain.proceed(action, indexRequest, listener); return; } - executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() { + executionService.execute(indexRequest, pipelineId, new ActionListener() { @Override - public void executed(IngestDocument ingestDocument) { + public void onResponse(IngestDocument ingestDocument) { indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true); chain.proceed(action, indexRequest, listener); } @Override - public void failed(Throwable e) { + public void onFailure(Throwable e) { logger.error("failed to execute pipeline [{}]", e, pipelineId); listener.onFailure(e); } @@ -121,14 +121,14 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } IndexRequest indexRequest = (IndexRequest) actionRequest; - executionService.execute(indexRequest, pipelineId, new PipelineExecutionService.Listener() { + executionService.execute(indexRequest, pipelineId, new ActionListener() { @Override - public void executed(IngestDocument ingestDocument) { + public void onResponse(IngestDocument ingestDocument) { processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); } @Override - public void failed(Throwable e) { + public void onFailure(Throwable e) { logger.debug("failed to execute pipeline [{}]", e, pipelineId); bulkRequestModifier.markCurrentItemAsFailed(e); processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java index c1c134c5a1b..8e48a5e057a 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineRequest.java @@ -135,7 +135,7 @@ public class SimulatePipelineRequest extends ActionRequest { return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws IOException { + static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE); Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry()); List ingestDocumentList = parseDocs(config); diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java index f3e327c527f..b4036c5bac3 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/simulate/SimulatePipelineTransportAction.java @@ -55,7 +55,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction expected = new HashMap<>(); @@ -57,7 +57,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase { } } - public void testNullValue() throws IOException { + public void testNullValue() throws Exception { String fieldName = RandomDocumentPicks.randomFieldName(random()); Processor processor = newProcessor(Collections.singletonList(fieldName)); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); @@ -69,7 +69,7 @@ public abstract class AbstractStringProcessorTestCase extends ESTestCase { } } - public void testNonStringValue() throws IOException { + public void testNonStringValue() throws Exception { String fieldName = RandomDocumentPicks.randomFieldName(random()); Processor processor = newProcessor(Collections.singletonList(fieldName)); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/add/AddProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/add/AddProcessorTests.java index ff32c3e2f05..2ae98b163f0 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/add/AddProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/add/AddProcessorTests.java @@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.equalTo; public class AddProcessorTests extends ESTestCase { - public void testAddExistingFields() throws IOException { + public void testAddExistingFields() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); int numFields = randomIntBetween(1, 5); Map fields = new HashMap<>(); @@ -49,7 +49,7 @@ public class AddProcessorTests extends ESTestCase { } } - public void testAddNewFields() throws IOException { + public void testAddNewFields() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); //used to verify that there are no conflicts between subsequent fields going to be added IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); @@ -68,7 +68,7 @@ public class AddProcessorTests extends ESTestCase { } } - public void testAddFieldsTypeMismatch() throws IOException { + public void testAddFieldsTypeMismatch() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); ingestDocument.setFieldValue("field", "value"); Processor processor = new AddProcessor(Collections.singletonMap("field.inner", "value")); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/convert/ConvertProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/convert/ConvertProcessorTests.java index 56d74c6f12a..3b089f05782 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/convert/ConvertProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/convert/ConvertProcessorTests.java @@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.equalTo; public class ConvertProcessorTests extends ESTestCase { - public void testConvertInt() throws IOException { + public void testConvertInt() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResult = new HashMap<>(); @@ -50,7 +50,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertIntList() throws IOException { + public void testConvertIntList() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map> expectedResult = new HashMap<>(); @@ -75,7 +75,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertIntError() throws IOException { + public void testConvertIntError() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); String value = "string-" + randomAsciiOfLengthBetween(1, 10); @@ -91,7 +91,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertFloat() throws IOException { + public void testConvertFloat() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResult = new HashMap<>(); @@ -110,7 +110,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertFloatList() throws IOException { + public void testConvertFloatList() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map> expectedResult = new HashMap<>(); @@ -135,7 +135,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertFloatError() throws IOException { + public void testConvertFloatError() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); String value = "string-" + randomAsciiOfLengthBetween(1, 10); @@ -151,7 +151,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertBoolean() throws IOException { + public void testConvertBoolean() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResult = new HashMap<>(); @@ -174,7 +174,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertBooleanList() throws IOException { + public void testConvertBooleanList() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map> expectedResult = new HashMap<>(); @@ -203,7 +203,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertBooleanError() throws IOException { + public void testConvertBooleanError() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); String fieldValue; @@ -225,7 +225,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertString() throws IOException { + public void testConvertString() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResult = new HashMap<>(); @@ -264,7 +264,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertStringList() throws IOException { + public void testConvertStringList() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map> expectedResult = new HashMap<>(); @@ -309,7 +309,7 @@ public class ConvertProcessorTests extends ESTestCase { } } - public void testConvertNullField() throws IOException { + public void testConvertNullField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); Type type = randomFrom(Type.values()); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/gsub/GsubProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/gsub/GsubProcessorTests.java index 7f91a8b3a56..0bcb11f58b5 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/gsub/GsubProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/gsub/GsubProcessorTests.java @@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.equalTo; public class GsubProcessorTests extends ESTestCase { - public void testGsub() throws IOException { + public void testGsub() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); int numFields = randomIntBetween(1, 5); List expressions = new ArrayList<>(); @@ -50,7 +50,7 @@ public class GsubProcessorTests extends ESTestCase { } } - public void testGsubNotAStringValue() throws IOException { + public void testGsubNotAStringValue() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); ingestDocument.setFieldValue(fieldName, 123); @@ -64,7 +64,7 @@ public class GsubProcessorTests extends ESTestCase { } } - public void testGsubNullValue() throws IOException { + public void testGsubNullValue() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); List gsubExpressions = Collections.singletonList(new GsubExpression(fieldName, Pattern.compile("\\."), "-")); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/join/JoinProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/join/JoinProcessorTests.java index 391c9a641ea..a75aac5253a 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/join/JoinProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/join/JoinProcessorTests.java @@ -33,7 +33,7 @@ public class JoinProcessorTests extends ESTestCase { private static final String[] SEPARATORS = new String[]{"-", "_", "."}; - public void testJoinStrings() throws IOException { + public void testJoinStrings() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResultMap = new HashMap<>(); @@ -62,7 +62,7 @@ public class JoinProcessorTests extends ESTestCase { } } - public void testJoinIntegers() throws IOException { + public void testJoinIntegers() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); Map expectedResultMap = new HashMap<>(); @@ -91,7 +91,7 @@ public class JoinProcessorTests extends ESTestCase { } } - public void testJoinNonListField() throws IOException { + public void testJoinNonListField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); ingestDocument.setFieldValue(fieldName, randomAsciiOfLengthBetween(1, 10)); @@ -104,7 +104,7 @@ public class JoinProcessorTests extends ESTestCase { } } - public void testJoinNonExistingField() throws IOException { + public void testJoinNonExistingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); Processor processor = new JoinProcessor(Collections.singletonMap(fieldName, "-")); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java index 3c110fd7b60..610b4c8a5cb 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/remove/RemoveProcessorTests.java @@ -35,7 +35,7 @@ import static org.hamcrest.Matchers.nullValue; public class RemoveProcessorTests extends ESTestCase { - public void testRemoveFields() throws IOException { + public void testRemoveFields() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); int numFields = randomIntBetween(1, 5); Set fields = new HashSet<>(); @@ -50,7 +50,7 @@ public class RemoveProcessorTests extends ESTestCase { } } - public void testRemoveNonExistingField() throws IOException { + public void testRemoveNonExistingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Processor processor = new RemoveProcessor(Collections.singletonList(RandomDocumentPicks.randomFieldName(random()))); processor.execute(ingestDocument); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/rename/RenameProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/rename/RenameProcessorTests.java index 51ed5da1750..332d3957369 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/rename/RenameProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/rename/RenameProcessorTests.java @@ -32,7 +32,7 @@ import static org.hamcrest.Matchers.nullValue; public class RenameProcessorTests extends ESTestCase { - public void testRename() throws IOException { + public void testRename() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); int numFields = randomIntBetween(1, 5); Map fields = new HashMap<>(); @@ -56,14 +56,14 @@ public class RenameProcessorTests extends ESTestCase { } } - public void testRenameNonExistingField() throws IOException { + public void testRenameNonExistingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Processor processor = new RenameProcessor(Collections.singletonMap(RandomDocumentPicks.randomFieldName(random()), RandomDocumentPicks.randomFieldName(random()))); processor.execute(ingestDocument); assertThat(ingestDocument.getSource().size(), equalTo(0)); } - public void testRenameExistingFieldNullValue() throws IOException { + public void testRenameExistingFieldNullValue() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); ingestDocument.setFieldValue(fieldName, null); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/split/SplitProcessorTests.java b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/split/SplitProcessorTests.java index 14cd6d02a39..799e9c41c33 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/split/SplitProcessorTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/ingest/processor/split/SplitProcessorTests.java @@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.equalTo; public class SplitProcessorTests extends ESTestCase { - public void testSplit() throws IOException { + public void testSplit() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); Map fields = new HashMap<>(); int numFields = randomIntBetween(1, 5); @@ -46,7 +46,7 @@ public class SplitProcessorTests extends ESTestCase { } } - public void testSplitNullValue() throws IOException { + public void testSplitNullValue() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); Map split = Collections.singletonMap(fieldName, "\\."); @@ -59,7 +59,7 @@ public class SplitProcessorTests extends ESTestCase { } } - public void testSplitNonStringValue() throws IOException { + public void testSplitNonStringValue() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); ingestDocument.setFieldValue(fieldName, randomInt()); diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java index 917d4b1815c..62a9bbd60b7 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineExecutionServiceTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.plugin.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -59,10 +60,10 @@ public class PipelineExecutionServiceTests extends ESTestCase { public void testExecute_pipelineDoesNotExist() { when(store.get("_id")).thenReturn(null); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); - PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + ActionListener listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); - verify(listener).failed(any(IllegalArgumentException.class)); - verify(listener, times(0)).executed(any()); + verify(listener).onFailure(any(IllegalArgumentException.class)); + verify(listener, times(0)).onResponse(any()); } public void testExecuteSuccess() throws Exception { @@ -70,11 +71,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); - PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + ActionListener listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - verify(listener).executed(eqID("_index", "_type", "_id", Collections.emptyMap())); - verify(listener, times(0)).failed(any(Exception.class)); + verify(listener).onResponse(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener, times(0)).onFailure(any(Exception.class)); } public void testExecutePropagateAllMetaDataUpdates() throws Exception { @@ -94,11 +95,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); - PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + ActionListener listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); verify(processor).execute(any()); - verify(listener).executed(any()); - verify(listener, times(0)).failed(any(Exception.class)); + verify(listener).onResponse(any()); + verify(listener, times(0)).onFailure(any(Exception.class)); assertThat(indexRequest.index(), equalTo("update_index")); assertThat(indexRequest.type(), equalTo("update_type")); @@ -114,11 +115,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Arrays.asList(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + ActionListener listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap())); - verify(listener, times(0)).executed(eqID("_index", "_type", "_id", Collections.emptyMap())); - verify(listener).failed(any(RuntimeException.class)); + verify(listener, times(0)).onResponse(eqID("_index", "_type", "_id", Collections.emptyMap())); + verify(listener).onFailure(any(RuntimeException.class)); } public void testExecuteTTL() throws Exception { @@ -130,12 +131,12 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); - PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class); + ActionListener listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl").millis())); - verify(listener, times(1)).executed(any()); - verify(listener, never()).failed(any()); + verify(listener, times(1)).onResponse(any()); + verify(listener, never()).onFailure(any()); // test with invalid ttl metaProcessorFactory = new MetaDataProcessor.Factory(); @@ -145,11 +146,11 @@ public class PipelineExecutionServiceTests extends ESTestCase { when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor))); indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()); - listener = mock(PipelineExecutionService.Listener.class); + listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); - verify(listener, never()).executed(any()); - verify(listener, times(1)).failed(any(ElasticsearchParseException.class)); + verify(listener, never()).onResponse(any()); + verify(listener, times(1)).onFailure(any(ElasticsearchParseException.class)); // test with provided ttl when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList())); @@ -157,12 +158,12 @@ public class PipelineExecutionServiceTests extends ESTestCase { indexRequest = new IndexRequest("_index", "_type", "_id") .source(Collections.emptyMap()) .ttl(1000l); - listener = mock(PipelineExecutionService.Listener.class); + listener = mock(ActionListener.class); executionService.execute(indexRequest, "_id", listener); assertThat(indexRequest.ttl(), equalTo(1000l)); - verify(listener, times(1)).executed(any()); - verify(listener, never()).failed(any(Throwable.class)); + verify(listener, times(1)).onResponse(any()); + verify(listener, never()).onFailure(any(Throwable.class)); } private IngestDocument eqID(String index, String type, String id, Map source) { diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java index 1dfc64f65b5..e67262b3df0 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/PipelineStoreTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.StringText; import org.elasticsearch.env.Environment; @@ -65,19 +66,15 @@ public class PipelineStoreTests extends ESTestCase { public void init() { threadPool = new ThreadPool("test"); client = mock(Client.class); - Injector injector = mock(Injector.class); - when(injector.getInstance(Client.class)).thenReturn(client); ClusterService clusterService = mock(ClusterService.class); when(client.searchScroll(any())).thenReturn(expectedSearchReponse(Collections.emptyList())); Environment environment = mock(Environment.class); - store = new PipelineStore(Settings.EMPTY, injector, threadPool, environment, clusterService, Collections.emptyMap()); - store.start(); + store = new PipelineStore(Settings.EMPTY, () -> client, threadPool, environment, clusterService, Collections.emptyMap()); } @After public void cleanup() { - store.stop(); threadPool.shutdown(); } diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 67ea1b32d3e..534f76ecc72 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -83,7 +83,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); verifyZeroInteractions(actionFilterChain); } @@ -96,7 +96,7 @@ public class IngestActionFilterTests extends ESTestCase { filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); verifyZeroInteractions(actionFilterChain); } @@ -122,14 +122,14 @@ public class IngestActionFilterTests extends ESTestCase { ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); Answer answer = invocationOnMock -> { - PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.executed(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap())); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new IngestDocument(indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.sourceAsMap())); return null; }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); verify(actionFilterChain).proceed("_action", indexRequest, actionListener); verifyZeroInteractions(actionListener); } @@ -145,15 +145,15 @@ public class IngestActionFilterTests extends ESTestCase { Answer answer = new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.failed(exception); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(exception); return null; } }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); filter.apply("_action", indexRequest, actionListener, actionFilterChain); - verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); verify(actionListener).onFailure(exception); verifyZeroInteractions(actionFilterChain); } @@ -242,11 +242,11 @@ public class IngestActionFilterTests extends ESTestCase { RuntimeException exception = new RuntimeException(); Answer answer = (invocationOnMock) -> { - PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.failed(exception); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(exception); return null; }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); CaptureActionListener actionListener = new CaptureActionListener(); RecordRequestAFC actionFilterChain = new RecordRequestAFC(); @@ -287,11 +287,11 @@ public class IngestActionFilterTests extends ESTestCase { RuntimeException exception = new RuntimeException(); Answer answer = (invocationOnMock) -> { - PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2]; - listener.failed(exception); + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onFailure(exception); return null; }; - doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(PipelineExecutionService.Listener.class)); + doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class)); ActionListener actionListener = mock(ActionListener.class); RecordRequestAFC actionFilterChain = new RecordRequestAFC();