NIFI-6662: Adding Kudu Lookup Service

NIFI-6662: Cleaning up Kudu logic
NIFI-6662: Minor enhancements and build fixes
NIFI-6662: This closes #3732.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
samhjelmfelt 2019-09-12 18:55:37 -05:00 committed by Joe Witt
parent 2493665c27
commit 2ab99970b7
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
6 changed files with 762 additions and 1 deletions

View File

@ -0,0 +1,152 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>nifi-kudu-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.10.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-kudu-controller-service</artifactId>
<packaging>jar</packaging>
<properties>
<exclude.tests>None</exclude.tests>
<kudu.version>1.10.0</kudu.version>
</properties>
<build>
<extensions>
<!-- Used to find the right kudu-binary artifact with the Maven
property ${os.detected.classifier} -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>${exclude.tests}</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.10.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-test-utils</artifactId>
<version>${kudu.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>kudu-windows</id>
<activation>
<os>
<family>Windows</family>
</os>
</activation>
<properties>
<!-- Kudu tests do not support Windows. -->
<exclude.tests>**/*.java</exclude.tests>
</properties>
</profile>
<profile>
<id>kudu-linux</id>
<activation>
<os>
<family>Unix</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>kudu-mac</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,354 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.ReplicaSelection;
import org.apache.kudu.client.RowResult;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import javax.security.auth.login.LoginException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@CapabilityDescription("Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned")
@Tags({"lookup", "enrich", "key", "value", "kudu"})
public class KuduLookupService extends AbstractControllerService implements RecordLookupService {
public static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder()
.name("kudu-lu-masters")
.displayName("Kudu Masters")
.description("Comma separated addresses of the Kudu masters to connect to.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("kudu-lu-kerberos-credentials-service")
.displayName("Kerberos Credentials Service")
.description("Specifies the Kerberos Credentials to use for authentication")
.required(false)
.identifiesControllerService(KerberosCredentialsService.class)
.build();
public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("kudu-lu-operations-timeout-ms")
.displayName("Kudu Operation Timeout")
.description("Default timeout used for user operations (using sessions and scanners)")
.required(false)
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final AllowableValue CLOSEST_REPLICA = new AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(), ReplicaSelection.CLOSEST_REPLICA.name(),
"Select the closest replica to the client. Replicas are classified from closest to furthest as follows: "+
"1) Local replicas 2) Replicas whose tablet server has the same location as the client 3) All other replicas");
public static final AllowableValue LEADER_ONLY = new AllowableValue(ReplicaSelection.LEADER_ONLY.toString(), ReplicaSelection.LEADER_ONLY.name(),
"Select the LEADER replica");
public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new PropertyDescriptor.Builder()
.name("kudu-lu-replica-selection")
.displayName("Kudu Replica Selection")
.description("Policy with which to choose amongst multiple replicas")
.required(true)
.defaultValue(CLOSEST_REPLICA.getValue())
.allowableValues(CLOSEST_REPLICA, LEADER_ONLY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("kudu-lu-table-name")
.displayName("Kudu Table Name")
.description("Name of the table to access.")
.required(true)
.defaultValue("default")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder()
.name("kudu-lu-return-cols")
.displayName("Kudu Return Columns")
.description("A comma-separated list of columns to return when scanning. To return all columns set to \"*\"")
.required(true)
.defaultValue("*")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
protected List<PropertyDescriptor> properties;
protected KerberosCredentialsService credentialsService;
private volatile KerberosUser kerberosUser;
protected String kuduMasters;
protected KuduClient kuduClient;
protected ReplicaSelection replicaSelection;
protected volatile String tableName;
protected volatile KuduTable table;
protected volatile List<String> columnNames;
protected volatile RecordSchema resultSchema;
protected volatile Schema tableSchema;
@Override
protected void init(final ControllerServiceInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KUDU_OPERATION_TIMEOUT_MS);
properties.add(KUDU_REPLICA_SELECTION);
properties.add(TABLE_NAME);
properties.add(RETURN_COLUMNS);
addProperties(properties);
this.properties = Collections.unmodifiableList(properties);
}
protected void addProperties(List<PropertyDescriptor> properties) {
}
protected void createKuduClient(ConfigurationContext context) throws LoginException {
final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService != null) {
final String keytab = credentialsService.getKeytab();
final String principal = credentialsService.getPrincipal();
kerberosUser = loginKerberosUser(principal, keytab);
final KerberosAction<KuduClient> kerberosAction = new KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), getLogger());
this.kuduClient = kerberosAction.execute();
} else {
this.kuduClient = buildClient(kuduMasters, context);
}
}
protected KerberosUser loginKerberosUser(final String principal, final String keytab) throws LoginException {
final KerberosUser kerberosUser = new KerberosKeytabUser(principal, keytab);
kerberosUser.login();
return kerberosUser;
}
protected KuduClient buildClient(final String masters, final ConfigurationContext context) {
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)
.build();
}
/**
* Establish a connection to a Kudu cluster.
* @param context the configuration context
* @throws InitializationException if unable to connect a Kudu cluster
*/
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (kuduClient == null) {
getLogger().debug("Setting up Kudu connection...");
createKuduClient(context);
getLogger().debug("Kudu connection successfully initialized");
}
} catch(Exception ex){
getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
throw new InitializationException(ex);
}
replicaSelection = ReplicaSelection.valueOf(context.getProperty(KUDU_REPLICA_SELECTION).getValue());
tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
try {
table = kuduClient.openTable(tableName);
tableSchema = table.getSchema();
columnNames = getColumns(context.getProperty(RETURN_COLUMNS).getValue());
//Result Schema
resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
} catch (KuduException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public Set<String> getRequiredKeys() {
return new HashSet<>();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) {
//Scanner
KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(table);
builder.setProjectedColumnNames(columnNames);
builder.replicaSelection(replicaSelection);
//Only expecting one match
builder.limit(1);
coordinates.forEach((key,value)->
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key), KuduPredicate.ComparisonOp.EQUAL, value))
);
KuduScanner kuduScanner = builder.build();
//Run lookup
for ( RowResult row : kuduScanner){
final Map<String, Object> values = new HashMap<>();
for(String columnName : columnNames){
Object object;
if(row.getColumnType(columnName) == Type.BINARY){
object = Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
} else {
object = row.getObject(columnName);
}
values.put(columnName, object);
}
return Optional.of(new MapRecord(resultSchema, values));
}
//No match
return Optional.empty();
}
private List<String> getColumns(String columns){
if(columns.equals("*")){
return tableSchema
.getColumns()
.stream().map(ColumnSchema::getName)
.collect(Collectors.toList());
} else {
return Arrays.asList(columns.split(","));
}
}
private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List<String> columnNames){
final List<RecordField> fields = new ArrayList<>();
for(String columnName : columnNames) {
if(!kuduTableSchema.hasColumn(columnName)){
throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName);
}
ColumnSchema cs = kuduTableSchema.getColumn(columnName);
switch (cs.getType()) {
case INT8:
fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType()));
break;
case INT16:
fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType()));
break;
case INT32:
fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType()));
break;
case INT64:
fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
break;
case UNIXTIME_MICROS:
fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
break;
case BINARY:
case STRING:
case DECIMAL:
fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
break;
case DOUBLE:
fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType()));
break;
case BOOL:
fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType()));
break;
case FLOAT:
fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
break;
}
}
return new SimpleRecordSchema(fields);
}
/**
* Disconnect from the Kudu cluster.
*/
@OnDisabled
public void onDisabled() throws Exception {
try {
if (this.kuduClient != null) {
getLogger().debug("Closing KuduClient");
this.kuduClient.close();
this.kuduClient = null;
}
} finally {
if (kerberosUser != null) {
kerberosUser.logout();
kerberosUser = null;
}
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.controller.kudu.KuduLookupService

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.cluster.MiniKuduCluster;
import org.apache.kudu.util.DecimalUtil;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class TestKuduLookupService {
// The KuduTestHarness automatically starts and stops a real Kudu cluster
// when each test is run. Kudu persists its on-disk state in a temporary
// directory under a location defined by the environment variable TEST_TMPDIR
// if set, or under /tmp otherwise. That cluster data is deleted on
// successful exit of the test. The cluster output is logged through slf4j.
@Rule
public KuduTestHarness harness = new KuduTestHarness(
new MiniKuduCluster.MiniKuduClusterBuilder()
.addMasterServerFlag("--use_hybrid_clock=false")
.addTabletServerFlag("--use_hybrid_clock=false")
);
private TestRunner testRunner;
private long nowMillis = System.currentTimeMillis();
private KuduLookupService kuduLookupService;
public static class SampleProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
@Before
public void init() throws Exception {
testRunner = TestRunners.newTestRunner(SampleProcessor.class);
testRunner.setValidateExpressionUsage(false);
final String tableName = "table1";
KuduClient client = harness.getClient();
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("string", Type.STRING).key(true).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", Type.BINARY).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("bool", Type.BOOL).build());
columns.add(new ColumnSchema
.ColumnSchemaBuilder("decimal", Type.DECIMAL)
.typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 1))
.build()
);
columns.add(new ColumnSchema.ColumnSchemaBuilder("double", Type.DOUBLE).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("float", Type.FLOAT).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("int8", Type.INT8).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("int16", Type.INT16).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", Type.INT32).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", Type.INT64).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", Type.UNIXTIME_MICROS).build());
Schema schema = new Schema(columns);
CreateTableOptions opts = new CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
client.createTable(tableName, schema, opts);
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("string", "string1");
row.addBinary("binary", "binary1".getBytes());
row.addBoolean("bool",true);
row.addDecimal("decimal", BigDecimal.valueOf(0.1));
row.addDouble("double",0.2);
row.addFloat("float",0.3f);
row.addByte("int8", (byte) 1);
row.addShort("int16", (short) 2);
row.addInt("int32",3);
row.addLong("int64",4L);
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
session.apply(insert);
insert = table.newInsert();
row = insert.getRow();
row.addString("string", "string2");
row.addBinary("binary", "binary2".getBytes());
row.addBoolean("bool",false);
row.addDecimal("decimal", BigDecimal.valueOf(0.1));
row.addDouble("double",1.2);
row.addFloat("float",1.3f);
row.addByte("int8", (byte) 11);
row.addShort("int16", (short) 12);
row.addInt("int32",13);
row.addLong("int64",14L);
row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 60 * 60 * 24 * 365))); //+ 1 year
session.apply(insert);
session.close();
kuduLookupService = new KuduLookupService();
testRunner.addControllerService("kuduLookupService", kuduLookupService);
testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_MASTERS, "testLocalHost:7051");
testRunner.setProperty(kuduLookupService, KuduLookupService.KUDU_REPLICA_SELECTION, KuduLookupService.LEADER_ONLY);
testRunner.setProperty(kuduLookupService, KuduLookupService.TABLE_NAME, tableName);
kuduLookupService.kuduClient = client;
}
@Test(expected = IllegalArgumentException.class)
public void invalid_key() {
testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
testRunner.enableControllerService(kuduLookupService);
Map<String,Object> map = new HashMap<>();
map.put("invalid", "invalid key");
kuduLookupService.lookup(map);
}
@Test
public void row_not_found() {
testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
testRunner.enableControllerService(kuduLookupService);
Map<String,Object> map = new HashMap<>();
map.put("string", "key not found");
Optional<Record> result = kuduLookupService.lookup(map);
assertFalse(result.isPresent());
}
@Test
public void single_key() {
testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
testRunner.enableControllerService(kuduLookupService);
Map<String,Object> map = new HashMap<>();
map.put("string", "string1");
Record result = kuduLookupService.lookup(map).get();
validateRow1(result);
}
@Test
public void multi_key() {
testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "*");
testRunner.enableControllerService(kuduLookupService);
Map<String,Object> map = new HashMap<>();
map.put("string", "string1");
map.put("binary", "binary1".getBytes());
map.put("bool",true);
map.put("decimal", BigDecimal.valueOf(0.1));
map.put("double",0.2);
map.put("float",0.3f);
map.put("int8", (byte) 1);
map.put("int16", (short) 2);
map.put("int32",3);
map.put("int64",4L);
map.put("unixtime_micros", new Timestamp(nowMillis));
Record result = kuduLookupService.lookup(map).get();
validateRow1(result);
}
@Test
public void specific_return_columns() {
testRunner.setProperty(kuduLookupService, KuduLookupService.RETURN_COLUMNS, "binary,bool");
testRunner.enableControllerService(kuduLookupService);
Map<String,Object> map = new HashMap<>();
map.put("string", "string1");
Record result = kuduLookupService.lookup(map).get();
assertEquals(2,result.getValues().length);
assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
assertEquals(true, result.getAsBoolean("bool"));
}
private void validateRow1(Record result){
assertEquals("string1", result.getAsString("string"));
assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), result.getValue("binary"));
assertEquals(true, result.getAsBoolean("bool"));
assertEquals(BigDecimal.valueOf(0.1), result.getValue("decimal"));
assertEquals(0.2, result.getAsDouble("double"),0);
assertEquals(0.3f, result.getAsFloat("float"),0);
assertEquals((byte)1, result.getValue("int8"));
assertEquals((short)2, result.getValue("int16"));
assertEquals(3, (int)result.getAsInt("int32"));
assertEquals(4L, (long)result.getAsLong("int64"));
assertEquals(new Timestamp(nowMillis), result.getValue("unixtime_micros"));
}
}

View File

@ -32,6 +32,11 @@
<artifactId>nifi-kudu-processors</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kudu-controller-service</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-nar</artifactId>

View File

@ -30,6 +30,6 @@
<modules>
<module>nifi-kudu-processors</module>
<module>nifi-kudu-nar</module>
<module>nifi-kudu-controller-service</module>
</modules>
</project>