NIFI-8328: Allow RestLookupService to use FlowFile attributes in header properties

This closes #4908

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2021-03-17 11:38:07 -04:00 committed by exceptionfactory
parent 61c4261bb7
commit 446401b6f1
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 79 additions and 50 deletions

View File

@ -17,27 +17,10 @@
package org.apache.nifi.lookup;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
@ -77,11 +60,29 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringUtils;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Proxy;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.trimToEmpty;
@Tags({ "rest", "lookup", "json", "xml", "http" })
@CapabilityDescription("Use a REST service to look up values.")
@DynamicProperties({
@DynamicProperty(name = "*", value = "*", description = "All dynamic properties are added as HTTP headers with the name " +
"as the header name and the value as the header value.")
"as the header name and the value as the header value.", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
})
public class RestLookupService extends AbstractControllerService implements RecordLookupService {
static final PropertyDescriptor URL = new PropertyDescriptor.Builder()
@ -180,7 +181,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<String> KEYS;
static final List VALID_VERBS = Arrays.asList("delete", "get", "post", "put");
static final List<String> VALID_VERBS = Arrays.asList("delete", "get", "post", "put");
static {
DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
@ -206,7 +207,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
private volatile RecordReaderFactory readerFactory;
private volatile RecordPath recordPath;
private volatile OkHttpClient client;
private volatile Map<String, String> headers;
private volatile Map<String, PropertyValue> headers;
private volatile PropertyValue urlTemplate;
private volatile String basicUser;
private volatile String basicPass;
@ -261,7 +262,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
if (descriptor.isDynamic()) {
headers.put(
descriptor.getDisplayName(),
context.getProperty(descriptor).evaluateAttributeExpressions().getValue()
context.getProperty(descriptor)
);
}
}
@ -273,7 +274,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
final Proxy proxy = config.createProxy();
builder.proxy(proxy);
if (config.hasCredential()){
if (config.hasCredential()) {
builder.proxyAuthenticator((route, response) -> {
final String credential= Credentials.basic(config.getProxyUserName(), config.getProxyUserPassword());
return response.request().newBuilder()
@ -292,10 +293,10 @@ public class RestLookupService extends AbstractControllerService implements Reco
@Override
public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
final String endpoint = determineEndpoint(coordinates);
final String mimeType = (String)coordinates.get(MIME_TYPE_KEY);
final String method = ((String)coordinates.getOrDefault(METHOD_KEY, "get")).trim().toLowerCase();
final String body = (String)coordinates.get(BODY_KEY);
final String endpoint = determineEndpoint(coordinates, context);
final String mimeType = (String) coordinates.get(MIME_TYPE_KEY);
final String method = ((String) coordinates.getOrDefault(METHOD_KEY, "get")).trim().toLowerCase();
final String body = (String) coordinates.get(BODY_KEY);
validateVerb(method);
@ -313,7 +314,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
}
}
Request request = buildRequest(mimeType, method, body, endpoint);
Request request = buildRequest(mimeType, method, body, endpoint, context);
try {
Response response = executeRequest(request);
@ -346,13 +347,21 @@ public class RestLookupService extends AbstractControllerService implements Reco
}
}
protected String determineEndpoint(Map<String, Object> coordinates) {
protected String determineEndpoint(Map<String, Object> coordinates, Map<String, String> context) {
Map<String, String> converted = coordinates.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().toString()
));
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().toString()
));
Map<String, String> contextConverted = (context == null) ? Collections.emptyMap()
: context.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
converted.putAll(contextConverted);
return urlTemplate.evaluateAttributeExpressions(converted).getValue();
}
@ -362,7 +371,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
.displayName(propertyDescriptorName)
.addValidator(Validator.VALID)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
@ -407,15 +416,15 @@ public class RestLookupService extends AbstractControllerService implements Reco
}
}
private Request buildRequest(final String mimeType, final String method, final String body, final String endpoint) {
private Request buildRequest(final String mimeType, final String method, final String body, final String endpoint, final Map<String,String> context) {
RequestBody requestBody = null;
if (body != null) {
final MediaType mt = MediaType.parse(mimeType);
requestBody = RequestBody.create(mt, body);
requestBody = RequestBody.create(body, mt);
}
Request.Builder request = new Request.Builder()
.url(endpoint);
switch(method) {
switch (method) {
case "delete":
request = body != null ? request.delete(requestBody) : request.delete();
break;
@ -431,8 +440,8 @@ public class RestLookupService extends AbstractControllerService implements Reco
}
if (headers != null) {
for (Map.Entry<String, String> header : headers.entrySet()) {
request = request.addHeader(header.getKey(), header.getValue());
for (Map.Entry<String, PropertyValue> header : headers.entrySet()) {
request = request.addHeader(header.getKey(), header.getValue().evaluateAttributeExpressions(context).getValue());
}
}

View File

@ -87,7 +87,7 @@ class RestLookupServiceIT {
try {
server.startServer()
setEndpoint(server.port, "/simple")
setEndpoint(server.port, '/${schema.name}')
def coordinates = [
"mime.type": "application/json",
@ -207,8 +207,8 @@ class RestLookupServiceIT {
@Test
void testHeaders() {
runner.setProperty(lookupService, "X-USER", "jane.doe")
runner.setProperty(lookupService, "X-PASS", "testing7890")
runner.setProperty(lookupService, "X-USER", '${x.user}')
runner.setProperty(lookupService, "X-PASS", 'testing7890')
TestServer server = new TestServer()
ServletHandler handler = new ServletHandler()
@ -224,7 +224,7 @@ class RestLookupServiceIT {
"request.method": "get"
]
def context = [ "schema.name": "simple" ]
def context = [ 'schema.name': 'simple' , 'x.user': 'jane.doe']
Optional<Record> response = lookupService.lookup(coordinates, context)
Assert.assertTrue(response.isPresent())

View File

@ -17,7 +17,11 @@
package org.apache.nifi.lookup
import okhttp3.*
import okhttp3.MediaType
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody
import org.apache.nifi.lookup.rest.MockRestLookupService
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
@ -32,6 +36,7 @@ import org.junit.Before
import org.junit.Test
import static groovy.json.JsonOutput.toJson
import static org.junit.Assert.assertNotNull
class TestRestLookupService {
TestRunner runner
@ -50,6 +55,8 @@ class TestRestLookupService {
runner.setProperty(lookupService, RestLookupService.RECORD_READER, "recordReader")
runner.setProperty("Lookup Service", "lookupService")
runner.setProperty(lookupService, RestLookupService.URL, "http://localhost:8080")
// Add a dynamic property using Expression Language (expecting to be provided by FlowFile attribute)
runner.setProperty(lookupService, 'test', '${test.ff.attribute}')
runner.enableControllerService(lookupService)
runner.enableControllerService(recordReader)
runner.assertValid()
@ -66,8 +73,14 @@ class TestRestLookupService {
recordReader.addRecord("Sally Doe", 47, "Curling")
lookupService.response = buildResponse(toJson([ simpleTest: true]), JSON_TYPE)
def result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"))
def result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"), ['test.ff.attribute' : 'Hello'])
Assert.assertTrue(result.isPresent())
def headers = lookupService.getHeaders()
assertNotNull(headers)
def headerValue = headers.get('test')
assertNotNull(headerValue)
Assert.assertEquals(1, headerValue.size())
Assert.assertEquals('Hello', headerValue.get(0))
def record = result.get()
Assert.assertEquals("John Doe", record.getAsString("name"))
Assert.assertEquals(48, record.getAsInt("age"))
@ -118,23 +131,23 @@ class TestRestLookupService {
result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"))
Assert.assertTrue(result.isPresent())
record = result.get()
Assert.assertNotNull(record.getAsString("sport"))
assertNotNull(record.getAsString("sport"))
Assert.assertEquals("Soccer", record.getAsString("sport"))
}
private Map<String, Object> getCoordinates(String mimeType, String method) {
def retVal = [:]
private static Map<String, Object> getCoordinates(String mimeType, String method) {
def retVal = [:] as Map<String, Object>
retVal[RestLookupService.MIME_TYPE_KEY] = mimeType
retVal[RestLookupService.METHOD_KEY] = method
retVal
}
private Response buildResponse(String resp, String mimeType) {
private static Response buildResponse(String resp, String mimeType) {
return new Response.Builder()
.code(200)
.body(
ResponseBody.create(MediaType.parse(mimeType), resp)
ResponseBody.create(resp, MediaType.parse(mimeType))
)
.message("Test")
.protocol(Protocol.HTTP_1_1)

View File

@ -17,15 +17,22 @@
package org.apache.nifi.lookup.rest
import okhttp3.Headers
import okhttp3.Request
import okhttp3.Response
import org.apache.nifi.lookup.RestLookupService
class MockRestLookupService extends RestLookupService {
Response response
Headers headers
@Override
protected Response executeRequest(Request request) {
this.headers = request.headers()
return response
}
Map<String, List<String>> getHeaders() {
headers.toMultimap()
}
}