Merge pull request #15922 from s1monw/feature/ingest

Review feedback and several cleanups
This commit is contained in:
Simon Willnauer 2016-01-13 12:04:04 +01:00
commit ce42ae4cf3
13 changed files with 56 additions and 62 deletions

View File

@ -51,7 +51,7 @@ public class SimulateDocumentVerboseResult implements SimulateDocumentResult<Sim
int size = in.readVInt();
List<SimulateProcessorResult> processorResults = new ArrayList<>();
for (int i = 0; i < size; i++) {
processorResults.add(SimulateProcessorResult.readSimulateProcessorResultFrom(in));
processorResults.add(new SimulateProcessorResult(in));
}
return new SimulateDocumentVerboseResult(processorResults);
}
@ -67,7 +67,7 @@ public class SimulateDocumentVerboseResult implements SimulateDocumentResult<Sim
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(Fields.PROCESSOR_RESULTS);
builder.startArray("processor_results");
for (SimulateProcessorResult processorResult : processorResults) {
processorResult.toXContent(builder, params);
}
@ -75,8 +75,4 @@ public class SimulateDocumentVerboseResult implements SimulateDocumentResult<Sim
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
@ -65,12 +66,15 @@ class SimulateExecutionService {
}
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()));
threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<SimulatePipelineResponse>(listener) {
@Override
protected void doRun() throws Exception {
List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) {
responses.add(executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()));
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
}
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
});
}
}

View File

@ -47,9 +47,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
@Override
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();
final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false).v2();
SimulatePipelineRequest.Parsed simulateRequest;
final SimulatePipelineRequest.Parsed simulateRequest;
try {
if (request.getId() != null) {
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore);

View File

@ -31,26 +31,31 @@ import java.io.IOException;
import java.util.Collections;
public class SimulateProcessorResult implements Writeable<SimulateProcessorResult>, ToXContent {
private final String processorId;
private final WriteableIngestDocument ingestDocument;
private final Exception failure;
private static final SimulateProcessorResult PROTOTYPE = new SimulateProcessorResult("_na", new WriteableIngestDocument(new IngestDocument(Collections.emptyMap(), Collections.emptyMap())));
private String processorId;
private WriteableIngestDocument ingestDocument;
private Exception failure;
public SimulateProcessorResult(StreamInput in) throws IOException {
this.processorId = in.readString();
if (in.readBoolean()) {
this.failure = in.readThrowable();
this.ingestDocument = null;
} else {
this.ingestDocument = new WriteableIngestDocument(in);
this.failure = null;
}
}
public SimulateProcessorResult(String processorId, IngestDocument ingestDocument) {
this.processorId = processorId;
this.ingestDocument = new WriteableIngestDocument(ingestDocument);
}
private SimulateProcessorResult(String processorId, WriteableIngestDocument ingestDocument) {
this.processorId = processorId;
this.ingestDocument = ingestDocument;
this.failure = null;
}
public SimulateProcessorResult(String processorId, Exception failure) {
this.processorId = processorId;
this.failure = failure;
this.ingestDocument = null;
}
public IngestDocument getIngestDocument() {
@ -68,18 +73,9 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
return failure;
}
public static SimulateProcessorResult readSimulateProcessorResultFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public SimulateProcessorResult readFrom(StreamInput in) throws IOException {
String processorId = in.readString();
if (in.readBoolean()) {
Exception exception = in.readThrowable();
return new SimulateProcessorResult(processorId, exception);
}
return new SimulateProcessorResult(processorId, WriteableIngestDocument.readWriteableIngestDocumentFrom(in));
return new SimulateProcessorResult(in);
}
@Override
@ -97,7 +93,7 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.PROCESSOR_ID, processorId);
builder.field("processor_id", processorId);
if (failure == null) {
ingestDocument.toXContent(builder, params);
} else {
@ -106,8 +102,4 @@ public class SimulateProcessorResult implements Writeable<SimulateProcessorResul
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString PROCESSOR_ID = new XContentBuilderString("processor_id");
}
}

View File

@ -34,8 +34,6 @@ import java.util.Objects;
final class WriteableIngestDocument implements Writeable<WriteableIngestDocument>, ToXContent {
private static final WriteableIngestDocument PROTOTYPE = new WriteableIngestDocument(new IngestDocument(Collections.emptyMap(), Collections.emptyMap()));
private final IngestDocument ingestDocument;
WriteableIngestDocument(IngestDocument ingestDocument) {
@ -43,20 +41,21 @@ final class WriteableIngestDocument implements Writeable<WriteableIngestDocument
this.ingestDocument = ingestDocument;
}
WriteableIngestDocument(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
@SuppressWarnings("unchecked")
Map<String, String> ingestMetadata = (Map<String, String>) in.readGenericValue();
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}
IngestDocument getIngestDocument() {
return ingestDocument;
}
static WriteableIngestDocument readWriteableIngestDocumentFrom(StreamInput in) throws IOException {
return PROTOTYPE.readFrom(in);
}
@Override
public WriteableIngestDocument readFrom(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
@SuppressWarnings("unchecked")
Map<String, String> ingestMetadata = (Map<String, String>) in.readGenericValue();
return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, ingestMetadata));
return new WriteableIngestDocument(in);
}
@Override
@ -67,13 +66,13 @@ final class WriteableIngestDocument implements Writeable<WriteableIngestDocument
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOCUMENT);
builder.startObject("doc");
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {
builder.field(metadata.getKey().getFieldName(), metadata.getValue());
}
builder.field(Fields.SOURCE, ingestDocument.getSourceAndMetadata());
builder.startObject(Fields.INGEST);
builder.field("_source", ingestDocument.getSourceAndMetadata());
builder.startObject("_ingest");
for (Map.Entry<String, String> ingestMetadata : ingestDocument.getIngestMetadata().entrySet()) {
builder.field(ingestMetadata.getKey(), ingestMetadata.getValue());
}
@ -103,10 +102,4 @@ final class WriteableIngestDocument implements Writeable<WriteableIngestDocument
public String toString() {
return ingestDocument.toString();
}
static final class Fields {
static final XContentBuilderString DOCUMENT = new XContentBuilderString("doc");
static final XContentBuilderString SOURCE = new XContentBuilderString("_source");
static final XContentBuilderString INGEST = new XContentBuilderString("_ingest");
}
}

