NIFI-7737: add string array option to putcassandrarecord

NIFI-7737: fix checkstyle issues

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5005
This commit is contained in:
Wouter de Vries 2020-03-10 08:51:51 +01:00 committed by Matthew Burgess
parent 1c17da562b
commit 0b29a42991
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
5 changed files with 167 additions and 9 deletions

View File

@ -44,6 +44,11 @@
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>${cassandra.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>

View File

@ -36,6 +36,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.StringUtils;
@ -223,6 +224,9 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
public void onScheduled(ProcessContext context) {
final boolean connectionProviderIsSet = context.getProperty(CONNECTION_PROVIDER_SERVICE).isSet();
// Register codecs
registerAdditionalCodecs();
if (connectionProviderIsSet) {
CassandraSessionProviderService sessionProvider = context.getProperty(CONNECTION_PROVIDER_SERVICE).asControllerService(CassandraSessionProviderService.class);
cluster.set(sessionProvider.getCluster());
@ -294,6 +298,14 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
}
}
protected void registerAdditionalCodecs() {
// Conversion between a String[] and a list of varchar
CodecRegistry.DEFAULT_INSTANCE.register(new ObjectArrayCodec<>(
DataType.list(DataType.varchar()),
String[].class,
TypeCodec.varchar()));
}
/**
* Uses a Cluster.Builder to create a Cassandra cluster reference using the given parameters
*

View File

@ -45,17 +45,19 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -411,7 +413,7 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
return results;
}
private Statement generateInsert(String cassandraTable, RecordSchema schema, Map<String, Object> recordContentMap) {
protected Statement generateInsert(String cassandraTable, RecordSchema schema, Map<String, Object> recordContentMap) {
Insert insertQuery;
if (cassandraTable.contains(".")) {
String[] keyspaceAndTable = cassandraTable.split("\\.");
@ -423,17 +425,29 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
Object value = recordContentMap.get(fieldName);
if (value != null && value.getClass().isArray()) {
Object[] array = (Object[])value;
Object[] array = (Object[]) value;
if (array.length > 0 && array[0] instanceof Byte) {
Object[] temp = (Object[]) value;
byte[] newArray = new byte[temp.length];
for (int x = 0; x < temp.length; x++) {
newArray[x] = (Byte) temp[x];
if (array.length > 0) {
if (array[0] instanceof Byte) {
Object[] temp = (Object[]) value;
byte[] newArray = new byte[temp.length];
for (int x = 0; x < temp.length; x++) {
newArray[x] = (Byte) temp[x];
}
value = ByteBuffer.wrap(newArray);
}
value = ByteBuffer.wrap(newArray);
}
}
if (schema.getDataType(fieldName).isPresent()) {
DataType fieldDataType = schema.getDataType(fieldName).get();
if (fieldDataType.getFieldType() == RecordFieldType.ARRAY) {
if (((ArrayDataType)fieldDataType).getElementType().getFieldType() == RecordFieldType.STRING) {
value = Arrays.stream((Object[])value).toArray(String[]::new);
}
}
}
insertQuery.value(fieldName, value);
}
return insertQuery;

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.cassandra;
import com.datastax.driver.core.querybuilder.Insert;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
public class PutCassandraRecordInsertTest {
private PutCassandraRecord testSubject;
@Mock
private RecordSchema schema;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
testSubject = new PutCassandraRecord();
}
@Test
public void testGenerateInsert() {
testGenerateInsert(
"keyspace.table",
Arrays.asList(
new Tuple<>("keyField", 1),
new Tuple<>("integerField", 15),
new Tuple<>("longField", 67L),
new Tuple<>("stringField", "abcd")
),
Arrays.asList(
new Tuple<>("keyField", RecordFieldType.INT.getDataType()),
new Tuple<>("integerField", RecordFieldType.INT.getDataType()),
new Tuple<>("longField", RecordFieldType.LONG.getDataType()),
new Tuple<>("stringField", RecordFieldType.STRING.getDataType())
),
"INSERT INTO keyspace.table (keyField,integerField,longField,stringField) VALUES (1,15,67,'abcd');"
);
}
@Test
public void testGenerateInsertStringArray() {
testGenerateInsert(
"keyspace.table",
Arrays.asList(
new Tuple<>("keyField", 1),
new Tuple<>("integerField", 15),
new Tuple<>("arrayField", new Object[]{"test1", "test2"})
),
Arrays.asList(
new Tuple<>("keyField", RecordFieldType.INT.getDataType()),
new Tuple<>("integerField", RecordFieldType.INT.getDataType()),
new Tuple<>("arrayField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))
),
"INSERT INTO keyspace.table (keyField,integerField,arrayField) VALUES (1,15,['test1','test2']);"
);
}
private void testGenerateInsert(String table, List<Tuple<String, Object>> records, List<Tuple<String, org.apache.nifi.serialization.record.DataType>> recordSchema, String expected) {
Map<String, Object> recordContentMap = records.stream()
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
Map<String, Object> recordSchemaMap = recordSchema.stream()
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
List<String> fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList());
when(schema.getFieldNames()).thenReturn(fieldNames);
when(schema.getDataType(anyString())).thenAnswer(i -> Optional.of(recordSchemaMap.get(i.getArgument(0))));
Insert actual = (Insert)testSubject.generateInsert(table, schema, recordContentMap);
actual.setForceNoValues(true);
// Codecs are normally registered in the onScheduled method
testSubject.registerAdditionalCodecs();
assertEquals(expected, actual.getQueryString());
}
}

View File

@ -27,6 +27,7 @@ import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -122,6 +123,23 @@ public class PutCassandraRecordTest {
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
}
@Test
public void testStringArrayPut() throws InitializationException {
setUpStandardTestConfig();
recordReader.addSchemaField(new RecordField("names", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addRecord(new Object[]{"John", "Doe"}, 1);
recordReader.addRecord(new Object[]{"John", "Doe"}, 2);
recordReader.addRecord(new Object[]{"John", "Doe"}, 3);
testRunner.enqueue("");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
}
@Test
public void testSimpleUpdate() throws InitializationException {
setUpStandardTestConfig();