From 4d38a47eb50435030807bfdf980bae2a12f64633 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 12 Jan 2016 12:06:14 +0100 Subject: [PATCH] Review feedback and several cleanups --- .../ingest/SimulateDocumentVerboseResult.java | 8 +--- .../ingest/SimulateExecutionService.java | 14 ++++--- .../SimulatePipelineTransportAction.java | 4 +- .../ingest/SimulateProcessorResult.java | 42 ++++++++----------- .../ingest/WriteableIngestDocument.java | 29 +++++-------- .../ingest/IngestBootstrapper.java | 4 ++ .../ingest/core/CompoundProcessor.java | 4 +- .../elasticsearch/ingest/core/Pipeline.java | 2 +- .../ingest/core/ValueSource.java | 2 +- .../elasticsearch/threadpool/ThreadPool.java | 2 +- .../ingest/SimulateProcessorResultTests.java | 2 +- .../ingest/grok/GrokProcessor.java | 3 ++ .../ingest/geoip/GeoIpProcessor.java | 2 + 13 files changed, 56 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java index 2b119afb9d5..8a8a1b00325 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java @@ -51,7 +51,7 @@ public class SimulateDocumentVerboseResult implements SimulateDocumentResult processorResults = new ArrayList<>(); for (int i = 0; i < size; i++) { - processorResults.add(SimulateProcessorResult.readSimulateProcessorResultFrom(in)); + processorResults.add(new SimulateProcessorResult(in)); } return new SimulateDocumentVerboseResult(processorResults); } @@ -67,7 +67,7 @@ public class SimulateDocumentVerboseResult implements SimulateDocumentResult listener) { - threadPool.executor(THREAD_POOL_NAME).execute(() -> { - List responses = new ArrayList<>(); - for (IngestDocument ingestDocument : request.getDocuments()) { - responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose())); + threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable(listener) { + @Override + protected void doRun() throws Exception { + List responses = new ArrayList<>(); + for (IngestDocument ingestDocument : request.getDocuments()) { + responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose())); + } + listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); } - listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); }); } } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 3d5e02a9332..89764b0ff7f 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -47,9 +47,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction listener) { - Map source = XContentHelper.convertToMap(request.getSource(), false).v2(); + final Map source = XContentHelper.convertToMap(request.getSource(), false).v2(); - SimulatePipelineRequest.Parsed simulateRequest; + final SimulatePipelineRequest.Parsed simulateRequest; try { if (request.getId() != null) { simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java index afa85b4c219..25680152049 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java @@ -31,26 +31,31 @@ import java.io.IOException; import java.util.Collections; public class SimulateProcessorResult implements Writeable, ToXContent { + private final String processorId; + private final WriteableIngestDocument ingestDocument; + private final Exception failure; - private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult("_na", new WriteableIngestDocument(new IngestDocument(Collections.emptyMap(), Collections.emptyMap()))); - - private String processorId; - private WriteableIngestDocument ingestDocument; - private Exception failure; + public SimulateProcessorResult(StreamInput in) throws IOException { + this.processorId = in.readString(); + if (in.readBoolean()) { + this.failure = in.readThrowable(); + this.ingestDocument = null; + } else { + this.ingestDocument = new WriteableIngestDocument(in); + this.failure = null; + } + } public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) { this.processorId = processorId; this.ingestDocument = new WriteableIngestDocument(ingestDocument); - } - - private SimulateProcessorResult(String processorId, WriteableIngestDocument ingestDocument) { - this.processorId = processorId; - this.ingestDocument = ingestDocument; + this.failure = null; } public SimulateProcessorResult(String processorId, Exception failure) { this.processorId = processorId; this.failure = failure; + this.ingestDocument = null; } public IngestDocument getIngestDocument() { @@ -68,18 +73,9 @@ public class SimulateProcessorResult implements Writeable, ToXContent { - private static final WriteableIngestDocument PROTOTYPE = new WriteableIngestDocument(new IngestDocument(Collections.emptyMap(), Collections.emptyMap())); - private final IngestDocument ingestDocument; WriteableIngestDocument(IngestDocument ingestDocument) { @@ -43,20 +41,21 @@ final class WriteableIngestDocument implements Writeable sourceAndMetadata = in.readMap(); + @SuppressWarnings("unchecked") + Map ingestMetadata = (Map) in.readGenericValue(); + this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata); + } + IngestDocument getIngestDocument() { return ingestDocument; } - static WriteableIngestDocument readWriteableIngestDocumentFrom(StreamInput in) throws IOException { - return PROTOTYPE.readFrom(in); - } @Override public WriteableIngestDocument readFrom(StreamInput in) throws IOException { - Map sourceAndMetadata = in.readMap(); - @SuppressWarnings("unchecked") - Map ingestMetadata = (Map) in.readGenericValue(); - return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, ingestMetadata)); + return new WriteableIngestDocument(in); } @Override @@ -67,13 +66,13 @@ final class WriteableIngestDocument implements Writeable metadataMap = ingestDocument.extractMetadata(); for (Map.Entry metadata : metadataMap.entrySet()) { builder.field(metadata.getKey().getFieldName(), metadata.getValue()); } - builder.field(Fields.SOURCE, ingestDocument.getSourceAndMetadata()); - builder.startObject(Fields.INGEST); + builder.field("_source", ingestDocument.getSourceAndMetadata()); + builder.startObject("_ingest"); for (Map.Entry ingestMetadata : ingestDocument.getIngestMetadata().entrySet()) { builder.field(ingestMetadata.getKey(), ingestMetadata.getValue()); } @@ -103,10 +102,4 @@ final class WriteableIngestDocument implements Writeable config, Map processorRegistry) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); + String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants? List processors = readProcessors("processors", processorRegistry, config); List onFailureProcessors = readProcessors("on_failure", processorRegistry, config); CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors)); diff --git a/core/src/main/java/org/elasticsearch/ingest/core/ValueSource.java b/core/src/main/java/org/elasticsearch/ingest/core/ValueSource.java index 987002f0354..5280b3e6702 100644 --- a/core/src/main/java/org/elasticsearch/ingest/core/ValueSource.java +++ b/core/src/main/java/org/elasticsearch/ingest/core/ValueSource.java @@ -57,7 +57,7 @@ public interface ValueSource { valueSourceList.add(wrap(item, templateService)); } return new ListValue(valueSourceList); - } else if (value == null || value instanceof Integer || + } else if (value == null || value instanceof Integer || // TODO(simonw): maybe we just check for Number? value instanceof Long || value instanceof Float || value instanceof Double || value instanceof Boolean) { return new ObjectValue(value); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index c73605c5547..bf4f75a569f 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -88,7 +88,7 @@ public class ThreadPool extends AbstractComponent { public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; - public static final String INGEST = "ingest"; + public static final String INGEST = "ingest"; //TODO(simonw): wow what is the reason for having yet another threadpool? I really think we should just use index for this. } public enum ThreadPoolType { diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java index 208d2534a4c..999e7ee6650 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java @@ -47,7 +47,7 @@ public class SimulateProcessorResultTests extends ESTestCase { BytesStreamOutput out = new BytesStreamOutput(); simulateProcessorResult.writeTo(out); StreamInput streamInput = StreamInput.wrap(out.bytes()); - SimulateProcessorResult otherSimulateProcessorResult = SimulateProcessorResult.readSimulateProcessorResultFrom(streamInput); + SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput); assertThat(otherSimulateProcessorResult.getProcessorId(), equalTo(simulateProcessorResult.getProcessorId())); assertThat(otherSimulateProcessorResult.getIngestDocument(), equalTo(simulateProcessorResult.getIngestDocument())); if (isFailure) { diff --git a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java index d38ea96131f..2c63646ff81 100644 --- a/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java +++ b/modules/ingest-grok/src/main/java/org/elasticsearch/ingest/grok/GrokProcessor.java @@ -78,6 +78,9 @@ public final class GrokProcessor implements Processor { private final Map builtinPatternBank; public Factory() throws IOException { + // TODO(simonw): we should have a static helper method to load these patterns and make this + // factory only accept a String->String map instead. That way we can load + // the patterns in the IngestGrokPlugin ctor or even in a static context and this ctor doesn't need to throw any exception. Map builtinPatterns = new HashMap<>(); for (String pattern : PATTERN_NAMES) { try(InputStream is = getClass().getResourceAsStream("/patterns/" + pattern)) { diff --git a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 9f65c76c215..ab87d51318b 100644 --- a/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/plugins/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -231,6 +231,8 @@ public final class GeoIpProcessor implements Processor { private final Map databaseReaders; public Factory(Path configDirectory) { + + // TODO(simonw): same as fro grok we should load this outside of the factory in a static method and hass the map to the ctor Path geoIpConfigDirectory = configDirectory.resolve("ingest-geoip"); if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) { throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");