NIFI-4358 This closes #3363. cassandra connection enable compression at resquest and response

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Sandish Kumar 2019-03-11 00:10:09 -05:00 committed by Joe Witt
parent 2846d3c3c6
commit 0e10e417df
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
7 changed files with 584 additions and 547 deletions

View File

@ -22,6 +22,7 @@ import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType; import com.datastax.driver.core.DataType;
import com.datastax.driver.core.JdkSSLOptions; import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.Row; import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec; import com.datastax.driver.core.TypeCodec;
@ -137,6 +138,15 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
.defaultValue("ONE") .defaultValue("ONE")
.build(); .build();
static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("Compression Type")
.description("Enable compression at transport-level requests and responses")
.required(false)
.allowableValues(ProtocolOptions.Compression.values())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NONE")
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set") .name("Character Set")
.description("Specifies the character set of the record data.") .description("Specifies the character set of the record data.")
@ -172,6 +182,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
descriptors.add(USERNAME); descriptors.add(USERNAME);
descriptors.add(PASSWORD); descriptors.add(PASSWORD);
descriptors.add(CONSISTENCY_LEVEL); descriptors.add(CONSISTENCY_LEVEL);
descriptors.add(COMPRESSION_TYPE);
descriptors.add(CHARSET); descriptors.add(CHARSET);
} }
@ -238,6 +249,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue(); final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
final String compressionType = context.getProperty(COMPRESSION_TYPE).getValue();
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList); List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);
// Set up the client for secure (SSL/TLS communications) if configured to do so // Set up the client for secure (SSL/TLS communications) if configured to do so
@ -277,7 +289,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
} }
// Create the cluster and connect to it // Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password); Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions(); PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession; final Session newSession;
@ -304,16 +316,22 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
* @param sslContext The SSL context (used for secure connections) * @param sslContext The SSL context (used for secure connections)
* @param username The username for connection authentication * @param username The username for connection authentication
* @param password The password for connection authentication * @param password The password for connection authentication
* @param compressionType Enable compression at transport-level requests and responses.
* @return A reference to the Cluster object associated with the given Cassandra configuration * @return A reference to the Cluster object associated with the given Cassandra configuration
*/ */
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) { String username, String password, String compressionType) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints); Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) { if (sslContext != null) {
JdkSSLOptions sslOptions = JdkSSLOptions.builder() JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext) .withSSLContext(sslContext)
.build(); .build();
builder = builder.withSSL(sslOptions); builder = builder.withSSL(sslOptions);
if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.SNAPPY);
} else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
}
} }
if (username != null && password != null) { if (username != null && password != null) {
builder = builder.withCredentials(username, password); builder = builder.withCredentials(username, password);

View File

@ -303,7 +303,7 @@ public class AbstractCassandraProcessorTest {
@Override @Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) { String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class); Cluster mockCluster = mock(Cluster.class);
Metadata mockMetadata = mock(Metadata.class); Metadata mockMetadata = mock(Metadata.class);
when(mockMetadata.getClusterName()).thenReturn("cluster1"); when(mockMetadata.getClusterName()).thenReturn("cluster1");

View File

@ -314,7 +314,7 @@ public class PutCassandraQLTest {
@Override @Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) { String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class); Cluster mockCluster = mock(Cluster.class);
try { try {
Metadata mockMetadata = mock(Metadata.class); Metadata mockMetadata = mock(Metadata.class);

View File

@ -1,198 +1,198 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.cassandra; package org.apache.nifi.processors.cassandra;
import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Configuration; import com.datastax.driver.core.Configuration;
import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement; import com.datastax.driver.core.Statement;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class PutCassandraRecordTest { public class PutCassandraRecordTest {
private TestRunner testRunner; private TestRunner testRunner;
private MockRecordParser recordReader; private MockRecordParser recordReader;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
MockPutCassandraRecord processor = new MockPutCassandraRecord(); MockPutCassandraRecord processor = new MockPutCassandraRecord();
recordReader = new MockRecordParser(); recordReader = new MockRecordParser();
testRunner = TestRunners.newTestRunner(processor); testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader"); testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader");
} }
@Test @Test
public void testProcessorConfigValidity() throws InitializationException { public void testProcessorConfigValidity() throws InitializationException {
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "localhost:9042"); testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "localhost:9042");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.PASSWORD, "password"); testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.USERNAME, "username"); testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks"); testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl"); testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
testRunner.assertNotValid(); testRunner.assertNotValid();
testRunner.addControllerService("reader", recordReader); testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader); testRunner.enableControllerService(recordReader);
testRunner.assertValid(); testRunner.assertValid();
} }
private void setUpStandardTestConfig() throws InitializationException { private void setUpStandardTestConfig() throws InitializationException {
testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042");
testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password");
testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username");
testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl"); testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
testRunner.addControllerService("reader", recordReader); testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader); testRunner.enableControllerService(recordReader);
} }
@Test @Test
public void testSimplePut() throws InitializationException { public void testSimplePut() throws InitializationException {
setUpStandardTestConfig(); setUpStandardTestConfig();
recordReader.addSchemaField("name", RecordFieldType.STRING); recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT); recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING); recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, "Soccer"); recordReader.addRecord("John Doe", 48, "Soccer");
recordReader.addRecord("Jane Doe", 47, "Tennis"); recordReader.addRecord("Jane Doe", 47, "Tennis");
recordReader.addRecord("Sally Doe", 47, "Curling"); recordReader.addRecord("Sally Doe", 47, "Curling");
recordReader.addRecord("Jimmy Doe", 14, null); recordReader.addRecord("Jimmy Doe", 14, null);
recordReader.addRecord("Pizza Doe", 14, null); recordReader.addRecord("Pizza Doe", 14, null);
testRunner.enqueue(""); testRunner.enqueue("");
testRunner.run(); testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
} }
@Test @Test
public void testEL() throws InitializationException { public void testEL() throws InitializationException {
testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}"); testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}");
testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}"); testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}"); testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl"); testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
testRunner.addControllerService("reader", recordReader); testRunner.addControllerService("reader", recordReader);
testRunner.enableControllerService(recordReader); testRunner.enableControllerService(recordReader);
testRunner.assertValid(); testRunner.assertValid();
testRunner.setVariable("contact.points", "localhost:9042"); testRunner.setVariable("contact.points", "localhost:9042");
testRunner.setVariable("user", "username"); testRunner.setVariable("user", "username");
testRunner.setVariable("pass", "password"); testRunner.setVariable("pass", "password");
recordReader.addSchemaField("name", RecordFieldType.STRING); recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT); recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING); recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, "Soccer"); recordReader.addRecord("John Doe", 48, "Soccer");
recordReader.addRecord("Jane Doe", 47, "Tennis"); recordReader.addRecord("Jane Doe", 47, "Tennis");
recordReader.addRecord("Sally Doe", 47, "Curling"); recordReader.addRecord("Sally Doe", 47, "Curling");
recordReader.addRecord("Jimmy Doe", 14, null); recordReader.addRecord("Jimmy Doe", 14, null);
recordReader.addRecord("Pizza Doe", 14, null); recordReader.addRecord("Pizza Doe", 14, null);
testRunner.enqueue(""); testRunner.enqueue("");
testRunner.run(1, true, true); testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
} }
private static class MockPutCassandraRecord extends PutCassandraRecord { private static class MockPutCassandraRecord extends PutCassandraRecord {
private Exception exceptionToThrow = null; private Exception exceptionToThrow = null;
private Session mockSession = mock(Session.class); private Session mockSession = mock(Session.class);
@Override @Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) { String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class); Cluster mockCluster = mock(Cluster.class);
try { try {
Metadata mockMetadata = mock(Metadata.class); Metadata mockMetadata = mock(Metadata.class);
when(mockMetadata.getClusterName()).thenReturn("cluster1"); when(mockMetadata.getClusterName()).thenReturn("cluster1");
when(mockCluster.getMetadata()).thenReturn(mockMetadata); when(mockCluster.getMetadata()).thenReturn(mockMetadata);
when(mockCluster.connect()).thenReturn(mockSession); when(mockCluster.connect()).thenReturn(mockSession);
when(mockCluster.connect(anyString())).thenReturn(mockSession); when(mockCluster.connect(anyString())).thenReturn(mockSession);
Configuration config = Configuration.builder().build(); Configuration config = Configuration.builder().build();
when(mockCluster.getConfiguration()).thenReturn(config); when(mockCluster.getConfiguration()).thenReturn(config);
ResultSetFuture future = mock(ResultSetFuture.class); ResultSetFuture future = mock(ResultSetFuture.class);
ResultSet rs = CassandraQueryTestUtil.createMockResultSet(); ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
PreparedStatement ps = mock(PreparedStatement.class); PreparedStatement ps = mock(PreparedStatement.class);
when(mockSession.prepare(anyString())).thenReturn(ps); when(mockSession.prepare(anyString())).thenReturn(ps);
BoundStatement bs = mock(BoundStatement.class); BoundStatement bs = mock(BoundStatement.class);
when(ps.bind()).thenReturn(bs); when(ps.bind()).thenReturn(bs);
when(future.getUninterruptibly()).thenReturn(rs); when(future.getUninterruptibly()).thenReturn(rs);
try { try {
doReturn(rs).when(future).getUninterruptibly(anyLong(), any(TimeUnit.class)); doReturn(rs).when(future).getUninterruptibly(anyLong(), any(TimeUnit.class));
} catch (TimeoutException te) { } catch (TimeoutException te) {
throw new IllegalArgumentException("Mocked cluster doesn't time out"); throw new IllegalArgumentException("Mocked cluster doesn't time out");
} }
if (exceptionToThrow != null) { if (exceptionToThrow != null) {
doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString()); doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class)); doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
} else { } else {
when(mockSession.executeAsync(anyString())).thenReturn(future); when(mockSession.executeAsync(anyString())).thenReturn(future);
when(mockSession.executeAsync(any(Statement.class))).thenReturn(future); when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
} }
when(mockSession.getCluster()).thenReturn(mockCluster); when(mockSession.getCluster()).thenReturn(mockCluster);
} catch (Exception e) { } catch (Exception e) {
fail(e.getMessage()); fail(e.getMessage());
} }
return mockCluster; return mockCluster;
} }
} }
} }

