remove grok and join usage from IngestClientIT
This commit is contained in:
parent
e036b5896d
commit
c185c1339a
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.ingest;
|
package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
|
|
||||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
@ -38,20 +37,17 @@ import org.elasticsearch.action.ingest.simulate.SimulatePipelineAction;
|
||||||
import org.elasticsearch.action.ingest.simulate.SimulatePipelineRequestBuilder;
|
import org.elasticsearch.action.ingest.simulate.SimulatePipelineRequestBuilder;
|
||||||
import org.elasticsearch.action.ingest.simulate.SimulatePipelineResponse;
|
import org.elasticsearch.action.ingest.simulate.SimulatePipelineResponse;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
||||||
import org.elasticsearch.ingest.core.ConfigurationUtils;
|
import org.elasticsearch.ingest.core.ConfigurationUtils;
|
||||||
import org.elasticsearch.ingest.core.IngestDocument;
|
import org.elasticsearch.ingest.core.IngestDocument;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -81,6 +77,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
protected Settings externalClusterClientSettings() {
|
protected Settings externalClusterClientSettings() {
|
||||||
return Settings.builder()
|
return Settings.builder()
|
||||||
.put(super.transportClientSettings())
|
.put(super.transportClientSettings())
|
||||||
|
//TODO can we remove this?
|
||||||
.put("node.ingest", true)
|
.put("node.ingest", true)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
@ -115,6 +112,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
.field("_id", "id")
|
.field("_id", "id")
|
||||||
.startObject("_source")
|
.startObject("_source")
|
||||||
.field("foo", "bar")
|
.field("foo", "bar")
|
||||||
|
.field("fail", false)
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject()
|
.endObject()
|
||||||
.endArray()
|
.endArray()
|
||||||
|
@ -128,6 +126,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
|
SimulateDocumentSimpleResult simulateDocumentSimpleResult = (SimulateDocumentSimpleResult) response.getResults().get(0);
|
||||||
Map<String, Object> source = new HashMap<>();
|
Map<String, Object> source = new HashMap<>();
|
||||||
source.put("foo", "bar");
|
source.put("foo", "bar");
|
||||||
|
source.put("fail", false);
|
||||||
source.put("processed", true);
|
source.put("processed", true);
|
||||||
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
|
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source);
|
||||||
assertThat(simulateDocumentSimpleResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
|
assertThat(simulateDocumentSimpleResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
|
||||||
|
@ -143,9 +142,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
.field("description", "my_pipeline")
|
.field("description", "my_pipeline")
|
||||||
.startArray("processors")
|
.startArray("processors")
|
||||||
.startObject()
|
.startObject()
|
||||||
.startObject("join")
|
.startObject("test")
|
||||||
.field("field", "field1")
|
|
||||||
.field("separator", "|")
|
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject()
|
.endObject()
|
||||||
.endArray()
|
.endArray()
|
||||||
|
@ -157,11 +154,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
|
bulkRequest.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id");
|
||||||
for (int i = 0; i < numRequests; i++) {
|
for (int i = 0; i < numRequests; i++) {
|
||||||
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
|
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
|
||||||
if (i % 2 == 0) {
|
indexRequest.source("field", "value", "fail", i % 2 == 0);
|
||||||
indexRequest.source("field1", Arrays.asList("value1", "value2"));
|
|
||||||
} else {
|
|
||||||
indexRequest.source("field2", Arrays.asList("value1", "value2"));
|
|
||||||
}
|
|
||||||
bulkRequest.add(indexRequest);
|
bulkRequest.add(indexRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,12 +163,12 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
for (int i = 0; i < bulkRequest.requests().size(); i++) {
|
||||||
BulkItemResponse itemResponse = response.getItems()[i];
|
BulkItemResponse itemResponse = response.getItems()[i];
|
||||||
if (i % 2 == 0) {
|
if (i % 2 == 0) {
|
||||||
|
BulkItemResponse.Failure failure = itemResponse.getFailure();
|
||||||
|
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: test processor failed"));
|
||||||
|
} else {
|
||||||
IndexResponse indexResponse = itemResponse.getResponse();
|
IndexResponse indexResponse = itemResponse.getResponse();
|
||||||
assertThat(indexResponse.getId(), equalTo(Integer.toString(i)));
|
assertThat(indexResponse.getId(), equalTo(Integer.toString(i)));
|
||||||
assertThat(indexResponse.isCreated(), is(true));
|
assertThat(indexResponse.isCreated(), is(true));
|
||||||
} else {
|
|
||||||
BulkItemResponse.Failure failure = itemResponse.getFailure();
|
|
||||||
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: field [field1] not present as part of path [field1]"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -187,9 +180,7 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
.field("description", "my_pipeline")
|
.field("description", "my_pipeline")
|
||||||
.startArray("processors")
|
.startArray("processors")
|
||||||
.startObject()
|
.startObject()
|
||||||
.startObject("grok")
|
.startObject("test")
|
||||||
.field("field", "field1")
|
|
||||||
.field("pattern", "%{NUMBER:val:float} %{NUMBER:status:int} <%{WORD:msg}>")
|
|
||||||
.endObject()
|
.endObject()
|
||||||
.endObject()
|
.endObject()
|
||||||
.endArray()
|
.endArray()
|
||||||
|
@ -202,32 +193,21 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
assertThat(getResponse.pipelines().size(), equalTo(1));
|
assertThat(getResponse.pipelines().size(), equalTo(1));
|
||||||
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
|
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
|
||||||
|
|
||||||
createIndex("test");
|
client().prepareIndex("test", "type", "1").setSource("field", "value", "fail", false)
|
||||||
XContentBuilder updateMappingBuilder = jsonBuilder().startObject().startObject("properties")
|
|
||||||
.startObject("status").field("type", "integer").endObject()
|
|
||||||
.startObject("val").field("type", "float").endObject()
|
|
||||||
.endObject();
|
|
||||||
PutMappingResponse putMappingResponse = client().admin().indices()
|
|
||||||
.preparePutMapping("test").setType("type").setSource(updateMappingBuilder).get();
|
|
||||||
assertAcked(putMappingResponse);
|
|
||||||
|
|
||||||
client().prepareIndex("test", "type", "1").setSource("field1", "123.42 400 <foo>")
|
|
||||||
.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id")
|
.putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id")
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||||
.get().getSourceAsMap();
|
.get().getSourceAsMap();
|
||||||
assertThat(doc.get("val"), equalTo(123.42));
|
assertThat(doc.get("field"), equalTo("value"));
|
||||||
assertThat(doc.get("status"), equalTo(400));
|
assertThat(doc.get("processed"), equalTo(true));
|
||||||
assertThat(doc.get("msg"), equalTo("foo"));
|
|
||||||
|
|
||||||
client().prepareBulk().add(
|
client().prepareBulk().add(
|
||||||
client().prepareIndex("test", "type", "2").setSource("field1", "123.42 400 <foo>")
|
client().prepareIndex("test", "type", "2").setSource("field", "value2", "fail", false)
|
||||||
).putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id").get();
|
).putHeader(ConfigurationUtils.PIPELINE_ID_PARAM, "_id").get();
|
||||||
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||||
assertThat(doc.get("val"), equalTo(123.42));
|
assertThat(doc.get("field"), equalTo("value2"));
|
||||||
assertThat(doc.get("status"), equalTo(400));
|
assertThat(doc.get("processed"), equalTo(true));
|
||||||
assertThat(doc.get("msg"), equalTo("foo"));
|
|
||||||
|
|
||||||
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
|
DeleteResponse response = new DeletePipelineRequestBuilder(client(), DeletePipelineAction.INSTANCE)
|
||||||
.setId("_id")
|
.setId("_id")
|
||||||
|
@ -261,7 +241,12 @@ public class IngestClientIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void onModule(IngestModule ingestModule) {
|
public void onModule(IngestModule ingestModule) {
|
||||||
ingestModule.addProcessor("test", (environment, templateService) -> config ->
|
ingestModule.addProcessor("test", (environment, templateService) -> config ->
|
||||||
new TestProcessor("test", ingestDocument -> ingestDocument.setFieldValue("processed", true))
|
new TestProcessor("test", ingestDocument -> {
|
||||||
|
ingestDocument.setFieldValue("processed", true);
|
||||||
|
if (ingestDocument.getFieldValue("fail", Boolean.class)) {
|
||||||
|
throw new IllegalArgumentException("test processor failed");
|
||||||
|
}
|
||||||
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue