NIFI-7586 In CassandraSesionProvider added properties to set socket-level read timeout and connect timeout.

In QueryCassandra when writing flowfile to the sesion it's done on the raw OutputStream.
Wrapped it in a BufferedOutputStream for better performance.

This closes #4368.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Tamas Palfy 2020-06-29 16:39:58 +02:00 committed by Peter Turcsanyi
parent 87ec8558a4
commit c2f46c44ca
3 changed files with 45 additions and 5 deletions

View File

@ -56,6 +56,7 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -210,8 +211,8 @@ public class QueryCassandra extends AbstractCassandraProcessor {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream rawOut) throws IOException {
try { try (final OutputStream out = new BufferedOutputStream(rawOut)) {
logger.debug("Executing CQL query {}", new Object[]{selectQuery}); logger.debug("Executing CQL query {}", new Object[]{selectQuery});
final ResultSet resultSet; final ResultSet resultSet;
if (queryTimeout > 0) { if (queryTimeout > 0) {

View File

@ -25,7 +25,9 @@ import com.datastax.driver.core.Session;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import com.datastax.driver.core.SocketOptions;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -126,6 +128,24 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
.defaultValue("NONE") .defaultValue("NONE")
.build(); .build();
static final PropertyDescriptor READ_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("read-timeout-ms")
.displayName("Read Timout (ms)")
.description("Read timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor CONNECT_TIMEOUT_MS = new PropertyDescriptor.Builder()
.name("connect-timeout-ms")
.displayName("Connect Timout (ms)")
.description("Connection timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
private List<PropertyDescriptor> properties; private List<PropertyDescriptor> properties;
private Cluster cluster; private Cluster cluster;
private Session cassandraSession; private Session cassandraSession;
@ -142,6 +162,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
props.add(USERNAME); props.add(USERNAME);
props.add(PASSWORD); props.add(PASSWORD);
props.add(PROP_SSL_CONTEXT_SERVICE); props.add(PROP_SSL_CONTEXT_SERVICE);
props.add(READ_TIMEOUT_MS);
props.add(CONNECT_TIMEOUT_MS);
properties = props; properties = props;
} }
@ -238,8 +260,18 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
password = null; password = null;
} }
PropertyValue readTimeoutMillisProperty = context.getProperty(READ_TIMEOUT_MS).evaluateAttributeExpressions();
Optional<Integer> readTimeoutMillisOptional = Optional.ofNullable(readTimeoutMillisProperty)
.filter(PropertyValue::isSet)
.map(PropertyValue::asInteger);
PropertyValue connectTimeoutMillisProperty = context.getProperty(CONNECT_TIMEOUT_MS).evaluateAttributeExpressions();
Optional<Integer> connectTimeoutMillisOptional = Optional.ofNullable(connectTimeoutMillisProperty)
.filter(PropertyValue::isSet)
.map(PropertyValue::asInteger);
// Create the cluster and connect to it // Create the cluster and connect to it
Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType); Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType, readTimeoutMillisOptional, connectTimeoutMillisOptional);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions(); PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession; final Session newSession;
if (keyspaceProperty != null) { if (keyspaceProperty != null) {
@ -277,7 +309,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
} }
private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
String username, String password, String compressionType) { String username, String password, String compressionType,
Optional<Integer> readTimeoutMillisOptional, Optional<Integer> connectTimeoutMillisOptional) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints); Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) { if (sslContext != null) {
@ -297,6 +330,12 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
builder = builder.withCompression(ProtocolOptions.Compression.LZ4); builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
} }
SocketOptions socketOptions = new SocketOptions();
readTimeoutMillisOptional.ifPresent(socketOptions::setReadTimeoutMillis);
connectTimeoutMillisOptional.ifPresent(socketOptions::setConnectTimeoutMillis);
builder.withSocketOptions(socketOptions);
return builder.build(); return builder.build();
} }
} }

View File

@ -46,7 +46,7 @@ public class TestCassandraSessionProvider {
public void testGetPropertyDescriptors() { public void testGetPropertyDescriptors() {
List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors(); List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors();
assertEquals(8, properties.size()); assertEquals(10, properties.size());
assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH)); assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL)); assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS)); assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));