NIFI-2614 This closes #944. added support for max.request.size

This commit is contained in:
Oleg Zhurakousky 2016-08-25 12:04:51 -04:00 committed by joewitt
parent a6133d4ce3
commit 04db806ace
6 changed files with 40 additions and 74 deletions

View File

@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
@ -133,6 +134,15 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
.defaultValue("30 sec") .defaultValue("30 sec")
.build(); .build();
static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("kafka-key") .name("kafka-key")
.displayName("Kafka Key") .displayName("Kafka Key")
@ -207,6 +217,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
_descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(DELIVERY_GUARANTEE);
_descriptors.add(KEY); _descriptors.add(KEY);
_descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MESSAGE_DEMARCATOR);
_descriptors.add(MAX_REQUEST_SIZE);
_descriptors.add(META_WAIT_TIME); _descriptors.add(META_WAIT_TIME);
_descriptors.add(PARTITION_CLASS); _descriptors.add(PARTITION_CLASS);
_descriptors.add(COMPRESSION_CODEC); _descriptors.add(COMPRESSION_CODEC);
@ -377,6 +388,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final Properties props = new Properties(); final Properties props = new Properties();
props.putAll(kafkaProps); props.putAll(kafkaProps);
@ -461,7 +473,8 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
} }
PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex,
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
publishingContext.setKeyBytes(keyBytes); publishingContext.setKeyBytes(keyBytes);
publishingContext.setDelimiterBytes(delimiterBytes); publishingContext.setDelimiterBytes(delimiterBytes);
return publishingContext; return publishingContext;

View File

@ -31,14 +31,7 @@ class PublishingContext {
private final int lastAckedMessageIndex; private final int lastAckedMessageIndex;
/* private final int maxRequestSize;
* We're using the default value from Kafka. We are using it to control the
* message size before it goes to to Kafka thus limiting possibility of a
* late failures in Kafka client.
*/
private int maxRequestSize = 1048576; // kafka default
private boolean maxRequestSizeSet;
private byte[] keyBytes; private byte[] keyBytes;
@ -49,10 +42,15 @@ class PublishingContext {
} }
PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) {
this(contentStream, topic, lastAckedMessageIndex, 1048576);
}
PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) {
this.validateInput(contentStream, topic, lastAckedMessageIndex); this.validateInput(contentStream, topic, lastAckedMessageIndex);
this.contentStream = contentStream; this.contentStream = contentStream;
this.topic = topic; this.topic = topic;
this.lastAckedMessageIndex = lastAckedMessageIndex; this.lastAckedMessageIndex = lastAckedMessageIndex;
this.maxRequestSize = maxRequestSize;
} }
@Override @Override
@ -106,19 +104,6 @@ class PublishingContext {
} }
} }
void setMaxRequestSize(int maxRequestSize) {
if (!this.maxRequestSizeSet) {
if (maxRequestSize > 0) {
this.maxRequestSize = maxRequestSize;
this.maxRequestSizeSet = true;
} else {
throw new IllegalArgumentException("'maxRequestSize' must be > 0");
}
} else {
throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance");
}
}
private void assertBytesValid(byte[] bytes) { private void assertBytesValid(byte[] bytes) {
if (bytes != null) { if (bytes != null) {
if (bytes.length == 0) { if (bytes.length == 0) {

View File

@ -87,20 +87,5 @@ public class PublishingContextTest {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// success // success
} }
publishingContext.setMaxRequestSize(1024);
try {
publishingContext.setMaxRequestSize(1024);
fail();
} catch (IllegalArgumentException e) {
// success
}
try {
publishingContext.setMaxRequestSize(-10);
fail();
} catch (IllegalArgumentException e) {
// success
}
} }
} }

View File

@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
@ -133,6 +134,15 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
.defaultValue("30 sec") .defaultValue("30 sec")
.build(); .build();
static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("kafka-key") .name("kafka-key")
.displayName("Kafka Key") .displayName("Kafka Key")
@ -207,6 +217,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
_descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(DELIVERY_GUARANTEE);
_descriptors.add(KEY); _descriptors.add(KEY);
_descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MESSAGE_DEMARCATOR);
_descriptors.add(MAX_REQUEST_SIZE);
_descriptors.add(META_WAIT_TIME); _descriptors.add(META_WAIT_TIME);
_descriptors.add(PARTITION_CLASS); _descriptors.add(PARTITION_CLASS);
_descriptors.add(COMPRESSION_CODEC); _descriptors.add(COMPRESSION_CODEC);
@ -377,6 +388,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final Properties props = new Properties(); final Properties props = new Properties();
props.putAll(kafkaProps); props.putAll(kafkaProps);
@ -461,7 +473,8 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
} }
PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex,
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
publishingContext.setKeyBytes(keyBytes); publishingContext.setKeyBytes(keyBytes);
publishingContext.setDelimiterBytes(delimiterBytes); publishingContext.setDelimiterBytes(delimiterBytes);
return publishingContext; return publishingContext;

View File

@ -31,14 +31,7 @@ class PublishingContext {
private final int lastAckedMessageIndex; private final int lastAckedMessageIndex;
/* private final int maxRequestSize;
* We're using the default value from Kafka. We are using it to control the
* message size before it goes to to Kafka thus limiting possibility of a
* late failures in Kafka client.
*/
private int maxRequestSize = 1048576; // kafka default
private boolean maxRequestSizeSet;
private byte[] keyBytes; private byte[] keyBytes;
@ -49,10 +42,15 @@ class PublishingContext {
} }
PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) {
this(contentStream, topic, lastAckedMessageIndex, 1048576);
}
PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) {
this.validateInput(contentStream, topic, lastAckedMessageIndex); this.validateInput(contentStream, topic, lastAckedMessageIndex);
this.contentStream = contentStream; this.contentStream = contentStream;
this.topic = topic; this.topic = topic;
this.lastAckedMessageIndex = lastAckedMessageIndex; this.lastAckedMessageIndex = lastAckedMessageIndex;
this.maxRequestSize = maxRequestSize;
} }
@Override @Override
@ -106,19 +104,6 @@ class PublishingContext {
} }
} }
void setMaxRequestSize(int maxRequestSize) {
if (!this.maxRequestSizeSet) {
if (maxRequestSize > 0) {
this.maxRequestSize = maxRequestSize;
this.maxRequestSizeSet = true;
} else {
throw new IllegalArgumentException("'maxRequestSize' must be > 0");
}
} else {
throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance");
}
}
private void assertBytesValid(byte[] bytes) { private void assertBytesValid(byte[] bytes) {
if (bytes != null) { if (bytes != null) {
if (bytes.length == 0) { if (bytes.length == 0) {

View File

@ -87,20 +87,5 @@ public class PublishingContextTest {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// success // success
} }
publishingContext.setMaxRequestSize(1024);
try {
publishingContext.setMaxRequestSize(1024);
fail();
} catch (IllegalArgumentException e) {
// success
}
try {
publishingContext.setMaxRequestSize(-10);
fail();
} catch (IllegalArgumentException e) {
// success
}
} }
} }