NIFI-5343: Fix GetMongo to properly reads charset from FlowFile attributes

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2818.
This commit is contained in:
zenfenan 2018-06-27 15:30:11 +05:30 committed by Pierre Villard
parent e741ba1ce4
commit 6508c191bf
2 changed files with 20 additions and 2 deletions

View File

@ -281,7 +281,7 @@ public class GetMongo extends AbstractMongoProcessor {
log.debug("Writing batch..."); log.debug("Writing batch...");
} }
String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint); String payload = buildBatch(batch, jsonTypeSetting, usePrettyPrint);
writeBatch(payload, null, context, session, attributes, REL_SUCCESS); writeBatch(payload, input, context, session, attributes, REL_SUCCESS);
batch = new ArrayList<>(); batch = new ArrayList<>();
} catch (Exception ex) { } catch (Exception ex) {
getLogger().error("Error building batch", ex); getLogger().error("Error building batch", ex);
@ -290,7 +290,7 @@ public class GetMongo extends AbstractMongoProcessor {
} }
if (batch.size() > 0) { if (batch.size() > 0) {
try { try {
writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), null, context, session, attributes, REL_SUCCESS); writeBatch(buildBatch(batch, jsonTypeSetting, usePrettyPrint), input, context, session, attributes, REL_SUCCESS);
} catch (Exception ex) { } catch (Exception ex) {
getLogger().error("Error sending remainder of batch", ex); getLogger().error("Error sending remainder of batch", ex);
} }

View File

@ -427,6 +427,24 @@ public class GetMongoIT {
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
} }
@Test
public void testReadCharsetWithEL() {
String query = "{ \"c\": { \"$gte\": 4 }}";
Map<String, String> attrs = new HashMap<>();
attrs.put("charset", "UTF-8");
runner.setProperty(GetMongo.CHARSET, "${charset}");
runner.setProperty(GetMongo.BATCH_SIZE, "2");
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.setIncomingConnection(true);
runner.enqueue(query, attrs);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
/* /*
* End query read behavior tests * End query read behavior tests
*/ */