diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index 8fd30dcc9e..06be4d5456 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -104,8 +104,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() .name("fetch-es-index") .displayName("Index") - .description("The name of the index to read from. If the property is set " - + "to _all, the query will match across all indexes.") + .description("The name of the index to read from.") .required(true) .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -310,10 +309,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { throw new MalformedURLException("Base URL cannot be null"); } HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); - builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index); - if (!StringUtils.isEmpty(type)) { - builder.addPathSegment(type); - } + builder.addPathSegment(index); + builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type); builder.addPathSegment(docId); if (!StringUtils.isEmpty(fields)) { String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index 28bc06091d..346ead499e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -21,6 +21,7 @@ import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Protocol; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; import org.apache.nifi.processor.ProcessContext; @@ -38,8 +39,10 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.io.InputStream; +import java.net.URL; import java.util.HashMap; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -114,6 +117,34 @@ public class TestFetchElasticsearchHttp { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testFetchElasticsearchOnTriggerNoType() throws IOException { + final String ES_URL = "http://127.0.0.1:9200"; + final String DOC_ID = "28039652140"; + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(true); + runner = TestRunners.newTestRunner(processor); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", DOC_ID); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", DOC_ID); + assertEquals("URL doesn't match expected value when type is not supplied", + "http://127.0.0.1:9200" + "/doc/_all/" + DOC_ID, + processor.getURL().toString()); + } + @Test public void testFetchElasticsearchOnTriggerWithFields() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found @@ -272,6 +303,8 @@ public class TestFetchElasticsearchHttp { int statusCode = 200; String statusMessage = "OK"; + URL url = null; + FetchElasticsearchHttpTestProcessor(boolean documentExists) { this.documentExists = documentExists; } @@ -315,6 +348,16 @@ public class TestFetchElasticsearchHttp { }); } + @Override + protected Response sendRequestToElasticsearch(OkHttpClient client, URL url, String username, String password, String verb, RequestBody body) throws IOException { + this.url = url; + return super.sendRequestToElasticsearch(client, url, username, password, verb, body); + } + + public URL getURL() { + return url; + } + protected OkHttpClient getClient() { return client; }