mirror of https://github.com/apache/nifi.git
NIFI-2417: Addressing code review comments
Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
parent
00412f6e97
commit
937dc71aec
|
@ -104,7 +104,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||||
.name("fetch-es-index")
|
.name("fetch-es-index")
|
||||||
.displayName("Index")
|
.displayName("Index")
|
||||||
.description("The name of the index to read from")
|
.description("The name of the index to read from. If the property is set "
|
||||||
|
+ "to _all, the query will match across all indexes.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
@ -113,8 +114,8 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||||
.name("fetch-es-type")
|
.name("fetch-es-type")
|
||||||
.displayName("Type")
|
.displayName("Type")
|
||||||
.description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
|
.description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty, "
|
||||||
+ "to _all, the first document matching the identifier across all types will be retrieved.")
|
+ "the first document matching the identifier across all types will be retrieved.")
|
||||||
.required(false)
|
.required(false)
|
||||||
.expressionLanguageSupported(true)
|
.expressionLanguageSupported(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
@ -297,8 +298,10 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
throw new MalformedURLException("Base URL cannot be null");
|
throw new MalformedURLException("Base URL cannot be null");
|
||||||
}
|
}
|
||||||
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
|
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
|
||||||
builder.addPathSegment(index);
|
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
|
||||||
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
|
if (!StringUtils.isEmpty(type)) {
|
||||||
|
builder.addPathSegment(type);
|
||||||
|
}
|
||||||
builder.addPathSegment(docId);
|
builder.addPathSegment(docId);
|
||||||
if (!StringUtils.isEmpty(fields)) {
|
if (!StringUtils.isEmpty(fields)) {
|
||||||
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
|
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
|
||||||
|
|
|
@ -68,6 +68,7 @@ import okhttp3.ResponseBody;
|
||||||
+ "To retrieve more records, use the ScrollElasticsearchHttp processor.")
|
+ "To retrieve more records, use the ScrollElasticsearchHttp processor.")
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
|
@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"),
|
||||||
|
@WritesAttribute(attribute = "es.id", description = "The Elasticsearch document identifier"),
|
||||||
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
|
||||||
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
|
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"),
|
||||||
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
|
@WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of "
|
||||||
|
@ -106,13 +107,14 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
||||||
.name("query-es-query").displayName("Query")
|
.name("query-es-query").displayName("Query")
|
||||||
.description("The Lucene-style query to run against ElasticSearch").required(true)
|
.description("The Lucene-style query to run against ElasticSearch (e.g., genre:blues AND -artist:muddy)").required(true)
|
||||||
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||||
.name("query-es-index").displayName("Index")
|
.name("query-es-index").displayName("Index")
|
||||||
.description("The name of the index to read from").required(true)
|
.description("The name of the index to read from. If the property is set "
|
||||||
|
+ "to _all, the query will match across all indexes.").required(true)
|
||||||
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -120,10 +122,12 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
.name("query-es-type")
|
.name("query-es-type")
|
||||||
.displayName("Type")
|
.displayName("Type")
|
||||||
.description(
|
.description(
|
||||||
"The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
|
"The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
|
||||||
+ "to _all, the first document matching the identifier across all types will be retrieved.")
|
+ "the the query will match across all types.")
|
||||||
.required(false).expressionLanguageSupported(true)
|
.required(false)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
|
||||||
.name("query-es-fields")
|
.name("query-es-fields")
|
||||||
|
@ -131,8 +135,10 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
.description(
|
.description(
|
||||||
"A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
|
"A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, "
|
||||||
+ "then the entire document's source will be retrieved.")
|
+ "then the entire document's source will be retrieved.")
|
||||||
.required(false).expressionLanguageSupported(true)
|
.required(false)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
|
||||||
.name("query-es-sort")
|
.name("query-es-sort")
|
||||||
|
@ -331,11 +337,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
JsonNode source = hit.get("_source");
|
JsonNode source = hit.get("_source");
|
||||||
|
documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId);
|
||||||
documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
|
documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
|
||||||
documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
|
documentFlowFile = session.putAttribute(documentFlowFile, "es.type", retrievedType);
|
||||||
|
|
||||||
if (targetIsContent) {
|
if (targetIsContent) {
|
||||||
documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
|
documentFlowFile = session.putAttribute(documentFlowFile, "filename", retrievedId);
|
||||||
|
documentFlowFile = session.putAttribute(documentFlowFile, "mime.type", "application/json");
|
||||||
documentFlowFile = session.write(documentFlowFile, out -> {
|
documentFlowFile = session.write(documentFlowFile, out -> {
|
||||||
out.write(source.toString().getBytes());
|
out.write(source.toString().getBytes());
|
||||||
});
|
});
|
||||||
|
@ -390,8 +398,10 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
|
||||||
throw new MalformedURLException("Base URL cannot be null");
|
throw new MalformedURLException("Base URL cannot be null");
|
||||||
}
|
}
|
||||||
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
|
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
|
||||||
builder.addPathSegment(index);
|
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
|
||||||
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
|
if (!StringUtils.isEmpty(type)) {
|
||||||
|
builder.addPathSegment(type);
|
||||||
|
}
|
||||||
builder.addPathSegment("_search");
|
builder.addPathSegment("_search");
|
||||||
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
|
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
|
||||||
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
|
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
|
||||||
|
|
|
@ -100,7 +100,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
|
|
||||||
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
|
||||||
.name("scroll-es-query").displayName("Query")
|
.name("scroll-es-query").displayName("Query")
|
||||||
.description("The Lucene-style query to run against ElasticSearch").required(true)
|
.description("The Lucene-style query to run against ElasticSearch (e.g., genre:blues AND -artist:muddy)").required(true)
|
||||||
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -116,19 +116,24 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||||
.name("scroll-es-index").displayName("Index")
|
.name("scroll-es-index")
|
||||||
.description("The name of the index to read from").required(true)
|
.displayName("Index")
|
||||||
.expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.description("The name of the index to read from. If the property is set "
|
||||||
|
+ "to _all, the query will match across all indexes.")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||||
.name("scroll-es-type")
|
.name("scroll-es-type")
|
||||||
.displayName("Type")
|
.displayName("Type")
|
||||||
.description(
|
.description(
|
||||||
"The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set "
|
"The (optional) type of this query, used by Elasticsearch for indexing and searching. If the property is empty, "
|
||||||
+ "to _all, the first document matching the identifier across all types will be retrieved.")
|
+ "the the query will match across all types.")
|
||||||
.required(false).expressionLanguageSupported(true)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
|
.required(false)
|
||||||
|
.expressionLanguageSupported(true).build();
|
||||||
|
|
||||||
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
|
||||||
.name("scroll-es-fields")
|
.name("scroll-es-fields")
|
||||||
|
@ -303,6 +308,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
JsonNode source = hit.get("_source");
|
JsonNode source = hit.get("_source");
|
||||||
flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
|
flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex);
|
||||||
flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
|
flowFile = session.putAttribute(flowFile, "es.type", retrievedType);
|
||||||
|
flowFile = session.putAttribute(flowFile, "mime.type", "application/json");
|
||||||
|
|
||||||
builder.append(source.toString());
|
builder.append(source.toString());
|
||||||
if (i < hits.size() - 1) {
|
if (i < hits.size() - 1) {
|
||||||
|
@ -394,8 +400,10 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
|
||||||
builder.addPathSegment("scroll");
|
builder.addPathSegment("scroll");
|
||||||
builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
|
builder.addQueryParameter(SCROLL_ID_QUERY_PARAM, scrollId);
|
||||||
} else {
|
} else {
|
||||||
builder.addPathSegment(index);
|
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
|
||||||
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
|
if (!StringUtils.isEmpty(type)) {
|
||||||
|
builder.addPathSegment(type);
|
||||||
|
}
|
||||||
builder.addPathSegment("_search");
|
builder.addPathSegment("_search");
|
||||||
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
|
builder.addQueryParameter(QUERY_QUERY_PARAM, query);
|
||||||
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
|
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
|
||||||
|
|
Loading…
Reference in New Issue