mirror of https://github.com/apache/nifi.git
parent
75bb4bfaa2
commit
6cbc585438
|
@ -41,7 +41,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.datastax.cassandra</groupId>
|
<groupId>com.datastax.cassandra</groupId>
|
||||||
<artifactId>cassandra-driver-core</artifactId>
|
<artifactId>cassandra-driver-core</artifactId>
|
||||||
<version>2.1.9</version>
|
<version>3.0.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
|
|
@ -17,12 +17,14 @@
|
||||||
package org.apache.nifi.processors.cassandra;
|
package org.apache.nifi.processors.cassandra;
|
||||||
|
|
||||||
import com.datastax.driver.core.Cluster;
|
import com.datastax.driver.core.Cluster;
|
||||||
|
import com.datastax.driver.core.CodecRegistry;
|
||||||
import com.datastax.driver.core.ConsistencyLevel;
|
import com.datastax.driver.core.ConsistencyLevel;
|
||||||
import com.datastax.driver.core.DataType;
|
import com.datastax.driver.core.DataType;
|
||||||
|
import com.datastax.driver.core.JdkSSLOptions;
|
||||||
import com.datastax.driver.core.Metadata;
|
import com.datastax.driver.core.Metadata;
|
||||||
import com.datastax.driver.core.Row;
|
import com.datastax.driver.core.Row;
|
||||||
import com.datastax.driver.core.SSLOptions;
|
|
||||||
import com.datastax.driver.core.Session;
|
import com.datastax.driver.core.Session;
|
||||||
|
import com.datastax.driver.core.TypeCodec;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.SchemaBuilder;
|
import org.apache.avro.SchemaBuilder;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -167,6 +169,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
|
||||||
protected final AtomicReference<Cluster> cluster = new AtomicReference<>(null);
|
protected final AtomicReference<Cluster> cluster = new AtomicReference<>(null);
|
||||||
protected final AtomicReference<Session> cassandraSession = new AtomicReference<>(null);
|
protected final AtomicReference<Session> cassandraSession = new AtomicReference<>(null);
|
||||||
|
|
||||||
|
protected static final CodecRegistry codecRegistry = new CodecRegistry();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
Set<ValidationResult> results = new HashSet<>();
|
Set<ValidationResult> results = new HashSet<>();
|
||||||
|
@ -253,7 +257,10 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
|
||||||
String username, String password) {
|
String username, String password) {
|
||||||
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
|
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
|
||||||
if (sslContext != null) {
|
if (sslContext != null) {
|
||||||
builder = builder.withSSL(new SSLOptions(sslContext, SSLOptions.DEFAULT_SSL_CIPHER_SUITES));
|
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
|
||||||
|
.withSSLContext(sslContext)
|
||||||
|
.build();
|
||||||
|
builder = builder.withSSL(sslOptions);
|
||||||
}
|
}
|
||||||
if (username != null && password != null) {
|
if (username != null && password != null) {
|
||||||
builder = builder.withCredentials(username, password);
|
builder = builder.withCredentials(username, password);
|
||||||
|
@ -315,15 +322,17 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
// Get the first type argument, to be used for lists and sets (and the first in a map)
|
// Get the first type argument, to be used for lists and sets (and the first in a map)
|
||||||
DataType firstArg = typeArguments.get(0);
|
DataType firstArg = typeArguments.get(0);
|
||||||
|
TypeCodec firstCodec = codecRegistry.codecFor(firstArg);
|
||||||
if (dataType.equals(DataType.set(firstArg))) {
|
if (dataType.equals(DataType.set(firstArg))) {
|
||||||
return row.getSet(i, firstArg.asJavaClass());
|
return row.getSet(i, firstCodec.getJavaType());
|
||||||
} else if (dataType.equals(DataType.list(firstArg))) {
|
} else if (dataType.equals(DataType.list(firstArg))) {
|
||||||
return row.getList(i, firstArg.asJavaClass());
|
return row.getList(i, firstCodec.getJavaType());
|
||||||
} else {
|
} else {
|
||||||
// Must be an n-arg collection like map
|
// Must be an n-arg collection like map
|
||||||
DataType secondArg = typeArguments.get(1);
|
DataType secondArg = typeArguments.get(1);
|
||||||
|
TypeCodec secondCodec = codecRegistry.codecFor(secondArg);
|
||||||
if (dataType.equals(DataType.map(firstArg, secondArg))) {
|
if (dataType.equals(DataType.map(firstArg, secondArg))) {
|
||||||
return row.getMap(i, firstArg.asJavaClass(), secondArg.asJavaClass());
|
return row.getMap(i, firstCodec.getJavaType(), secondCodec.getJavaType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -427,7 +436,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
|
||||||
return primitiveType;
|
return primitiveType;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException("Not a primitive Cassandra type: " + dataTypeName);
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType;
|
||||||
import com.datastax.driver.core.PreparedStatement;
|
import com.datastax.driver.core.PreparedStatement;
|
||||||
import com.datastax.driver.core.ResultSetFuture;
|
import com.datastax.driver.core.ResultSetFuture;
|
||||||
import com.datastax.driver.core.Session;
|
import com.datastax.driver.core.Session;
|
||||||
|
import com.datastax.driver.core.TypeCodec;
|
||||||
import com.datastax.driver.core.exceptions.AuthenticationException;
|
import com.datastax.driver.core.exceptions.AuthenticationException;
|
||||||
import com.datastax.driver.core.exceptions.InvalidTypeException;
|
import com.datastax.driver.core.exceptions.InvalidTypeException;
|
||||||
import com.datastax.driver.core.exceptions.NoHostAvailableException;
|
import com.datastax.driver.core.exceptions.NoHostAvailableException;
|
||||||
|
@ -119,7 +120,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
||||||
// Matches on top-level type (primitive types like text,int) and also for collections (like list<boolean> and map<float,double>)
|
// Matches on top-level type (primitive types like text,int) and also for collections (like list<boolean> and map<float,double>)
|
||||||
private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
|
private static final Pattern CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Will ensure that the list of property descriptors is build only once.
|
* Will ensure that the list of property descriptors is build only once.
|
||||||
* Will also create a Set of relationships
|
* Will also create a Set of relationships
|
||||||
|
@ -310,9 +310,9 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
||||||
// If the matcher doesn't match, this should fall through to the exception at the bottom
|
// If the matcher doesn't match, this should fall through to the exception at the bottom
|
||||||
if (matcher.find() && matcher.groupCount() > 1) {
|
if (matcher.find() && matcher.groupCount() > 1) {
|
||||||
String mainTypeString = matcher.group(1).toLowerCase();
|
String mainTypeString = matcher.group(1).toLowerCase();
|
||||||
DataType.Name mainTypeName = DataType.Name.valueOf(mainTypeString.toUpperCase());
|
|
||||||
if (!mainTypeName.isCollection()) {
|
|
||||||
DataType mainType = getPrimitiveDataTypeFromString(mainTypeString);
|
DataType mainType = getPrimitiveDataTypeFromString(mainTypeString);
|
||||||
|
if (mainType != null) {
|
||||||
|
TypeCodec typeCodec = codecRegistry.codecFor(mainType);
|
||||||
|
|
||||||
// Need the right statement.setXYZ() method
|
// Need the right statement.setXYZ() method
|
||||||
if (mainType.equals(DataType.ascii())
|
if (mainType.equals(DataType.ascii())
|
||||||
|
@ -327,23 +327,23 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
||||||
statement.setString(paramIndex, paramValue);
|
statement.setString(paramIndex, paramValue);
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.cboolean())) {
|
} else if (mainType.equals(DataType.cboolean())) {
|
||||||
statement.setBool(paramIndex, (boolean) mainType.parse(paramValue));
|
statement.setBool(paramIndex, (boolean) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.cint())) {
|
} else if (mainType.equals(DataType.cint())) {
|
||||||
statement.setInt(paramIndex, (int) mainType.parse(paramValue));
|
statement.setInt(paramIndex, (int) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.bigint())
|
} else if (mainType.equals(DataType.bigint())
|
||||||
|| mainType.equals(DataType.counter())) {
|
|| mainType.equals(DataType.counter())) {
|
||||||
statement.setLong(paramIndex, (long) mainType.parse(paramValue));
|
statement.setLong(paramIndex, (long) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.cfloat())) {
|
} else if (mainType.equals(DataType.cfloat())) {
|
||||||
statement.setFloat(paramIndex, (float) mainType.parse(paramValue));
|
statement.setFloat(paramIndex, (float) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.cdouble())) {
|
} else if (mainType.equals(DataType.cdouble())) {
|
||||||
statement.setDouble(paramIndex, (double) mainType.parse(paramValue));
|
statement.setDouble(paramIndex, (double) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
} else if (mainType.equals(DataType.blob())) {
|
} else if (mainType.equals(DataType.blob())) {
|
||||||
statement.setBytes(paramIndex, (ByteBuffer) mainType.parse(paramValue));
|
statement.setBytes(paramIndex, (ByteBuffer) typeCodec.parse(paramValue));
|
||||||
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
@ -352,22 +352,28 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
|
||||||
if (matcher.groupCount() > 2) {
|
if (matcher.groupCount() > 2) {
|
||||||
String firstParamTypeName = matcher.group(3);
|
String firstParamTypeName = matcher.group(3);
|
||||||
DataType firstParamType = getPrimitiveDataTypeFromString(firstParamTypeName);
|
DataType firstParamType = getPrimitiveDataTypeFromString(firstParamTypeName);
|
||||||
|
if (firstParamType == null) {
|
||||||
|
throw new IllegalArgumentException("Nested collections are not supported");
|
||||||
|
}
|
||||||
|
|
||||||
// Check for map type
|
// Check for map type
|
||||||
if (DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) {
|
if (DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) {
|
||||||
if (matcher.groupCount() > 4) {
|
if (matcher.groupCount() > 4) {
|
||||||
String secondParamTypeName = matcher.group(5);
|
String secondParamTypeName = matcher.group(5);
|
||||||
DataType secondParamType = getPrimitiveDataTypeFromString(secondParamTypeName);
|
DataType secondParamType = getPrimitiveDataTypeFromString(secondParamTypeName);
|
||||||
statement.setMap(paramIndex, (Map) DataType.map(firstParamType, secondParamType).parse(paramValue));
|
DataType mapType = DataType.map(firstParamType, secondParamType);
|
||||||
|
statement.setMap(paramIndex, (Map) codecRegistry.codecFor(mapType).parse(paramValue));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Must be set or list
|
// Must be set or list
|
||||||
if (DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) {
|
if (DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) {
|
||||||
statement.setSet(paramIndex, (Set) DataType.set(firstParamType).parse(paramValue));
|
DataType setType = DataType.set(firstParamType);
|
||||||
|
statement.setSet(paramIndex, (Set) codecRegistry.codecFor(setType).parse(paramValue));
|
||||||
return;
|
return;
|
||||||
} else if (DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) {
|
} else if (DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) {
|
||||||
statement.setList(paramIndex, (List) DataType.list(firstParamType).parse(paramValue));
|
DataType listType = DataType.list(firstParamType);
|
||||||
|
statement.setList(paramIndex, (List) codecRegistry.codecFor(listType).parse(paramValue));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,7 +270,7 @@ public class AbstractCassandraProcessorTest {
|
||||||
Metadata mockMetadata = mock(Metadata.class);
|
Metadata mockMetadata = mock(Metadata.class);
|
||||||
when(mockMetadata.getClusterName()).thenReturn("cluster1");
|
when(mockMetadata.getClusterName()).thenReturn("cluster1");
|
||||||
when(mockCluster.getMetadata()).thenReturn(mockMetadata);
|
when(mockCluster.getMetadata()).thenReturn(mockMetadata);
|
||||||
Configuration config = new Configuration();
|
Configuration config = Configuration.builder().build();
|
||||||
when(mockCluster.getConfiguration()).thenReturn(config);
|
when(mockCluster.getConfiguration()).thenReturn(config);
|
||||||
return mockCluster;
|
return mockCluster;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType;
|
||||||
import com.datastax.driver.core.ResultSet;
|
import com.datastax.driver.core.ResultSet;
|
||||||
import com.datastax.driver.core.Row;
|
import com.datastax.driver.core.Row;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.google.common.reflect.TypeToken;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -34,7 +35,9 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyInt;
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -107,9 +110,9 @@ public class CassandraQueryTestUtil {
|
||||||
when(row.getString(0)).thenReturn(user_id);
|
when(row.getString(0)).thenReturn(user_id);
|
||||||
when(row.getString(1)).thenReturn(first_name);
|
when(row.getString(1)).thenReturn(first_name);
|
||||||
when(row.getString(2)).thenReturn(last_name);
|
when(row.getString(2)).thenReturn(last_name);
|
||||||
when(row.getSet(3, String.class)).thenReturn(emails);
|
when(row.getSet(eq(3), any(TypeToken.class))).thenReturn(emails);
|
||||||
when(row.getList(4, String.class)).thenReturn(top_places);
|
when(row.getList(eq(4), any(TypeToken.class))).thenReturn(top_places);
|
||||||
when(row.getMap(5, Date.class, String.class)).thenReturn(todo);
|
when(row.getMap(eq(5), any(TypeToken.class), any(TypeToken.class))).thenReturn(todo);
|
||||||
when(row.getBool(6)).thenReturn(registered);
|
when(row.getBool(6)).thenReturn(registered);
|
||||||
when(row.getFloat(7)).thenReturn(scale);
|
when(row.getFloat(7)).thenReturn(scale);
|
||||||
when(row.getDouble(8)).thenReturn(metric);
|
when(row.getDouble(8)).thenReturn(metric);
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class PutCassandraQLTest {
|
||||||
when(mockCluster.getMetadata()).thenReturn(mockMetadata);
|
when(mockCluster.getMetadata()).thenReturn(mockMetadata);
|
||||||
when(mockCluster.connect()).thenReturn(mockSession);
|
when(mockCluster.connect()).thenReturn(mockSession);
|
||||||
when(mockCluster.connect(anyString())).thenReturn(mockSession);
|
when(mockCluster.connect(anyString())).thenReturn(mockSession);
|
||||||
Configuration config = new Configuration();
|
Configuration config = Configuration.builder().build();
|
||||||
when(mockCluster.getConfiguration()).thenReturn(config);
|
when(mockCluster.getConfiguration()).thenReturn(config);
|
||||||
ResultSetFuture future = mock(ResultSetFuture.class);
|
ResultSetFuture future = mock(ResultSetFuture.class);
|
||||||
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
|
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
|
||||||
|
|
|
@ -340,7 +340,7 @@ public class QueryCassandraTest {
|
||||||
Session mockSession = mock(Session.class);
|
Session mockSession = mock(Session.class);
|
||||||
when(mockCluster.connect()).thenReturn(mockSession);
|
when(mockCluster.connect()).thenReturn(mockSession);
|
||||||
when(mockCluster.connect(anyString())).thenReturn(mockSession);
|
when(mockCluster.connect(anyString())).thenReturn(mockSession);
|
||||||
Configuration config = new Configuration();
|
Configuration config = Configuration.builder().build();
|
||||||
when(mockCluster.getConfiguration()).thenReturn(config);
|
when(mockCluster.getConfiguration()).thenReturn(config);
|
||||||
ResultSetFuture future = mock(ResultSetFuture.class);
|
ResultSetFuture future = mock(ResultSetFuture.class);
|
||||||
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
|
ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
|
||||||
|
|
Loading…
Reference in New Issue