mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 10:38:33 +00:00
NIFI-5947 Elasticsearch lookup service compatible with LookupAttribute
NIFI-5947 Made the lookup key for ElasticSearchStringLookupService more unique to the service (was previously 'id'); change by Mike Thomsen This closes #3268. Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
parent
033b2a1940
commit
45b32e3bc1
@ -19,6 +19,8 @@ package org.apache.nifi.elasticsearch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.jayway.jsonpath.JsonPath;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
@ -46,6 +48,9 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@CapabilityDescription("Lookup a record from Elasticsearch Server associated with the specified document ID. " +
|
||||
"The coordinates that are passed to the lookup must contain the key 'id'.")
|
||||
@Tags({"lookup", "enrich", "record", "elasticsearch"})
|
||||
public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryService implements LookupService<Record> {
|
||||
public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("el-rest-client-service")
|
||||
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.elasticsearch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.lookup.LookupFailureException;
|
||||
import org.apache.nifi.lookup.StringLookupService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
@CapabilityDescription("Lookup a string value from Elasticsearch Server associated with the specified document ID. " +
|
||||
"The coordinates that are passed to the lookup must contain the key 'id'.")
|
||||
@Tags({"lookup", "enrich", "value", "key", "elasticsearch"})
|
||||
public class ElasticSearchStringLookupService extends AbstractControllerService implements StringLookupService {
|
||||
public static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("el-rest-client-service")
|
||||
.displayName("Client Service")
|
||||
.description("An ElasticSearch client service to use for running queries.")
|
||||
.identifiesControllerService(ElasticSearchClientService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
|
||||
.name("el-lookup-index")
|
||||
.displayName("Index")
|
||||
.description("The name of the index to read from")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
|
||||
.name("el-lookup-type")
|
||||
.displayName("Type")
|
||||
.description("The type of this document (used by Elasticsearch for indexing and searching)")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS = Arrays.asList(CLIENT_SERVICE, INDEX, TYPE);
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
public static final String ID = "es_document_id";
|
||||
private ElasticSearchClientService esClient;
|
||||
private String index;
|
||||
private String type;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
esClient = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
|
||||
index = context.getProperty(INDEX).evaluateAttributeExpressions().getValue();
|
||||
type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
|
||||
try {
|
||||
final String id = (String) coordinates.get(ID);
|
||||
final Map<String, Object> enums = esClient.get(index, type, id);
|
||||
return Optional.of(mapper.writeValueAsString(enums));
|
||||
} catch (IOException e) {
|
||||
throw new LookupFailureException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
return Collections.singleton(ID);
|
||||
}
|
||||
}
|
@ -13,4 +13,5 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.elasticsearch.ElasticSearchLookupService
|
||||
org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
|
||||
org.apache.nifi.elasticsearch.ElasticSearchStringLookupService
|
||||
org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
|
||||
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.elasticsearch.integration
|
||||
|
||||
import org.apache.nifi.elasticsearch.ElasticSearchClientService
|
||||
import org.apache.nifi.elasticsearch.ElasticSearchStringLookupService
|
||||
import org.apache.nifi.util.TestRunner
|
||||
import org.apache.nifi.util.TestRunners
|
||||
import org.junit.Assert
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
|
||||
class ElasticSearchStringLookupServiceTest {
|
||||
ElasticSearchClientService mockClientService
|
||||
ElasticSearchStringLookupService lookupService
|
||||
TestRunner runner
|
||||
|
||||
@Before
|
||||
void setup() throws Exception {
|
||||
mockClientService = new TestElasticSearchClientService()
|
||||
lookupService = new ElasticSearchStringLookupService()
|
||||
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
|
||||
runner.addControllerService("clientService", mockClientService)
|
||||
runner.addControllerService("lookupService", lookupService)
|
||||
runner.enableControllerService(mockClientService)
|
||||
runner.setProperty(lookupService, ElasticSearchStringLookupService.CLIENT_SERVICE, "clientService")
|
||||
runner.setProperty(lookupService, ElasticSearchStringLookupService.INDEX, "users")
|
||||
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "clientService")
|
||||
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "lookupService")
|
||||
runner.enableControllerService(lookupService)
|
||||
}
|
||||
|
||||
@Test
|
||||
void simpleLookupTest() throws Exception {
|
||||
def coordinates = [ (ElasticSearchStringLookupService.ID): "12345" ]
|
||||
|
||||
Optional<String> result = lookupService.lookup(coordinates)
|
||||
|
||||
Assert.assertNotNull(result)
|
||||
Assert.assertTrue(result.isPresent())
|
||||
String json = result.get()
|
||||
Assert.assertEquals('{"username":"john.smith","password":"testing1234","email":"john.smith@test.com","position":"Software Engineer"}', json)
|
||||
}
|
||||
}
|
@ -25,6 +25,13 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
|
||||
import org.apache.nifi.elasticsearch.SearchResponse
|
||||
|
||||
class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
|
||||
Map data = [
|
||||
"username": "john.smith",
|
||||
"password": "testing1234",
|
||||
"email": "john.smith@test.com",
|
||||
"position": "Software Engineer"
|
||||
]
|
||||
|
||||
@Override
|
||||
IndexOperationResponse add(IndexOperationRequest operation) throws IOException {
|
||||
return null
|
||||
@ -52,18 +59,13 @@ class TestElasticSearchClientService extends AbstractControllerService implement
|
||||
|
||||
@Override
|
||||
Map<String, Object> get(String index, String type, String id) throws IOException {
|
||||
return null
|
||||
return data
|
||||
}
|
||||
|
||||
@Override
|
||||
SearchResponse search(String query, String index, String type) throws IOException {
|
||||
List hits = [[
|
||||
"_source": [
|
||||
"username": "john.smith",
|
||||
"password": "testing1234",
|
||||
"email": "john.smith@test.com",
|
||||
"position": "Software Engineer"
|
||||
]
|
||||
"_source": data
|
||||
]]
|
||||
return new SearchResponse(hits, null, 1, 100, false)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user