From 2ab99970b778b09e3544867310458acd0913fbbc Mon Sep 17 00:00:00 2001 From: samhjelmfelt Date: Thu, 12 Sep 2019 18:55:37 -0500 Subject: [PATCH] NIFI-6662: Adding Kudu Lookup Service NIFI-6662: Cleaning up Kudu logic NIFI-6662: Minor enhancements and build fixes NIFI-6662: This closes #3732. Signed-off-by: Joe Witt --- .../nifi-kudu-controller-service/pom.xml | 152 ++++++++ .../controller/kudu/KuduLookupService.java | 354 ++++++++++++++++++ ...g.apache.nifi.controller.ControllerService | 16 + .../kudu/TestKuduLookupService.java | 234 ++++++++++++ .../nifi-kudu-bundle/nifi-kudu-nar/pom.xml | 5 + nifi-nar-bundles/nifi-kudu-bundle/pom.xml | 2 +- 6 files changed, 762 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml new file mode 100644 index 0000000000..1f8c6a2402 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml @@ -0,0 +1,152 @@ + + + + 4.0.0 + + nifi-kudu-bundle + org.apache.nifi + 1.10.0-SNAPSHOT + + + nifi-kudu-controller-service + jar + + + + None + 1.10.0 + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${exclude.tests} + + + + + + + + org.apache.nifi + nifi-api + provided + + + org.apache.nifi + nifi-lookup-service-api + provided + + + org.apache.nifi + nifi-utils + 1.10.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-kerberos-credentials-service-api + provided + + + org.apache.nifi + nifi-record + provided + + + org.apache.nifi + nifi-security-utils + 1.10.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-mock + test + + + org.apache.kudu + kudu-client + ${kudu.version} + + + org.apache.kudu + kudu-test-utils + ${kudu.version} + test + + + + + kudu-windows + + + Windows + + + + + **/*.java + + + + kudu-linux + + + Unix + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + ${os.detected.classifier} + test + + + + + kudu-mac + + + mac + + + + + org.apache.kudu + kudu-binary + ${kudu.version} + ${os.detected.classifier} + test + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java new file mode 100644 index 0000000000..b044f9a910 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.kudu; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.ReplicaSelection; +import org.apache.kudu.client.RowResult; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.lookup.RecordLookupService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.security.krb.KerberosAction; +import org.apache.nifi.security.krb.KerberosKeytabUser; +import org.apache.nifi.security.krb.KerberosUser; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import javax.security.auth.login.LoginException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +@CapabilityDescription("Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned") +@Tags({"lookup", "enrich", "key", "value", "kudu"}) +public class KuduLookupService extends AbstractControllerService implements RecordLookupService { + + public static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder() + .name("kudu-lu-masters") + .displayName("Kudu Masters") + .description("Comma separated addresses of the Kudu masters to connect to.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("kudu-lu-kerberos-credentials-service") + .displayName("Kerberos Credentials Service") + .description("Specifies the Kerberos Credentials to use for authentication") + .required(false) + .identifiesControllerService(KerberosCredentialsService.class) + .build(); + + public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("kudu-lu-operations-timeout-ms") + .displayName("Kudu Operation Timeout") + .description("Default timeout used for user operations (using sessions and scanners)") + .required(false) + .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final AllowableValue CLOSEST_REPLICA = new AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(), ReplicaSelection.CLOSEST_REPLICA.name(), + "Select the closest replica to the client. Replicas are classified from closest to furthest as follows: "+ + "1) Local replicas 2) Replicas whose tablet server has the same location as the client 3) All other replicas"); + public static final AllowableValue LEADER_ONLY = new AllowableValue(ReplicaSelection.LEADER_ONLY.toString(), ReplicaSelection.LEADER_ONLY.name(), + "Select the LEADER replica"); + public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new PropertyDescriptor.Builder() + .name("kudu-lu-replica-selection") + .displayName("Kudu Replica Selection") + .description("Policy with which to choose amongst multiple replicas") + .required(true) + .defaultValue(CLOSEST_REPLICA.getValue()) + .allowableValues(CLOSEST_REPLICA, LEADER_ONLY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("kudu-lu-table-name") + .displayName("Kudu Table Name") + .description("Name of the table to access.") + .required(true) + .defaultValue("default") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder() + .name("kudu-lu-return-cols") + .displayName("Kudu Return Columns") + .description("A comma-separated list of columns to return when scanning. To return all columns set to \"*\"") + .required(true) + .defaultValue("*") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + + protected List properties; + + protected KerberosCredentialsService credentialsService; + private volatile KerberosUser kerberosUser; + + protected String kuduMasters; + protected KuduClient kuduClient; + protected ReplicaSelection replicaSelection; + protected volatile String tableName; + protected volatile KuduTable table; + protected volatile List columnNames; + + protected volatile RecordSchema resultSchema; + protected volatile Schema tableSchema; + + @Override + protected void init(final ControllerServiceInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(KUDU_MASTERS); + properties.add(KERBEROS_CREDENTIALS_SERVICE); + properties.add(KUDU_OPERATION_TIMEOUT_MS); + properties.add(KUDU_REPLICA_SELECTION); + properties.add(TABLE_NAME); + properties.add(RETURN_COLUMNS); + addProperties(properties); + this.properties = Collections.unmodifiableList(properties); + } + + protected void addProperties(List properties) { + } + + protected void createKuduClient(ConfigurationContext context) throws LoginException { + final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); + final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + if (credentialsService != null) { + final String keytab = credentialsService.getKeytab(); + final String principal = credentialsService.getPrincipal(); + kerberosUser = loginKerberosUser(principal, keytab); + + final KerberosAction kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger()); + this.kuduClient = kerberosAction.execute(); + } else { + this.kuduClient = buildClient(kuduMasters, context); + } + } + + protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException { + final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab); + kerberosUser.login(); + return kerberosUser; + } + + protected KuduClient buildClient(final String masters, final ConfigurationContext context) { + final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + + return new KuduClient.KuduClientBuilder(masters) + .defaultOperationTimeoutMs(operationTimeout) + .build(); + } + + /** + * Establish a connection to a Kudu cluster. + * @param context the configuration context + * @throws InitializationException if unable to connect a Kudu cluster + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) throws InitializationException { + + try { + kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); + credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class); + + if (kuduClient == null) { + getLogger().debug("Setting up Kudu connection..."); + + createKuduClient(context); + getLogger().debug("Kudu connection successfully initialized"); + } + } catch(Exception ex){ + getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); + throw new InitializationException(ex); + } + + replicaSelection = ReplicaSelection.valueOf(context.getProperty(KUDU_REPLICA_SELECTION).getValue()); + tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + try { + table = kuduClient.openTable(tableName); + tableSchema = table.getSchema(); + columnNames = getColumns(context.getProperty(RETURN_COLUMNS).getValue()); + + //Result Schema + resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames); + + } catch (KuduException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Set getRequiredKeys() { + return new HashSet<>(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Optional lookup(Map coordinates) { + + //Scanner + KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table); + + builder.setProjectedColumnNames(columnNames); + builder.replicaSelection(replicaSelection); + + //Only expecting one match + builder.limit(1); + + coordinates.forEach((key,value)-> + builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key), KuduPredicate.ComparisonOp.EQUAL, value)) + ); + + KuduScanner kuduScanner = builder.build(); + + //Run lookup + for ( RowResult row : kuduScanner){ + final Map values = new HashMap<>(); + for(String columnName : columnNames){ + Object object; + if(row.getColumnType(columnName) == Type.BINARY){ + object = Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName)); + } else { + object = row.getObject(columnName); + } + values.put(columnName, object); + } + return Optional.of(new MapRecord(resultSchema, values)); + } + + //No match + return Optional.empty(); + } + + private List getColumns(String columns){ + if(columns.equals("*")){ + return tableSchema + .getColumns() + .stream().map(ColumnSchema::getName) + .collect(Collectors.toList()); + } else { + return Arrays.asList(columns.split(",")); + } + } + + private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List columnNames){ + final List fields = new ArrayList<>(); + for(String columnName : columnNames) { + if(!kuduTableSchema.hasColumn(columnName)){ + throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName); + } + ColumnSchema cs = kuduTableSchema.getColumn(columnName); + switch (cs.getType()) { + case INT8: + fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType())); + break; + case INT16: + fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType())); + break; + case INT32: + fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType())); + break; + case INT64: + fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType())); + break; + case UNIXTIME_MICROS: + fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType())); + break; + case BINARY: + case STRING: + case DECIMAL: + fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType())); + break; + case DOUBLE: + fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType())); + break; + case BOOL: + fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType())); + break; + case FLOAT: + fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType())); + break; + } + } + return new SimpleRecordSchema(fields); + } + + /** + * Disconnect from the Kudu cluster. + */ + @OnDisabled + public void onDisabled() throws Exception { + try { + if (this.kuduClient != null) { + getLogger().debug("Closing KuduClient"); + this.kuduClient.close(); + this.kuduClient = null; + } + } finally { + if (kerberosUser != null) { + kerberosUser.logout(); + kerberosUser = null; + } + } + } +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..240825c2ed --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.controller.kudu.KuduLookupService \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java new file mode 100644 index 0000000000..0b746fee5b --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.kudu; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.cluster.MiniKuduCluster; +import org.apache.kudu.util.DecimalUtil; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestKuduLookupService { + + // The KuduTestHarness automatically starts and stops a real Kudu cluster + // when each test is run. Kudu persists its on-disk state in a temporary + // directory under a location defined by the environment variable TEST_TMPDIR + // if set, or under /tmp otherwise. That cluster data is deleted on + // successful exit of the test. The cluster output is logged through slf4j. + @Rule + public KuduTestHarness harness = new KuduTestHarness( + new MiniKuduCluster.MiniKuduClusterBuilder() + .addMasterServerFlag("--use_hybrid_clock=false") + .addTabletServerFlag("--use_hybrid_clock=false") + ); + private TestRunner testRunner; + private long nowMillis = System.currentTimeMillis(); + private KuduLookupService kuduLookupService; + + public static class SampleProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } + } + + @Before + public void init() throws Exception { + testRunner = TestRunners.newTestRunner(SampleProcessor.class); + testRunner.setValidateExpressionUsage(false); + final String tableName = "table1"; + + KuduClient client = harness.getClient(); + List columns = new ArrayList<>(); + columns.add(new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).key(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build()); + columns.add(new ColumnSchema + .ColumnSchemaBuilder("decimal", Type.DECIMAL) + .typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 1)) + .build() + ); + columns.add(new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("int8", Type.INT8).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build()); + Schema schema = new Schema(columns); + + CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string")); + client.createTable(tableName, schema, opts); + + KuduTable table = client.openTable(tableName); + KuduSession session = client.newSession(); + + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addString("string", "string1"); + row.addBinary("binary", "binary1".getBytes()); + row.addBoolean("bool",true); + row.addDecimal("decimal", BigDecimal.valueOf(0.1)); + row.addDouble("double",0.2); + row.addFloat("float",0.3f); + row.addByte("int8", (byte) 1); + row.addShort("int16", (short) 2); + row.addInt("int32",3); + row.addLong("int64",4L); + row.addTimestamp("unixtime_micros", new Timestamp(nowMillis)); + session.apply(insert); + + insert = table.newInsert(); + row = insert.getRow(); + row.addString("string", "string2"); + row.addBinary("binary", "binary2".getBytes()); + row.addBoolean("bool",false); + row.addDecimal("decimal", BigDecimal.valueOf(0.1)); + row.addDouble("double",1.2); + row.addFloat("float",1.3f); + row.addByte("int8", (byte) 11); + row.addShort("int16", (short) 12); + row.addInt("int32",13); + row.addLong("int64",14L); + row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year + session.apply(insert); + + session.close(); + + kuduLookupService = new KuduLookupService(); + testRunner.addControllerService("kuduLookupService", kuduLookupService); + testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_MASTERS, "testLocalHost:7051"); + testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_REPLICA_SELECTION, KuduLookupService.LEADER_ONLY); + testRunner.setProperty(kuduLookupService, KuduLookupService.TABLE_NAME, tableName); + kuduLookupService.kuduClient = client; + } + + @Test(expected = IllegalArgumentException.class) + public void invalid_key() { + testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*"); + + testRunner.enableControllerService(kuduLookupService); + + Map map = new HashMap<>(); + map.put("invalid", "invalid key"); + kuduLookupService.lookup(map); + } + @Test + public void row_not_found() { + testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*"); + + testRunner.enableControllerService(kuduLookupService); + + Map map = new HashMap<>(); + map.put("string", "key not found"); + Optional result = kuduLookupService.lookup(map); + assertFalse(result.isPresent()); + } + + @Test + public void single_key() { + testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*"); + + testRunner.enableControllerService(kuduLookupService); + + Map map = new HashMap<>(); + map.put("string", "string1"); + Record result = kuduLookupService.lookup(map).get(); + validateRow1(result); + } + @Test + public void multi_key() { + testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*"); + + testRunner.enableControllerService(kuduLookupService); + + Map map = new HashMap<>(); + map.put("string", "string1"); + map.put("binary", "binary1".getBytes()); + map.put("bool",true); + map.put("decimal", BigDecimal.valueOf(0.1)); + map.put("double",0.2); + map.put("float",0.3f); + map.put("int8", (byte) 1); + map.put("int16", (short) 2); + map.put("int32",3); + map.put("int64",4L); + map.put("unixtime_micros", new Timestamp(nowMillis)); + Record result = kuduLookupService.lookup(map).get(); + validateRow1(result); + } + @Test + public void specific_return_columns() { + testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "binary,bool"); + + testRunner.enableControllerService(kuduLookupService); + + Map map = new HashMap<>(); + map.put("string", "string1"); + Record result = kuduLookupService.lookup(map).get(); + + assertEquals(2,result.getValues().length); + + assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary")); + assertEquals(true, result.getAsBoolean("bool")); + } + private void validateRow1(Record result){ + + assertEquals("string1", result.getAsString("string")); + assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary")); + assertEquals(true, result.getAsBoolean("bool")); + assertEquals(BigDecimal.valueOf(0.1), result.getValue("decimal")); + assertEquals(0.2, result.getAsDouble("double"),0); + assertEquals(0.3f, result.getAsFloat("float"),0); + assertEquals((byte)1, result.getValue("int8")); + assertEquals((short)2, result.getValue("int16")); + assertEquals(3, (int)result.getAsInt("int32")); + assertEquals(4L, (long)result.getAsLong("int64")); + assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros")); + } + +} diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml index 16d4e5ec37..4b530a04f2 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml @@ -32,6 +32,11 @@ nifi-kudu-processors 1.10.0-SNAPSHOT + + org.apache.nifi + nifi-kudu-controller-service + 1.10.0-SNAPSHOT + org.apache.nifi nifi-hadoop-libraries-nar diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml index 8a0759a2b1..bea7232425 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml @@ -30,6 +30,6 @@ nifi-kudu-processors nifi-kudu-nar + nifi-kudu-controller-service -