Simulate api improvements

Move ParsedSimulateRequest to SimulatePipelineRequest and remove Parser class in favor of static parse methods.
Simplified execute methods in SimulateExecutionService.
This commit is contained in:
javanna 2015-11-25 17:49:02 +01:00 committed by Luca Cavanna
parent a84d35ab3f
commit 1a7391070f
6 changed files with 110 additions and 144 deletions

View File

@ -1,89 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
public class ParsedSimulateRequest {
private final List<IngestDocument> documents;
private final Pipeline pipeline;
private final boolean verbose;
ParsedSimulateRequest(Pipeline pipeline, List<IngestDocument> documents, boolean verbose) {
this.pipeline = pipeline;
this.documents = Collections.unmodifiableList(documents);
this.verbose = verbose;
}
public Pipeline getPipeline() {
return pipeline;
}
public List<IngestDocument> getDocuments() {
return documents;
}
public boolean isVerbose() {
return verbose;
}
public static class Parser {
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
public static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
private List<IngestDocument> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
document);
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;
}
public ParsedSimulateRequest parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = pipelineStore.get(pipelineId);
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose);
}
public ParsedSimulateRequest parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new ParsedSimulateRequest(pipeline, ingestDocumentList, verbose);
}
}
}

View File

@ -40,48 +40,39 @@ public class SimulateExecutionService {
this.threadPool = threadPool;
}
SimulateDocumentResult executeItem(Pipeline pipeline, IngestDocument ingestDocument) {
try {
pipeline.execute(ingestDocument);
return new SimulateDocumentSimpleResult(ingestDocument);
} catch (Exception e) {
return new SimulateDocumentSimpleResult(e);
}
}
SimulateDocumentVerboseResult executeVerboseItem(Pipeline pipeline, IngestDocument ingestDocument) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
Processor processor = pipeline.getProcessors().get(i);
String processorId = "processor[" + processor.getType() + "]-" + i;
try {
processor.execute(currentIngestDocument);
processorResultList.add(new SimulateProcessorResult(processorId, currentIngestDocument));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processorId, e));
}
currentIngestDocument = new IngestDocument(currentIngestDocument);
}
return new SimulateDocumentVerboseResult(processorResultList);
}
public void execute(ParsedSimulateRequest request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
@Override
public void run() {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
if (request.isVerbose()) {
responses.add(executeVerboseItem(request.getPipeline(), ingestDocument));
} else {
responses.add(executeItem(request.getPipeline(), ingestDocument));
}
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
IngestDocument currentIngestDocument = new IngestDocument(ingestDocument);
for (int i = 0; i < pipeline.getProcessors().size(); i++) {
Processor processor = pipeline.getProcessors().get(i);
String processorId = "processor[" + processor.getType() + "]-" + i;
try {
processor.execute(currentIngestDocument);
processorResultList.add(new SimulateProcessorResult(processorId, currentIngestDocument));
} catch (Exception e) {
processorResultList.add(new SimulateProcessorResult(processorId, e));
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
currentIngestDocument = new IngestDocument(currentIngestDocument);
}
return new SimulateDocumentVerboseResult(processorResultList);
} else {
try {
pipeline.execute(ingestDocument);
return new SimulateDocumentSimpleResult(ingestDocument);
} catch (Exception e) {
return new SimulateDocumentSimpleResult(e);
}
}
}
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()));
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
});
}
}

View File

@ -24,8 +24,16 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.processor.ConfigurationUtils;
import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -92,4 +100,61 @@ public class SimulatePipelineRequest extends ActionRequest {
static final String TYPE = "_type";
static final String ID = "_id";
}
static class Parsed {
private final List<IngestDocument> documents;
private final Pipeline pipeline;
private final boolean verbose;
Parsed(Pipeline pipeline, List<IngestDocument> documents, boolean verbose) {
this.pipeline = pipeline;
this.documents = Collections.unmodifiableList(documents);
this.verbose = verbose;
}
public Pipeline getPipeline() {
return pipeline;
}
public List<IngestDocument> getDocuments() {
return documents;
}
public boolean isVerbose() {
return verbose;
}
}
private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory();
static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = pipelineStore.get(pipelineId);
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
static Parsed parse(Map<String, Object> config, boolean verbose, PipelineStore pipelineStore) throws IOException {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(config, Fields.PIPELINE);
Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactoryRegistry());
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
private static List<IngestDocument> parseDocs(Map<String, Object> config) {
List<Map<String, Object>> docs = ConfigurationUtils.readList(config, Fields.DOCS);
List<IngestDocument> ingestDocumentList = new ArrayList<>();
for (Map<String, Object> dataMap : docs) {
Map<String, Object> document = ConfigurationUtils.readMap(dataMap, Fields.SOURCE);
IngestDocument ingestDocument = new IngestDocument(ConfigurationUtils.readStringProperty(dataMap, Fields.INDEX),
ConfigurationUtils.readStringProperty(dataMap, Fields.TYPE),
ConfigurationUtils.readStringProperty(dataMap, Fields.ID),
document);
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;
}
}

View File

@ -48,13 +48,12 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();
ParsedSimulateRequest simulateRequest;
ParsedSimulateRequest.Parser parser = new ParsedSimulateRequest.Parser();
SimulatePipelineRequest.Parsed simulateRequest;
try {
if (request.getId() != null) {
simulateRequest = parser.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);
} else {
simulateRequest = parser.parse(source, request.isVerbose(), pipelineStore);
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore);
}
} catch (IOException e) {
listener.onFailure(e);

View File

@ -62,7 +62,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteVerboseItem() throws Exception {
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -78,7 +78,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
}
public void testExecuteItem() throws Exception {
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(2)).execute(ingestDocument);
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;
@ -89,7 +89,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).doNothing().when(processor).execute(ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
verify(processor, times(2)).execute(ingestDocument);
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
@ -110,7 +110,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed");
doThrow(e).when(processor).execute(ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, ingestDocument);
SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
verify(processor, times(1)).execute(ingestDocument);
assertThat(actualItemResponse, instanceOf(SimulateDocumentSimpleResult.class));
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) actualItemResponse;

View File

@ -38,17 +38,17 @@ import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ParsedSimulateRequestParserTests extends ESTestCase {
public class SimulatePipelineRequestParsingTests extends ESTestCase {
private PipelineStore store;
@Before
public void init() throws IOException {
Pipeline pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.singletonList(mock(Processor.class)));
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, Collections.singletonList(mock(Processor.class)));
Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mock_processor", mock(Processor.Factory.class));
store = mock(PipelineStore.class);
when(store.get(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.get(SimulatePipelineRequest.SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorFactoryRegistry()).thenReturn(processorRegistry);
}
@ -79,7 +79,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
expectedDocs.add(expectedDoc);
}
ParsedSimulateRequest actualRequest = new ParsedSimulateRequest.Parser().parseWithPipelineId(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, requestContent, false, store);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, requestContent, false, store);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
@ -91,7 +91,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(1));
}
@ -132,7 +132,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
pipelineConfig.put("processors", processors);
requestContent.put(Fields.PIPELINE, pipelineConfig);
ParsedSimulateRequest actualRequest = new ParsedSimulateRequest.Parser().parse(requestContent, false, store);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
@ -144,7 +144,7 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
assertThat(ingestDocument.getMetadata(ID), equalTo(expectedDocument.get(Fields.ID)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors));
}