NIFI-8295: This closes #4946. upgrade cassandra driver to 3.11.0

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Wouter de Vries 2021-03-29 14:24:30 +02:00 committed by Joe Witt
parent ef794c56b6
commit 6309ab96d1
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 13 additions and 10 deletions

View File

@ -25,6 +25,7 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SniEndPoint;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@ -219,7 +220,7 @@ public class PutCassandraQLTest {
public void testProcessorBadTimestamp() {
setUpStandardTestConfig();
processor.setExceptionToThrow(
new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid timestamp"));
new InvalidQueryException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), "invalid timestamp"));
testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
@ -330,7 +331,7 @@ public class PutCassandraQLTest {
// Test exceptions
processor.setExceptionToThrow(
new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid query"));
new InvalidQueryException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), "invalid query"));
testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';");
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1);
@ -342,7 +343,7 @@ public class PutCassandraQLTest {
setUpStandardTestConfig();
processor.setExceptionToThrow(
new UnavailableException(new InetSocketAddress("localhost", 9042), ConsistencyLevel.ALL, 5, 2));
new UnavailableException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), ConsistencyLevel.ALL, 5, 2));
testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';");
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_RETRY, 1);

View File

@ -29,10 +29,12 @@ import static org.mockito.Mockito.when;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.EndPoint;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SniEndPoint;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
@ -110,19 +112,19 @@ public class QueryCassandraTest {
testRunner.clearTransferState();
// Test exceptions
processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<InetSocketAddress, Throwable>()));
processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<EndPoint, Throwable>()));
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
processor.setExceptionToThrow(
new ReadTimeoutException(new InetSocketAddress("localhost", 9042), ConsistencyLevel.ANY, 0, 1, false));
new ReadTimeoutException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), ConsistencyLevel.ANY, 0, 1, false));
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
processor.setExceptionToThrow(
new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid query"));
new InvalidQueryException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), "invalid query"));
testRunner.run(1, true, true);
// No files transferred to failure if there was no incoming connection
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE, 0);
@ -225,21 +227,21 @@ public class QueryCassandraTest {
testRunner.clearTransferState();
// Test exceptions
processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<InetSocketAddress, Throwable>()));
processor.setExceptionToThrow(new NoHostAvailableException(new HashMap<EndPoint, Throwable>()));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
processor.setExceptionToThrow(
new ReadTimeoutException(new InetSocketAddress("localhost", 9042), ConsistencyLevel.ANY, 0, 1, false));
new ReadTimeoutException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), ConsistencyLevel.ANY, 0, 1, false));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_RETRY, 1);
testRunner.clearTransferState();
processor.setExceptionToThrow(
new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid query"));
new InvalidQueryException(new SniEndPoint(new InetSocketAddress("localhost", 9042), ""), "invalid query"));
testRunner.enqueue("".getBytes());
testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(QueryCassandra.REL_FAILURE, 1);

View File

@ -23,7 +23,7 @@
</parent>
<properties>
<cassandra.sdk.version>3.3.0</cassandra.sdk.version>
<cassandra.sdk.version>3.11.0</cassandra.sdk.version>
</properties>
<artifactId>nifi-cassandra-bundle</artifactId>