View File

@ -385,7 +385,7 @@ public class QueryCassandraTest {
@Override @Override
protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password) { String username, String password, String compressionType) {
Cluster mockCluster = mock(Cluster.class); Cluster mockCluster = mock(Cluster.class);
try { try {
Metadata mockMetadata = mock(Metadata.class); Metadata mockMetadata = mock(Metadata.class);

View File

@ -1,285 +1,304 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.service; package org.apache.nifi.service;
import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.JdkSSLOptions; import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session; import com.datastax.driver.core.ProtocolOptions;
import org.apache.commons.lang3.StringUtils; import com.datastax.driver.core.Session;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.cassandra.CassandraSessionProviderService; import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.cassandra.CassandraSessionProviderService;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress; import javax.net.ssl.SSLContext;
import java.util.ArrayList; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.ArrayList;
import java.util.List; import java.util.Arrays;
import java.util.List;
@Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
@CapabilityDescription("Provides connection session for Cassandra processors to work with Apache Cassandra.") @Tags({"cassandra", "dbcp", "database", "connection", "pooling"})
public class CassandraSessionProvider extends AbstractControllerService implements CassandraSessionProviderService { @CapabilityDescription("Provides connection session for Cassandra processors to work with Apache Cassandra.")
public class CassandraSessionProvider extends AbstractControllerService implements CassandraSessionProviderService {
public static final int DEFAULT_CASSANDRA_PORT = 9042;
public static final int DEFAULT_CASSANDRA_PORT = 9042;
// Common descriptors
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() // Common descriptors
.name("Cassandra Contact Points") public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.description("Contact points are addresses of Cassandra nodes. The list of contact points should be " .name("Cassandra Contact Points")
+ "comma-separated and in hostname:port format. Example node1:port,node2:port,...." .description("Contact points are addresses of Cassandra nodes. The list of contact points should be "
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") + "comma-separated and in hostname:port format. Example node1:port,node2:port,...."
.required(true) + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
.name("Keyspace") public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
.description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " + .name("Keyspace")
"include the keyspace name before any table reference, in case of 'query' native processors or " + .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " +
"if the processor supports the 'Table' property, the keyspace name has to be provided with the " + "include the keyspace name before any table reference, in case of 'query' native processors or " +
"table name in the form of <KEYSPACE>.<TABLE>") "if the processor supports the 'Table' property, the keyspace name has to be provided with the " +
.required(false) "table name in the form of <KEYSPACE>.<TABLE>")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.description("The SSL Context Service used to provide client certificate information for TLS/SSL " .name("SSL Context Service")
+ "connections.") .description("The SSL Context Service used to provide client certificate information for TLS/SSL "
.required(false) + "connections.")
.identifiesControllerService(SSLContextService.class) .required(false)
.build(); .identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth") public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " .name("Client Auth")
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. "
+ "has been defined and enabled.") + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
.required(false) + "has been defined and enabled.")
.allowableValues(SSLContextService.ClientAuth.values()) .required(false)
.defaultValue("REQUIRED") .allowableValues(SSLContextService.ClientAuth.values())
.build(); .defaultValue("REQUIRED")
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("Username") public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.description("Username to access the Cassandra cluster") .name("Username")
.required(false) .description("Username to access the Cassandra cluster")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password") public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.description("Password to access the Cassandra cluster") .name("Password")
.required(false) .description("Password to access the Cassandra cluster")
.sensitive(true) .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
.name("Consistency Level") public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
.description("The strategy for how many replicas must respond before results are returned.") .name("Consistency Level")
.required(true) .description("The strategy for how many replicas must respond before results are returned.")
.allowableValues(ConsistencyLevel.values()) .required(true)
.defaultValue("ONE") .allowableValues(ConsistencyLevel.values())
.build(); .defaultValue("ONE")
.build();
private List<PropertyDescriptor> properties;
private Cluster cluster; static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
private Session cassandraSession; .name("Compression Type")
.description("Enable compression at transport-level requests and responses")
@Override .required(false)
public void init(final ControllerServiceInitializationContext context) { .allowableValues(ProtocolOptions.Compression.values())
List<PropertyDescriptor> props = new ArrayList<>(); .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NONE")
props.add(CONTACT_POINTS); .build();
props.add(CLIENT_AUTH);
props.add(CONSISTENCY_LEVEL); private List<PropertyDescriptor> properties;
props.add(KEYSPACE); private Cluster cluster;
props.add(USERNAME); private Session cassandraSession;
props.add(PASSWORD);
props.add(PROP_SSL_CONTEXT_SERVICE); @Override
public void init(final ControllerServiceInitializationContext context) {
properties = props; List<PropertyDescriptor> props = new ArrayList<>();
}
props.add(CONTACT_POINTS);
@Override props.add(CLIENT_AUTH);
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { props.add(CONSISTENCY_LEVEL);
return properties; props.add(COMPRESSION_TYPE);
} props.add(KEYSPACE);
props.add(USERNAME);
@OnEnabled props.add(PASSWORD);
public void onEnabled(final ConfigurationContext context) { props.add(PROP_SSL_CONTEXT_SERVICE);
connectToCassandra(context);
} properties = props;
}
@OnDisabled
public void onDisabled(){ @Override
if (cassandraSession != null) { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
cassandraSession.close(); return properties;
} }
if (cluster != null) {
cluster.close(); @OnEnabled
} public void onEnabled(final ConfigurationContext context) {
} connectToCassandra(context);
}
@OnStopped
public void onStopped() { @OnDisabled
if (cassandraSession != null) { public void onDisabled(){
cassandraSession.close(); if (cassandraSession != null) {
} cassandraSession.close();
if (cluster != null) { }
cluster.close(); if (cluster != null) {
} cluster.close();
} }
}
@Override
public Cluster getCluster() { @OnStopped
if (cluster != null) { public void onStopped() {
return cluster; if (cassandraSession != null) {
} else { cassandraSession.close();
throw new ProcessException("Unable to get the Cassandra cluster detail."); }
} if (cluster != null) {
} cluster.close();
}
@Override }
public Session getCassandraSession() {
if (cassandraSession != null) { @Override
return cassandraSession; public Cluster getCluster() {
} else { if (cluster != null) {
throw new ProcessException("Unable to get the Cassandra session."); return cluster;
} } else {
} throw new ProcessException("Unable to get the Cassandra cluster detail.");
}
private void connectToCassandra(ConfigurationContext context) { }
if (cluster == null) {
ComponentLog log = getLogger(); @Override
final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue(); public Session getCassandraSession() {
final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); if (cassandraSession != null) {
List<InetSocketAddress> contactPoints = getContactPoints(contactPointList); return cassandraSession;
} else {
// Set up the client for secure (SSL/TLS communications) if configured to do so throw new ProcessException("Unable to get the Cassandra session.");
final SSLContextService sslService = }
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); }
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext; private void connectToCassandra(ConfigurationContext context) {
if (cluster == null) {
if (sslService != null) { ComponentLog log = getLogger();
final SSLContextService.ClientAuth clientAuth; final String contactPointList = context.getProperty(CONTACT_POINTS).evaluateAttributeExpressions().getValue();
if (StringUtils.isBlank(rawClientAuth)) { final String consistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
clientAuth = SSLContextService.ClientAuth.REQUIRED; final String compressionType = context.getProperty(COMPRESSION_TYPE).getValue();
} else {
try { List<InetSocketAddress> contactPoints = getContactPoints(contactPointList);
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) { // Set up the client for secure (SSL/TLS communications) if configured to do so
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", final SSLContextService sslService =
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
} final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
} final SSLContext sslContext;
sslContext = sslService.createSSLContext(clientAuth);
} else { if (sslService != null) {
sslContext = null; final SSLContextService.ClientAuth clientAuth;
} if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
final String username, password; } else {
PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions(); try {
PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions(); clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
if (usernameProperty != null && passwordProperty != null) { throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
username = usernameProperty.getValue(); rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
password = passwordProperty.getValue(); }
} else { }
username = null; sslContext = sslService.createSSLContext(clientAuth);
password = null; } else {
} sslContext = null;
}
// Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password); final String username, password;
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions(); PropertyValue usernameProperty = context.getProperty(USERNAME).evaluateAttributeExpressions();
final Session newSession; PropertyValue passwordProperty = context.getProperty(PASSWORD).evaluateAttributeExpressions();
if (keyspaceProperty != null) {
newSession = newCluster.connect(keyspaceProperty.getValue()); if (usernameProperty != null && passwordProperty != null) {
} else { username = usernameProperty.getValue();
newSession = newCluster.connect(); password = passwordProperty.getValue();
} } else {
newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)); username = null;
Metadata metadata = newCluster.getMetadata(); password = null;
log.info("Connected to Cassandra cluster: {}", new Object[]{metadata.getClusterName()}); }
cluster = newCluster; // Create the cluster and connect to it
cassandraSession = newSession; Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType);
} PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
} final Session newSession;
if (keyspaceProperty != null) {
private List<InetSocketAddress> getContactPoints(String contactPointList) { newSession = newCluster.connect(keyspaceProperty.getValue());
} else {
if (contactPointList == null) { newSession = newCluster.connect();
return null; }
} newCluster.getConfiguration().getQueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel));
Metadata metadata = newCluster.getMetadata();
final List<String> contactPointStringList = Arrays.asList(contactPointList.split(",")); log.info("Connected to Cassandra cluster: {}", new Object[]{metadata.getClusterName()});
List<InetSocketAddress> contactPoints = new ArrayList<>();
cluster = newCluster;
for (String contactPointEntry : contactPointStringList) { cassandraSession = newSession;
String[] addresses = contactPointEntry.split(":"); }
final String hostName = addresses[0].trim(); }
final int port = (addresses.length > 1) ? Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
private List<InetSocketAddress> getContactPoints(String contactPointList) {
contactPoints.add(new InetSocketAddress(hostName, port));
} if (contactPointList == null) {
return null;
return contactPoints; }
}
final List<String> contactPointStringList = Arrays.asList(contactPointList.split(","));
private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, List<InetSocketAddress> contactPoints = new ArrayList<>();
String username, String password) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints); for (String contactPointEntry : contactPointStringList) {
String[] addresses = contactPointEntry.split(":");
if (sslContext != null) { final String hostName = addresses[0].trim();
JdkSSLOptions sslOptions = JdkSSLOptions.builder() final int port = (addresses.length > 1) ? Integer.parseInt(addresses[1].trim()) : DEFAULT_CASSANDRA_PORT;
.withSSLContext(sslContext)
.build(); contactPoints.add(new InetSocketAddress(hostName, port));
builder = builder.withSSL(sslOptions); }
}
return contactPoints;
if (username != null && password != null) { }
builder = builder.withCredentials(username, password);
} private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) {
return builder.build(); Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
}
} if (sslContext != null) {
JdkSSLOptions sslOptions = JdkSSLOptions.builder()
.withSSLContext(sslContext)
.build();
builder = builder.withSSL(sslOptions);
}
if (username != null && password != null) {
builder = builder.withCredentials(username, password);
}
if(ProtocolOptions.Compression.SNAPPY.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.SNAPPY);
} else if(ProtocolOptions.Compression.LZ4.equals(compressionType)) {
builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
}
return builder.build();
}
}

