NIFI-5621: Added Cassandra connection provider service

NIFI-5621: Improvements on LICENSE & NOTICE files

This closes #3105

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
zenfenan 2018-10-21 23:11:07 +05:30 committed by Mike Thomsen
parent 85cc5aa9eb
commit 9a1ab4c504
23 changed files with 2008 additions and 108 deletions

View File

@ -535,6 +535,18 @@ language governing permissions and limitations under the License. -->
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spring-nar</artifactId>

View File

@ -32,7 +32,7 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<artifactId>nifi-cassandra-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>

View File

@ -42,11 +42,12 @@
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.0</version>
<version>${cassandra.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
@ -55,13 +56,13 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
<artifactId>nifi-cassandra-services-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<artifactId>nifi-cassandra-services</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
@ -78,6 +79,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -25,10 +25,14 @@ import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.cassandra.CassandraSessionProviderService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@ -38,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
@ -61,26 +66,36 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
public static final int DEFAULT_CASSANDRA_PORT = 9042;
// Common descriptors
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder()
.name("cassandra-connection-provider")
.displayName("Cassandra Connection Provider")
.description("Specifies the Cassandra connection providing controller service to be used to connect to Cassandra cluster.")
.required(false)
.identifiesControllerService(CassandraSessionProviderService.class)
.build();
static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("Cassandra Contact Points")
.description("Contact points are addresses of Cassandra nodes. The list of contact points should be "
+ "comma-separated and in hostname:port format. Example node1:port,node2:port,...."
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.required(true)
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
.name("Keyspace")
.description("The Cassandra Keyspace to connect to. If not set, the keyspace name has to be provided with the " +
.description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " +
"include the keyspace name before any table reference, in case of 'query' native processors or " +
"if the processor exposes the 'Table' property, the keyspace name has to be provided with the " +
"table name in the form of <KEYSPACE>.<TABLE>")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
@ -88,7 +103,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
@ -98,7 +113,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.defaultValue("REQUIRED")
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username to access the Cassandra cluster")
.required(false)
@ -106,7 +121,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password to access the Cassandra cluster")
.required(false)
@ -115,15 +130,15 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
.name("Consistency Level")
.description("The strategy for how many replicas must respond before results are returned.")
.required(true)
.required(false)
.allowableValues(ConsistencyLevel.values())
.defaultValue("ONE")
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the record data.")
.required(true)
@ -132,22 +147,25 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is transferred to this relationship if the operation completed successfully.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is transferred to this relationship if the operation failed.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting "
+ "it again may succeed.")
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>();
protected static List<PropertyDescriptor> descriptors = new ArrayList<>();
static {
descriptors.add(CONNECTION_PROVIDER_SERVICE);
descriptors.add(CONTACT_POINTS);
descriptors.add(KEYSPACE);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@ -170,15 +188,53 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
// Ensure that if username or password is set, then the other is too
String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
results.add(new ValidationResult.Builder().valid(false).explanation(
results.add(new ValidationResult.Builder().subject("Username / Password configuration").valid(false).explanation(
"If username or password is specified, then the other must be specified as well").build());
}
// Ensure that both Connection provider service and the processor specific configurations are not provided
boolean connectionProviderIsSet = validationContext.getProperty(CONNECTION_PROVIDER_SERVICE).isSet();
boolean contactPointsIsSet = validationContext.getProperty(CONTACT_POINTS).isSet();
if (connectionProviderIsSet && contactPointsIsSet) {
results.add(new ValidationResult.Builder().subject("Cassandra configuration").valid(false).explanation("both " + CONNECTION_PROVIDER_SERVICE.getDisplayName() +
" and processor level Cassandra configuration cannot be provided at the same time.").build());
}
if (!connectionProviderIsSet && !contactPointsIsSet) {
results.add(new ValidationResult.Builder().subject("Cassandra configuration").valid(false).explanation("either " + CONNECTION_PROVIDER_SERVICE.getDisplayName() +
" or processor level Cassandra configuration has to be provided.").build());
}
return results;
}
protected void connectToCassandra(ProcessContext context) {
@OnScheduled
public void onScheduled(ProcessContext context) {
final boolean connectionProviderIsSet = context.getProperty(CONNECTION_PROVIDER_SERVICE).isSet();
if (connectionProviderIsSet) {
CassandraSessionProviderService sessionProvider = context.getProperty(CONNECTION_PROVIDER_SERVICE).asControllerService(CassandraSessionProviderService.class);
cluster.set(sessionProvider.getCluster());
cassandraSession.set(sessionProvider.getCassandraSession());
return;
}
try {
connectToCassandra(context);
} catch (NoHostAvailableException nhae) {
getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
getLogger().error(nhae.getCustomMessage(10, true, false));
throw new ProcessException(nhae);
} catch (AuthenticationException ae) {
getLogger().error("Invalid username/password combination", ae);
throw new ProcessException(ae);
}
}
void connectToCassandra(ProcessContext context) {
if (cluster.get() == null) {
ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
@ -186,13 +242,13 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);
// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext;
if (sslService != null) {
final SSLContextService.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
} else {
@ -203,6 +259,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
sslContext = sslService.createSSLContext(clientAuth);
} else {
sslContext = null;
@ -223,15 +280,19 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
// Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession;
if (keyspaceProperty != null) {
newSession = newCluster.connect(keyspaceProperty.getValue());
} else {
newSession = newCluster.connect();
}
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
Metadata metadata = newCluster.getMetadata();
log.info("Connected to Cassandra cluster: {}", new Object[]{metadata.getClusterName()});
cluster.set(newCluster);
cassandraSession.set(newSession);
}
@ -261,14 +322,20 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
return builder.build();
}
public void stop() {
if (cassandraSession.get() != null) {
cassandraSession.get().close();
cassandraSession.set(null);
}
if (cluster.get() != null) {
cluster.get().close();
cluster.set(null);
public void stop(ProcessContext context) {
// We don't want to close the connection when using 'Cassandra Connection Provider'
// because each time @OnUnscheduled/@OnShutdown annotated method is triggered on a
// processor, the connection would be closed which is not ideal for a centralized
// connection provider controller service
if (!context.getProperty(CONNECTION_PROVIDER_SERVICE).isSet()) {
if (cassandraSession.get() != null) {
cassandraSession.get().close();
cassandraSession.set(null);
}
if (cluster.get() != null) {
cluster.get().close();
cluster.set(null);
}
}
}
@ -350,7 +417,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
*
* @param dataType The data type of the field
*/
public static Schema getUnionFieldType(String dataType) {
protected static Schema getUnionFieldType(String dataType) {
return SchemaBuilder.builder().unionOf().nullBuilder().endNull().and().type(getSchemaForType(dataType)).endUnion();
}
@ -359,7 +426,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
*
* @param dataType The data type of the field
*/
public static Schema getSchemaForType(String dataType) {
protected static Schema getSchemaForType(String dataType) {
SchemaBuilder.TypeBuilder<Schema> typeBuilder = SchemaBuilder.builder();
Schema returnSchema;
switch (dataType) {
@ -390,7 +457,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
return returnSchema;
}
public static String getPrimitiveAvroTypeFromCassandraType(DataType dataType) {
protected static String getPrimitiveAvroTypeFromCassandraType(DataType dataType) {
// Map types from Cassandra to Avro where possible
if (dataType.equals(DataType.ascii())
|| dataType.equals(DataType.text())
@ -428,7 +495,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
}
}
public static DataType getPrimitiveDataTypeFromString(String dataTypeName) {
protected static DataType getPrimitiveDataTypeFromString(String dataTypeName) {
Set<DataType> primitiveTypes = DataType.allPrimitiveTypes();
for (DataType primitiveType : primitiveTypes) {
if (primitiveType.toString().equals(dataTypeName)) {
@ -444,7 +511,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
* @param contactPointList A comma-separated list of Cassandra contact points (host:port,host2:port2, etc.)
* @return List of InetSocketAddresses for the Cassandra contact points
*/
public List<InetSocketAddress> getContactPoints(String contactPointList) {
protected List<InetSocketAddress> getContactPoints(String contactPointList) {
if (contactPointList == null) {
return null;

View File

@ -22,7 +22,6 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
@ -163,7 +162,7 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
ComponentLog log = getLogger();
super.onScheduled(context);
// Initialize the prepared statement cache
int statementCacheSize = context.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
@ -171,22 +170,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
.maximumSize(statementCacheSize)
.<String, PreparedStatement>build()
.asMap();
try {
connectToCassandra(context);
} catch (final NoHostAvailableException nhae) {
log.error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
// Log up to 10 error messages. Otherwise if a 1000-node cluster was specified but there was no connectivity,
// a thousand error messages would be logged. However we would like information from Cassandra itself, so
// cap the error limit at 10, format the messages, and don't include the stack trace (it is displayed by the
// logger message above).
log.error(nhae.getCustomMessage(10, true, false));
throw new ProcessException(nhae);
} catch (final AuthenticationException ae) {
log.error("Invalid username/password combination", ae);
throw new ProcessException(ae);
}
}
@Override
@ -417,8 +400,8 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
}
@OnStopped
public void stop() {
super.stop();
public void stop(ProcessContext context) {
super.stop(context);
statementCache.clear();
}
}

View File

@ -19,14 +19,11 @@ package org.apache.nifi.processors.cassandra;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
@ -105,8 +102,8 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
.build();
private final static List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(
CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD, RECORD_READER_FACTORY,
BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD,
RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
private final static Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
@ -121,20 +118,6 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
return relationships;
}
@OnScheduled
public void onScheduled(ProcessContext context) {
try {
connectToCassandra(context);
} catch (NoHostAvailableException nhae) {
getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
getLogger().error(nhae.getCustomMessage(10, true, false));
throw new ProcessException(nhae);
} catch (AuthenticationException ae) {
getLogger().error("Invalid username/password combination", ae);
throw new ProcessException(ae);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile inputFlowFile = session.get();
@ -210,13 +193,13 @@ public class PutCassandraRecord extends AbstractCassandraProcessor {
}
@OnUnscheduled
public void stop() {
super.stop();
public void stop(ProcessContext context) {
super.stop(context);
}
@OnShutdown
public void shutdown() {
super.stop();
public void shutdown(ProcessContext context) {
super.stop(context);
}
}

View File

@ -22,7 +22,6 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AuthenticationException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
@ -166,27 +165,15 @@ public class QueryCassandra extends AbstractCassandraProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
ComponentLog log = getLogger();
try {
connectToCassandra(context);
final int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
if (fetchSize > 0) {
synchronized (cluster.get()) {
cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize);
}
super.onScheduled(context);
final int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
if (fetchSize > 0) {
synchronized (cluster.get()) {
cluster.get().getConfiguration().getQueryOptions().setFetchSize(fetchSize);
}
} catch (final NoHostAvailableException nhae) {
log.error("No host in the Cassandra cluster can be contacted successfully to execute this query", nhae);
// Log up to 10 error messages. Otherwise if a 1000-node cluster was specified but there was no connectivity,
// a thousand error messages would be logged. However we would like information from Cassandra itself, so
// cap the error limit at 10, format the messages, and don't include the stack trace (it is displayed by the
// logger message above).
log.error(nhae.getCustomMessage(10, true, false));
throw new ProcessException(nhae);
} catch (final AuthenticationException ae) {
log.error("Invalid username/password combination", ae);
throw new ProcessException(ae);
}
}
@Override
@ -308,13 +295,13 @@ public class QueryCassandra extends AbstractCassandraProcessor {
@OnUnscheduled
public void stop() {
super.stop();
public void stop(ProcessContext context) {
super.stop(context);
}
@OnShutdown
public void shutdown() {
super.stop();
public void shutdown(ProcessContext context) {
super.stop(context);
}
/**

View File

@ -22,11 +22,15 @@ import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Row;
import com.google.common.collect.Sets;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.service.CassandraSessionProvider;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -50,7 +54,6 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit tests for the AbstractCassandraProcessor class
*/
@ -204,7 +207,7 @@ public class AbstractCassandraProcessorTest {
processor.setCluster(cluster);
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
processor.connectToCassandra(testRunner.getProcessContext());
processor.stop();
processor.stop(testRunner.getProcessContext());
assertNull(processor.getCluster());
// Now do a connect where a cluster is "built"
@ -257,6 +260,33 @@ public class AbstractCassandraProcessorTest {
assertNotNull(processor.getCluster());
}
@Test
public void testCustomValidateCassandraConnectionConfiguration() throws InitializationException {
MockCassandraSessionProvider sessionProviderService = new MockCassandraSessionProvider();
testRunner.addControllerService("cassandra-connection-provider", sessionProviderService);
testRunner.setProperty(sessionProviderService, CassandraSessionProvider.CONTACT_POINTS, "localhost:9042");
testRunner.setProperty(sessionProviderService, CassandraSessionProvider.KEYSPACE, "somekyespace");
testRunner.setProperty(AbstractCassandraProcessor.CONNECTION_PROVIDER_SERVICE, "cassandra-connection-provider");
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042");
testRunner.setProperty(AbstractCassandraProcessor.KEYSPACE, "some-keyspace");
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "user");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password");
testRunner.enableControllerService(sessionProviderService);
testRunner.assertNotValid();
testRunner.removeProperty(AbstractCassandraProcessor.CONTACT_POINTS);
testRunner.removeProperty(AbstractCassandraProcessor.KEYSPACE);
testRunner.removeProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL);
testRunner.removeProperty(AbstractCassandraProcessor.USERNAME);
testRunner.removeProperty(AbstractCassandraProcessor.PASSWORD);
testRunner.assertValid();
}
/**
* Provides a stubbed processor instance for testing
*/
@ -264,7 +294,7 @@ public class AbstractCassandraProcessorTest {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Arrays.asList(CONTACT_POINTS, KEYSPACE, USERNAME, PASSWORD, CONSISTENCY_LEVEL, CHARSET);
return Arrays.asList(CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, USERNAME, PASSWORD, CONSISTENCY_LEVEL, CHARSET);
}
@Override
@ -292,4 +322,16 @@ public class AbstractCassandraProcessorTest {
this.cluster.set(newCluster);
}
}
/**
* Mock CassandraSessionProvider implementation for testing purpose
*/
private class MockCassandraSessionProvider extends CassandraSessionProvider {
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
}
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-cassandra-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-cassandra-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,266 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
This product bundles 'asm' which is available under a 3-Clause BSD style license.
For details see http://asm.ow2.org/asmdex-license.html
Copyright (c) 2012 France Télécom
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'JNR x86asm' under an MIT
style license.
Copyright (C) 2010 Wayne Meissner
Copyright (c) 2008-2009, Petr Kobalicek <kobalicek.petr@gmail.com>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,226 @@
nifi-cassandra-services-api-nar
Copyright 2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
(ASLv2) The Netty Project
The following NOTICE information applies:
Copyright 2014 The Netty Project
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* http://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://www.jboss.org/jbossmarshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* http://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains a forked and modified version of Tomcat Native
* LICENSE:
* ASL2
* HOMEPAGE:
* http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/
(ASLv2) Guava
The following NOTICE information applies:
Guava
Copyright 2015 The Guava Authors
(ASLv2) Dropwizard Metrics
The following NOTICE information applies:
Copyright (c) 2010-2013 Coda Hale, Yammer.com
************************
Eclipse Public License 1.0
************************
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 2.0)(GPL 2)(LGPL 2.1) JNR Posix ( jnr.posix ) https://github.com/jnr/jnr-posix/blob/master/LICENSE.txt

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-cassandra-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-cassandra-services-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.sdk.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cassandra;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import org.apache.nifi.controller.ControllerService;
public interface CassandraSessionProviderService extends ControllerService {
/**
* Obtains a Cassandra session instance
* @return {@link Session}
*/
Session getCassandraSession();
/**
* Obtains a Cassandra cluster instance
* @return {@link Cluster}
*/
Cluster getCluster();
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-cassandra-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-cassandra-services-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-api-nar</artifactId>
<version>1.9.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,369 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
This product bundles 'libffi' which is available under an MIT style license.
libffi - Copyright (c) 1996-2014 Anthony Green, Red Hat, Inc and others.
see https://github.com/java-native-access/jna/blob/master/native/libffi/LICENSE
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
``Software''), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED ``AS IS'', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
This product bundles 'asm' which is available under a 3-Clause BSD style license.
For details see http://asm.ow2.org/asmdex-license.html
Copyright (c) 2012 France Télécom
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'Bouncy Castle JDK 1.5'
under an MIT style license.
Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
The binary distribution of this product bundles 'JNR x86asm' under an MIT
style license.
Copyright (C) 2010 Wayne Meissner
Copyright (c) 2008-2009, Petr Kobalicek <kobalicek.petr@gmail.com>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation
files (the "Software"), to deal in the Software without
restriction, including without limitation the rights to use,
copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
This product bundles 'jBCrypt' which is available under an MIT license.
For details see https://github.com/svenkubiak/jBCrypt/blob/0.4.1/LICENSE
Copyright (c) 2006 Damien Miller <djm@mindrot.org>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
This product bundles 'logback' which is dual-licensed under the EPL v1.0
and the LGPL 2.1.
Logback: the reliable, generic, fast and flexible logging framework.
Copyright (C) 1999-2017, QOS.ch. All rights reserved.
This program and the accompanying materials are dual-licensed under
either the terms of the Eclipse Public License v1.0 as published by
the Eclipse Foundation or (per the licensee's choosing) under the
terms of the GNU Lesser General Public License version 2.1 as
published by the Free Software Foundation.
The binary distribution of this product bundles 'ANTLR 3' which is available
under a "3-clause BSD" license. For details see http://www.antlr.org/license.html
Copyright (c) 2012 Terence Parr and Sam Harwell
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted
provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
Neither the name of the author nor the names of its contributors may be used to endorse
or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,291 @@
nifi-cassandra-services-nar
Copyright 2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) DataStax Java Driver for Apache Cassandra - Core
The following NOTICE information applies:
DataStax Java Driver for Apache Cassandra - Core
Copyright (C) 2012-2017 DataStax Inc.
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2017 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) Guava
The following NOTICE information applies:
Guava
Copyright 2015 The Guava Authors
(ASLv2) JSON-SMART
The following NOTICE information applies:
Copyright 2011 JSON-SMART authors
(ASLv2) Dropwizard Metrics
The following NOTICE information applies:
Copyright (c) 2010-2013 Coda Hale, Yammer.com
(ASLv2) The Netty Project
The following NOTICE information applies:
Copyright 2014 The Netty Project
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* http://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://www.jboss.org/jbossmarshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* http://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains a forked and modified version of Tomcat Native
* LICENSE:
* ASL2
* HOMEPAGE:
* http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/
(ASLv2) Objenesis
The following NOTICE information applies:
Objenesis
Copyright 2006-2013 Joe Walnes, Henri Tremblay, Leonardo Mesquita
************************
Eclipse Public License 1.0
************************
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 2.0)(GPL 2)(LGPL 2.1) JNR Posix ( jnr.posix ) https://github.com/jnr/jnr-posix/blob/master/LICENSE.txt
(EPL 1.0)(LGPL 2.1) Logback Classic (ch.qos.logback:logback-classic:jar:1.2.3 - http://logback.qos.ch/)
(EPL 1.0)(LGPL 2.1) Logback Core (ch.qos.logback:logback-core:jar:1.2.3 - http://logback.qos.ch/)

View File

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>nifi-cassandra-bundle</artifactId>
<groupId>org.apache.nifi</groupId>
<version>1.9.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-cassandra-services</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>${cassandra.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,285 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.service;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.cassandra.CassandraSessionProviderService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
@CapabilityDescription("Provides connection session for Cassandra processors to work with Apache Cassandra.")
public class CassandraSessionProvider extends AbstractControllerService implements CassandraSessionProviderService {
public static final int DEFAULT_CASSANDRA_PORT = 9042;
// Common descriptors
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("Cassandra Contact Points")
.description("Contact points are addresses of Cassandra nodes. The list of contact points should be "
+ "comma-separated and in hostname:port format. Example node1:port,node2:port,...."
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
.name("Keyspace")
.description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " +
"include the keyspace name before any table reference, in case of 'query' native processors or " +
"if the processor supports the 'Table' property, the keyspace name has to be provided with the " +
"table name in the form of <KEYSPACE>.<TABLE>")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
+ "connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue("REQUIRED")
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username to access the Cassandra cluster")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password to access the Cassandra cluster")
.required(false)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
.name("Consistency Level")
.description("The strategy for how many replicas must respond before results are returned.")
.required(true)
.allowableValues(ConsistencyLevel.values())
.defaultValue("ONE")
.build();
private List<PropertyDescriptor> properties;
private Cluster cluster;
private Session cassandraSession;
@Override
public void init(final ControllerServiceInitializationContext context) {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(CONTACT_POINTS);
props.add(CLIENT_AUTH);
props.add(CONSISTENCY_LEVEL);
props.add(KEYSPACE);
props.add(USERNAME);
props.add(PASSWORD);
props.add(PROP_SSL_CONTEXT_SERVICE);
properties = props;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
connectToCassandra(context);
}
@OnDisabled
public void onDisabled(){
if (cassandraSession != null) {
cassandraSession.close();
}
if (cluster != null) {
cluster.close();
}
}
@OnStopped
public void onStopped() {
if (cassandraSession != null) {
cassandraSession.close();
}
if (cluster != null) {
cluster.close();
}
}
@Override
public Cluster getCluster() {
if (cluster != null) {
return cluster;
} else {
throw new ProcessException("Unable to get the Cassandra cluster detail.");
}
}
@Override
public Session getCassandraSession() {
if (cassandraSession != null) {
return cassandraSession;
} else {
throw new ProcessException("Unable to get the Cassandra session.");
}
}
private void connectToCassandra(ConfigurationContext context) {
if (cluster == null) {
ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);
// Set up the client for secure (SSL/TLS communications) if configured to do so
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext;
if (sslService != null) {
final SSLContextService.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
} else {
try {
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
sslContext = sslService.createSSLContext(clientAuth);
} else {
sslContext = null;
}
final String username, password;
PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions();
PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions();
if (usernameProperty != null && passwordProperty != null) {
username = usernameProperty.getValue();
password = passwordProperty.getValue();
} else {
username = null;
password = null;
}
// Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession;
if (keyspaceProperty != null) {
newSession = newCluster.connect(keyspaceProperty.getValue());
} else {
newSession = newCluster.connect();
}
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
Metadata metadata = newCluster.getMetadata();
log.info("Connected to Cassandra cluster: {}", new Object[]{metadata.getClusterName()});
cluster = newCluster;
cassandraSession = newSession;
}
}
private List<InetSocketAddress> getContactPoints(String contactPointList) {
if (contactPointList == null) {
return null;
}
final List<String> contactPointStringList = Arrays.asList(contactPointList.split(","));
List<InetSocketAddress> contactPoints = new ArrayList<>();
for (String contactPointEntry : contactPointStringList) {
String[] addresses = contactPointEntry.split(":");
final String hostName = addresses[0].trim();
final int port = (addresses.length > 1) ? Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
contactPoints.add(new InetSocketAddress(hostName, port));
}
return contactPoints;
}
private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) {
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
builder = builder.withSSL(sslOptions);
}
if (username != null && password != null) {
builder = builder.withCredentials(username, password);
}
return builder.build();
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.service.CassandraSessionProvider

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Collections;
import java.util.List;
/**
* Mock Cassandra processor for testing CassandraSessionProvider
*/
public class MockCassandraProcessor extends AbstractProcessor{
private static PropertyDescriptor CASSANDRA_SESSION_PROVIDER = new PropertyDescriptor.Builder()
.name("cassandra-session-provider")
.displayName("Cassandra Session Provider")
.required(true)
.description("Controller Service to obtain a Cassandra connection session")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.identifiesControllerService(CassandraSessionProvider.class)
.build();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(CASSANDRA_SESSION_PROVIDER);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestCassandraSessionProvider {
private static TestRunner runner;
private static CassandraSessionProvider sessionProvider;
@BeforeClass
public static void setup() throws InitializationException {
MockCassandraProcessor mockCassandraProcessor = new MockCassandraProcessor();
sessionProvider = new CassandraSessionProvider();
runner = TestRunners.newTestRunner(mockCassandraProcessor);
runner.addControllerService("cassandra-session-provider", sessionProvider);
}
@Test
public void testGetPropertyDescriptors() {
List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors();
assertEquals(7, properties.size());
assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
}
}

View File

@ -22,12 +22,20 @@
<version>1.9.0-SNAPSHOT</version>
</parent>
<properties>
<cassandra.sdk.version>3.3.0</cassandra.sdk.version>
</properties>
<artifactId>nifi-cassandra-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-cassandra-processors</module>
<module>nifi-cassandra-nar</module>
<module>nifi-cassandra-services-api</module>
<module>nifi-cassandra-services-api-nar</module>
<module>nifi-cassandra-services</module>
<module>nifi-cassandra-services-nar</module>
</modules>
<dependencyManagement>

View File

@ -246,6 +246,11 @@
<artifactId>nifi-proxy-configuration-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cassandra-services-api</artifactId>
<version>1.9.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-volatile-provenance-repository</artifactId>