NIFI-5544: GetMongo refactored

NIFI-5544: PR Review changes

This closes #2958

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
zenfenan 2018-08-20 16:36:25 +05:30 committed by Mike Thomsen
parent 97e0f6a6a7
commit 05e32cff28
2 changed files with 129 additions and 120 deletions

View File

@ -67,22 +67,26 @@ public class GetMongo extends AbstractMongoProcessor {
static final String DB_NAME = "mongo.database.name"; static final String DB_NAME = "mongo.database.name";
static final String COL_NAME = "mongo.collection.name"; static final String COL_NAME = "mongo.collection.name";
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that have the results of a successful query execution go here.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder() static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure") .name("failure")
.description("All input flowfiles that are part of a failed query execution go here.") .description("All input FlowFiles that are part of a failed query execution go here.")
.build(); .build();
static final Relationship REL_ORIGINAL = new Relationship.Builder() static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original") .name("original")
.description("All input flowfiles that are part of a successful query execution go here.") .description("All input FlowFiles that are part of a successful query execution go here.")
.build(); .build();
static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
.name("Query") .name("Query")
.description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" + .description("The selection criteria to do the lookup. If the field is left blank, it will look for input from" +
" an incoming connection from another processor to provide the query as a valid JSON document inside of " + " an incoming connection from another processor to provide the query as a valid JSON document inside of " +
"the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + "the FlowFile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " +
"that will result in a full collection fetch using a \"{}\" query.") "that will result in a full collection fetch using a \"{}\" query.")
.required(false) .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -96,6 +100,7 @@ public class GetMongo extends AbstractMongoProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE) .addValidator(JsonValidator.INSTANCE)
.build(); .build();
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort") .name("Sort")
.description("The fields by which to sort; must be a valid BSON document") .description("The fields by which to sort; must be a valid BSON document")
@ -103,6 +108,7 @@ public class GetMongo extends AbstractMongoProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(JsonValidator.INSTANCE) .addValidator(JsonValidator.INSTANCE)
.build(); .build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
.name("Limit") .name("Limit")
.description("The maximum number of elements to return") .description("The maximum number of elements to return")
@ -113,7 +119,7 @@ public class GetMongo extends AbstractMongoProcessor {
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size") .name("Batch Size")
.description("The number of elements returned from the server in one batch") .description("The number of elements to be returned from the server in one batch")
.required(false) .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -121,7 +127,7 @@ public class GetMongo extends AbstractMongoProcessor {
static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("results-per-flowfile") .name("results-per-flowfile")
.displayName("Results Per FlowFile") .displayName("Results Per FlowFile")
.description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .description("How many results to put into a FlowFile at once. The whole body will be treated as a JSON array of results.")
.required(false) .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -129,11 +135,12 @@ public class GetMongo extends AbstractMongoProcessor {
static final AllowableValue YES_PP = new AllowableValue("true", "True"); static final AllowableValue YES_PP = new AllowableValue("true", "True");
static final AllowableValue NO_PP = new AllowableValue("false", "False"); static final AllowableValue NO_PP = new AllowableValue("false", "False");
static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder() static final PropertyDescriptor USE_PRETTY_PRINTING = new PropertyDescriptor.Builder()
.name("use-pretty-printing") .name("use-pretty-printing")
.displayName("Pretty Print Results JSON") .displayName("Pretty Print Results JSON")
.description("Choose whether or not to pretty print the JSON from the results of the query. " + .description("Choose whether or not to pretty print the JSON from the results of the query. " +
"Choosing yes can greatly increase the space requirements on disk depending on the complexity of the JSON document") "Choosing 'True' can greatly increase the space requirements on disk depending on the complexity of the JSON document")
.required(true) .required(true)
.defaultValue(YES_PP.getValue()) .defaultValue(YES_PP.getValue())
.allowableValues(YES_PP, NO_PP) .allowableValues(YES_PP, NO_PP)
@ -142,6 +149,7 @@ public class GetMongo extends AbstractMongoProcessor {
private final static Set<Relationship> relationships; private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
private ComponentLog logger;
static { static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
@ -204,144 +212,134 @@ public class GetMongo extends AbstractMongoProcessor {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile input = null; FlowFile input = null;
logger = getLogger();
if (context.hasIncomingConnection()) { if (context.hasIncomingConnection()) {
input = session.get(); input = session.get();
if (input == null && context.hasNonLoopConnection()) { if (input == null && context.hasNonLoopConnection()) {
return; return;
} }
} }
final ComponentLog logger = getLogger(); final Document query = getQuery(context, session, input );
Map<String, String> attributes = new HashMap<>(); if (query == null) {
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); return;
final Document query;
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
String queryStr;
if (context.getProperty(QUERY).isSet()) {
queryStr = context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue();
query = Document.parse(queryStr);
} else if (!context.getProperty(QUERY).isSet() && input == null) {
queryStr = "{}";
query = Document.parse("{}");
} else {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
queryStr = new String(out.toByteArray());
query = Document.parse(queryStr);
} catch (Exception ex) {
getLogger().error("Error reading flowfile", ex);
if (input != null) { //Likely culprit is a bad query
session.transfer(input, REL_FAILURE);
return;
} else {
throw new ProcessException(ex);
}
}
} }
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(input).getValue());
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
if (context.getProperty(QUERY_ATTRIBUTE).isSet()) { if (context.getProperty(QUERY_ATTRIBUTE).isSet()) {
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue(); final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue();
attributes.put(queryAttr, queryStr); attributes.put(queryAttr, query.toJson());
} }
final Document projection = context.getProperty(PROJECTION).isSet() final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null; ? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null; ? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()) : null;
final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue();
final String usePrettyPrint = context.getProperty(USE_PRETTY_PRINTING).getValue();
configureMapper(jsonTypeSetting);
final MongoCollection<Document> collection = getCollection(context, input);
final FindIterable<Document> it = collection.find(query);
try { attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
final MongoCollection<Document> collection = getCollection(context, input); attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
attributes.put(DB_NAME, collection.getNamespace().getDatabaseName()); if (projection != null) {
attributes.put(COL_NAME, collection.getNamespace().getCollectionName()); it.projection(projection);
}
if (sort != null) {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
}
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find(); try (MongoCursor<Document> cursor = it.iterator()) {
if (projection != null) { configureMapper(jsonTypeSetting);
it.projection(projection);
}
if (sort != null) {
it.sort(sort);
}
if (context.getProperty(LIMIT).isSet()) {
it.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger());
}
if (context.getProperty(BATCH_SIZE).isSet()) {
it.batchSize(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger());
}
final MongoCursor<Document> cursor = it.iterator(); if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
ComponentLog log = getLogger(); int sizePerBatch = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
try { List<Document> batch = new ArrayList<>();
FlowFile outgoingFlowFile;
if (context.getProperty(RESULTS_PER_FLOWFILE).isSet()) {
int ceiling = context.getProperty(RESULTS_PER_FLOWFILE).evaluateAttributeExpressions(input).asInteger();
List<Document> batch = new ArrayList<>();
while (cursor.hasNext()) { while (cursor.hasNext()) {
batch.add(cursor.next()); batch.add(cursor.next());
if (batch.size() == ceiling) {
try { if (batch.size() == sizePerBatch) {
if (log.isDebugEnabled()) {
log.debug("Writing batch...");
}
String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
writeBatch(payload, input, context, session, attributes, REL_SUCCESS);
batch = new ArrayList<>();
} catch (Exception ex) {
getLogger().error("Error building batch", ex);
}
}
}
if (batch.size() > 0) {
try { try {
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS); writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
} catch (Exception ex) { batch = new ArrayList<>();
getLogger().error("Error sending remainder of batch", ex); } catch (Exception e) {
logger.error("Error building batch due to {}", new Object[] {e});
} }
} }
} else { }
while (cursor.hasNext()) {
outgoingFlowFile = (input == null) ? session.create() : session.create(input);
outgoingFlowFile = session.write(outgoingFlowFile, out -> {
String json;
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next());
} else {
json = cursor.next().toJson();
}
out.write(json.getBytes(charset));
});
outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context)); if (batch.size() > 0) {
session.transfer(outgoingFlowFile, REL_SUCCESS); try {
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
} catch (Exception e) {
logger.error("Error building batch due to {}", new Object[] {e});
} }
} }
} else {
FlowFile outgoingFlowFile;
if (input != null) { while (cursor.hasNext()) {
session.transfer(input, REL_ORIGINAL); outgoingFlowFile = (input == null) ? session.create() : session.create(input);
outgoingFlowFile = session.write(outgoingFlowFile, out -> {
if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
out.write(getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()).getBytes(charset));
} else {
out.write(cursor.next().toJson().getBytes(charset));
}
});
outgoingFlowFile = session.putAllAttributes(outgoingFlowFile, attributes);
session.getProvenanceReporter().receive(outgoingFlowFile, getURI(context));
session.transfer(outgoingFlowFile, REL_SUCCESS);
} }
} finally {
cursor.close();
} }
} catch (final RuntimeException e) {
if (input != null) { if (input != null) {
session.transfer(input, REL_FAILURE); session.transfer(input, REL_ORIGINAL);
} }
context.yield();
logger.error("Failed to execute query {} due to {}", new Object[] { query, e }, e);
} }
} }
private Document getQuery(ProcessContext context, ProcessSession session, FlowFile input) {
Document query = null;
if (context.getProperty(QUERY).isSet()) {
query = Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions(input).getValue());
} else if (!context.getProperty(QUERY).isSet() && input == null) {
query = Document.parse("{}");
} else {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
session.exportTo(input, out);
out.close();
query = Document.parse(new String(out.toByteArray()));
} catch (Exception ex) {
logger.error("Error reading FlowFile : ", ex);
if (input != null) { //Likely culprit is a bad query
session.transfer(input, REL_FAILURE);
session.commit();
} else {
throw new ProcessException(ex);
}
}
}
return query;
}
} }