View File

@ -58,6 +58,10 @@ public class IngestBootstrapper extends AbstractLifecycleComponent implements Cl
private final PipelineExecutionService pipelineExecutionService;
private final ProcessorsRegistry processorsRegistry;
// TODO(simonw): I would like to stress this abstraction a little more and move it's construction into
// NodeService and instead of making it AbstractLifecycleComponent just impl Closeable.
// that way we can start the effort of making NodeModule the central point of required service and also move the registration of the
// pipelines into NodeModule? I'd really like to prevent adding yet another module.
@Inject
public IngestBootstrapper(Settings settings, ThreadPool threadPool, Environment environment,
ClusterService clusterService, TransportService transportService,

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
//TODO(simonw): can all these classes go into org.elasticsearch.ingest?
package org.elasticsearch.ingest.core;
@ -30,7 +30,7 @@ import java.util.stream.Collectors;
* A Processor that executes a list of other "processors". It executes a separate list of
* "onFailureProcessors" when any of the processors throw an {@link Exception}.
*/
public class CompoundProcessor implements Processor {
public final class CompoundProcessor implements Processor {
static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
static final String ON_FAILURE_PROCESSOR_FIELD = "on_failure_processor";

View File

@ -109,7 +109,7 @@ public final class Pipeline {
}
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorRegistry) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(config, "description");
String description = ConfigurationUtils.readOptionalStringProperty(config, "description"); // TODO(simonw): can we make these strings constants?
List<Processor> processors = readProcessors("processors", processorRegistry, config);
List<Processor> onFailureProcessors = readProcessors("on_failure", processorRegistry, config);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.unmodifiableList(processors), Collections.unmodifiableList(onFailureProcessors));

View File

@ -57,7 +57,7 @@ public interface ValueSource {
valueSourceList.add(wrap(item, templateService));
}
return new ListValue(valueSourceList);
} else if (value == null || value instanceof Integer ||
} else if (value == null || value instanceof Integer || // TODO(simonw): maybe we just check for Number?
value instanceof Long || value instanceof Float ||
value instanceof Double || value instanceof Boolean) {
return new ObjectValue(value);

View File

@ -88,7 +88,7 @@ public class ThreadPool extends AbstractComponent {
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String INGEST = "ingest";
public static final String INGEST = "ingest"; //TODO(simonw): wow what is the reason for having yet another threadpool? I really think we should just use index for this.
}
public enum ThreadPoolType {

View File

@ -47,7 +47,7 @@ public class SimulateProcessorResultTests extends ESTestCase {
BytesStreamOutput out = new BytesStreamOutput();
simulateProcessorResult.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateProcessorResult otherSimulateProcessorResult = SimulateProcessorResult.readSimulateProcessorResultFrom(streamInput);
SimulateProcessorResult otherSimulateProcessorResult = new SimulateProcessorResult(streamInput);
assertThat(otherSimulateProcessorResult.getProcessorId(), equalTo(simulateProcessorResult.getProcessorId()));
assertThat(otherSimulateProcessorResult.getIngestDocument(), equalTo(simulateProcessorResult.getIngestDocument()));
if (isFailure) {

View File

@ -78,6 +78,9 @@ public final class GrokProcessor implements Processor {
private final Map<String, String> builtinPatternBank;
public Factory() throws IOException {
// TODO(simonw): we should have a static helper method to load these patterns and make this
// factory only accept a String->String map instead. That way we can load
// the patterns in the IngestGrokPlugin ctor or even in a static context and this ctor doesn't need to throw any exception.
Map<String, String> builtinPatterns = new HashMap<>();
for (String pattern : PATTERN_NAMES) {
try(InputStream is = getClass().getResourceAsStream("/patterns/" + pattern)) {

View File

@ -231,6 +231,8 @@ public final class GeoIpProcessor implements Processor {
private final Map<String, DatabaseReader> databaseReaders;
public Factory(Path configDirectory) {
// TODO(simonw): same as fro grok we should load this outside of the factory in a static method and hass the map to the ctor
Path geoIpConfigDirectory = configDirectory.resolve("ingest-geoip");
if (Files.exists(geoIpConfigDirectory) == false && Files.isDirectory(geoIpConfigDirectory)) {
throw new IllegalStateException("the geoip directory [" + geoIpConfigDirectory + "] containing databases doesn't exist");