mirror of https://github.com/apache/nifi.git
NIFI-7007: Add update functionality to the PutCassandraRecord processor.
NIFI-7007: Add additional unit tests that hit non-happy path NIFI-7007: Use AllowableValue instead of string NIFI-7007: Add the use of attributes for the update method, statement type and batch statement type NIFI-7007: Add additional tests, mainly for the use of attributes NIFI-7007: add some ReadsAttribute properties to the PutCassandraRecord processor NIFI-7007: additional update keys validation logic NIFI-7007: fix imports NIFI-7007: Convert fieldValue to long in separate method NIFI-7007: Add new style of tests checking actual CQL output NIFI-7007: add license to new test file NIFI-7007: add customValidate to check for certain incompatible property combinations NIFI-7007: remove check on updateMethod and replace Set.of with java 8 compatible replacmenet NIFI-7007: Add test for failure with empty update method via attributes NIFI-7007: remove unused variable NIFI-7007: Fix customValidate that incorrectly invalidated a valid config Fix Checkstyle Signed-off-by: Matthew Burgess <mattyb149@apache.org> This closes #3977
This commit is contained in:
parent
4f315edc6e
commit
9848eb4409
|
@ -19,14 +19,23 @@ package org.apache.nifi.processors.cassandra;
|
||||||
import com.datastax.driver.core.BatchStatement;
|
import com.datastax.driver.core.BatchStatement;
|
||||||
import com.datastax.driver.core.ConsistencyLevel;
|
import com.datastax.driver.core.ConsistencyLevel;
|
||||||
import com.datastax.driver.core.Session;
|
import com.datastax.driver.core.Session;
|
||||||
|
import com.datastax.driver.core.Statement;
|
||||||
|
import com.datastax.driver.core.querybuilder.Assignment;
|
||||||
import com.datastax.driver.core.querybuilder.Insert;
|
import com.datastax.driver.core.querybuilder.Insert;
|
||||||
import com.datastax.driver.core.querybuilder.QueryBuilder;
|
import com.datastax.driver.core.querybuilder.QueryBuilder;
|
||||||
|
import com.datastax.driver.core.querybuilder.Update;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
@ -34,8 +43,8 @@ import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
|
||||||
import org.apache.nifi.serialization.RecordReader;
|
import org.apache.nifi.serialization.RecordReader;
|
||||||
|
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
@ -45,18 +54,63 @@ import org.apache.nifi.util.StopWatch;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static java.lang.String.format;
|
||||||
|
|
||||||
@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
|
@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||||
@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " +
|
@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " +
|
||||||
"configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.")
|
"configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.")
|
||||||
|
@ReadsAttributes({
|
||||||
|
@ReadsAttribute(attribute = "cql.statement.type", description = "If 'Use cql.statement.type Attribute' is selected for the Statement " +
|
||||||
|
"Type property, the value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) " +
|
||||||
|
"will be generated and executed"),
|
||||||
|
@ReadsAttribute(attribute = "cql.update.method", description = "If 'Use cql.update.method Attribute' is selected for the Update " +
|
||||||
|
"Method property, the value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) " +
|
||||||
|
"will be used to generate and execute the Update statement. Ignored if the Statement Type property is not set to UPDATE"),
|
||||||
|
@ReadsAttribute(attribute = "cql.batch.statement.type", description = "If 'Use cql.batch.statement.type Attribute' is selected for the Batch " +
|
||||||
|
"Statement Type property, the value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement " +
|
||||||
|
"(LOGGED, UNLOGGED, COUNTER) will be generated and executed")
|
||||||
|
})
|
||||||
public class PutCassandraRecord extends AbstractCassandraProcessor {
|
public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
|
static final AllowableValue UPDATE_TYPE = new AllowableValue("UPDATE", "UPDATE",
|
||||||
|
"Use an UPDATE statement.");
|
||||||
|
static final AllowableValue INSERT_TYPE = new AllowableValue("INSERT", "INSERT",
|
||||||
|
"Use an INSERT statement.");
|
||||||
|
static final AllowableValue STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.statement.type Attribute",
|
||||||
|
"The value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) " +
|
||||||
|
"will be generated and executed");
|
||||||
|
static final String STATEMENT_TYPE_ATTRIBUTE = "cql.statement.type";
|
||||||
|
|
||||||
|
static final AllowableValue INCR_TYPE = new AllowableValue("INCREMENT", "Increment",
|
||||||
|
"Use an increment operation (+=) for the Update statement.");
|
||||||
|
static final AllowableValue SET_TYPE = new AllowableValue("SET", "Set",
|
||||||
|
"Use a set operation (=) for the Update statement.");
|
||||||
|
static final AllowableValue DECR_TYPE = new AllowableValue("DECREMENT", "Decrement",
|
||||||
|
"Use a decrement operation (-=) for the Update statement.");
|
||||||
|
static final AllowableValue UPDATE_METHOD_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.update.method Attribute",
|
||||||
|
"The value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) " +
|
||||||
|
"will be used to generate and execute the Update statement.");
|
||||||
|
static final String UPDATE_METHOD_ATTRIBUTE = "cql.update.method";
|
||||||
|
|
||||||
|
static final AllowableValue LOGGED_TYPE = new AllowableValue("LOGGED", "LOGGED",
|
||||||
|
"Use a LOGGED batch statement");
|
||||||
|
static final AllowableValue UNLOGGED_TYPE = new AllowableValue("UNLOGGED", "UNLOGGED",
|
||||||
|
"Use an UNLOGGED batch statement");
|
||||||
|
static final AllowableValue COUNTER_TYPE = new AllowableValue("COUNTER", "COUNTER",
|
||||||
|
"Use a COUNTER batch statement");
|
||||||
|
static final AllowableValue BATCH_STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.batch.statement.type Attribute",
|
||||||
|
"The value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement (LOGGED, UNLOGGED or COUNTER) " +
|
||||||
|
"will be used to generate and execute the Update statement.");
|
||||||
|
static final String BATCH_STATEMENT_TYPE_ATTRIBUTE = "cql.batch.statement.type";
|
||||||
|
|
||||||
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
|
||||||
.name("put-cassandra-record-reader")
|
.name("put-cassandra-record-reader")
|
||||||
|
@ -67,6 +121,36 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
.required(true)
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("put-cassandra-record-statement-type")
|
||||||
|
.displayName("Statement Type")
|
||||||
|
.description("Specifies the type of CQL Statement to generate.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue(INSERT_TYPE.getValue())
|
||||||
|
.allowableValues(UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
|
||||||
|
.name("put-cassandra-record-update-method")
|
||||||
|
.displayName("Update Method")
|
||||||
|
.description("Specifies the method to use to SET the values. This property is used if the Statement Type is " +
|
||||||
|
"UPDATE and ignored otherwise.")
|
||||||
|
.required(false)
|
||||||
|
.defaultValue(SET_TYPE.getValue())
|
||||||
|
.allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder()
|
||||||
|
.name("put-cassandra-record-update-keys")
|
||||||
|
.displayName("Update Keys")
|
||||||
|
.description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
|
||||||
|
+ "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. "
|
||||||
|
+ "This property is ignored if the Statement Type is not UPDATE.")
|
||||||
|
.addValidator(StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR))
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||||
|
.build();
|
||||||
|
|
||||||
static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
|
||||||
.name("put-cassandra-record-table")
|
.name("put-cassandra-record-table")
|
||||||
.displayName("Table name")
|
.displayName("Table name")
|
||||||
|
@ -90,8 +174,8 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
.name("put-cassandra-record-batch-statement-type")
|
.name("put-cassandra-record-batch-statement-type")
|
||||||
.displayName("Batch Statement Type")
|
.displayName("Batch Statement Type")
|
||||||
.description("Specifies the type of 'Batch Statement' to be used.")
|
.description("Specifies the type of 'Batch Statement' to be used.")
|
||||||
.allowableValues(BatchStatement.Type.values())
|
.allowableValues(LOGGED_TYPE, UNLOGGED_TYPE, COUNTER_TYPE, BATCH_STATEMENT_TYPE_USE_ATTR_TYPE)
|
||||||
.defaultValue(BatchStatement.Type.LOGGED.toString())
|
.defaultValue(LOGGED_TYPE.getValue())
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -102,7 +186,7 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
private final static List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(
|
private final static List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(
|
||||||
CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD,
|
CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, STATEMENT_TYPE, UPDATE_KEYS, UPDATE_METHOD, CLIENT_AUTH, USERNAME, PASSWORD,
|
||||||
RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
|
RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
|
||||||
|
|
||||||
private final static Set<Relationship> relationships = Collections.unmodifiableSet(
|
private final static Set<Relationship> relationships = Collections.unmodifiableSet(
|
||||||
|
@ -129,8 +213,33 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
|
final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||||
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
|
final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
|
||||||
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
|
||||||
final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
|
|
||||||
final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
|
final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
|
||||||
|
final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||||
|
|
||||||
|
// Get the statement type from the attribute if necessary
|
||||||
|
final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
|
||||||
|
String statementType = statementTypeProperty;
|
||||||
|
if (STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(statementTypeProperty)) {
|
||||||
|
statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the update method from the attribute if necessary
|
||||||
|
final String updateMethodProperty = context.getProperty(UPDATE_METHOD).getValue();
|
||||||
|
String updateMethod = updateMethodProperty;
|
||||||
|
if (UPDATE_METHOD_USE_ATTR_TYPE.getValue().equals(updateMethodProperty)) {
|
||||||
|
updateMethod = inputFlowFile.getAttribute(UPDATE_METHOD_ATTRIBUTE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Get the batch statement type from the attribute if necessary
|
||||||
|
final String batchStatementTypeProperty = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
|
||||||
|
String batchStatementType = batchStatementTypeProperty;
|
||||||
|
if (BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(batchStatementTypeProperty)) {
|
||||||
|
batchStatementType = inputFlowFile.getAttribute(BATCH_STATEMENT_TYPE_ATTRIBUTE).toUpperCase();
|
||||||
|
}
|
||||||
|
if (StringUtils.isEmpty(batchStatementType)) {
|
||||||
|
throw new IllegalArgumentException(format("Batch Statement Type is not specified, FlowFile %s", inputFlowFile));
|
||||||
|
}
|
||||||
|
|
||||||
final BatchStatement batchStatement;
|
final BatchStatement batchStatement;
|
||||||
final Session connectionSession = cassandraSession.get();
|
final Session connectionSession = cassandraSession.get();
|
||||||
|
@ -142,6 +251,24 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
try (final InputStream inputStream = session.read(inputFlowFile);
|
try (final InputStream inputStream = session.read(inputFlowFile);
|
||||||
final RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, getLogger())){
|
final RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, getLogger())){
|
||||||
|
|
||||||
|
// throw an exception if statement type is not set
|
||||||
|
if (StringUtils.isEmpty(statementType)) {
|
||||||
|
throw new IllegalArgumentException(format("Statement Type is not specified, FlowFile %s", inputFlowFile));
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw an exception if the statement type is set to update and updateKeys is empty
|
||||||
|
if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty(updateKeys)) {
|
||||||
|
throw new IllegalArgumentException(format("Update Keys are not specified, FlowFile %s", inputFlowFile));
|
||||||
|
}
|
||||||
|
|
||||||
|
// throw an exception if the Update Method is Increment or Decrement and the batch statement type is not UNLOGGED or COUNTER
|
||||||
|
if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) || DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
|
||||||
|
if (!(UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) || COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType))) {
|
||||||
|
throw new IllegalArgumentException(format("Increment/Decrement Update Method can only be used with COUNTER " +
|
||||||
|
"or UNLOGGED Batch Statement Type, FlowFile %s", inputFlowFile));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final RecordSchema schema = reader.getSchema();
|
final RecordSchema schema = reader.getSchema();
|
||||||
Record record;
|
Record record;
|
||||||
|
|
||||||
|
@ -151,18 +278,16 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
while((record = reader.nextRecord()) != null) {
|
while((record = reader.nextRecord()) != null) {
|
||||||
Map<String, Object> recordContentMap = (Map<String, Object>) DataTypeUtils
|
Map<String, Object> recordContentMap = (Map<String, Object>) DataTypeUtils
|
||||||
.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
|
.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
|
||||||
Insert insertQuery;
|
|
||||||
|
|
||||||
if (cassandraTable.contains(".")) {
|
Statement query;
|
||||||
String keyspaceAndTable[] = cassandraTable.split("\\.");
|
if (INSERT_TYPE.getValue().equalsIgnoreCase(statementType)) {
|
||||||
insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]);
|
query = generateInsert(cassandraTable, schema, recordContentMap);
|
||||||
|
} else if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
|
||||||
|
query = generateUpdate(cassandraTable, schema, updateKeys, updateMethod, recordContentMap);
|
||||||
} else {
|
} else {
|
||||||
insertQuery = QueryBuilder.insertInto(cassandraTable);
|
throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, inputFlowFile));
|
||||||
}
|
}
|
||||||
for (String fieldName : schema.getFieldNames()) {
|
batchStatement.add(query);
|
||||||
insertQuery.value(fieldName, recordContentMap.get(fieldName));
|
|
||||||
}
|
|
||||||
batchStatement.add(insertQuery);
|
|
||||||
|
|
||||||
if (recordsAdded.incrementAndGet() == batchSize) {
|
if (recordsAdded.incrementAndGet() == batchSize) {
|
||||||
connectionSession.execute(batchStatement);
|
connectionSession.execute(batchStatement);
|
||||||
|
@ -193,6 +318,112 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
|
||||||
|
Update updateQuery;
|
||||||
|
|
||||||
|
// Split up the update key names separated by a comma, should not be empty
|
||||||
|
final Set<String> updateKeyNames;
|
||||||
|
updateKeyNames = Arrays.stream(updateKeys.split(","))
|
||||||
|
.map(String::trim)
|
||||||
|
.filter(StringUtils::isNotEmpty)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
if (updateKeyNames.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("No Update Keys were specified");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify if all update keys are present in the record
|
||||||
|
for (String updateKey : updateKeyNames) {
|
||||||
|
if (!schema.getFieldNames().contains(updateKey)) {
|
||||||
|
throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare keyspace/table names
|
||||||
|
if (cassandraTable.contains(".")) {
|
||||||
|
String[] keyspaceAndTable = cassandraTable.split("\\.");
|
||||||
|
updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
|
||||||
|
} else {
|
||||||
|
updateQuery = QueryBuilder.update(cassandraTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop through the field names, setting those that are not in the update key set, and using those
|
||||||
|
// in the update key set as conditions.
|
||||||
|
for (String fieldName : schema.getFieldNames()) {
|
||||||
|
Object fieldValue = recordContentMap.get(fieldName);
|
||||||
|
|
||||||
|
if (updateKeyNames.contains(fieldName)) {
|
||||||
|
updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
|
||||||
|
} else {
|
||||||
|
Assignment assignment;
|
||||||
|
if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
|
||||||
|
assignment = QueryBuilder.set(fieldName, fieldValue);
|
||||||
|
} else if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
|
||||||
|
assignment = QueryBuilder.incr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
|
||||||
|
} else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
|
||||||
|
assignment = QueryBuilder.decr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
|
||||||
|
}
|
||||||
|
updateQuery.with(assignment);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return updateQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long convertFieldObjectToLong(String name, Object value) {
|
||||||
|
if (!(value instanceof Number)) {
|
||||||
|
throw new IllegalArgumentException("Field '" + name + "' is not of type Number");
|
||||||
|
}
|
||||||
|
return ((Number) value).longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
|
||||||
|
|
||||||
|
String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
|
||||||
|
|
||||||
|
if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
|
||||||
|
// Check that update keys are set
|
||||||
|
String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
|
||||||
|
if (StringUtils.isEmpty(updateKeys)) {
|
||||||
|
results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
|
||||||
|
"if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that if the update method is set to increment or decrement that the batch statement type is set to
|
||||||
|
// unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
|
||||||
|
String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
|
||||||
|
String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
|
||||||
|
if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)
|
||||||
|
|| DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
|
||||||
|
if (!(COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType)
|
||||||
|
|| UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType)
|
||||||
|
|| BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equalsIgnoreCase(batchStatementType))) {
|
||||||
|
results.add(new ValidationResult.Builder().subject("Update method configuration").valid(false).explanation(
|
||||||
|
"if the Update Method is set to Increment or Decrement, then the Batch Statement Type must be set " +
|
||||||
|
"to either COUNTER or UNLOGGED").build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Statement generateInsert(String cassandraTable, RecordSchema schema, Map<String, Object> recordContentMap) {
|
||||||
|
Insert insertQuery;
|
||||||
|
if (cassandraTable.contains(".")) {
|
||||||
|
String[] keyspaceAndTable = cassandraTable.split("\\.");
|
||||||
|
insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]);
|
||||||
|
} else {
|
||||||
|
insertQuery = QueryBuilder.insertInto(cassandraTable);
|
||||||
|
}
|
||||||
|
for (String fieldName : schema.getFieldNames()) {
|
||||||
|
insertQuery.value(fieldName, recordContentMap.get(fieldName));
|
||||||
|
}
|
||||||
|
return insertQuery;
|
||||||
|
}
|
||||||
|
|
||||||
@OnUnscheduled
|
@OnUnscheduled
|
||||||
public void stop(ProcessContext context) {
|
public void stop(ProcessContext context) {
|
||||||
super.stop(context);
|
super.stop(context);
|
||||||
|
|
|
@ -35,7 +35,9 @@ import org.junit.Test;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
@ -120,6 +122,366 @@ public class PutCassandraRecordTest {
|
||||||
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
|
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleUpdate() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.INT);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 2L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 0);
|
||||||
|
|
||||||
|
testRunner.enqueue("");
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateInvalidFieldType() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.STRING);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48,"1");
|
||||||
|
recordReader.addRecord("Jane Doe", 47, "1");
|
||||||
|
recordReader.addRecord("Sally Doe", 47, "1");
|
||||||
|
|
||||||
|
testRunner.enqueue("");
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateEmptyUpdateKeys() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateNullUpdateKeys() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateZeroLengthUpdateKeys() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, ",");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.STRING);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, "1");
|
||||||
|
recordReader.addRecord("Jane Doe", 47, "1");
|
||||||
|
recordReader.addRecord("Sally Doe", 47, "1");
|
||||||
|
|
||||||
|
testRunner.enqueue("");
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateSetLoggedBatch() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.LOGGED_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateCounterWrongBatchStatementType() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.LOGGED_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertNotValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithUpdateMethodAndKeyAttributes() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.update.method", "Increment");
|
||||||
|
attributes.put("cql.update.keys", "name,age");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertWithStatementAttribute() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.statement.type", "Insert");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertWithStatementAttributeInvalid() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.statement.type", "invalid-type");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertWithBatchStatementAttribute() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.INSERT_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.batch.statement.type", "counter");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInsertWithBatchStatementAttributeInvalid() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.INSERT_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.batch.statement.type", "invalid-type");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithAttributesInvalidUpdateMethod() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.INT);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.update.method", "invalid-method");
|
||||||
|
attributes.put("cql.update.keys", "name,age");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithAttributesIncompatibleBatchStatementType() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.INT);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.batch.statement.type", "LOGGED");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithAttributesEmptyUpdateKeysAttribute() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
HashMap<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.update.method", "Increment");
|
||||||
|
attributes.put("cql.update.keys", "");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateWithAttributesEmptyUpdateMethodAttribute() throws InitializationException {
|
||||||
|
setUpStandardTestConfig();
|
||||||
|
testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
|
||||||
|
testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
|
||||||
|
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE);
|
||||||
|
|
||||||
|
testRunner.assertValid();
|
||||||
|
|
||||||
|
recordReader.addSchemaField("name", RecordFieldType.STRING);
|
||||||
|
recordReader.addSchemaField("age", RecordFieldType.INT);
|
||||||
|
recordReader.addSchemaField("goals", RecordFieldType.LONG);
|
||||||
|
|
||||||
|
recordReader.addRecord("John Doe", 48, 1L);
|
||||||
|
recordReader.addRecord("Jane Doe", 47, 1L);
|
||||||
|
recordReader.addRecord("Sally Doe", 47, 1L);
|
||||||
|
|
||||||
|
HashMap<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put("cql.update.method", "");
|
||||||
|
testRunner.enqueue("", attributes);
|
||||||
|
testRunner.run();
|
||||||
|
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
|
||||||
|
testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEL() throws InitializationException {
|
public void testEL() throws InitializationException {
|
||||||
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}");
|
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}");
|
||||||
|
|
|
@ -0,0 +1,293 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.cassandra;
|
||||||
|
|
||||||
|
import com.datastax.driver.core.Statement;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.util.Tuple;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class PutCassandraRecordUpdateTest {
|
||||||
|
private PutCassandraRecord testSubject;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private RecordSchema schema;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
|
||||||
|
testSubject = new PutCassandraRecord();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateWithEmptyKeyList() {
|
||||||
|
Stream.of("", ",", ",,,").forEach(updateKeys -> testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
updateKeys,
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("stringField", "newStringValue")
|
||||||
|
),
|
||||||
|
new IllegalArgumentException("No Update Keys were specified")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateWithMissingKey() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField,missingKeyField",
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("stringField", "newStringValue")
|
||||||
|
),
|
||||||
|
new IllegalArgumentException("Update key 'missingKeyField' is not present in the record schema")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateWithInvalidUpdateMethod() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
"invalidUpdateMethod",
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("longField", 15L)
|
||||||
|
),
|
||||||
|
new IllegalArgumentException("Update Method 'invalidUpdateMethod' is not valid.")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementString() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("stringField", "15")
|
||||||
|
),
|
||||||
|
new IllegalArgumentException("Field 'stringField' is not of type Number")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateSimpleTableName() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"table",
|
||||||
|
"keyField1",
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField1", 1),
|
||||||
|
new Tuple<>("stringField", "newStringValue")
|
||||||
|
),
|
||||||
|
"UPDATE table SET stringField='newStringValue' WHERE keyField1=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateKeyspacedTableName() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField1",
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField1", 1),
|
||||||
|
new Tuple<>("stringField", "newStringValue")
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateMultipleKeys() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField1,keyField2,keyField3",
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField1", 1),
|
||||||
|
new Tuple<>("keyField2", "key2"),
|
||||||
|
new Tuple<>("keyField3", 123L),
|
||||||
|
new Tuple<>("stringField", "newStringValue")
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1 AND keyField2='key2' AND keyField3=123;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementLong() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("longField", 15L)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET longField=longField+15 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateDecrementLong() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.DECR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("longField", 15L)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET longField=longField-15 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementInteger() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("integerField", 15)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET integerField=integerField+15 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementFloat() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("floatField", 15.05F)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET floatField=floatField+15 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementDouble() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("doubleField", 15.05D)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET doubleField=doubleField+15 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateSetMultipleValues() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.SET_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("stringField", "newStringValue"),
|
||||||
|
new Tuple<>("integerField", 15),
|
||||||
|
new Tuple<>("longField", 67L)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET stringField='newStringValue',integerField=15,longField=67 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateIncrementMultipleValues() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.INCR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("integerField", 15),
|
||||||
|
new Tuple<>("longField", 67L)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET integerField=integerField+15,longField=longField+67 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGenerateUpdateDecrementMultipleValues() {
|
||||||
|
testGenerateUpdate(
|
||||||
|
"keyspace.table",
|
||||||
|
"keyField",
|
||||||
|
PutCassandraRecord.DECR_TYPE.getValue(),
|
||||||
|
Arrays.asList(
|
||||||
|
new Tuple<>("keyField", 1),
|
||||||
|
new Tuple<>("integerField", 15),
|
||||||
|
new Tuple<>("longField", 67L)
|
||||||
|
),
|
||||||
|
"UPDATE keyspace.table SET integerField=integerField-15,longField=longField-67 WHERE keyField=1;"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, String expected) {
|
||||||
|
Map<String, Object> recordContentMap = records.stream()
|
||||||
|
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
|
||||||
|
|
||||||
|
List<String> fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList());
|
||||||
|
|
||||||
|
when(schema.getFieldNames()).thenReturn(fieldNames);
|
||||||
|
Statement actual = testSubject.generateUpdate(table, schema, updateKeys, updateMethod, recordContentMap);
|
||||||
|
|
||||||
|
assertEquals(expected, actual.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private <E extends Exception> void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, E expected) {
|
||||||
|
Map<String, Object> recordContentMap = records.stream()
|
||||||
|
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
|
||||||
|
|
||||||
|
List<String> fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList());
|
||||||
|
|
||||||
|
when(schema.getFieldNames()).thenReturn(fieldNames);
|
||||||
|
try {
|
||||||
|
testSubject.generateUpdate("keyspace.table", schema, updateKeys, updateMethod, recordContentMap);
|
||||||
|
fail();
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertEquals(expected.getClass(), e.getClass());
|
||||||
|
assertEquals(expected.getMessage(), e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue