NIFI-5427: Updating ScrollElasticsearchHttp to use POST, supporting ES6

This closes #2890

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Joe Gresock 2018-07-13 16:29:45 +00:00 committed by Mike Thomsen
parent 4e09a03f86
commit 66783c18b2
2 changed files with 11 additions and 4 deletions

View File

@ -250,6 +250,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
requestBuilder = requestBuilder.get();
} else if ("put".equalsIgnoreCase(verb)) {
requestBuilder = requestBuilder.put(body);
} else if ("post".equalsIgnoreCase(verb)) {
requestBuilder = requestBuilder.post(body);
} else {
throw new IllegalArgumentException("Elasticsearch REST API verb not supported by this processor: " + verb);
}

View File

@ -18,7 +18,9 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.lang3.StringUtils;
@ -86,7 +88,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
private static final String FINISHED_QUERY_STATE = "finishedQuery";
private static final String SCROLL_ID_STATE = "scrollId";
private static final String SCROLL_QUERY_PARAM = "scroll";
private static final String SCROLL_ID_QUERY_PARAM = "scroll_id";
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -260,8 +261,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
scrollId, pageSize, scroll, context);
final long startNanos = System.nanoTime();
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
scrollId);
final RequestBody body = RequestBody.create(MediaType.parse("application/json"), scrollBody);
final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
username, password, "GET", null);
username, password, "POST", body);
this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
getResponse.close();
} else {
@ -415,7 +421,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
if (!StringUtils.isEmpty(scrollId)) {
builder.addPathSegment("_search");
builder.addPathSegment("scroll");
builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
} else {
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
if (!StringUtils.isEmpty(type)) {
@ -432,8 +437,8 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
builder.addQueryParameter(SORT_QUERY_PARAM, trimmedFields);
}
}
builder.addQueryParameter(SCROLL_QUERY_PARAM, scroll);
}
// Find the user-added properties and set them as query parameters on the URL
for (Map.Entry<PropertyDescriptor, String> property : context.getProperties().entrySet()) {