mirror of
https://github.com/apache/nifi.git
synced 2025-02-13 13:35:20 +00:00
NIFI-5440 Added db and collection attributes to GetMongo output.
This closes #2906 Signed-off-by: zenfenan <zenfenan@apache.org>
This commit is contained in:
parent
610cbb66b2
commit
b1f78d58a6
@ -25,6 +25,8 @@ import com.mongodb.client.MongoCollection;
|
|||||||
import com.mongodb.client.MongoCursor;
|
import com.mongodb.client.MongoCursor;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
@ -56,8 +58,14 @@ import java.util.Set;
|
|||||||
|
|
||||||
@Tags({ "mongodb", "read", "get" })
|
@Tags({ "mongodb", "read", "get" })
|
||||||
@InputRequirement(Requirement.INPUT_ALLOWED)
|
@InputRequirement(Requirement.INPUT_ALLOWED)
|
||||||
@CapabilityDescription("Creates FlowFiles from documents in MongoDB")
|
@CapabilityDescription("Creates FlowFiles from documents in MongoDB loaded by a user-specified query.")
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = GetMongo.DB_NAME, description = "The database where the results came from."),
|
||||||
|
@WritesAttribute(attribute = GetMongo.COL_NAME, description = "The collection where the results came from.")
|
||||||
|
})
|
||||||
public class GetMongo extends AbstractMongoProcessor {
|
public class GetMongo extends AbstractMongoProcessor {
|
||||||
|
static final String DB_NAME = "mongo.database.name";
|
||||||
|
static final String COL_NAME = "mongo.collection.name";
|
||||||
|
|
||||||
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
|
static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build();
|
||||||
static final Relationship REL_FAILURE = new Relationship.Builder()
|
static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||||
@ -253,6 +261,10 @@ public class GetMongo extends AbstractMongoProcessor {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
final MongoCollection<Document> collection = getCollection(context, input);
|
final MongoCollection<Document> collection = getCollection(context, input);
|
||||||
|
|
||||||
|
attributes.put(DB_NAME, collection.getNamespace().getDatabaseName());
|
||||||
|
attributes.put(COL_NAME, collection.getNamespace().getCollectionName());
|
||||||
|
|
||||||
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
|
final FindIterable<Document> it = query != null ? collection.find(query) : collection.find();
|
||||||
if (projection != null) {
|
if (projection != null) {
|
||||||
it.projection(projection);
|
it.projection(projection);
|
||||||
|
@ -529,4 +529,20 @@ public class GetMongoIT {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDBAttributes() {
|
||||||
|
runner.enqueue("{}");
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
|
||||||
|
List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
|
||||||
|
for (MockFlowFile ff : ffs) {
|
||||||
|
String db = ff.getAttribute(GetMongo.DB_NAME);
|
||||||
|
String col = ff.getAttribute(GetMongo.COL_NAME);
|
||||||
|
Assert.assertNotNull(db);
|
||||||
|
Assert.assertNotNull(col);
|
||||||
|
Assert.assertEquals(DB_NAME, db);
|
||||||
|
Assert.assertEquals(COLLECTION_NAME, col);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user