diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index 561f36bb0b..5bc0e0e5b5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -35,8 +35,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import kafka.producer.Partitioner; @@ -46,20 +44,18 @@ import kafka.producer.Partitioner; */ class KafkaPublisher implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer kafkaProducer; private long ackWaitTime = 30000; - private ComponentLog processLog; + private final ComponentLog componentLog; private final Partitioner partitioner; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -71,7 +67,7 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.kafkaProducer = new KafkaProducer<>(kafkaProperties); @@ -85,6 +81,7 @@ class KafkaPublisher implements Closeable { } catch (Exception e) { throw new IllegalStateException("Failed to create partitioner", e); } + this.componentLog = componentLog; } /** @@ -220,28 +217,14 @@ class KafkaPublisher implements Closeable { this.kafkaProducer.close(); } - /** - * Will set {@link ComponentLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ComponentLog processLog) { - this.processLog = processLog; - } - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - logger.error(message, e); - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message); } } @@ -262,7 +245,7 @@ class KafkaPublisher implements Closeable { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index dc0889a319..4dc8d189a5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -388,8 +388,7 @@ public class PutKafka extends AbstractKafkaProcessor { @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); - kafkaPublisher.setProcessLog(this.getLogger()); + KafkaPublisher kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context), this.getLogger()); return kafkaPublisher; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java index c4fc9a8108..5bb7c3cd58 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -29,6 +30,7 @@ import java.util.Map; import java.util.Properties; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -70,7 +72,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -96,7 +98,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -132,7 +134,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -179,7 +181,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index 366efefba6..f684bfa3cf 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -33,27 +33,22 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.stream.io.util.StreamDemarcator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor * with sending contents of the {@link FlowFile}s to Kafka. */ class KafkaPublisher implements Closeable { - - private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final Producer kafkaProducer; private volatile long ackWaitTime = 30000; - private volatile ComponentLog processLog; + private final ComponentLog componentLog; private final int ackCheckSize; - KafkaPublisher(Properties kafkaProperties) { - this(kafkaProperties, 100); + KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { + this(kafkaProperties, 100, componentLog); } /** @@ -65,9 +60,10 @@ class KafkaPublisher implements Closeable { * instance of {@link Properties} used to bootstrap * {@link KafkaProducer} */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize) { + KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { this.kafkaProducer = new KafkaProducer<>(kafkaProperties); this.ackCheckSize = ackCheckSize; + this.componentLog = componentLog; } /** @@ -199,28 +195,14 @@ class KafkaPublisher implements Closeable { this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); } - /** - * Will set {@link ComponentLog} as an additional logger to forward log - * messages to NiFi bulletin - */ - void setProcessLog(ComponentLog processLog) { - this.processLog = processLog; - } - /** * */ private void warnOrError(String message, Exception e) { if (e == null) { - logger.warn(message); - if (this.processLog != null) { - this.processLog.warn(message); - } + this.componentLog.warn(message); } else { - logger.error(message, e); - if (this.processLog != null) { - this.processLog.error(message, e); - } + this.componentLog.error(message, e); } } @@ -244,7 +226,7 @@ class KafkaPublisher implements Closeable { } public boolean isAllAcked() { - return this.messagesSent - 1 == this.lastMessageAcked; + return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 6235f0b170..6703c04a5b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -228,7 +228,8 @@ public class PublishKafka extends AbstractKafkaProcessor { kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - return new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); + return publisher; } /** diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java index 2c45d373d2..6b8b042d20 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -32,6 +33,7 @@ import java.util.Properties; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; import org.apache.nifi.processors.kafka.test.EmbeddedKafka; import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; @@ -71,7 +73,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsWhole"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); KafkaPublisherResult result = publisher.publish(publishingContext); @@ -97,7 +99,7 @@ public class KafkaPublisherTest { String topicName = "validateSuccessfulSendAsDelimited"; Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8)); @@ -133,7 +135,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 1; @@ -180,7 +182,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); // simulates the first re-try int lastAckedMessageIndex = 3; @@ -221,7 +223,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publisher.publish(publishingContext); @@ -240,7 +242,7 @@ public class KafkaPublisherTest { Properties kafkaProperties = this.buildProducerProperties(); kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName()); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class)); PublishingContext publishingContext = new PublishingContext(contentStream, topicName); publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8)); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index af550b4a17..be9757820c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -153,7 +153,7 @@ public class PublishKafkaTest { runner.setProperty(PublishKafka.KEY, "key1"); runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); - runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis"); final String text = "Hello World\nGoodbye\nfail\n2"; runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); @@ -164,6 +164,7 @@ public class PublishKafkaTest { Producer producer = putKafka.getProducer(); verify(producer, times(4)).send(Mockito.any(ProducerRecord.class)); runner.shutdown(); + putKafka.destroy(); } @SuppressWarnings("unchecked") @@ -191,6 +192,35 @@ public class PublishKafkaTest { runner.shutdown(); } + @SuppressWarnings("unchecked") + @Test + public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception { + String topicName = "validateSendFailureAndThenResendSuccess"; + StubPublishKafka putKafka = new StubPublishKafka(100); + + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(PublishKafka.CLIENT_ID, "foo"); + runner.setProperty(PublishKafka.KEY, "key1"); + runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n"); + runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis"); + + final String text = "futurefail\nHello World\nGoodbye\n2"; + runner.enqueue(text.getBytes(StandardCharsets.UTF_8)); + runner.run(1, false); + MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0); + assertNotNull(ff); + runner.enqueue(ff); + + runner.run(1, false); + assertEquals(0, runner.getQueueSize().getObjectCount()); + Producer producer = putKafka.getProducer(); + // 6 sends due to duplication + verify(producer, times(5)).send(Mockito.any(ProducerRecord.class)); + runner.shutdown(); + } + @SuppressWarnings("unchecked") @Test public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index 2236f30f20..3189356c01 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -22,14 +22,19 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.Properties; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; @@ -45,6 +50,8 @@ public class StubPublishKafka extends PublishKafka { private final int ackCheckSize; + private final ExecutorService executor = Executors.newCachedThreadPool(); + StubPublishKafka(int ackCheckSize) { this.ackCheckSize = ackCheckSize; } @@ -53,6 +60,10 @@ public class StubPublishKafka extends PublishKafka { return producer; } + public void destroy() { + this.executor.shutdownNow(); + } + @SuppressWarnings("unchecked") @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) @@ -66,12 +77,17 @@ public class StubPublishKafka extends PublishKafka { f.setAccessible(true); f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue()); publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); + publisher.setAckWaitTime(15000); producer = mock(Producer.class); this.instrumentProducer(producer, false); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); kf.setAccessible(true); kf.set(publisher, producer); + Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog"); + componentLogF.setAccessible(true); + componentLogF.set(publisher, mock(ComponentLog.class)); + Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize"); ackCheckSizeField.setAccessible(true); ackCheckSizeField.set(publisher, this.ackCheckSize); @@ -84,8 +100,8 @@ public class StubPublishKafka extends PublishKafka { @SuppressWarnings("unchecked") private void instrumentProducer(Producer producer, boolean failRandomly) { + when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer>() { - @SuppressWarnings("rawtypes") @Override public Future answer(InvocationOnMock invocation) throws Throwable { ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; @@ -94,11 +110,19 @@ public class StubPublishKafka extends PublishKafka { StubPublishKafka.this.failed = true; throw new RuntimeException("intentional"); } - Future future = mock(Future.class); - if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { - StubPublishKafka.this.failed = true; - when(future.get(Mockito.anyLong(), Mockito.any())).thenThrow(ExecutionException.class); - } + Future future = executor.submit(new Callable() { + @Override + public RecordMetadata call() throws Exception { + if ("futurefail".equals(value) && !StubPublishKafka.this.failed) { + StubPublishKafka.this.failed = true; + throw new TopicAuthorizationException("Unauthorized"); + } else { + TopicPartition partition = new TopicPartition("foo", 0); + RecordMetadata meta = new RecordMetadata(partition, 0, 0); + return meta; + } + } + }); return future; } });