View File

@ -1,59 +1,59 @@
/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.service; package org.apache.nifi.service;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class TestCassandraSessionProvider { public class TestCassandraSessionProvider {
private static TestRunner runner; private static TestRunner runner;
private static CassandraSessionProvider sessionProvider; private static CassandraSessionProvider sessionProvider;
@BeforeClass @BeforeClass
public static void setup() throws InitializationException { public static void setup() throws InitializationException {
MockCassandraProcessor mockCassandraProcessor = new MockCassandraProcessor(); MockCassandraProcessor mockCassandraProcessor = new MockCassandraProcessor();
sessionProvider = new CassandraSessionProvider(); sessionProvider = new CassandraSessionProvider();
runner = TestRunners.newTestRunner(mockCassandraProcessor); runner = TestRunners.newTestRunner(mockCassandraProcessor);
runner.addControllerService("cassandra-session-provider", sessionProvider); runner.addControllerService("cassandra-session-provider", sessionProvider);
} }
@Test @Test
public void testGetPropertyDescriptors() { public void testGetPropertyDescriptors() {
List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors(); List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors();
assertEquals(7, properties.size()); assertEquals(8, properties.size());
assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH)); assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL)); assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS)); assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));
assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE)); assertTrue(properties.contains(CassandraSessionProvider.KEYSPACE));
assertTrue(properties.contains(CassandraSessionProvider.PASSWORD)); assertTrue(properties.contains(CassandraSessionProvider.PASSWORD));
assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE)); assertTrue(properties.contains(CassandraSessionProvider.PROP_SSL_CONTEXT_SERVICE));
assertTrue(properties.contains(CassandraSessionProvider.USERNAME)); assertTrue(properties.contains(CassandraSessionProvider.USERNAME));
} }
} }