NIFI-4082 - Added EL on GetMongo properties

NIFI-4082 - Added EL on DB, URI and Collection

NIFI-4082 - Added UT for EL evaluation (URI, DB, Collection) and fixed ex. message for document validator.

This closes #1969
This commit is contained in:
Pierre Villard 2017-06-30 20:43:50 +02:00 committed by Matt Burgess
parent 329b1caf82
commit 8b54c2604c
5 changed files with 61 additions and 24 deletions

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
@ -48,18 +49,21 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
.name("Mongo URI")
.description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder()
.name("Mongo Database Name")
.description("The name of the database to use")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder()
.name("Mongo Collection Name")
.description("The name of the collection to use")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
@ -124,11 +128,10 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
try {
final String uri = context.getProperty(URI).getValue();
if(sslContext == null) {
mongoClient = new MongoClient(new MongoClientURI(uri));
mongoClient = new MongoClient(new MongoClientURI(getURI(context)));
} else {
mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext)));
mongoClient = new MongoClient(new MongoClientURI(getURI(context), getClientOptions(sslContext)));
}
} catch (Exception e) {
getLogger().error("Failed to schedule {} due to {}", new Object[] { this.getClass().getName(), e }, e);
@ -153,12 +156,24 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor {
}
protected MongoDatabase getDatabase(final ProcessContext context) {
final String databaseName = context.getProperty(DATABASE_NAME).getValue();
return getDatabase(context, null);
}
protected MongoDatabase getDatabase(final ProcessContext context, final FlowFile flowFile) {
final String databaseName = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(flowFile).getValue();
return mongoClient.getDatabase(databaseName);
}
protected MongoCollection<Document> getCollection(final ProcessContext context) {
final String collectionName = context.getProperty(COLLECTION_NAME).getValue();
return getDatabase(context).getCollection(collectionName);
return getCollection(context, null);
}
protected MongoCollection<Document> getCollection(final ProcessContext context, final FlowFile flowFile) {
final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(flowFile).getValue();
return getDatabase(context, flowFile).getCollection(collectionName);
}
protected String getURI(final ProcessContext context) {
return context.getProperty(URI).evaluateAttributeExpressions().getValue();
}
}

View File

@ -59,14 +59,21 @@ public class GetMongo extends AbstractMongoProcessor {
public static final Validator DOCUMENT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject).input(value);
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return builder.valid(true).explanation("Contains Expression Language").build();
}
String reason = null;
try {
Document.parse(value);
} catch (final RuntimeException e) {
reason = e.getClass().getName();
reason = e.getLocalizedMessage();
}
return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
return builder.explanation(reason).valid(reason == null).build();
}
};
@ -76,18 +83,21 @@ public class GetMongo extends AbstractMongoProcessor {
.name("Query")
.description("The selection criteria; must be a valid MongoDB Extended JSON format; if omitted the entire collection will be queried")
.required(false)
.expressionLanguageSupported(true)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor PROJECTION = new PropertyDescriptor.Builder()
.name("Projection")
.description("The fields to be returned from the documents in the result set; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(true)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor SORT = new PropertyDescriptor.Builder()
.name("Sort")
.description("The fields by which to sort; must be a valid BSON document")
.required(false)
.expressionLanguageSupported(true)
.addValidator(DOCUMENT_VALIDATOR)
.build();
static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder()
@ -163,7 +173,7 @@ public class GetMongo extends AbstractMongoProcessor {
}
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
}
@ -171,9 +181,12 @@ public class GetMongo extends AbstractMongoProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
final Document query = context.getProperty(QUERY).isSet() ? Document.parse(context.getProperty(QUERY).getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet() ? Document.parse(context.getProperty(PROJECTION).getValue()) : null;
final Document sort = context.getProperty(SORT).isSet() ? Document.parse(context.getProperty(SORT).getValue()) : null;
final Document query = context.getProperty(QUERY).isSet()
? Document.parse(context.getProperty(QUERY).evaluateAttributeExpressions().getValue()) : null;
final Document projection = context.getProperty(PROJECTION).isSet()
? Document.parse(context.getProperty(PROJECTION).evaluateAttributeExpressions().getValue()) : null;
final Document sort = context.getProperty(SORT).isSet()
? Document.parse(context.getProperty(SORT).evaluateAttributeExpressions().getValue()) : null;
final MongoCollection<Document> collection = getCollection(context);
@ -233,7 +246,7 @@ public class GetMongo extends AbstractMongoProcessor {
});
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
session.getProvenanceReporter().receive(flowFile, context.getProperty(URI).getValue());
session.getProvenanceReporter().receive(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -148,7 +148,7 @@ public class PutMongo extends AbstractMongoProcessor {
final String mode = context.getProperty(MODE).getValue();
final WriteConcern writeConcern = getWriteConcern(context);
final MongoCollection<Document> collection = getCollection(context).withWriteConcern(writeConcern);
final MongoCollection<Document> collection = getCollection(context, flowFile).withWriteConcern(writeConcern);
try {
// Read the contents of the FlowFile into a byte array
@ -176,7 +176,7 @@ public class PutMongo extends AbstractMongoProcessor {
logger.info("updated {} into MongoDB", new Object[] { flowFile });
}
session.getProvenanceReporter().send(flowFile, context.getProperty(URI).getValue());
session.getProvenanceReporter().send(flowFile, getURI(context));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
logger.error("Failed to insert {} into MongoDB due to {}", new Object[] {flowFile, e}, e);

View File

@ -60,9 +60,12 @@ public class GetMongoTest {
@Before
public void setup() {
runner = TestRunners.newTestRunner(GetMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DB_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
@ -120,8 +123,9 @@ public class GetMongoTest {
Assert.assertTrue(results.iterator().next().toString().matches("'Query' .* is invalid because org.bson.json.JsonParseException"));
// invalid projection
runner.setVariable("projection", "{a: x,y,z}");
runner.setProperty(GetMongo.QUERY, "{a: 1}");
runner.setProperty(GetMongo.PROJECTION, "{a: x,y,z}");
runner.setProperty(GetMongo.PROJECTION, "${projection}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
@ -146,7 +150,8 @@ public class GetMongoTest {
@Test
public void testReadOneDocument() throws Exception {
runner.setProperty(GetMongo.QUERY, "{a: 1, b: 3}");
runner.setVariable("query", "{a: 1, b: 3}");
runner.setProperty(GetMongo.QUERY, "${query}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
@ -180,8 +185,9 @@ public class GetMongoTest {
@Test
public void testSort() throws Exception {
runner.setVariable("sort", "{a: -1, b: -1, c: 1}");
runner.setProperty(GetMongo.QUERY, "{a: {$exists: true}}");
runner.setProperty(GetMongo.SORT, "{a: -1, b: -1, c: 1}");
runner.setProperty(GetMongo.SORT, "${sort}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);

View File

@ -61,9 +61,12 @@ public class PutMongoTest {
@Before
public void setup() {
runner = TestRunners.newTestRunner(PutMongo.class);
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DATABASE_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));