NIFI-12057 Refactored Groovy tests to Java in nifi-lookup-services

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2023-09-14 09:06:22 -05:00 committed by Joseph Witt
parent ea4c2055d6
commit 591c323f0a
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
18 changed files with 501 additions and 1059 deletions

View File

@ -127,6 +127,10 @@
<version>2.7</version> <version>2.7</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
@ -154,34 +158,6 @@
<artifactId>nifi-schema-registry-service-api</artifactId> <artifactId>nifi-schema-registry-service-api</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbytools</artifactId>
<version>${derby.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
@ -190,8 +166,6 @@
<artifactId>apache-rat-plugin</artifactId> <artifactId>apache-rat-plugin</artifactId>
<configuration> <configuration>
<excludes combine.children="append"> <excludes combine.children="append">
<exclude>src/test/resources/complex.avsc</exclude>
<exclude>src/test/resources/simple.avsc</exclude>
<exclude>src/test/resources/test.csv</exclude> <exclude>src/test/resources/test.csv</exclude>
<exclude>src/test/resources/test_sep_escape_comment.csv</exclude> <exclude>src/test/resources/test_sep_escape_comment.csv</exclude>
<exclude>src/test/resources/test_Windows-31J.csv</exclude> <exclude>src/test/resources/test_Windows-31J.csv</exclude>

View File

@ -1,160 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup
import okhttp3.MediaType
import okhttp3.Protocol
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody
import org.apache.nifi.lookup.rest.MockRestLookupService
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import static groovy.json.JsonOutput.toJson
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertTrue
class TestRestLookupService {
TestRunner runner
MockRecordParser recordReader
MockRestLookupService lookupService
static final String JSON_TYPE = "application/json"
@BeforeEach
void setup() {
recordReader = new MockRecordParser()
lookupService = new MockRestLookupService()
runner = TestRunners.newTestRunner(TestRestLookupServiceProcessor.class)
runner.setValidateExpressionUsage(false)
runner.addControllerService("lookupService", lookupService)
runner.addControllerService("recordReader", recordReader)
runner.setProperty(lookupService, RestLookupService.RECORD_READER, "recordReader")
runner.setProperty("Lookup Service", "lookupService")
runner.setProperty(lookupService, RestLookupService.URL, "http://localhost:8080")
// Add a dynamic property using Expression Language (expecting to be provided by FlowFile attribute)
runner.setProperty(lookupService, 'test', '${test.ff.attribute}')
runner.enableControllerService(lookupService)
runner.enableControllerService(recordReader)
runner.assertValid()
}
@Test
void testSimpleLookup() {
recordReader.addSchemaField("name", RecordFieldType.STRING)
recordReader.addSchemaField("age", RecordFieldType.INT)
recordReader.addSchemaField("sport", RecordFieldType.STRING)
recordReader.addRecord("John Doe", 48, "Soccer")
recordReader.addRecord("Jane Doe", 47, "Tennis")
recordReader.addRecord("Sally Doe", 47, "Curling")
lookupService.response = buildResponse(toJson([ simpleTest: true]), JSON_TYPE)
def result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"), ['test.ff.attribute' : 'Hello'])
assertTrue(result.isPresent())
def headers = lookupService.getHeaders()
assertNotNull(headers)
def headerValue = headers.get('test')
assertNotNull(headerValue)
assertEquals(1, headerValue.size())
assertEquals('Hello', headerValue.get(0))
def record = result.get()
assertEquals("John Doe", record.getAsString("name"))
assertEquals(48, record.getAsInt("age"))
assertEquals("Soccer", record.getAsString("sport"))
}
@Test
void testNestedLookup() {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, RestLookupService.RECORD_PATH, "/person")
runner.enableControllerService(lookupService)
runner.assertValid()
recordReader.addSchemaField("id", RecordFieldType.INT)
final List<RecordField> personFields = new ArrayList<>()
final RecordField nameField = new RecordField("name", RecordFieldType.STRING.getDataType())
final RecordField ageField = new RecordField("age", RecordFieldType.INT.getDataType())
final RecordField sportField = new RecordField("sport", RecordFieldType.STRING.getDataType())
personFields.add(nameField)
personFields.add(ageField)
personFields.add(sportField)
final RecordSchema personSchema = new SimpleRecordSchema(personFields)
recordReader.addSchemaField("person", RecordFieldType.RECORD)
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe")
put("age", 48)
put("sport", "Soccer")
}}))
lookupService.response = buildResponse(toJson([ simpleTest: true]), JSON_TYPE)
def result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"))
assertTrue(result.isPresent())
def record = result.get()
assertEquals("John Doe", record.getAsString("name"))
assertEquals(48, record.getAsInt("age"))
assertEquals("Soccer", record.getAsString("sport"))
/*
* Test deep lookup
*/
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, RestLookupService.RECORD_PATH, "/person/sport")
runner.enableControllerService(lookupService)
runner.assertValid()
result = lookupService.lookup(getCoordinates(JSON_TYPE, "get"))
assertTrue(result.isPresent())
record = result.get()
assertNotNull(record.getAsString("sport"))
assertEquals("Soccer", record.getAsString("sport"))
}
private static Map<String, Object> getCoordinates(String mimeType, String method) {
def retVal = [:] as Map<String, Object>
retVal[RestLookupService.MIME_TYPE_KEY] = mimeType
retVal[RestLookupService.METHOD_KEY] = method
retVal
}
private static Response buildResponse(String resp, String mimeType) {
return new Response.Builder()
.code(200)
.body(
ResponseBody.create(resp, MediaType.parse(mimeType))
)
.message("Test")
.protocol(Protocol.HTTP_1_1)
.request(new Request.Builder().url("http://localhost:8080").get().build())
.build()
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.processor.AbstractProcessor
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException
class TestRestLookupServiceProcessor extends AbstractProcessor {
static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("Lookup Service")
.description("RestLookupService")
.identifiesControllerService(RestLookupService.class)
.required(true)
.build()
@Override
void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>()
propDescs.add(CLIENT_SERVICE)
return propDescs
}
}

