split out SimulateDocumentResult into subclasses, add tests for equalTo and streamable

This commit is contained in:
Tal Levy 2015-11-12 17:31:08 -08:00
parent af1de8e1cc
commit 20384aedf0
19 changed files with 744 additions and 181 deletions

View File

@ -130,16 +130,6 @@ public final class Data {
return modified; return modified;
} }
public Map<String, Object> asMap() {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("_index", index);
dataMap.put("_type", type);
dataMap.put("_id", id);
dataMap.put("_source", document);
return dataMap;
}
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj == this) { return true; } if (obj == this) { return true; }

View File

@ -26,6 +26,8 @@ import org.elasticsearch.plugin.ingest.PipelineStore;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
public class ParsedSimulateRequest { public class ParsedSimulateRequest {
private final List<Data> documents; private final List<Data> documents;
private final Pipeline pipeline; private final Pipeline pipeline;
@ -98,14 +100,5 @@ public class ParsedSimulateRequest {
List<Data> dataList = parseDocs(config); List<Data> dataList = parseDocs(config);
return new ParsedSimulateRequest(pipeline, dataList, verbose); return new ParsedSimulateRequest(pipeline, dataList, verbose);
} }
static final class Fields {
static final String PIPELINE = "pipeline";
static final String DOCS = "docs";
static final String SOURCE = "_source";
static final String INDEX = "_index";
static final String TYPE = "_type";
static final String ID = "_id";
}
} }
} }

View File

