NIFI-3140: Restored optional type handling in FetchElasticsearchHttp

This closes #1288

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Matt Burgess 2016-12-01 19:32:49 -05:00 committed by jpercivall
parent 7c5bd876bd
commit 316cae16d3
2 changed files with 46 additions and 6 deletions

View File

@ -104,8 +104,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("fetch-es-index") .name("fetch-es-index")
.displayName("Index") .displayName("Index")
.description("The name of the index to read from. If the property is set " .description("The name of the index to read from.")
+ "to _all, the query will match across all indexes.")
.required(true) .required(true)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -310,10 +309,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
throw new MalformedURLException("Base URL cannot be null"); throw new MalformedURLException("Base URL cannot be null");
} }
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index); builder.addPathSegment(index);
if (!StringUtils.isEmpty(type)) { builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
builder.addPathSegment(type);
}
builder.addPathSegment(docId); builder.addPathSegment(docId);
if (!StringUtils.isEmpty(fields)) { if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));

View File

@ -21,6 +21,7 @@ import okhttp3.MediaType;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Protocol; import okhttp3.Protocol;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response; import okhttp3.Response;
import okhttp3.ResponseBody; import okhttp3.ResponseBody;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -38,8 +39,10 @@ import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -114,6 +117,34 @@ public class TestFetchElasticsearchHttp {
out.assertAttributeEquals("doc_id", "28039652140"); out.assertAttributeEquals("doc_id", "28039652140");
} }
@Test
public void testFetchElasticsearchOnTriggerNoType() throws IOException {
final String ES_URL = "http://127.0.0.1:9200";
final String DOC_ID = "28039652140";
FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(true);
runner = TestRunners.newTestRunner(processor); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", DOC_ID);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", DOC_ID);
assertEquals("URL doesn't match expected value when type is not supplied",
"http://127.0.0.1:9200" + "/doc/_all/" + DOC_ID,
processor.getURL().toString());
}
@Test @Test
public void testFetchElasticsearchOnTriggerWithFields() throws IOException { public void testFetchElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found
@ -272,6 +303,8 @@ public class TestFetchElasticsearchHttp {
int statusCode = 200; int statusCode = 200;
String statusMessage = "OK"; String statusMessage = "OK";
URL url = null;
FetchElasticsearchHttpTestProcessor(boolean documentExists) { FetchElasticsearchHttpTestProcessor(boolean documentExists) {
this.documentExists = documentExists; this.documentExists = documentExists;
} }
@ -315,6 +348,16 @@ public class TestFetchElasticsearchHttp {
}); });
} }
@Override
protected Response sendRequestToElasticsearch(OkHttpClient client, URL url, String username, String password, String verb, RequestBody body) throws IOException {
this.url = url;
return super.sendRequestToElasticsearch(client, url, username, password, verb, body);
}
public URL getURL() {
return url;
}
protected OkHttpClient getClient() { protected OkHttpClient getClient() {
return client; return client;
} }