View File

@ -1,229 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.db
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.lookup.LookupFailureException
import org.apache.nifi.lookup.LookupService
import org.apache.nifi.lookup.TestProcessor
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.sql.Statement
import static org.hamcrest.CoreMatchers.instanceOf
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNull
class TestDatabaseRecordLookupService {
private TestRunner runner
private final static Optional<Record> EMPTY_RECORD = Optional.empty()
private final static String DB_LOCATION = "target/db"
@BeforeAll
static void setupClass() {
System.setProperty("derby.stream.error.file", "target/derby.log")
}
@BeforeEach
void setup() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl()
final Map<String, String> dbcpProperties = new HashMap<>()
runner = TestRunners.newTestRunner(TestProcessor.class)
runner.addControllerService("dbcp", dbcp, dbcpProperties)
runner.enableControllerService(dbcp)
}
@Test
void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION)
dbLocation.delete()
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
final Statement stmt = con.createStatement()
try {
stmt.execute("drop table TEST")
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
runner.addControllerService("db-lookup-service", service)
runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
runner.assertNotValid()
runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
runner.enableControllerService(service)
runner.assertValid(service)
def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
assertThat(lookupService, instanceOf(LookupService.class))
final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertNull(property1.get().getAsInt("VAL1"), "Should be null but is not")
assertEquals("Hello", property1.get().getAsString("VAL2"))
final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals(1, property2.get().getAsInt("VAL1"))
assertEquals("World", property2.get().getAsString("VAL2"))
// Key not found
final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
assertEquals(EMPTY_RECORD, property3)
}
@Test
void testDatabaseLookupServiceSpecifyColumns() throws InitializationException, IOException, LookupFailureException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION)
dbLocation.delete()
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
final Statement stmt = con.createStatement()
try {
stmt.execute("drop table TEST")
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
runner.addControllerService("db-lookup-service", service)
runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
runner.assertNotValid()
runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, "val1")
runner.enableControllerService(service)
runner.assertValid(service)
def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
assertThat(lookupService, instanceOf(LookupService.class))
final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertNull(property1.get().getAsInt("VAL1"), "Should be null but is not")
final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals(1, property2.get().getAsInt("VAL1"))
// Key not found
final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
assertEquals(EMPTY_RECORD, property3)
}
@Test
void exerciseCacheLogic() {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION)
dbLocation.delete()
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
final Statement stmt = con.createStatement()
try {
stmt.execute("drop table TEST")
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
final DatabaseRecordLookupService service = new DatabaseRecordLookupService()
runner.addControllerService("db-lookup-service", service)
runner.setProperty(service, DatabaseRecordLookupService.DBCP_SERVICE, "dbcp")
runner.assertNotValid()
runner.setProperty(service, DatabaseRecordLookupService.TABLE_NAME, "TEST")
runner.setProperty(service, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, "id")
runner.setProperty(service, DatabaseRecordLookupService.CACHE_SIZE, "10")
runner.enableControllerService(service)
runner.assertValid(service)
def lookupService = (DatabaseRecordLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
assertThat(lookupService, instanceOf(LookupService.class))
final Optional<Record> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals(1, property1.get().getAsInt("VAL1"))
assertEquals("World", property1.get().getAsString("VAL2"))
final Optional<Record> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals(1, property2.get().getAsInt("VAL1"))
assertEquals("World", property2.get().getAsString("VAL2"))
final Optional<Record> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertNull(property3.get().getAsInt("VAL1"))
assertEquals("Hello", property3.get().getAsString("VAL2"))
final Optional<Record> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertNull(property4.get().getAsInt("VAL1"))
assertEquals("Hello", property4.get().getAsString("VAL2"))
}
/**
* Simple implementation for component testing.
*
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
String getIdentifier() {
"dbcp"
}
@Override
Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
} catch (e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -1,186 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.db
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.lookup.LookupFailureException
import org.apache.nifi.lookup.LookupService
import org.apache.nifi.lookup.TestProcessor
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.sql.Connection
import java.sql.DriverManager
import java.sql.SQLException
import java.sql.Statement
import static org.hamcrest.CoreMatchers.instanceOf
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertFalse
class TestSimpleDatabaseLookupService {
private TestRunner runner
private final static Optional<Record> EMPTY_RECORD = Optional.empty()
private final static String DB_LOCATION = "target/db"
@BeforeAll
static void setupClass() {
System.setProperty("derby.stream.error.file", "target/derby.log")
}
@BeforeEach
void setup() throws InitializationException {
final DBCPService dbcp = new DBCPServiceSimpleImpl()
final Map<String, String> dbcpProperties = new HashMap<>()
runner = TestRunners.newTestRunner(TestProcessor.class)
runner.addControllerService("dbcp", dbcp, dbcpProperties)
runner.enableControllerService(dbcp)
}
@Test
void testDatabaseLookupService() throws InitializationException, IOException, LookupFailureException {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION)
dbLocation.delete()
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
final Statement stmt = con.createStatement()
try {
stmt.execute("drop table TEST")
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
runner.addControllerService("db-lookup-service", service)
runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
runner.assertNotValid()
runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
runner.enableControllerService(service)
runner.assertValid(service)
def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
assertThat(lookupService, instanceOf(LookupService.class))
// Lookup VAL1
final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertFalse(property1.isPresent())
// Key not found
final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "2"))
assertEquals(EMPTY_RECORD, property3)
runner.disableControllerService(service)
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
runner.enableControllerService(service)
final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals("World", property2.get())
}
@Test
void exerciseCacheLogic() {
// remove previous test database, if any
final File dbLocation = new File(DB_LOCATION)
dbLocation.delete()
// load test data to database
final Connection con = ((DBCPService) runner.getControllerService("dbcp")).connection
final Statement stmt = con.createStatement()
try {
stmt.execute("drop table TEST")
} catch (final SQLException sqle) {
}
stmt.execute("create table TEST (id integer not null, val1 integer, val2 varchar(10), constraint my_pk primary key (id))")
stmt.execute("insert into TEST (id, val1, val2) VALUES (0, NULL, 'Hello')")
stmt.execute("insert into TEST (id, val1, val2) VALUES (1, 1, 'World')")
final SimpleDatabaseLookupService service = new SimpleDatabaseLookupService()
runner.addControllerService("db-lookup-service", service)
runner.setProperty(service, SimpleDatabaseLookupService.DBCP_SERVICE, "dbcp")
runner.assertNotValid()
runner.setProperty(service, SimpleDatabaseLookupService.TABLE_NAME, "TEST")
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, "id")
runner.setProperty(service, SimpleDatabaseLookupService.CACHE_SIZE, "10")
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL1")
runner.enableControllerService(service)
runner.assertValid(service)
def lookupService = (SimpleDatabaseLookupService) runner.processContext.controllerServiceLookup.getControllerService("db-lookup-service")
assertThat(lookupService, instanceOf(LookupService.class))
// Lookup VAL1
final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals("1", property1.get())
final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertFalse(property3.isPresent())
runner.disableControllerService(service)
runner.setProperty(service, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, "VAL2")
runner.enableControllerService(service)
final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "1"))
assertEquals("World", property2.get())
final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "0"))
assertEquals("Hello", property4.get())
}
/**
* Simple implementation for component testing.
*
*/
class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
@Override
String getIdentifier() {
"dbcp"
}
@Override
Connection getConnection() throws ProcessException {
try {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
DriverManager.getConnection("jdbc:derby:${DB_LOCATION};create=true")
} catch (e) {
throw new ProcessException("getConnection failed: " + e);
}
}
}
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest
import okhttp3.Headers
import okhttp3.Request
import okhttp3.Response
import org.apache.nifi.lookup.RestLookupService
class MockRestLookupService extends RestLookupService {
Response response
Headers headers
@Override
protected Response executeRequest(Request request) {
this.headers = request.headers()
return response
}
Map<String, List<String>> getHeaders() {
headers.toMultimap()
}
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest
class SchemaUtil {
static final String SIMPLE = SchemaUtil.class.getResourceAsStream("/simple.avsc").text
static final String COMPLEX = SchemaUtil.class.getResourceAsStream("/complex.avsc").text
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import org.eclipse.jetty.server.Request
import org.eclipse.jetty.server.handler.AbstractHandler
import javax.servlet.ServletException
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class BasicAuth extends AbstractHandler {
@Override
void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
baseRequest.handled = true
def authString = request.getHeader("Authorization")
def headers = []
request.headerNames.each { headers << it }
if (!authString || authString != "Basic am9obi5zbWl0aDp0ZXN0aW5nMTIzNA==") {
response.status = 401
response.setHeader("WWW-Authenticate", "Basic realm=\"Jetty\"")
response.setHeader("response.phrase", "Unauthorized")
response.contentType = "text/plain"
response.writer.println("Get off my lawn!")
return
}
response.writer.println(prettyPrint(
toJson([
username: "john.smith",
password: "testing1234"
])
))
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class ComplexJson extends HttpServlet {
@Override
void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.contentType = "application/json"
response.outputStream.write(prettyPrint(
toJson([
top: [
middle: [
inner: [
"username": "jane.doe",
"password": "testing7890",
"email": "jane.doe@test-example.com"
]
]
]
])
).bytes)
}
}

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class NoRecord extends HttpServlet {
Logger logger = LoggerFactory.getLogger(NoRecord.class)
@Override
void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.contentType = "application/json"
response.outputStream.write(prettyPrint(
toJson([:])
).bytes)
}
}

View File

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class SimpleJson extends HttpServlet {
Logger logger = LoggerFactory.getLogger(SimpleJson.class)
@Override
void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
String u = request.getHeader("X-USER")
String p = request.getHeader("X-PASS")
response.contentType = "application/json"
response.outputStream.write(prettyPrint(
toJson([
username: u ?: "john.smith",
password: p ?: "testing1234"
])
).bytes)
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class SimpleJsonArray extends HttpServlet {
@Override
void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.contentType = "application/json"
response.outputStream.write(prettyPrint(
toJson([[
username: "john.smith",
password: "testing1234"
],
[
username: "jane.doe",
password: "testing7890"
]])
).bytes)
}
}

View File

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.rest.handlers
import org.apache.nifi.util.StringUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.servlet.http.HttpServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
import static org.junit.jupiter.api.Assertions.assertFalse
import static org.junit.jupiter.api.Assertions.assertNotNull
class VerbTest extends HttpServlet {
Logger logger = LoggerFactory.getLogger(VerbTest.class)
void doDelete(HttpServletRequest request, HttpServletResponse response) {
validateBody(request)
sendResponse(response)
}
void doPost(HttpServletRequest request, HttpServletResponse response) {
validateBody(request)
sendResponse(response)
}
void doPut(HttpServletRequest request, HttpServletResponse response) {
validateBody(request)
sendResponse(response)
}
void sendResponse(HttpServletResponse response) {
response.contentType = "application/json"
response.outputStream.write(prettyPrint(
toJson([
username: "john.smith",
password: "testing1234"
])
).bytes)
}
void validateBody(HttpServletRequest request) {
String needsBody = request.getHeader("needs-body")
boolean bodyRequired = !StringUtils.isBlank(needsBody)
String body = request.inputStream.text
if (bodyRequired) {
assertNotNull(body)
assertFalse(StringUtils.isBlank(body))
}
}
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@Timeout(10)
@ExtendWith(MockitoExtension.class)
class TestRestLookupService {
private static final String SERVICE_ID = RestLookupService.class.getSimpleName();
private static final String READER_ID = RecordReaderFactory.class.getSimpleName();
private static final String ROOT_PATH = "/";
private static final String GET_METHOD = "GET";
private static final String POST_METHOD = "POST";
private static final String POST_BODY = "{}";
private static final String APPLICATION_JSON = "application/json; charset=utf-8";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String TIMEOUT = "5 s";
private static final String SHORT_TIMEOUT = "100 ms";
private MockWebServer mockWebServer;
private TestRunner runner;
@Mock
private RecordReaderFactory recordReaderFactory;
@Mock
private RecordReader recordReader;
@Mock
private Record record;
private RestLookupService restLookupService;
@BeforeEach
void setRunner() throws InitializationException {
mockWebServer = new MockWebServer();
runner = TestRunners.newTestRunner(NoOpProcessor.class);
restLookupService = new RestLookupService();
when(recordReaderFactory.getIdentifier()).thenReturn(READER_ID);
runner.addControllerService(READER_ID, recordReaderFactory);
runner.addControllerService(SERVICE_ID, restLookupService);
final String url = mockWebServer.url(ROOT_PATH).toString();
runner.setProperty(restLookupService, RestLookupService.URL, url);
runner.setProperty(restLookupService, RestLookupService.RECORD_READER, READER_ID);
runner.setProperty(restLookupService, RestLookupService.PROP_CONNECT_TIMEOUT, TIMEOUT);
runner.setProperty(restLookupService, RestLookupService.PROP_READ_TIMEOUT, TIMEOUT);
}
@AfterEach
void shutdownServer() throws IOException {
mockWebServer.shutdown();
}
@Test
void testLookupSocketTimeout() {
runner.setProperty(restLookupService, RestLookupService.PROP_READ_TIMEOUT, SHORT_TIMEOUT);
runner.enableControllerService(restLookupService);
final LookupFailureException exception = assertThrows(LookupFailureException.class, () -> restLookupService.lookup(Collections.emptyMap()));
assertInstanceOf(SocketTimeoutException.class, exception.getCause());
}
@Test
void testLookupRecordNotPresent() throws Exception {
runner.enableControllerService(restLookupService);
when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenReturn(recordReader);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
final Optional<Record> recordFound = restLookupService.lookup(Collections.emptyMap());
assertFalse(recordFound.isPresent());
assertRecordedRequestFound();
}
@Test
void testLookupRecordFound() throws Exception {
runner.enableControllerService(restLookupService);
when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenReturn(recordReader);
when(recordReader.nextRecord()).thenReturn(record);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
final Optional<Record> recordFound = restLookupService.lookup(Collections.emptyMap());
assertTrue(recordFound.isPresent());
assertRecordedRequestFound();
}
@Test
void testLookupRecordFoundPostMethod() throws Exception {
runner.enableControllerService(restLookupService);
when(recordReaderFactory.createRecordReader(any(), any(), anyLong(), any())).thenReturn(recordReader);
when(recordReader.nextRecord()).thenReturn(record);
mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_OK));
final Map<String, Object> coordinates = new LinkedHashMap<>();
coordinates.put(RestLookupService.METHOD_KEY, POST_METHOD);
coordinates.put(RestLookupService.BODY_KEY, POST_BODY);
coordinates.put(RestLookupService.MIME_TYPE_KEY, APPLICATION_JSON);
final Optional<Record> recordFound = restLookupService.lookup(coordinates);
assertTrue(recordFound.isPresent());
assertPostRecordedRequestFound();
}
private void assertRecordedRequestFound() throws InterruptedException {
final RecordedRequest request = mockWebServer.takeRequest();
assertEquals(GET_METHOD, request.getMethod());
assertEquals(ROOT_PATH, request.getPath());
}
private void assertPostRecordedRequestFound() throws InterruptedException {
final RecordedRequest request = mockWebServer.takeRequest();
assertEquals(POST_METHOD, request.getMethod());
assertEquals(ROOT_PATH, request.getPath());
assertEquals(APPLICATION_JSON, request.getHeader(CONTENT_TYPE_HEADER));
final String body = request.getBody().readString(StandardCharsets.UTF_8);
assertEquals(POST_BODY, body);
}
}

View File

@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.db;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestDatabaseRecordLookupService {
private static final String SERVICE_ID = DatabaseRecordLookupService.class.getSimpleName();
private static final String DBCP_SERVICE_ID = DBCPService.class.getSimpleName();
private static final String TABLE_NAME = "Person";
private static final String LOOKUP_KEY_COLUMN = "Name";
private static final String LOOKUP_VALUE_COLUMN = "ID";
private static final String LOOKUP_VALUE = "12345";
private static final String LOOKUP_KEY_PROPERTY = "key";
private static final String LOOKUP_KEY = "First";
private static final String EXPECTED_STATEMENT = String.format("SELECT %s FROM %s WHERE %s = ?", LOOKUP_VALUE_COLUMN, TABLE_NAME, LOOKUP_KEY_COLUMN);
private TestRunner runner;
@Mock
private DBCPService dbcpService;
@Mock
private Connection connection;
@Mock
private PreparedStatement preparedStatement;
@Mock
private ResultSet resultSet;
@Mock
private ResultSetMetaData resultSetMetaData;
@Captor
private ArgumentCaptor<String> statementCaptor;
private DatabaseRecordLookupService lookupService;
@BeforeEach
void setRunner() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
when(dbcpService.getIdentifier()).thenReturn(DBCP_SERVICE_ID);
runner.addControllerService(DBCP_SERVICE_ID, dbcpService);
runner.enableControllerService(dbcpService);
lookupService = new DatabaseRecordLookupService();
runner.addControllerService(SERVICE_ID, lookupService);
runner.setProperty(lookupService, DatabaseRecordLookupService.DBCP_SERVICE, DBCP_SERVICE_ID);
runner.setProperty(lookupService, DatabaseRecordLookupService.TABLE_NAME, TABLE_NAME);
runner.setProperty(lookupService, DatabaseRecordLookupService.LOOKUP_KEY_COLUMN, LOOKUP_KEY_COLUMN);
runner.setProperty(lookupService, DatabaseRecordLookupService.LOOKUP_VALUE_COLUMNS, LOOKUP_VALUE_COLUMN);
}
@Test
void testLookupEmpty() throws LookupFailureException, SQLException {
runner.enableControllerService(lookupService);
setConnection();
final Map<String, Object> coordinates = Collections.singletonMap(LOOKUP_KEY_PROPERTY, LOOKUP_KEY);
final Optional<Record> lookupFound = lookupService.lookup(coordinates);
assertFalse(lookupFound.isPresent());
assertPreparedStatementExpected();
}
@Test
void testLookupFound() throws LookupFailureException, SQLException {
runner.enableControllerService(lookupService);
setConnection();
setResultSetMetaData();
when(resultSet.next()).thenReturn(true);
when(resultSet.getObject(eq(LOOKUP_VALUE_COLUMN))).thenReturn(LOOKUP_VALUE);
final Map<String, Object> coordinates = Collections.singletonMap(LOOKUP_KEY_PROPERTY, LOOKUP_KEY);
final Optional<Record> lookupFound = lookupService.lookup(coordinates);
assertTrue(lookupFound.isPresent());
}
private void setConnection() throws SQLException {
when(dbcpService.getConnection(any())).thenReturn(connection);
when(connection.prepareStatement(anyString())).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.getMetaData()).thenReturn(resultSetMetaData);
}
private void setResultSetMetaData() throws SQLException {
final int columnIndex = 1;
when(resultSetMetaData.getColumnCount()).thenReturn(1);
when(resultSetMetaData.getColumnType(eq(columnIndex))).thenReturn(JDBCType.VARCHAR.getVendorTypeNumber());
when(resultSetMetaData.getColumnLabel(eq(columnIndex))).thenReturn(LOOKUP_VALUE_COLUMN);
when(resultSetMetaData.isNullable(eq(columnIndex))).thenReturn(ResultSetMetaData.columnNoNulls);
}
private void assertPreparedStatementExpected() throws SQLException {
verify(connection).prepareStatement(statementCaptor.capture());
final String statement = statementCaptor.getValue();
assertEquals(EXPECTED_STATEMENT, statement);
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.lookup.db;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.NoOpProcessor;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class TestSimpleDatabaseLookupService {
private static final String SERVICE_ID = SimpleDatabaseLookupService.class.getSimpleName();
private static final String DBCP_SERVICE_ID = DBCPService.class.getSimpleName();
private static final String TABLE_NAME = "Person";
private static final String LOOKUP_KEY_COLUMN = "Name";
private static final String LOOKUP_VALUE_COLUMN = "ID";
private static final String LOOKUP_VALUE = "12345";
private static final String LOOKUP_KEY_PROPERTY = "key";
private static final String LOOKUP_KEY = "First";
private static final String EXPECTED_STATEMENT = String.format("SELECT %s FROM %s WHERE %s = ?", LOOKUP_VALUE_COLUMN, TABLE_NAME, LOOKUP_KEY_COLUMN);
private TestRunner runner;
@Mock
private DBCPService dbcpService;
@Mock
private Connection connection;
@Mock
private PreparedStatement preparedStatement;
@Mock
private ResultSet resultSet;
@Captor
private ArgumentCaptor<String> statementCaptor;
private SimpleDatabaseLookupService lookupService;
@BeforeEach
void setRunner() throws InitializationException {
runner = TestRunners.newTestRunner(NoOpProcessor.class);
when(dbcpService.getIdentifier()).thenReturn(DBCP_SERVICE_ID);
runner.addControllerService(DBCP_SERVICE_ID, dbcpService);
runner.enableControllerService(dbcpService);
lookupService = new SimpleDatabaseLookupService();
runner.addControllerService(SERVICE_ID, lookupService);
runner.setProperty(lookupService, SimpleDatabaseLookupService.DBCP_SERVICE, DBCP_SERVICE_ID);
runner.setProperty(lookupService, SimpleDatabaseLookupService.TABLE_NAME, TABLE_NAME);
runner.setProperty(lookupService, SimpleDatabaseLookupService.LOOKUP_KEY_COLUMN, LOOKUP_KEY_COLUMN);
runner.setProperty(lookupService, SimpleDatabaseLookupService.LOOKUP_VALUE_COLUMN, LOOKUP_VALUE_COLUMN);
}
@Test
void testLookupEmpty() throws LookupFailureException, SQLException {
runner.enableControllerService(lookupService);
setConnection();
final Map<String, Object> coordinates = Collections.singletonMap(LOOKUP_KEY_PROPERTY, LOOKUP_KEY);
final Optional<String> lookupFound = lookupService.lookup(coordinates);
assertFalse(lookupFound.isPresent());
assertPreparedStatementExpected();
}
@Test
void testLookupFound() throws LookupFailureException, SQLException {
runner.enableControllerService(lookupService);
setConnection();
when(resultSet.next()).thenReturn(true);
when(resultSet.getObject(eq(LOOKUP_VALUE_COLUMN))).thenReturn(LOOKUP_VALUE);
final Map<String, Object> coordinates = Collections.singletonMap(LOOKUP_KEY_PROPERTY, LOOKUP_KEY);
final Optional<String> lookupFound = lookupService.lookup(coordinates);
assertTrue(lookupFound.isPresent());
assertEquals(LOOKUP_VALUE, lookupFound.get());
assertPreparedStatementExpected();
}
private void setConnection() throws SQLException {
when(dbcpService.getConnection(any())).thenReturn(connection);
when(connection.prepareStatement(anyString())).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
}
private void assertPreparedStatementExpected() throws SQLException {
verify(connection).prepareStatement(statementCaptor.capture());
final String statement = statementCaptor.getValue();
assertEquals(EXPECTED_STATEMENT, statement);
}
}

View File

@ -1,36 +0,0 @@
{
"type": "record",
"name": "ComplexRecord",
"fields": [
{
"name": "top",
"type": {
"type": "record",
"name": "TopRecord",
"fields": [
{
"name": "middle",
"type": {
"name": "MiddleRecord",
"type": "record",
"fields": [
{
"name": "inner",
"type": {
"type": "record",
"name": "InnerRecord",
"fields": [
{ "name": "username", "type": "string" },
{ "name": "password", "type": "string" },
{ "name": "email", "type": "string" }
]
}
}
]
}
}
]
}
}
]
}

View File

@ -1,14 +0,0 @@
{
"type": "record",
"name": "SimpleRecord",
"fields": [
{
"name": "username",
"type": "string"
},
{
"name": "password",
"type": "string"
}
]
}