View File

@ -194,8 +194,8 @@ public class GetMongoIT {
public void testReadMultipleDocuments() throws Exception { public void testReadMultipleDocuments() throws Exception {
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}"); runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
runner.run(); runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) { for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson()); flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
@ -313,7 +313,7 @@ public class GetMongoIT {
runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
runner.run(); runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
testQueryAttribute(attr, "{}"); testQueryAttribute(attr, "{ }");
runner.clearTransferState(); runner.clearTransferState();
@ -323,7 +323,7 @@ public class GetMongoIT {
runner.removeProperty(GetMongo.QUERY); runner.removeProperty(GetMongo.QUERY);
runner.setIncomingConnection(false); runner.setIncomingConnection(false);
runner.run(); runner.run();
testQueryAttribute(attr, "{}"); testQueryAttribute(attr, "{ }");
runner.clearTransferState(); runner.clearTransferState();
@ -334,7 +334,7 @@ public class GetMongoIT {
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
runner.enqueue("{}"); runner.enqueue("{}");
runner.run(); runner.run();
testQueryAttribute(attr, "{}"); testQueryAttribute(attr, "{ }");
/* /*
* Input flowfile with invalid query * Input flowfile with invalid query
@ -478,6 +478,7 @@ public class GetMongoIT {
*/ */
@Test @Test
public void testDatabaseEL() { public void testDatabaseEL() {
runner.clearTransferState();
runner.removeVariable("collection"); runner.removeVariable("collection");
runner.removeVariable("db"); runner.removeVariable("db");
runner.setIncomingConnection(true); runner.setIncomingConnection(true);
@ -506,26 +507,36 @@ public class GetMongoIT {
} }
Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{ Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{
put("Database", new HashMap<String, String>(){{
put("db", "");
put("collection", "test");
}});
put("Collection", new HashMap<String, String>(){{ put("Collection", new HashMap<String, String>(){{
put("db", "getmongotest"); put("db", "getmongotest");
put("collection", ""); put("collection", "");
}}); }});
put("Database", new HashMap<String, String>(){{
put("db", "");
put("collection", "test");
}});
}}; }};
TestRunner tmpRunner;
for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) { for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) {
runner.enqueue("{}", entry.getValue()); // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile
tmpRunner = TestRunners.newTestRunner(GetMongo.class);
tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
tmpRunner.setIncomingConnection(true);
tmpRunner.enqueue("{ }", entry.getValue());
try { try {
runner.run(); tmpRunner.run();
} catch (Throwable ex) { } catch (Throwable ex) {
Throwable cause = ex.getCause(); Throwable cause = ex.getCause();
Assert.assertTrue(cause instanceof ProcessException); Assert.assertTrue(cause instanceof ProcessException);
Assert.assertTrue(entry.getKey(), cause.getMessage().contains(entry.getKey())); Assert.assertTrue(entry.getKey(), ex.getMessage().contains(entry.getKey()));
} }
runner.clearTransferState(); tmpRunner.clearTransferState();
} }
} }