mirror of https://github.com/apache/nifi.git
NIFI-4644 Fixed LookupService API to allow for more than String/String lookup pairs. This closes #2304.
This commit is contained in:
parent
bfe92b9000
commit
d7347a2dc3
|
@ -66,7 +66,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
|
||||||
private volatile File kerberosServiceKeytab = null;
|
private volatile File kerberosServiceKeytab = null;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
// Delegate the lookup() call to the scripted LookupService
|
// Delegate the lookup() call to the scripted LookupService
|
||||||
return lookupService.get().lookup(coordinates);
|
return lookupService.get().lookup(coordinates);
|
||||||
}
|
}
|
||||||
|
|
|
@ -259,7 +259,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
|
||||||
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
|
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
|
||||||
|
|
||||||
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
|
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
|
||||||
final Map<String, String> lookupCoordinates = new HashMap<>(recordPaths.size());
|
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
|
||||||
|
|
||||||
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
|
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
|
||||||
final String coordinateKey = entry.getKey();
|
final String coordinateKey = entry.getKey();
|
||||||
|
@ -284,7 +284,8 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
|
||||||
}
|
}
|
||||||
|
|
||||||
final FieldValue fieldValue = lookupFieldValues.get(0);
|
final FieldValue fieldValue = lookupFieldValues.get(0);
|
||||||
final String coordinateValue = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
|
final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean)
|
||||||
|
? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null);
|
||||||
lookupCoordinates.put(coordinateKey, coordinateValue);
|
lookupCoordinates.put(coordinateKey, coordinateValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class TestLookupAttribute {
|
||||||
|
|
||||||
private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
|
private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> lookup(Map<String, String> coordinates) {
|
public Optional<String> lookup(Map<String, Object> coordinates) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -383,12 +383,12 @@ public class TestLookupRecord {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> lookup(final Map<String, String> coordinates) {
|
public Optional<String> lookup(final Map<String, Object> coordinates) {
|
||||||
if (coordinates == null) {
|
if (coordinates == null || coordinates.get("lookup") == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get("lookup");
|
final String key = (String)coordinates.get("lookup");
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
@ -415,12 +415,12 @@ public class TestLookupRecord {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Record> lookup(final Map<String, String> coordinates) {
|
public Optional<Record> lookup(final Map<String, Object> coordinates) {
|
||||||
if (coordinates == null) {
|
if (coordinates == null || coordinates.get("lookup") == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get("lookup");
|
final String key = (String)coordinates.get("lookup");
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,8 +114,12 @@ public class HBase_1_1_2_RecordLookupService extends AbstractControllerService i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
final String rowKey = coordinates.get(ROW_KEY_KEY);
|
if (coordinates.get(ROW_KEY_KEY) == null) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String rowKey = coordinates.get(ROW_KEY_KEY).toString();
|
||||||
if (StringUtils.isBlank(rowKey)) {
|
if (StringUtils.isBlank(rowKey)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class TestRecordLookupProcessor extends AbstractProcessor {
|
||||||
|
|
||||||
final String rowKey = context.getProperty(HBASE_ROW).getValue();
|
final String rowKey = context.getProperty(HBASE_ROW).getValue();
|
||||||
|
|
||||||
final Map<String,String> coordinates = new HashMap<>();
|
final Map<String,Object> coordinates = new HashMap<>();
|
||||||
coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);
|
coordinates.put(HBase_1_1_2_RecordLookupService.ROW_KEY_KEY, rowKey);
|
||||||
|
|
||||||
final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
|
final LookupService<Record> lookupService = context.getProperty(HBASE_LOOKUP_SERVICE).asControllerService(LookupService.class);
|
||||||
|
|
|
@ -33,7 +33,7 @@ public interface LookupService<T> extends ControllerService {
|
||||||
*
|
*
|
||||||
* @throws LookupFailureException if unable to lookup a value for the given coordinates
|
* @throws LookupFailureException if unable to lookup a value for the given coordinates
|
||||||
*/
|
*/
|
||||||
Optional<T> lookup(Map<String, String> coordinates) throws LookupFailureException;
|
Optional<T> lookup(Map<String, Object> coordinates) throws LookupFailureException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
|
* @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
|
||||||
|
|
|
@ -17,24 +17,10 @@
|
||||||
|
|
||||||
package org.apache.nifi.lookup;
|
package org.apache.nifi.lookup;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
|
||||||
public interface RecordLookupService extends LookupService<Record> {
|
public interface RecordLookupService extends LookupService<Record> {
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an Optional Record that corresponds to the given coordinates
|
|
||||||
*
|
|
||||||
* @param coordinates the coordinates to lookup
|
|
||||||
* @return an Optional Record that corresponds to the given coordinates
|
|
||||||
*
|
|
||||||
* @throws LookupFailureException if unable to lookup a value for the given coordinates
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
default Class<?> getValueType() {
|
default Class<?> getValueType() {
|
||||||
return Record.class;
|
return Record.class;
|
||||||
|
|
|
@ -17,22 +17,8 @@
|
||||||
|
|
||||||
package org.apache.nifi.lookup;
|
package org.apache.nifi.lookup;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
public interface StringLookupService extends LookupService<String> {
|
public interface StringLookupService extends LookupService<String> {
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an Optional value that corresponds to the given coordinates
|
|
||||||
*
|
|
||||||
* @param coordinates the coordinates to lookup
|
|
||||||
* @return an Optional String that represents the value for the given coordinates
|
|
||||||
*
|
|
||||||
* @throws LookupFailureException if unable to lookup a value for the given key
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
Optional<String> lookup(Map<String, String> coordinates) throws LookupFailureException;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
default Class<?> getValueType() {
|
default Class<?> getValueType() {
|
||||||
return String.class;
|
return String.class;
|
||||||
|
|
|
@ -206,12 +206,12 @@ public class CSVRecordLookupService extends AbstractControllerService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
if (coordinates == null) {
|
if (coordinates == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get(KEY);
|
final String key = (String)coordinates.get(KEY);
|
||||||
if (StringUtils.isBlank(key)) {
|
if (StringUtils.isBlank(key)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,12 +194,12 @@ public class SimpleCsvFileLookupService extends AbstractControllerService implem
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
if (coordinates == null) {
|
if (coordinates == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get(KEY);
|
final String key = coordinates.get(KEY).toString();
|
||||||
if (StringUtils.isBlank(key)) {
|
if (StringUtils.isBlank(key)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,12 +58,12 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> lookup(final Map<String, String> coordinates) {
|
public Optional<String> lookup(final Map<String, Object> coordinates) {
|
||||||
if (coordinates == null) {
|
if (coordinates == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get(KEY);
|
final String key = coordinates.get(KEY).toString();
|
||||||
if (key == null) {
|
if (key == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,12 +122,12 @@ public abstract class CommonsConfigurationLookupService<T extends FileBasedConfi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<String> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
if (coordinates == null) {
|
if (coordinates == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
final String key = coordinates.get(KEY);
|
final String key = coordinates.get(KEY).toString();
|
||||||
if (StringUtils.isBlank(key)) {
|
if (StringUtils.isBlank(key)) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
|
@ -189,7 +189,7 @@ public class IPLookupService extends AbstractControllerService implements Record
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
if (coordinates == null) {
|
if (coordinates == null) {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
@ -237,12 +237,13 @@ public class IPLookupService extends AbstractControllerService implements Record
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, String> coordinates) throws LookupFailureException, InvalidDatabaseException {
|
private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, Object> coordinates) throws LookupFailureException, InvalidDatabaseException {
|
||||||
final String ipAddress = coordinates.get(IP_KEY);
|
if (coordinates.get(IP_KEY) == null) {
|
||||||
if (ipAddress == null) {
|
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String ipAddress = coordinates.get(IP_KEY).toString();
|
||||||
|
|
||||||
final InetAddress inetAddress;
|
final InetAddress inetAddress;
|
||||||
try {
|
try {
|
||||||
inetAddress = InetAddress.getByName(ipAddress);
|
inetAddress = InetAddress.getByName(ipAddress);
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class MongoDBLookupService extends MongoDBControllerService implements Lo
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
|
public Optional<Object> lookup(Map<String, Object> coordinates) throws LookupFailureException {
|
||||||
Map<String, Object> clean = new HashMap<>();
|
Map<String, Object> clean = new HashMap<>();
|
||||||
clean.putAll(coordinates);
|
clean.putAll(coordinates);
|
||||||
Document query = new Document(clean);
|
Document query = new Document(clean);
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class TestMongoDBLookupService {
|
||||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||||
service.insert(document);
|
service.insert(document);
|
||||||
|
|
||||||
Map<String, String> criteria = new HashMap<>();
|
Map<String, Object> criteria = new HashMap<>();
|
||||||
criteria.put("uuid", "x-y-z");
|
criteria.put("uuid", "x-y-z");
|
||||||
Optional result = service.lookup(criteria);
|
Optional result = service.lookup(criteria);
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ public class TestMongoDBLookupService {
|
||||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||||
service.insert(document);
|
service.insert(document);
|
||||||
|
|
||||||
Map<String, String> criteria = new HashMap<>();
|
Map<String, Object> criteria = new HashMap<>();
|
||||||
criteria.put("uuid", "x-y-z");
|
criteria.put("uuid", "x-y-z");
|
||||||
Optional result = service.lookup(criteria);
|
Optional result = service.lookup(criteria);
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ public class TestMongoDBLookupService {
|
||||||
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
Document document = service.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
|
||||||
service.insert(document);
|
service.insert(document);
|
||||||
|
|
||||||
Map<String, String> criteria = new HashMap<>();
|
Map<String, Object> criteria = new HashMap<>();
|
||||||
criteria.put("uuid", "x-y-z");
|
criteria.put("uuid", "x-y-z");
|
||||||
|
|
||||||
boolean error = false;
|
boolean error = false;
|
||||||
|
|
Loading…
Reference in New Issue