@ -18,133 +18,23 @@
*/ */
package org.elasticsearch.plugin.ingest.transport.simulate; package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.plugin.ingest.transport.TransportData;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class SimulateDocumentResult implements Streamable, ToXContent { public abstract class SimulateDocumentResult implements Streamable, ToXContent {
private TransportData data;
private List<SimulateProcessorResult> processorResultList;
private Throwable failure;
public SimulateDocumentResult() {
public int getStreamId() {
return -1;
} }
public SimulateDocumentResult(Data data) { public abstract void readFrom(StreamInput in) throws IOException;
this.data = new TransportData(data);
}
public SimulateDocumentResult(List<SimulateProcessorResult> processorResultList) { public abstract void writeTo(StreamOutput out) throws IOException;
this.processorResultList = processorResultList;
}
public SimulateDocumentResult(Throwable failure) { public abstract XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
this.failure = failure;
}
public boolean isFailed() {
if (failure != null) {
return true;
}
return false;
}
public boolean isVerbose() {
return this.processorResultList != null;
}
public Data getData() {
return data.get();
}
public List<SimulateProcessorResult> getProcessorResultList() {
return processorResultList;
}
@Override
public void readFrom(StreamInput in) throws IOException {
boolean isFailed = in.readBoolean();
boolean isVerbose = in.readBoolean();
if (isFailed) {
this.failure = in.readThrowable();
} else if (isVerbose) {
int size = in.readVInt();
processorResultList = new ArrayList<>();
for (int i = 0; i < size; i++) {
SimulateProcessorResult processorResult = new SimulateProcessorResult();
processorResult.readFrom(in);
processorResultList.add(processorResult);
}
} else {
this.data = new TransportData();
this.data.readFrom(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isFailed());
out.writeBoolean(isVerbose());
if (failure != null) {
out.writeThrowable(failure);
} else if (isVerbose()) {
out.writeVInt(processorResultList.size());
for (SimulateProcessorResult p : processorResultList) {
p.writeTo(out);
}
} else {
data.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (isFailed()) {
ElasticsearchException.renderThrowable(builder, params, failure);
} else if (isVerbose()) {
builder.startArray(Fields.PROCESSOR_RESULTS);
for (SimulateProcessorResult processorResult : processorResultList) {
processorResult.toXContent(builder, params);
}
builder.endArray();
} else {
data.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
if (obj == null || getClass() != obj.getClass()) {
return false;
}
SimulateDocumentResult other = (SimulateDocumentResult) obj;
return Objects.equals(data, other.data) && Objects.equals(processorResultList, other.processorResultList) && Objects.equals(failure, other.failure);
}
@Override
public int hashCode() {
return Objects.hash(data, processorResultList, failure);
}
static final class Fields {
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
}
} }

View File

@ -44,9 +44,9 @@ public class SimulateExecutionService {
SimulateDocumentResult executeItem(Pipeline pipeline, Data data) { SimulateDocumentResult executeItem(Pipeline pipeline, Data data) {
try { try {
pipeline.execute(data); pipeline.execute(data);
return new SimulateDocumentResult(data); return new SimulateSimpleDocumentResult(data);
} catch (Exception e) { } catch (Exception e) {
return new SimulateDocumentResult(e); return new SimulateFailedDocumentResult(e);
} }
} }
@ -67,7 +67,7 @@ public class SimulateExecutionService {
currentData = new Data(currentData); currentData = new Data(currentData);
} }
return new SimulateDocumentResult(processorResultList); return new SimulateVerboseDocumentResult(processorResultList);
} }
public void execute(ParsedSimulateRequest request, ActionListener<SimulatePipelineResponse> listener) { public void execute(ParsedSimulateRequest request, ActionListener<SimulatePipelineResponse> listener) {

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class SimulateFailedDocumentResult extends SimulateDocumentResult {
public static final int STREAM_ID = 2;
private Throwable failure;
public SimulateFailedDocumentResult() {
}
public SimulateFailedDocumentResult(Throwable failure) {
this.failure = failure;
}
@Override
public int getStreamId() {
return STREAM_ID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int streamId = in.readVInt();
if (streamId != STREAM_ID) {
throw new IOException("stream_id [" + streamId + "] does not match " + getClass().getName() + " [stream_id=" + STREAM_ID + "]");
}
this.failure = in.readThrowable();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(STREAM_ID);
out.writeThrowable(failure);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
ElasticsearchException.renderThrowable(builder, params, failure);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimulateFailedDocumentResult that = (SimulateFailedDocumentResult) o;
return Objects.equals((failure == null) ? null : failure.getClass(),
(that.failure == null) ? null : that.failure.getClass());
}
@Override
public int hashCode() {
return Objects.hash(failure);
}
}

View File

@ -83,4 +83,13 @@ public class SimulatePipelineRequest extends ActionRequest {
out.writeBoolean(verbose); out.writeBoolean(verbose);
out.writeBytesReference(source); out.writeBytesReference(source);
} }
public static final class Fields {
static final String PIPELINE = "pipeline";
static final String DOCS = "docs";
static final String SOURCE = "_source";
static final String INDEX = "_index";
static final String TYPE = "_type";
static final String ID = "_id";
}
} }

View File

