From 9848eb4409fe672e7d321ea3ac3351e8a5972e14 Mon Sep 17 00:00:00 2001 From: Wouter de Vries Date: Fri, 10 Jan 2020 09:09:38 +0100 Subject: [PATCH] NIFI-7007: Add update functionality to the PutCassandraRecord processor. NIFI-7007: Add additional unit tests that hit non-happy path NIFI-7007: Use AllowableValue instead of string NIFI-7007: Add the use of attributes for the update method, statement type and batch statement type NIFI-7007: Add additional tests, mainly for the use of attributes NIFI-7007: add some ReadsAttribute properties to the PutCassandraRecord processor NIFI-7007: additional update keys validation logic NIFI-7007: fix imports NIFI-7007: Convert fieldValue to long in separate method NIFI-7007: Add new style of tests checking actual CQL output NIFI-7007: add license to new test file NIFI-7007: add customValidate to check for certain incompatible property combinations NIFI-7007: remove check on updateMethod and replace Set.of with java 8 compatible replacmenet NIFI-7007: Add test for failure with empty update method via attributes NIFI-7007: remove unused variable NIFI-7007: Fix customValidate that incorrectly invalidated a valid config Fix Checkstyle Signed-off-by: Matthew Burgess This closes #3977 --- .../cassandra/PutCassandraRecord.java | 259 ++++++++++++- .../cassandra/PutCassandraRecordTest.java | 362 ++++++++++++++++++ .../PutCassandraRecordUpdateTest.java | 293 ++++++++++++++ 3 files changed, 900 insertions(+), 14 deletions(-) create mode 100644 nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java index 8acc36aecd..37fefa851c 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java @@ -19,14 +19,23 @@ package org.apache.nifi.processors.cassandra; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Assignment; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Update; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -34,8 +43,8 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; @@ -45,18 +54,63 @@ import org.apache.nifi.util.StopWatch; import java.io.InputStream; import java.util.Arrays; import java.util.Collections; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static java.lang.String.format; @Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " + "configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "cql.statement.type", description = "If 'Use cql.statement.type Attribute' is selected for the Statement " + + "Type property, the value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) " + + "will be generated and executed"), + @ReadsAttribute(attribute = "cql.update.method", description = "If 'Use cql.update.method Attribute' is selected for the Update " + + "Method property, the value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) " + + "will be used to generate and execute the Update statement. Ignored if the Statement Type property is not set to UPDATE"), + @ReadsAttribute(attribute = "cql.batch.statement.type", description = "If 'Use cql.batch.statement.type Attribute' is selected for the Batch " + + "Statement Type property, the value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement " + + "(LOGGED, UNLOGGED, COUNTER) will be generated and executed") +}) public class PutCassandraRecord extends AbstractCassandraProcessor { + static final AllowableValue UPDATE_TYPE = new AllowableValue("UPDATE", "UPDATE", + "Use an UPDATE statement."); + static final AllowableValue INSERT_TYPE = new AllowableValue("INSERT", "INSERT", + "Use an INSERT statement."); + static final AllowableValue STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.statement.type Attribute", + "The value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) " + + "will be generated and executed"); + static final String STATEMENT_TYPE_ATTRIBUTE = "cql.statement.type"; + + static final AllowableValue INCR_TYPE = new AllowableValue("INCREMENT", "Increment", + "Use an increment operation (+=) for the Update statement."); + static final AllowableValue SET_TYPE = new AllowableValue("SET", "Set", + "Use a set operation (=) for the Update statement."); + static final AllowableValue DECR_TYPE = new AllowableValue("DECREMENT", "Decrement", + "Use a decrement operation (-=) for the Update statement."); + static final AllowableValue UPDATE_METHOD_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.update.method Attribute", + "The value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) " + + "will be used to generate and execute the Update statement."); + static final String UPDATE_METHOD_ATTRIBUTE = "cql.update.method"; + + static final AllowableValue LOGGED_TYPE = new AllowableValue("LOGGED", "LOGGED", + "Use a LOGGED batch statement"); + static final AllowableValue UNLOGGED_TYPE = new AllowableValue("UNLOGGED", "UNLOGGED", + "Use an UNLOGGED batch statement"); + static final AllowableValue COUNTER_TYPE = new AllowableValue("COUNTER", "COUNTER", + "Use a COUNTER batch statement"); + static final AllowableValue BATCH_STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.batch.statement.type Attribute", + "The value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement (LOGGED, UNLOGGED or COUNTER) " + + "will be used to generate and execute the Update statement."); + static final String BATCH_STATEMENT_TYPE_ATTRIBUTE = "cql.batch.statement.type"; static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() .name("put-cassandra-record-reader") @@ -67,6 +121,36 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { .required(true) .build(); + static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder() + .name("put-cassandra-record-statement-type") + .displayName("Statement Type") + .description("Specifies the type of CQL Statement to generate.") + .required(true) + .defaultValue(INSERT_TYPE.getValue()) + .allowableValues(UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE) + .build(); + + static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder() + .name("put-cassandra-record-update-method") + .displayName("Update Method") + .description("Specifies the method to use to SET the values. This property is used if the Statement Type is " + + "UPDATE and ignored otherwise.") + .required(false) + .defaultValue(SET_TYPE.getValue()) + .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE) + .build(); + + static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder() + .name("put-cassandra-record-update-keys") + .displayName("Update Keys") + .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. " + + "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. " + + "This property is ignored if the Statement Type is not UPDATE.") + .addValidator(StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR)) + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() .name("put-cassandra-record-table") .displayName("Table name") @@ -90,8 +174,8 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { .name("put-cassandra-record-batch-statement-type") .displayName("Batch Statement Type") .description("Specifies the type of 'Batch Statement' to be used.") - .allowableValues(BatchStatement.Type.values()) - .defaultValue(BatchStatement.Type.LOGGED.toString()) + .allowableValues(LOGGED_TYPE, UNLOGGED_TYPE, COUNTER_TYPE, BATCH_STATEMENT_TYPE_USE_ATTR_TYPE) + .defaultValue(LOGGED_TYPE.getValue()) .required(false) .build(); @@ -102,7 +186,7 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { .build(); private final static List propertyDescriptors = Collections.unmodifiableList(Arrays.asList( - CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD, + CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, STATEMENT_TYPE, UPDATE_KEYS, UPDATE_METHOD, CLIENT_AUTH, USERNAME, PASSWORD, RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE)); private final static Set relationships = Collections.unmodifiableSet( @@ -129,8 +213,33 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue(); final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue(); final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); + final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue(); + + // Get the statement type from the attribute if necessary + final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue(); + String statementType = statementTypeProperty; + if (STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(statementTypeProperty)) { + statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE); + } + + // Get the update method from the attribute if necessary + final String updateMethodProperty = context.getProperty(UPDATE_METHOD).getValue(); + String updateMethod = updateMethodProperty; + if (UPDATE_METHOD_USE_ATTR_TYPE.getValue().equals(updateMethodProperty)) { + updateMethod = inputFlowFile.getAttribute(UPDATE_METHOD_ATTRIBUTE); + } + + + // Get the batch statement type from the attribute if necessary + final String batchStatementTypeProperty = context.getProperty(BATCH_STATEMENT_TYPE).getValue(); + String batchStatementType = batchStatementTypeProperty; + if (BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(batchStatementTypeProperty)) { + batchStatementType = inputFlowFile.getAttribute(BATCH_STATEMENT_TYPE_ATTRIBUTE).toUpperCase(); + } + if (StringUtils.isEmpty(batchStatementType)) { + throw new IllegalArgumentException(format("Batch Statement Type is not specified, FlowFile %s", inputFlowFile)); + } final BatchStatement batchStatement; final Session connectionSession = cassandraSession.get(); @@ -142,6 +251,24 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { try (final InputStream inputStream = session.read(inputFlowFile); final RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, getLogger())){ + // throw an exception if statement type is not set + if (StringUtils.isEmpty(statementType)) { + throw new IllegalArgumentException(format("Statement Type is not specified, FlowFile %s", inputFlowFile)); + } + + // throw an exception if the statement type is set to update and updateKeys is empty + if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty(updateKeys)) { + throw new IllegalArgumentException(format("Update Keys are not specified, FlowFile %s", inputFlowFile)); + } + + // throw an exception if the Update Method is Increment or Decrement and the batch statement type is not UNLOGGED or COUNTER + if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) || DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) { + if (!(UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) || COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType))) { + throw new IllegalArgumentException(format("Increment/Decrement Update Method can only be used with COUNTER " + + "or UNLOGGED Batch Statement Type, FlowFile %s", inputFlowFile)); + } + } + final RecordSchema schema = reader.getSchema(); Record record; @@ -151,18 +278,16 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { while((record = reader.nextRecord()) != null) { Map recordContentMap = (Map) DataTypeUtils .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); - Insert insertQuery; - if (cassandraTable.contains(".")) { - String keyspaceAndTable[] = cassandraTable.split("\\."); - insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]); + Statement query; + if (INSERT_TYPE.getValue().equalsIgnoreCase(statementType)) { + query = generateInsert(cassandraTable, schema, recordContentMap); + } else if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) { + query = generateUpdate(cassandraTable, schema, updateKeys, updateMethod, recordContentMap); } else { - insertQuery = QueryBuilder.insertInto(cassandraTable); + throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, inputFlowFile)); } - for (String fieldName : schema.getFieldNames()) { - insertQuery.value(fieldName, recordContentMap.get(fieldName)); - } - batchStatement.add(insertQuery); + batchStatement.add(query); if (recordsAdded.incrementAndGet() == batchSize) { connectionSession.execute(batchStatement); @@ -193,6 +318,112 @@ public class PutCassandraRecord extends AbstractCassandraProcessor { } + protected Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map recordContentMap) { + Update updateQuery; + + // Split up the update key names separated by a comma, should not be empty + final Set updateKeyNames; + updateKeyNames = Arrays.stream(updateKeys.split(",")) + .map(String::trim) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toSet()); + if (updateKeyNames.isEmpty()) { + throw new IllegalArgumentException("No Update Keys were specified"); + } + + // Verify if all update keys are present in the record + for (String updateKey : updateKeyNames) { + if (!schema.getFieldNames().contains(updateKey)) { + throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema"); + } + } + + // Prepare keyspace/table names + if (cassandraTable.contains(".")) { + String[] keyspaceAndTable = cassandraTable.split("\\."); + updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]); + } else { + updateQuery = QueryBuilder.update(cassandraTable); + } + + // Loop through the field names, setting those that are not in the update key set, and using those + // in the update key set as conditions. + for (String fieldName : schema.getFieldNames()) { + Object fieldValue = recordContentMap.get(fieldName); + + if (updateKeyNames.contains(fieldName)) { + updateQuery.where(QueryBuilder.eq(fieldName, fieldValue)); + } else { + Assignment assignment; + if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) { + assignment = QueryBuilder.set(fieldName, fieldValue); + } else if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) { + assignment = QueryBuilder.incr(fieldName, convertFieldObjectToLong(fieldName, fieldValue)); + } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) { + assignment = QueryBuilder.decr(fieldName, convertFieldObjectToLong(fieldName, fieldValue)); + } else { + throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid."); + } + updateQuery.with(assignment); + } + } + return updateQuery; + } + + private Long convertFieldObjectToLong(String name, Object value) { + if (!(value instanceof Number)) { + throw new IllegalArgumentException("Field '" + name + "' is not of type Number"); + } + return ((Number) value).longValue(); + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + Set results = (Set) super.customValidate(validationContext); + + String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue(); + + if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) { + // Check that update keys are set + String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue(); + if (StringUtils.isEmpty(updateKeys)) { + results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation( + "if the Statement Type is set to Update, then the Update Keys must be specified as well").build()); + } + + // Check that if the update method is set to increment or decrement that the batch statement type is set to + // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point). + String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue(); + String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue(); + if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) + || DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) { + if (!(COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType) + || UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) + || BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equalsIgnoreCase(batchStatementType))) { + results.add(new ValidationResult.Builder().subject("Update method configuration").valid(false).explanation( + "if the Update Method is set to Increment or Decrement, then the Batch Statement Type must be set " + + "to either COUNTER or UNLOGGED").build()); + } + } + } + + return results; + } + + private Statement generateInsert(String cassandraTable, RecordSchema schema, Map recordContentMap) { + Insert insertQuery; + if (cassandraTable.contains(".")) { + String[] keyspaceAndTable = cassandraTable.split("\\."); + insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]); + } else { + insertQuery = QueryBuilder.insertInto(cassandraTable); + } + for (String fieldName : schema.getFieldNames()) { + insertQuery.value(fieldName, recordContentMap.get(fieldName)); + } + return insertQuery; + } + @OnUnscheduled public void stop(ProcessContext context) { super.stop(context); diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java index 34a6973959..f83dc202a8 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java @@ -35,7 +35,9 @@ import org.junit.Test; import javax.net.ssl.SSLContext; import java.net.InetSocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -120,6 +122,366 @@ public class PutCassandraRecordTest { testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); } + @Test + public void testSimpleUpdate() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.INT); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 2L); + recordReader.addRecord("Sally Doe", 47, 0); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); + } + + @Test + public void testUpdateInvalidFieldType() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48,"1"); + recordReader.addRecord("Jane Doe", 47, "1"); + recordReader.addRecord("Sally Doe", 47, "1"); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateEmptyUpdateKeys() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, ""); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertNotValid(); + } + + @Test + public void testUpdateNullUpdateKeys() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertNotValid(); + } + + @Test + public void testUpdateZeroLengthUpdateKeys() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, ","); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "1"); + recordReader.addRecord("Jane Doe", 47, "1"); + recordReader.addRecord("Sally Doe", 47, "1"); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateSetLoggedBatch() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.SET_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.LOGGED_TYPE); + + testRunner.assertValid(); + } + + @Test + public void testUpdateCounterWrongBatchStatementType() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.LOGGED_TYPE); + + testRunner.assertNotValid(); + } + + @Test + public void testUpdateWithUpdateMethodAndKeyAttributes() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.update.method", "Increment"); + attributes.put("cql.update.keys", "name,age"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testInsertWithStatementAttribute() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.statement.type", "Insert"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testInsertWithStatementAttributeInvalid() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.statement.type", "invalid-type"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testInsertWithBatchStatementAttribute() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.INSERT_TYPE); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.batch.statement.type", "counter"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testInsertWithBatchStatementAttributeInvalid() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.INSERT_TYPE); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.batch.statement.type", "invalid-type"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateWithAttributesInvalidUpdateMethod() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.INT); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.update.method", "invalid-method"); + attributes.put("cql.update.keys", "name,age"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateWithAttributesIncompatibleBatchStatementType() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.INCR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.INT); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + Map attributes = new HashMap<>(); + attributes.put("cql.batch.statement.type", "LOGGED"); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateWithAttributesEmptyUpdateKeysAttribute() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "${cql.update.keys}"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + HashMap attributes = new HashMap<>(); + attributes.put("cql.update.method", "Increment"); + attributes.put("cql.update.keys", ""); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + + @Test + public void testUpdateWithAttributesEmptyUpdateMethodAttribute() throws InitializationException { + setUpStandardTestConfig(); + testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, PutCassandraRecord.UPDATE_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE); + testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, PutCassandraRecord.COUNTER_TYPE); + + testRunner.assertValid(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("goals", RecordFieldType.LONG); + + recordReader.addRecord("John Doe", 48, 1L); + recordReader.addRecord("Jane Doe", 47, 1L); + recordReader.addRecord("Sally Doe", 47, 1L); + + HashMap attributes = new HashMap<>(); + attributes.put("cql.update.method", ""); + testRunner.enqueue("", attributes); + testRunner.run(); + + testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1); + testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0); + testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0); + } + @Test public void testEL() throws InitializationException { testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}"); diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java new file mode 100644 index 0000000000..78bebf0c47 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Statement; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.Tuple; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; + +public class PutCassandraRecordUpdateTest { + private PutCassandraRecord testSubject; + + @Mock + private RecordSchema schema; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + testSubject = new PutCassandraRecord(); + } + + @Test + public void testGenerateUpdateWithEmptyKeyList() { + Stream.of("", ",", ",,,").forEach(updateKeys -> testGenerateUpdate( + "keyspace.table", + updateKeys, + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("stringField", "newStringValue") + ), + new IllegalArgumentException("No Update Keys were specified") + )); + } + + @Test + public void testGenerateUpdateWithMissingKey() { + testGenerateUpdate( + "keyspace.table", + "keyField,missingKeyField", + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("stringField", "newStringValue") + ), + new IllegalArgumentException("Update key 'missingKeyField' is not present in the record schema") + ); + } + + @Test + public void testGenerateUpdateWithInvalidUpdateMethod() { + testGenerateUpdate( + "keyspace.table", + "keyField", + "invalidUpdateMethod", + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("longField", 15L) + ), + new IllegalArgumentException("Update Method 'invalidUpdateMethod' is not valid.") + ); + } + + @Test + public void testGenerateUpdateIncrementString() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("stringField", "15") + ), + new IllegalArgumentException("Field 'stringField' is not of type Number") + ); + } + + @Test + public void testGenerateUpdateSimpleTableName() { + testGenerateUpdate( + "table", + "keyField1", + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField1", 1), + new Tuple<>("stringField", "newStringValue") + ), + "UPDATE table SET stringField='newStringValue' WHERE keyField1=1;" + ); + } + + @Test + public void testGenerateUpdateKeyspacedTableName() { + testGenerateUpdate( + "keyspace.table", + "keyField1", + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField1", 1), + new Tuple<>("stringField", "newStringValue") + ), + "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1;" + ); + } + + @Test + public void testGenerateUpdateMultipleKeys() { + testGenerateUpdate( + "keyspace.table", + "keyField1,keyField2,keyField3", + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField1", 1), + new Tuple<>("keyField2", "key2"), + new Tuple<>("keyField3", 123L), + new Tuple<>("stringField", "newStringValue") + ), + "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1 AND keyField2='key2' AND keyField3=123;" + ); + } + + @Test + public void testGenerateUpdateIncrementLong() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("longField", 15L) + ), + "UPDATE keyspace.table SET longField=longField+15 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateDecrementLong() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.DECR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("longField", 15L) + ), + "UPDATE keyspace.table SET longField=longField-15 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateIncrementInteger() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("integerField", 15) + ), + "UPDATE keyspace.table SET integerField=integerField+15 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateIncrementFloat() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("floatField", 15.05F) + ), + "UPDATE keyspace.table SET floatField=floatField+15 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateIncrementDouble() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("doubleField", 15.05D) + ), + "UPDATE keyspace.table SET doubleField=doubleField+15 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateSetMultipleValues() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.SET_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("stringField", "newStringValue"), + new Tuple<>("integerField", 15), + new Tuple<>("longField", 67L) + ), + "UPDATE keyspace.table SET stringField='newStringValue',integerField=15,longField=67 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateIncrementMultipleValues() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.INCR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("integerField", 15), + new Tuple<>("longField", 67L) + ), + "UPDATE keyspace.table SET integerField=integerField+15,longField=longField+67 WHERE keyField=1;" + ); + } + + @Test + public void testGenerateUpdateDecrementMultipleValues() { + testGenerateUpdate( + "keyspace.table", + "keyField", + PutCassandraRecord.DECR_TYPE.getValue(), + Arrays.asList( + new Tuple<>("keyField", 1), + new Tuple<>("integerField", 15), + new Tuple<>("longField", 67L) + ), + "UPDATE keyspace.table SET integerField=integerField-15,longField=longField-67 WHERE keyField=1;" + ); + } + + private void testGenerateUpdate(String table, String updateKeys, String updateMethod, List> records, String expected) { + Map recordContentMap = records.stream() + .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + + List fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList()); + + when(schema.getFieldNames()).thenReturn(fieldNames); + Statement actual = testSubject.generateUpdate(table, schema, updateKeys, updateMethod, recordContentMap); + + assertEquals(expected, actual.toString()); + } + + private void testGenerateUpdate(String table, String updateKeys, String updateMethod, List> records, E expected) { + Map recordContentMap = records.stream() + .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue)); + + List fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList()); + + when(schema.getFieldNames()).thenReturn(fieldNames); + try { + testSubject.generateUpdate("keyspace.table", schema, updateKeys, updateMethod, recordContentMap); + fail(); + } catch (Exception e) { + assertEquals(expected.getClass(), e.getClass()); + assertEquals(expected.getMessage(), e.getMessage()); + } + } +}