diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
new file mode 100644
index 0000000000..1f8c6a2402
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -0,0 +1,152 @@
+
+
+
+ 4.0.0
+
+ nifi-kudu-bundle
+ org.apache.nifi
+ 1.10.0-SNAPSHOT
+
+
+ nifi-kudu-controller-service
+ jar
+
+
+
+ None
+ 1.10.0
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+ ${exclude.tests}
+
+
+
+
+
+
+
+ org.apache.nifi
+ nifi-api
+ provided
+
+
+ org.apache.nifi
+ nifi-lookup-service-api
+ provided
+
+
+ org.apache.nifi
+ nifi-utils
+ 1.10.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ nifi-kerberos-credentials-service-api
+ provided
+
+
+ org.apache.nifi
+ nifi-record
+ provided
+
+
+ org.apache.nifi
+ nifi-security-utils
+ 1.10.0-SNAPSHOT
+ provided
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.apache.kudu
+ kudu-client
+ ${kudu.version}
+
+
+ org.apache.kudu
+ kudu-test-utils
+ ${kudu.version}
+ test
+
+
+
+
+ kudu-windows
+
+
+ Windows
+
+
+
+
+ **/*.java
+
+
+
+ kudu-linux
+
+
+ Unix
+
+
+
+
+ org.apache.kudu
+ kudu-binary
+ ${kudu.version}
+ ${os.detected.classifier}
+ test
+
+
+
+
+ kudu-mac
+
+
+ mac
+
+
+
+
+ org.apache.kudu
+ kudu-binary
+ ${kudu.version}
+ ${os.detected.classifier}
+ test
+
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
new file mode 100644
index 0000000000..b044f9a910
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.ReplicaSelection;
+import org.apache.kudu.client.RowResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.security.auth.login.LoginException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@CapabilityDescription("Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned")
+@Tags({"lookup", "enrich", "key", "value", "kudu"})
+public class KuduLookupService extends AbstractControllerService implements RecordLookupService {
+
+ public static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
+ .name("kudu-lu-masters")
+ .displayName("Kudu Masters")
+ .description("Comma separated addresses of the Kudu masters to connect to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
+ .name("kudu-lu-kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials to use for authentication")
+ .required(false)
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .build();
+
+ public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder()
+ .name("kudu-lu-operations-timeout-ms")
+ .displayName("Kudu Operation Timeout")
+ .description("Default timeout used for user operations (using sessions and scanners)")
+ .required(false)
+ .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final AllowableValue CLOSEST_REPLICA = new AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(), ReplicaSelection.CLOSEST_REPLICA.name(),
+ "Select the closest replica to the client. Replicas are classified from closest to furthest as follows: "+
+ "1) Local replicas 2) Replicas whose tablet server has the same location as the client 3) All other replicas");
+ public static final AllowableValue LEADER_ONLY = new AllowableValue(ReplicaSelection.LEADER_ONLY.toString(), ReplicaSelection.LEADER_ONLY.name(),
+ "Select the LEADER replica");
+ public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new PropertyDescriptor.Builder()
+ .name("kudu-lu-replica-selection")
+ .displayName("Kudu Replica Selection")
+ .description("Policy with which to choose amongst multiple replicas")
+ .required(true)
+ .defaultValue(CLOSEST_REPLICA.getValue())
+ .allowableValues(CLOSEST_REPLICA, LEADER_ONLY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
+ .name("kudu-lu-table-name")
+ .displayName("Kudu Table Name")
+ .description("Name of the table to access.")
+ .required(true)
+ .defaultValue("default")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
+ .name("kudu-lu-return-cols")
+ .displayName("Kudu Return Columns")
+ .description("A comma-separated list of columns to return when scanning. To return all columns set to \"*\"")
+ .required(true)
+ .defaultValue("*")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+
+ protected List properties;
+
+ protected KerberosCredentialsService credentialsService;
+ private volatile KerberosUser kerberosUser;
+
+ protected String kuduMasters;
+ protected KuduClient kuduClient;
+ protected ReplicaSelection replicaSelection;
+ protected volatile String tableName;
+ protected volatile KuduTable table;
+ protected volatile List columnNames;
+
+ protected volatile RecordSchema resultSchema;
+ protected volatile Schema tableSchema;
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) {
+ final List properties = new ArrayList<>();
+ properties.add(KUDU_MASTERS);
+ properties.add(KERBEROS_CREDENTIALS_SERVICE);
+ properties.add(KUDU_OPERATION_TIMEOUT_MS);
+ properties.add(KUDU_REPLICA_SELECTION);
+ properties.add(TABLE_NAME);
+ properties.add(RETURN_COLUMNS);
+ addProperties(properties);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ protected void addProperties(List properties) {
+ }
+
+ protected void createKuduClient(ConfigurationContext context) throws LoginException {
+ final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (credentialsService != null) {
+ final String keytab = credentialsService.getKeytab();
+ final String principal = credentialsService.getPrincipal();
+ kerberosUser = loginKerberosUser(principal, keytab);
+
+ final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
+ this.kuduClient = kerberosAction.execute();
+ } else {
+ this.kuduClient = buildClient(kuduMasters, context);
+ }
+ }
+
+ protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
+ kerberosUser.login();
+ return kerberosUser;
+ }
+
+ protected KuduClient buildClient(final String masters, final ConfigurationContext context) {
+ final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ return new KuduClient.KuduClientBuilder(masters)
+ .defaultOperationTimeoutMs(operationTimeout)
+ .build();
+ }
+
+ /**
+ * Establish a connection to a Kudu cluster.
+ * @param context the configuration context
+ * @throws InitializationException if unable to connect a Kudu cluster
+ */
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException {
+
+ try {
+ kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (kuduClient == null) {
+ getLogger().debug("Setting up Kudu connection...");
+
+ createKuduClient(context);
+ getLogger().debug("Kudu connection successfully initialized");
+ }
+ } catch(Exception ex){
+ getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
+ throw new InitializationException(ex);
+ }
+
+ replicaSelection = ReplicaSelection.valueOf(context.getProperty(KUDU_REPLICA_SELECTION).getValue());
+ tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+ try {
+ table = kuduClient.openTable(tableName);
+ tableSchema = table.getSchema();
+ columnNames = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
+
+ //Result Schema
+ resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
+
+ } catch (KuduException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Set getRequiredKeys() {
+ return new HashSet<>();
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Optional lookup(Map coordinates) {
+
+ //Scanner
+ KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table);
+
+ builder.setProjectedColumnNames(columnNames);
+ builder.replicaSelection(replicaSelection);
+
+ //Only expecting one match
+ builder.limit(1);
+
+ coordinates.forEach((key,value)->
+ builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key), KuduPredicate.ComparisonOp.EQUAL, value))
+ );
+
+ KuduScanner kuduScanner = builder.build();
+
+ //Run lookup
+ for ( RowResult row : kuduScanner){
+ final Map values = new HashMap<>();
+ for(String columnName : columnNames){
+ Object object;
+ if(row.getColumnType(columnName) == Type.BINARY){
+ object = Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
+ } else {
+ object = row.getObject(columnName);
+ }
+ values.put(columnName, object);
+ }
+ return Optional.of(new MapRecord(resultSchema, values));
+ }
+
+ //No match
+ return Optional.empty();
+ }
+
+ private List getColumns(String columns){
+ if(columns.equals("*")){
+ return tableSchema
+ .getColumns()
+ .stream().map(ColumnSchema::getName)
+ .collect(Collectors.toList());
+ } else {
+ return Arrays.asList(columns.split(","));
+ }
+ }
+
+ private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List columnNames){
+ final List fields = new ArrayList<>();
+ for(String columnName : columnNames) {
+ if(!kuduTableSchema.hasColumn(columnName)){
+ throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName);
+ }
+ ColumnSchema cs = kuduTableSchema.getColumn(columnName);
+ switch (cs.getType()) {
+ case INT8:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType()));
+ break;
+ case INT16:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType()));
+ break;
+ case INT32:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType()));
+ break;
+ case INT64:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
+ break;
+ case UNIXTIME_MICROS:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
+ break;
+ case BINARY:
+ case STRING:
+ case DECIMAL:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
+ break;
+ case DOUBLE:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType()));
+ break;
+ case BOOL:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType()));
+ break;
+ case FLOAT:
+ fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
+ break;
+ }
+ }
+ return new SimpleRecordSchema(fields);
+ }
+
+ /**
+ * Disconnect from the Kudu cluster.
+ */
+ @OnDisabled
+ public void onDisabled() throws Exception {
+ try {
+ if (this.kuduClient != null) {
+ getLogger().debug("Closing KuduClient");
+ this.kuduClient.close();
+ this.kuduClient = null;
+ }
+ } finally {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ kerberosUser = null;
+ }
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..240825c2ed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.controller.kudu.KuduLookupService
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
new file mode 100644
index 0000000000..0b746fee5b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.util.DecimalUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestKuduLookupService {
+
+ // The KuduTestHarness automatically starts and stops a real Kudu cluster
+ // when each test is run. Kudu persists its on-disk state in a temporary
+ // directory under a location defined by the environment variable TEST_TMPDIR
+ // if set, or under /tmp otherwise. That cluster data is deleted on
+ // successful exit of the test. The cluster output is logged through slf4j.
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness(
+ new MiniKuduCluster.MiniKuduClusterBuilder()
+ .addMasterServerFlag("--use_hybrid_clock=false")
+ .addTabletServerFlag("--use_hybrid_clock=false")
+ );
+ private TestRunner testRunner;
+ private long nowMillis = System.currentTimeMillis();
+ private KuduLookupService kuduLookupService;
+
+ public static class SampleProcessor extends AbstractProcessor {
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ }
+ }
+
+ @Before
+ public void init() throws Exception {
+ testRunner = TestRunners.newTestRunner(SampleProcessor.class);
+ testRunner.setValidateExpressionUsage(false);
+ final String tableName = "table1";
+
+ KuduClient client = harness.getClient();
+ List columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build());
+ columns.add(new ColumnSchema
+ .ColumnSchemaBuilder("decimal", Type.DECIMAL)
+ .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 1))
+ .build()
+ );
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int8", Type.INT8).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
+ Schema schema = new Schema(columns);
+
+ CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
+ client.createTable(tableName, schema, opts);
+
+ KuduTable table = client.openTable(tableName);
+ KuduSession session = client.newSession();
+
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addString("string", "string1");
+ row.addBinary("binary", "binary1".getBytes());
+ row.addBoolean("bool",true);
+ row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+ row.addDouble("double",0.2);
+ row.addFloat("float",0.3f);
+ row.addByte("int8", (byte) 1);
+ row.addShort("int16", (short) 2);
+ row.addInt("int32",3);
+ row.addLong("int64",4L);
+ row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
+ session.apply(insert);
+
+ insert = table.newInsert();
+ row = insert.getRow();
+ row.addString("string", "string2");
+ row.addBinary("binary", "binary2".getBytes());
+ row.addBoolean("bool",false);
+ row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+ row.addDouble("double",1.2);
+ row.addFloat("float",1.3f);
+ row.addByte("int8", (byte) 11);
+ row.addShort("int16", (short) 12);
+ row.addInt("int32",13);
+ row.addLong("int64",14L);
+ row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
+ session.apply(insert);
+
+ session.close();
+
+ kuduLookupService = new KuduLookupService();
+ testRunner.addControllerService("kuduLookupService", kuduLookupService);
+ testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_MASTERS, "testLocalHost:7051");
+ testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_REPLICA_SELECTION, KuduLookupService.LEADER_ONLY);
+ testRunner.setProperty(kuduLookupService, KuduLookupService.TABLE_NAME, tableName);
+ kuduLookupService.kuduClient = client;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void invalid_key() {
+ testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map map = new HashMap<>();
+ map.put("invalid", "invalid key");
+ kuduLookupService.lookup(map);
+ }
+ @Test
+ public void row_not_found() {
+ testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map map = new HashMap<>();
+ map.put("string", "key not found");
+ Optional result = kuduLookupService.lookup(map);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void single_key() {
+ testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map map = new HashMap<>();
+ map.put("string", "string1");
+ Record result = kuduLookupService.lookup(map).get();
+ validateRow1(result);
+ }
+ @Test
+ public void multi_key() {
+ testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map map = new HashMap<>();
+ map.put("string", "string1");
+ map.put("binary", "binary1".getBytes());
+ map.put("bool",true);
+ map.put("decimal", BigDecimal.valueOf(0.1));
+ map.put("double",0.2);
+ map.put("float",0.3f);
+ map.put("int8", (byte) 1);
+ map.put("int16", (short) 2);
+ map.put("int32",3);
+ map.put("int64",4L);
+ map.put("unixtime_micros", new Timestamp(nowMillis));
+ Record result = kuduLookupService.lookup(map).get();
+ validateRow1(result);
+ }
+ @Test
+ public void specific_return_columns() {
+ testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "binary,bool");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map map = new HashMap<>();
+ map.put("string", "string1");
+ Record result = kuduLookupService.lookup(map).get();
+
+ assertEquals(2,result.getValues().length);
+
+ assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
+ assertEquals(true, result.getAsBoolean("bool"));
+ }
+ private void validateRow1(Record result){
+
+ assertEquals("string1", result.getAsString("string"));
+ assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
+ assertEquals(true, result.getAsBoolean("bool"));
+ assertEquals(BigDecimal.valueOf(0.1), result.getValue("decimal"));
+ assertEquals(0.2, result.getAsDouble("double"),0);
+ assertEquals(0.3f, result.getAsFloat("float"),0);
+ assertEquals((byte)1, result.getValue("int8"));
+ assertEquals((short)2, result.getValue("int16"));
+ assertEquals(3, (int)result.getAsInt("int32"));
+ assertEquals(4L, (long)result.getAsLong("int64"));
+ assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
index 16d4e5ec37..4b530a04f2 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
@@ -32,6 +32,11 @@
nifi-kudu-processors
1.10.0-SNAPSHOT
+
+ org.apache.nifi
+ nifi-kudu-controller-service
+ 1.10.0-SNAPSHOT
+
org.apache.nifi
nifi-hadoop-libraries-nar
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
index 8a0759a2b1..bea7232425 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
@@ -30,6 +30,6 @@
nifi-kudu-processors
nifi-kudu-nar
+ nifi-kudu-controller-service
-