From 04db806aceca433e05b68d2b317e07780f150e43 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 25 Aug 2016 12:04:51 -0400 Subject: [PATCH] NIFI-2614 This closes #944. added support for max.request.size --- .../kafka/pubsub/PublishKafka_0_10.java | 15 ++++++++++- .../kafka/pubsub/PublishingContext.java | 27 +++++-------------- .../kafka/pubsub/PublishingContextTest.java | 15 ----------- .../processors/kafka/pubsub/PublishKafka.java | 15 ++++++++++- .../kafka/pubsub/PublishingContext.java | 27 +++++-------------- .../kafka/pubsub/PublishingContextTest.java | 15 ----------- 6 files changed, 40 insertions(+), 74 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java index e29f2af850..3ad2fc647b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -133,6 +134,15 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { .defaultValue("30 sec") .build(); + static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() .name("kafka-key") .displayName("Kafka Key") @@ -207,6 +217,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { _descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(KEY); _descriptors.add(MESSAGE_DEMARCATOR); + _descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(META_WAIT_TIME); _descriptors.add(PARTITION_CLASS); _descriptors.add(COMPRESSION_CODEC); @@ -377,6 +388,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue())); this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); final Properties props = new Properties(); props.putAll(kafkaProps); @@ -461,7 +473,8 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex, + context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()); publishingContext.setKeyBytes(keyBytes); publishingContext.setDelimiterBytes(delimiterBytes); return publishingContext; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java index bda29e6410..1513481df1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java @@ -31,14 +31,7 @@ class PublishingContext { private final int lastAckedMessageIndex; - /* - * We're using the default value from Kafka. We are using it to control the - * message size before it goes to to Kafka thus limiting possibility of a - * late failures in Kafka client. - */ - private int maxRequestSize = 1048576; // kafka default - - private boolean maxRequestSizeSet; + private final int maxRequestSize; private byte[] keyBytes; @@ -49,10 +42,15 @@ class PublishingContext { } PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this(contentStream, topic, lastAckedMessageIndex, 1048576); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) { this.validateInput(contentStream, topic, lastAckedMessageIndex); this.contentStream = contentStream; this.topic = topic; this.lastAckedMessageIndex = lastAckedMessageIndex; + this.maxRequestSize = maxRequestSize; } @Override @@ -106,19 +104,6 @@ class PublishingContext { } } - void setMaxRequestSize(int maxRequestSize) { - if (!this.maxRequestSizeSet) { - if (maxRequestSize > 0) { - this.maxRequestSize = maxRequestSize; - this.maxRequestSizeSet = true; - } else { - throw new IllegalArgumentException("'maxRequestSize' must be > 0"); - } - } else { - throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); - } - } - private void assertBytesValid(byte[] bytes) { if (bytes != null) { if (bytes.length == 0) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java index 4a9a1c07ba..76c29cdd97 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java @@ -87,20 +87,5 @@ public class PublishingContextTest { } catch (IllegalArgumentException e) { // success } - - publishingContext.setMaxRequestSize(1024); - try { - publishingContext.setMaxRequestSize(1024); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - publishingContext.setMaxRequestSize(-10); - fail(); - } catch (IllegalArgumentException e) { - // success - } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 4745984563..65f386e8a8 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -47,6 +47,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -133,6 +134,15 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { .defaultValue("30 sec") .build(); + static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() .name("kafka-key") .displayName("Kafka Key") @@ -207,6 +217,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { _descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(KEY); _descriptors.add(MESSAGE_DEMARCATOR); + _descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(META_WAIT_TIME); _descriptors.add(PARTITION_CLASS); _descriptors.add(COMPRESSION_CODEC); @@ -377,6 +388,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue())); this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); final Properties props = new Properties(); props.putAll(kafkaProps); @@ -461,7 +473,8 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex, + context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()); publishingContext.setKeyBytes(keyBytes); publishingContext.setDelimiterBytes(delimiterBytes); return publishingContext; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java index bda29e6410..1513481df1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java @@ -31,14 +31,7 @@ class PublishingContext { private final int lastAckedMessageIndex; - /* - * We're using the default value from Kafka. We are using it to control the - * message size before it goes to to Kafka thus limiting possibility of a - * late failures in Kafka client. - */ - private int maxRequestSize = 1048576; // kafka default - - private boolean maxRequestSizeSet; + private final int maxRequestSize; private byte[] keyBytes; @@ -49,10 +42,15 @@ class PublishingContext { } PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this(contentStream, topic, lastAckedMessageIndex, 1048576); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) { this.validateInput(contentStream, topic, lastAckedMessageIndex); this.contentStream = contentStream; this.topic = topic; this.lastAckedMessageIndex = lastAckedMessageIndex; + this.maxRequestSize = maxRequestSize; } @Override @@ -106,19 +104,6 @@ class PublishingContext { } } - void setMaxRequestSize(int maxRequestSize) { - if (!this.maxRequestSizeSet) { - if (maxRequestSize > 0) { - this.maxRequestSize = maxRequestSize; - this.maxRequestSizeSet = true; - } else { - throw new IllegalArgumentException("'maxRequestSize' must be > 0"); - } - } else { - throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); - } - } - private void assertBytesValid(byte[] bytes) { if (bytes != null) { if (bytes.length == 0) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java index 4a9a1c07ba..76c29cdd97 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java @@ -87,20 +87,5 @@ public class PublishingContextTest { } catch (IllegalArgumentException e) { // success } - - publishingContext.setMaxRequestSize(1024); - try { - publishingContext.setMaxRequestSize(1024); - fail(); - } catch (IllegalArgumentException e) { - // success - } - - try { - publishingContext.setMaxRequestSize(-10); - fail(); - } catch (IllegalArgumentException e) { - // success - } } }