@ -33,9 +33,8 @@ import java.util.List;
import java.util.Objects; import java.util.Objects;
public class SimulatePipelineResponse extends ActionResponse implements ToXContent { public class SimulatePipelineResponse extends ActionResponse implements ToXContent {
private String pipelineId; private String pipelineId;
private List<SimulateDocumentResult> responses; private List<SimulateDocumentResult> results;
public SimulatePipelineResponse() { public SimulatePipelineResponse() {
@ -43,7 +42,7 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
public SimulatePipelineResponse(String pipelineId, List<SimulateDocumentResult> responses) { public SimulatePipelineResponse(String pipelineId, List<SimulateDocumentResult> responses) {
this.pipelineId = pipelineId; this.pipelineId = pipelineId;
this.responses = Collections.unmodifiableList(responses); this.results = Collections.unmodifiableList(responses);
} }
public String getPipelineId() { public String getPipelineId() {
@ -54,20 +53,21 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
this.pipelineId = pipelineId; this.pipelineId = pipelineId;
} }
public List<SimulateDocumentResult> getResponses() { public List<SimulateDocumentResult> getResults() {
return responses; return results;
} }
public void setResponses(List<SimulateDocumentResult> responses) { public void setResults(List<SimulateDocumentResult> results) {
this.responses = responses; this.results = results;
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeString(pipelineId); out.writeString(pipelineId);
out.writeVInt(responses.size()); out.writeVInt(results.size());
for (SimulateDocumentResult response : responses) { for (SimulateDocumentResult response : results) {
out.writeVInt(response.getStreamId());
response.writeTo(out); response.writeTo(out);
} }
} }
@ -77,11 +77,24 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
super.readFrom(in); super.readFrom(in);
this.pipelineId = in.readString(); this.pipelineId = in.readString();
int responsesLength = in.readVInt(); int responsesLength = in.readVInt();
responses = new ArrayList<>(); results = new ArrayList<>();
for (int i = 0; i < responsesLength; i++) { for (int i = 0; i < responsesLength; i++) {
SimulateDocumentResult response = new SimulateDocumentResult(); SimulateDocumentResult result;
response.readFrom(in); switch (in.readVInt()) {
responses.add(response); case SimulateSimpleDocumentResult.STREAM_ID:
result = new SimulateSimpleDocumentResult();
break;
case SimulateVerboseDocumentResult.STREAM_ID:
result = new SimulateVerboseDocumentResult();
break;
case SimulateFailedDocumentResult.STREAM_ID:
result = new SimulateFailedDocumentResult();
break;
default:
throw new IOException("Cannot read result from stream");
}
result.readFrom(in);
results.add(result);
} }
} }
@ -89,7 +102,7 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(Fields.DOCUMENTS); builder.startArray(Fields.DOCUMENTS);
for (SimulateDocumentResult response : responses) { for (SimulateDocumentResult response : results) {
response.toXContent(builder, params); response.toXContent(builder, params);
} }
builder.endArray(); builder.endArray();
@ -103,12 +116,12 @@ public class SimulatePipelineResponse extends ActionResponse implements ToXConte
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
SimulatePipelineResponse that = (SimulatePipelineResponse) o; SimulatePipelineResponse that = (SimulatePipelineResponse) o;
return Objects.equals(pipelineId, that.pipelineId) && return Objects.equals(pipelineId, that.pipelineId) &&
Objects.equals(responses, that.responses); Objects.equals(results, that.results);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(pipelineId, responses); return Objects.hash(pipelineId, results);
} }
static final class Fields { static final class Fields {

View File

@ -51,7 +51,7 @@ public class SimulateProcessorResult implements Streamable, ToXContent {
this.failure = failure; this.failure = failure;
} }
public boolean isFailed() { private boolean isFailed() {
return this.failure != null; return this.failure != null;
} }
@ -66,6 +66,7 @@ public class SimulateProcessorResult implements Streamable, ToXContent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
boolean isFailure = in.readBoolean(); boolean isFailure = in.readBoolean();
this.processorId = in.readString();
if (isFailure) { if (isFailure) {
this.failure = in.readThrowable(); this.failure = in.readThrowable();
} else { } else {
@ -77,10 +78,10 @@ public class SimulateProcessorResult implements Streamable, ToXContent {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isFailed()); out.writeBoolean(isFailed());
out.writeString(processorId);
if (isFailed()) { if (isFailed()) {
out.writeThrowable(failure); out.writeThrowable(failure);
} else { } else {
out.writeString(processorId);
data.writeTo(out); data.writeTo(out);
} }
} }
@ -107,7 +108,9 @@ public class SimulateProcessorResult implements Streamable, ToXContent {
return false; return false;
} }
SimulateProcessorResult other = (SimulateProcessorResult) obj; SimulateProcessorResult other = (SimulateProcessorResult) obj;
return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) && Objects.equals(failure, other.failure);
return Objects.equals(processorId, other.processorId) && Objects.equals(data, other.data) &&
Objects.equals((failure == null) ? null : failure.getClass(), (other.failure == null) ? null : other.failure.getClass());
} }
@Override @Override

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.plugin.ingest.transport.TransportData;
import java.io.IOException;
import java.util.Objects;
public class SimulateSimpleDocumentResult extends SimulateDocumentResult {
public static final int STREAM_ID = 0;
private TransportData transportData;
public SimulateSimpleDocumentResult() {
}
public SimulateSimpleDocumentResult(Data data) {
this.transportData = new TransportData(data);
}
@Override
public int getStreamId() {
return STREAM_ID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int streamId = in.readVInt();
if (streamId != STREAM_ID) {
throw new IOException("stream_id [" + streamId + "] does not match " + getClass().getName() + " [stream_id=" + STREAM_ID + "]");
}
this.transportData = new TransportData();
this.transportData.readFrom(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(STREAM_ID);
transportData.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
transportData.toXContent(builder, params);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimulateSimpleDocumentResult that = (SimulateSimpleDocumentResult) o;
return Objects.equals(transportData, that.transportData);
}
@Override
public int hashCode() {
return Objects.hash(transportData);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class SimulateVerboseDocumentResult extends SimulateDocumentResult {
public static final int STREAM_ID = 1;
private List<SimulateProcessorResult> processorResults;
public SimulateVerboseDocumentResult() {
}
public SimulateVerboseDocumentResult(List<SimulateProcessorResult> processorResults) {
this.processorResults = processorResults;
}
@Override
public int getStreamId() {
return STREAM_ID;
}
@Override
public void readFrom(StreamInput in) throws IOException {
int streamId = in.readVInt();
if (streamId != STREAM_ID) {
throw new IOException("stream_id [" + streamId + "] does not match " + getClass().getName() + " [stream_id=" + STREAM_ID + "]");
}
int size = in.readVInt();
processorResults = new ArrayList<>();
for (int i = 0; i < size; i++) {
SimulateProcessorResult processorResult = new SimulateProcessorResult();
processorResult.readFrom(in);
processorResults.add(processorResult);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(STREAM_ID);
out.writeVInt(processorResults.size());
for (SimulateProcessorResult result : processorResults) {
result.writeTo(out);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(Fields.PROCESSOR_RESULTS);
for (SimulateProcessorResult processorResult : processorResults) {
processorResult.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimulateVerboseDocumentResult that = (SimulateVerboseDocumentResult) o;
return Objects.equals(processorResults, that.processorResults);
}
@Override
public int hashCode() {
return Objects.hash(processorResults);
}
static final class Fields {
static final XContentBuilderString PROCESSOR_RESULTS = new XContentBuilderString("processor_results");
}
}

View File

@ -30,10 +30,7 @@ import org.elasticsearch.plugin.ingest.transport.get.GetPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse; import org.elasticsearch.plugin.ingest.transport.get.GetPipelineResponse;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder; import org.elasticsearch.plugin.ingest.transport.put.PutPipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineAction; import org.elasticsearch.plugin.ingest.transport.simulate.*;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequestBuilder;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineResponse;
import org.elasticsearch.plugin.ingest.transport.simulate.SimulateDocumentResult;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
@ -104,7 +101,7 @@ public class IngestClientIT extends ESIntegTestCase {
Map<String, Object> expectedDoc = new HashMap<>(); Map<String, Object> expectedDoc = new HashMap<>();
expectedDoc.put("foo", "bar"); expectedDoc.put("foo", "bar");
Data expectedData = new Data("index", "type", "id", expectedDoc); Data expectedData = new Data("index", "type", "id", expectedDoc);
SimulateDocumentResult expectedResponse = new SimulateDocumentResult(expectedData); SimulateDocumentResult expectedResponse = new SimulateSimpleDocumentResult(expectedData);
List<SimulateDocumentResult> expectedResponses = Arrays.asList(expectedResponse); List<SimulateDocumentResult> expectedResponses = Arrays.asList(expectedResponse);
SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses); SimulatePipelineResponse expected = new SimulatePipelineResponse("_id", expectedResponses);

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest;
import org.elasticsearch.ingest.processor.Processor;
import org.elasticsearch.ingest.processor.mutate.MutateProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
public class PipelineTests extends ESTestCase {
private Processor updateProcessor;
private Processor lowercaseProcessor;
private Pipeline pipeline;
@Before
public void setup() {
Map<String, Object> update = Collections.singletonMap("foo", 123);
List<String> lowercase = Collections.singletonList("foo");
updateProcessor = new MutateProcessor(update, null, null, null, null, null, null, null, null, null);
lowercaseProcessor = new MutateProcessor(null, null, null, null, null, null, null, null, null, lowercase);
pipeline = new Pipeline("id", "description", Arrays.asList(updateProcessor, lowercaseProcessor));
}
public void testEquals() throws Exception {
Pipeline other = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getProcessors());
assertThat(pipeline, equalTo(other));
}
public void testNotEqualsDiffId() throws Exception {
Pipeline other = new Pipeline(pipeline.getId() + "foo", pipeline.getDescription(), pipeline.getProcessors());
assertThat(pipeline, not(equalTo(other)));
}
public void testNotEqualsDiffDescription() throws Exception {
Pipeline other = new Pipeline(pipeline.getId(), pipeline.getDescription() + "foo", pipeline.getProcessors());
assertThat(pipeline, not(equalTo(other)));
}
public void testNotEqualsDiffProcessors() throws Exception {
Pipeline other = new Pipeline(pipeline.getId(), pipeline.getDescription() + "foo", Collections.singletonList(updateProcessor));
assertThat(pipeline, not(equalTo(other)));
}
}

View File

@ -34,6 +34,8 @@ import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRequest.Fields;
public class ParsedSimulateRequestParserTests extends ESTestCase { public class ParsedSimulateRequestParserTests extends ESTestCase {
private PipelineStore store; private PipelineStore store;
private ParsedSimulateRequest.Parser parser; private ParsedSimulateRequest.Parser parser;
@ -43,9 +45,9 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
@Before @Before
public void init() throws IOException { public void init() throws IOException {
parser = new ParsedSimulateRequest.Parser(); parser = new ParsedSimulateRequest.Parser();
List<String> uppercase = Collections.unmodifiableList(Collections.singletonList("foo")); List<String> uppercase = Collections.singletonList("foo");
Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null); Processor processor = new MutateProcessor(null, null, null, null, null, null, null, null, uppercase, null);
pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Collections.unmodifiableList(Arrays.asList(processor))); pipeline = new Pipeline(ParsedSimulateRequest.Parser.SIMULATED_PIPELINE_ID, null, Arrays.asList(processor));
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar")); data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
Map<String, Processor.Factory> processorRegistry = new HashMap<>(); Map<String, Processor.Factory> processorRegistry = new HashMap<>();
processorRegistry.put("mutate", new MutateProcessor.Factory()); processorRegistry.put("mutate", new MutateProcessor.Factory());
@ -60,12 +62,12 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
Map<String, Object> raw = new HashMap<>(); Map<String, Object> raw = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>(); List<Map<String, Object>> docs = new ArrayList<>();
Map<String, Object> doc = new HashMap<>(); Map<String, Object> doc = new HashMap<>();
doc.put("_index", "_index"); doc.put(Fields.INDEX, "_index");
doc.put("_type", "_type"); doc.put(Fields.TYPE, "_type");
doc.put("_id", "_id"); doc.put(Fields.ID, "_id");
doc.put("_source", data.getDocument()); doc.put(Fields.SOURCE, data.getDocument());
docs.add(doc); docs.add(doc);
raw.put("docs", docs); raw.put(Fields.DOCS, docs);
ParsedSimulateRequest actualRequest = parser.parseWithPipelineId("_id", raw, false, store); ParsedSimulateRequest actualRequest = parser.parseWithPipelineId("_id", raw, false, store);
assertThat(actualRequest, equalTo(expectedRequest)); assertThat(actualRequest, equalTo(expectedRequest));
@ -77,10 +79,10 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
Map<String, Object> raw = new HashMap<>(); Map<String, Object> raw = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>(); List<Map<String, Object>> docs = new ArrayList<>();
Map<String, Object> doc = new HashMap<>(); Map<String, Object> doc = new HashMap<>();
doc.put("_index", "_index"); doc.put(Fields.INDEX, "_index");
doc.put("_type", "_type"); doc.put(Fields.TYPE, "_type");
doc.put("_id", "_id"); doc.put(Fields.ID, "_id");
doc.put("_source", data.getDocument()); doc.put(Fields.SOURCE, data.getDocument());
docs.add(doc); docs.add(doc);
Map<String, Object> processorConfig = new HashMap<>(); Map<String, Object> processorConfig = new HashMap<>();
@ -88,8 +90,8 @@ public class ParsedSimulateRequestParserTests extends ESTestCase {
Map<String, Object> pipelineConfig = new HashMap<>(); Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig))); pipelineConfig.put("processors", Collections.singletonList(Collections.singletonMap("mutate", processorConfig)));
raw.put("docs", docs); raw.put(Fields.DOCS, docs);
raw.put("pipeline", pipelineConfig); raw.put(Fields.PIPELINE, pipelineConfig);
ParsedSimulateRequest actualRequest = parser.parse(raw, false, store); ParsedSimulateRequest actualRequest = parser.parse(raw, false, store);
assertThat(actualRequest, equalTo(expectedRequest)); assertThat(actualRequest, equalTo(expectedRequest));

View File

@ -68,7 +68,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
} }
public void testExecuteVerboseItem() throws Exception { public void testExecuteVerboseItem() throws Exception {
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult( SimulateDocumentResult expectedItemResponse = new SimulateVerboseDocumentResult(
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data))); Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data)));
SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data); SimulateDocumentResult actualItemResponse = executionService.executeVerboseItem(pipeline, data);
verify(processor, times(2)).execute(data); verify(processor, times(2)).execute(data);
@ -76,7 +76,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
} }
public void testExecuteItem() throws Exception { public void testExecuteItem() throws Exception {
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(data); SimulateDocumentResult expectedItemResponse = new SimulateSimpleDocumentResult(data);
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
verify(processor, times(2)).execute(data); verify(processor, times(2)).execute(data);
assertThat(actualItemResponse, equalTo(expectedItemResponse)); assertThat(actualItemResponse, equalTo(expectedItemResponse));
@ -84,7 +84,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteVerboseItemWithFailure() throws Exception { public void testExecuteVerboseItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed"); Exception e = new RuntimeException("processor failed");
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult( SimulateDocumentResult expectedItemResponse = new SimulateVerboseDocumentResult(
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", e), new SimulateProcessorResult("processor[mock]-1", data)) Arrays.asList(new SimulateProcessorResult("processor[mock]-0", e), new SimulateProcessorResult("processor[mock]-1", data))
); );
doThrow(e).doNothing().when(processor).execute(data); doThrow(e).doNothing().when(processor).execute(data);
@ -95,7 +95,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteItemWithFailure() throws Exception { public void testExecuteItemWithFailure() throws Exception {
Exception e = new RuntimeException("processor failed"); Exception e = new RuntimeException("processor failed");
SimulateDocumentResult expectedItemResponse = new SimulateDocumentResult(e); SimulateDocumentResult expectedItemResponse = new SimulateFailedDocumentResult(e);
doThrow(e).when(processor).execute(data); doThrow(e).when(processor).execute(data);
SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data); SimulateDocumentResult actualItemResponse = executionService.executeItem(pipeline, data);
verify(processor, times(1)).execute(data); verify(processor, times(1)).execute(data);
@ -103,7 +103,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
} }
public void testExecute() throws Exception { public void testExecute() throws Exception {
SimulateDocumentResult itemResponse = new SimulateDocumentResult(data); SimulateDocumentResult itemResponse = new SimulateSimpleDocumentResult(data);
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false); ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), false);
executionService.execute(request, listener); executionService.execute(request, listener);
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));
@ -118,7 +118,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
public void testExecuteWithVerbose() throws Exception { public void testExecuteWithVerbose() throws Exception {
ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true); ParsedSimulateRequest request = new ParsedSimulateRequest(pipeline, Collections.singletonList(data), true);
SimulateDocumentResult itemResponse = new SimulateDocumentResult( SimulateDocumentResult itemResponse = new SimulateVerboseDocumentResult(
Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data))); Arrays.asList(new SimulateProcessorResult("processor[mock]-0", data), new SimulateProcessorResult("processor[mock]-1", data)));
executionService.execute(request, listener); executionService.execute(request, listener);
SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse)); SimulatePipelineResponse response = new SimulatePipelineResponse("_id", Collections.singletonList(itemResponse));

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class SimulateFailedDocumentResultTests extends ESTestCase {
@Before
public void setup() {
}
public void testEqualsExact() {
Throwable throwable = new Exception("foo");
SimulateDocumentResult result = new SimulateFailedDocumentResult(throwable);
SimulateDocumentResult otherResult = new SimulateFailedDocumentResult(throwable);
assertThat(result, equalTo(otherResult));
}
public void testEqualsSameExceptionClass() {
SimulateDocumentResult result = new SimulateFailedDocumentResult(new IllegalArgumentException("foo"));
SimulateDocumentResult otherResult = new SimulateFailedDocumentResult(new IllegalArgumentException("bar"));
assertThat(result, equalTo(otherResult));
}
public void testNotEqualsDiffExceptionClass() {
SimulateDocumentResult result = new SimulateFailedDocumentResult(new IllegalArgumentException("foo"));
SimulateDocumentResult otherResult = new SimulateFailedDocumentResult(new NullPointerException("foo"));
assertThat(result, not(equalTo(otherResult)));
}
public void testStreamable() throws IOException {
SimulateDocumentResult result = new SimulateFailedDocumentResult(new IllegalArgumentException("foo"));
BytesStreamOutput out = new BytesStreamOutput();
result.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateDocumentResult otherResult = new SimulateFailedDocumentResult();
otherResult.readFrom(streamInput);
assertThat(result, equalTo(otherResult));
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class SimulatePipelineResponseTests extends ESTestCase {
private Data data;
private SimulateDocumentResult documentResult;
private SimulatePipelineResponse response;
@Before
public void setup() {
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
documentResult = new SimulateSimpleDocumentResult(data);
response = new SimulatePipelineResponse("_id", Collections.singletonList(documentResult));
}
public void testEquals() {
SimulatePipelineResponse otherResponse = new SimulatePipelineResponse("_id", Collections.singletonList(documentResult));
assertThat(response, equalTo(otherResponse));
}
public void testNotEqualsId() {
SimulatePipelineResponse otherResponse = new SimulatePipelineResponse(response.getPipelineId() + "foo", response.getResults());
assertThat(response, not(equalTo(otherResponse)));
}
public void testNotEqualsResults() {
SimulatePipelineResponse otherResponse = new SimulatePipelineResponse(response.getPipelineId(), Arrays.asList(documentResult, documentResult));
assertThat(response, not(equalTo(otherResponse)));
}
public void testStreamable() throws IOException {
List<SimulateDocumentResult> results = Arrays.asList(
new SimulateSimpleDocumentResult(data),
new SimulateFailedDocumentResult(new IllegalArgumentException("foo")),
new SimulateVerboseDocumentResult(Collections.singletonList(new SimulateProcessorResult("pid", data)))
);
response = new SimulatePipelineResponse("_id", results);
BytesStreamOutput out = new BytesStreamOutput();
response.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulatePipelineResponse otherResponse = new SimulatePipelineResponse();
otherResponse.readFrom(streamInput);
assertThat(response, equalTo(otherResponse));
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class SimulateProcessorResultTests extends ESTestCase {
private Data data;
private SimulateProcessorResult result;
private SimulateProcessorResult failedResult;
private String processorId;
private Throwable throwable;
@Before
public void setup() {
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
processorId = "id";
throwable = new IllegalArgumentException("foo");
result = new SimulateProcessorResult(processorId, data);
failedResult = new SimulateProcessorResult(processorId, throwable);
}
public void testEqualsData() {
SimulateProcessorResult otherResult = new SimulateProcessorResult(new String(processorId), new Data(data));
assertThat(result, equalTo(otherResult));
}
public void testEqualsSameClassThrowable() {
SimulateProcessorResult otherFailedResult = new SimulateProcessorResult(new String(processorId), new IllegalArgumentException("foo"));
assertThat(failedResult, equalTo(otherFailedResult));
}
public void testNotEqualsThrowable() {
SimulateProcessorResult otherFailedResult = new SimulateProcessorResult(new String(processorId), new NullPointerException("foo"));
assertThat(failedResult, not(equalTo(otherFailedResult)));
}
public void testStreamableWithThrowable() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
failedResult.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateProcessorResult otherFailedResult = new SimulateProcessorResult();
otherFailedResult.readFrom(streamInput);
assertThat(failedResult, equalTo(otherFailedResult));
}
public void testStreamableWithData() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
result.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateProcessorResult otherResult = new SimulateProcessorResult();
otherResult.readFrom(streamInput);
assertThat(result, equalTo(otherResult));
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.ingest.transport.simulate;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.ingest.Data;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class SimulateSimpleDocumentResultTests extends ESTestCase {
private Data data;
@Before
public void setup() {
data = new Data("_index", "_type", "_id", Collections.singletonMap("foo", "bar"));
}
public void testEquals() {
SimulateDocumentResult result = new SimulateSimpleDocumentResult(data);
SimulateDocumentResult otherResult = new SimulateSimpleDocumentResult(data);
assertThat(result, equalTo(otherResult));
}
public void testNotEqualsDiffData() {
Data otherData = new Data(data.getIndex() + "foo", data.getType(), data.getId(), data.getDocument());
SimulateDocumentResult result = new SimulateSimpleDocumentResult(data);
SimulateDocumentResult otherResult = new SimulateSimpleDocumentResult(otherData);
assertThat(result, not(equalTo(otherResult)));
}
public void testStreamable() throws IOException {
SimulateDocumentResult result = new SimulateSimpleDocumentResult(data);
BytesStreamOutput out = new BytesStreamOutput();
result.writeTo(out);
StreamInput streamInput = StreamInput.wrap(out.bytes());
SimulateDocumentResult otherResult = new SimulateSimpleDocumentResult();
otherResult.readFrom(streamInput);
assertThat(result, equalTo(otherResult));
}
}

View File

@ -87,6 +87,33 @@
} }
- length: { docs: 1 } - length: { docs: 1 }
---
"Test simulate with no provided pipeline or pipeline_id":
- do:
cluster.health:
wait_for_status: green
- do:
catch: request
ingest.simulate:
body: >
{
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"foo": "bar"
}
}
]
}
- length: { error: 3 }
- match: { status: 400 }
- match: { error.type: "illegal_argument_exception" }
- match: { error.reason: "required property [pipeline] is missing" }
--- ---
"Test simulate with verbose flag": "Test simulate with verbose flag":
- do: - do: