NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)

This commit is contained in:
greyp9 2022-11-30 13:23:23 -05:00 committed by GitHub
parent c79eca94a1
commit 282c56b5ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 558 additions and 61 deletions

View File

@ -431,7 +431,8 @@ public class PublisherLease implements Closeable {
} }
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) { protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) {
publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition); final List<Header> headers = toHeaders(flowFile, Collections.emptyMap());
publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
} }
protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent, protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent,

View File

@ -375,7 +375,7 @@
<table border="thin"> <table border="thin">
<tr> <tr>
<th>Record Key</th> <th>Record Key</th>
<td><code>Acme Accounts</code></td> <td><code>Acme Holdings</code></td>
</tr> </tr>
<tr> <tr>
<th>Record Value</th> <th>Record Value</th>

View File

@ -31,17 +31,12 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -57,12 +52,15 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
@ -79,30 +77,29 @@ public class TestPublishKafkaMockParameterized {
public static Stream<Arguments> testCaseParametersProvider() { public static Stream<Arguments> testCaseParametersProvider() {
return Stream.of( return Stream.of(
arguments("PublishRecord/parameterized/flowfileInput1.json", arguments("Publish/parameterized/flowfileInput1.json",
"account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE, "key1A", ".*A.", getAttributes(),
"PublishRecord/parameterized/kafkaOutput1V.json"), "Publish/parameterized/kafkaOutput1A.json"),
arguments("PublishRecord/parameterized/flowfileInput1.json", arguments("Publish/parameterized/flowfileInput1.json",
"account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER, "key1B", ".*B.", getAttributes(),
"PublishRecord/parameterized/kafkaOutput1W.json"), "Publish/parameterized/kafkaOutput1B.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json", arguments("Publish/parameterized/flowfileInputA.json",
"key", ".*1", getAttributes(), PublishStrategy.USE_VALUE, "keyA1", ".*1", getAttributes(),
"PublishRecord/parameterized/kafkaOutputAV.json"), "Publish/parameterized/kafkaOutputA1.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json", arguments("Publish/parameterized/flowfileInputA.json",
"key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER, "keyA2", ".*2", getAttributes(),
"PublishRecord/parameterized/kafkaOutputAW.json") "Publish/parameterized/kafkaOutputA2.json")
); );
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("testCaseParametersProvider") @MethodSource("testCaseParametersProvider")
public void testPublishKafkaRecord(final String flowfileInputResource, public void testPublishKafka(final String flowfileInputResource,
final String messageKeyField, final String messageKey,
final String attributeNameRegex, final String attributeNameRegex,
final Map<String, String> attributes, final Map<String, String> attributes,
final PublishStrategy publishStrategy, final String kafkaRecordExpectedOutputResource)
final String kafkaRecordExpectedOutputResource) throws IOException {
throws IOException, InitializationException {
final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull( final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(flowfileInputResource))); getClass().getClassLoader().getResource(flowfileInputResource)));
logger.trace(new String(flowfileData, UTF_8)); logger.trace(new String(flowfileData, UTF_8));
@ -114,12 +111,11 @@ public class TestPublishKafkaMockParameterized {
final TestRunner runner = getTestRunner(producedRecords); final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", "test-topic"); runner.setProperty("topic", "test-topic");
runner.setProperty("attribute-name-regex", attributeNameRegex); runner.setProperty("attribute-name-regex", attributeNameRegex);
runner.setProperty("message-key-field", messageKeyField); runner.setProperty("kafka-key", messageKey);
runner.setProperty("publish-strategy", publishStrategy.name());
runner.enqueue(flowFile); runner.enqueue(flowFile);
runner.run(1); runner.run(1);
// verify results // verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1); runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size()); assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next(); final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next();
final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter() final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter()
@ -151,7 +147,7 @@ public class TestPublishKafkaMockParameterized {
public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator, public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException { final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject(); jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("RecordHeader-key", jsonGenerator.writeStringField("RecordHeader-key",
(recordHeader.key() == null) ? null : recordHeader.key()); (recordHeader.key() == null) ? null : recordHeader.key());
jsonGenerator.writeObjectField("RecordHeader-value", jsonGenerator.writeObjectField("RecordHeader-value",
(recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8)); (recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8));
@ -174,11 +170,15 @@ public class TestPublishKafkaMockParameterized {
public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator, public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException { SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject(); jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("ProducerRecord-key", jsonGenerator.writeStringField("ProducerRecord-key",
(producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key())); (producerRecord.key() == null) ? null : new String(producerRecord.key(), StandardCharsets.UTF_8));
jsonGenerator.writeObjectField("ProducerRecord-value", jsonGenerator.writeObjectField("ProducerRecord-value",
(producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value())); (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers()); final List<Header> headers = new ArrayList<>();
producerRecord.headers().forEach(headers::add);
final List<Header> headersSorted = headers.stream()
.sorted(Comparator.comparing(Header::key)).collect(Collectors.toList());
jsonGenerator.writeObjectField("ProducerRecord-headers", headersSorted);
jsonGenerator.writeEndObject(); jsonGenerator.writeEndObject();
} }
} }
@ -192,15 +192,8 @@ public class TestPublishKafkaMockParameterized {
return attributes; return attributes;
} }
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) {
throws InitializationException { final PublishKafka_2_6 processor = new PublishKafka_2_6() {
final String readerId = "record-reader";
final RecordReaderFactory readerService = new JsonTreeReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
final String keyWriterId = "record-key-writer";
final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
@Override @Override
protected PublisherPool createPublisherPool(final ProcessContext context) { protected PublisherPool createPublisherPool(final ProcessContext context) {
return getPublisherPool(producedRecords, context); return getPublisherPool(producedRecords, context);
@ -208,15 +201,6 @@ public class TestPublishKafkaMockParameterized {
}; };
final TestRunner runner = TestRunners.newTestRunner(processor); final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
runner.addControllerService(keyWriterId, keyWriterService);
runner.enableControllerService(keyWriterService);
runner.setProperty(keyWriterId, keyWriterId);
return runner; return runner;
} }
@ -229,10 +213,8 @@ public class TestPublishKafkaMockParameterized {
final boolean useTransactions = context.getProperty("use-transactions").asBoolean(); final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue(); final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix); Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue(); final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName); final Charset charset = Charset.forName(charsetName);
final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
return new PublisherPool( return new PublisherPool(
Collections.emptyMap(), Collections.emptyMap(),
@ -243,8 +225,8 @@ public class TestPublishKafkaMockParameterized {
transactionalIdSupplier, transactionalIdSupplier,
attributeNamePattern, attributeNamePattern,
charset, charset,
publishStrategy, null,
recordKeyWriterFactory) { null) {
@Override @Override
public PublisherLease obtainPublisher() { public PublisherLease obtainPublisher() {
return getPublisherLease(producedRecords, context); return getPublisherLease(producedRecords, context);
@ -259,9 +241,6 @@ public class TestPublishKafkaMockParameterized {
final ProcessContext context) { final ProcessContext context) {
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue(); final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex); final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
.asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final Producer<byte[], byte[]> producer = mock(ProducerBB.class); final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> { when(producer.send(any(), any())).then(invocation -> {
@ -280,8 +259,8 @@ public class TestPublishKafkaMockParameterized {
true, true,
patternAttributeName, patternAttributeName,
UTF_8, UTF_8,
publishStrategy, null,
keyWriterFactory) { null) {
@Override @Override
protected long getTimestamp() { protected long getTimestamp() {
return 1000000000000L; return 1000000000000L;

View File

@ -0,0 +1,324 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPublishKafkaRecordMockParameterized {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ObjectMapper mapper = getObjectMapper();
public static Stream<Arguments> testCaseParametersProvider() {
return Stream.of(
arguments("PublishRecord/parameterized/flowfileInput1.json",
"account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE,
"PublishRecord/parameterized/kafkaOutput1V.json"),
arguments("PublishRecord/parameterized/flowfileInput1.json",
"account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutput1W.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json",
"key", ".*1", getAttributes(), PublishStrategy.USE_VALUE,
"PublishRecord/parameterized/kafkaOutputAV.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json",
"key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutputAW.json"),
arguments("PublishRecord/parameterized/flowfileInputDoc1V.json",
"account", "attribute.*", getAttributesDoc1(), PublishStrategy.USE_VALUE,
"PublishRecord/parameterized/kafkaOutputDoc1V.json"),
arguments("PublishRecord/parameterized/flowfileInputDoc1W.json",
null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutputDoc1W.json"),
arguments("PublishRecord/parameterized/flowfileInputDoc2W.json",
null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutputDoc2W.json")
);
}
@ParameterizedTest
@MethodSource("testCaseParametersProvider")
public void testPublishKafkaRecord(final String flowfileInputResource,
final String messageKeyField,
final String attributeNameRegex,
final Map<String, String> attributes,
final PublishStrategy publishStrategy,
final String kafkaRecordExpectedOutputResource)
throws IOException, InitializationException {
final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(flowfileInputResource)));
logger.trace(new String(flowfileData, UTF_8));
final MockFlowFile flowFile = new MockFlowFile(1L);
flowFile.putAttributes(attributes);
flowFile.setData(flowfileData);
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", "test-topic");
if (attributeNameRegex != null) {
runner.setProperty("attribute-name-regex", attributeNameRegex);
}
if (messageKeyField != null) {
runner.setProperty("message-key-field", messageKeyField);
}
runner.setProperty("publish-strategy", publishStrategy.name());
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next();
final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter()
.withObjectIndenter(new DefaultIndenter().withLinefeed("\n"));
final String json = mapper.writer(prettyPrinter).writeValueAsString(kafkaRecord);
logger.trace(json);
final String kafkaRecordExpected = IOUtils.toString(Objects.requireNonNull(
getClass().getClassLoader().getResource(kafkaRecordExpectedOutputResource)), UTF_8);
assertEquals(kafkaRecordExpected, json);
}
private static ObjectMapper getObjectMapper() {
final ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(RecordHeader.class, new HeaderSerializer());
simpleModule.addSerializer(new ProducerRecordBBSerializer(objectMapper));
objectMapper.registerModule(simpleModule);
return objectMapper;
}
/**
* Custom {@link com.fasterxml.jackson} serialization for {@link RecordHeader}.
*/
private static class HeaderSerializer extends JsonSerializer<RecordHeader> {
@Override
public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("RecordHeader-key",
(recordHeader.key() == null) ? null : recordHeader.key());
jsonGenerator.writeObjectField("RecordHeader-value",
(recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8));
jsonGenerator.writeEndObject();
}
}
/**
* Custom {@link com.fasterxml.jackson} serialization for {@link ProducerRecord}.
*/
private static class ProducerRecordBBSerializer extends StdSerializer<ProducerRecord<byte[], byte[]>> {
private final ObjectMapper objectMapper;
protected ProducerRecordBBSerializer(ObjectMapper objectMapper) {
super(ProducerRecord.class, false);
this.objectMapper = objectMapper;
}
@Override
public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
serializeField(jsonGenerator, "ProducerRecord-key", producerRecord.key());
serializeField(jsonGenerator, "ProducerRecord-value", producerRecord.value());
jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
jsonGenerator.writeEndObject();
}
private void serializeField(final JsonGenerator jsonGenerator, final String key, final byte[] value) throws IOException {
if (value == null) {
jsonGenerator.writeObjectField(key, null);
} else {
try {
jsonGenerator.writeObjectField(key, objectMapper.readTree(value));
} catch (final JsonParseException e) {
jsonGenerator.writeStringField(key, new String(value, UTF_8));
}
}
}
}
private static Map<String, String> getAttributes() {
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA1", "attrValueA1");
attributes.put("attrKeyA2", "attrValueA2");
attributes.put("attrKeyB1", "attrValueB1");
attributes.put("attrKeyB2", "attrValueB2");
return attributes;
}
private static Map<String, String> getAttributesDoc1() {
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attributeA", "valueA");
attributes.put("attributeB", "valueB");
attributes.put("otherAttribute", "otherValue");
return attributes;
}
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
throws InitializationException {
final String readerId = "record-reader";
final RecordReaderFactory readerService = new JsonTreeReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
final String keyWriterId = "record-key-writer";
final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return getPublisherPool(producedRecords, context);
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
runner.addControllerService(keyWriterId, keyWriterService);
runner.enableControllerService(keyWriterService);
runner.setProperty(keyWriterId, keyWriterId);
return runner;
}
private PublisherPool getPublisherPool(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final int maxMessageSize = context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue();
final long maxAckWaitMillis = context.getProperty("ack.wait.time").asTimePeriod(TimeUnit.MILLISECONDS);
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
return new PublisherPool(
Collections.emptyMap(),
mock(ComponentLog.class),
maxMessageSize,
maxAckWaitMillis,
useTransactions,
transactionalIdSupplier,
attributeNamePattern,
charset,
publishStrategy,
recordKeyWriterFactory) {
@Override
public PublisherLease obtainPublisher() {
return getPublisherLease(producedRecords, context);
}
};
}
public interface ProducerBB extends Producer<byte[], byte[]> {
}
private PublisherLease getPublisherLease(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
.asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
producedRecords.add(record);
final Callback callback = invocation.getArgument(1);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), null);
return null;
});
return new PublisherLease(
producer,
1024,
1000L,
mock(ComponentLog.class),
true,
patternAttributeName,
UTF_8,
publishStrategy,
keyWriterFactory) {
@Override
protected long getTimestamp() {
return 1000000000000L;
}
};
}
}

View File

@ -0,0 +1,8 @@
{
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number": "AC1234"
}
}

View File

@ -0,0 +1,12 @@
{
"key": {
"type": "person"
},
"value": {
"name": "Mark",
"number": 49
},
"headers": {
"headerA": "headerAValue"
}
}

View File

@ -0,0 +1,18 @@
{
"ProducerRecord-key" : "key1A",
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyA1",
"RecordHeader-value" : "attrValueA1"
}, {
"RecordHeader-key" : "attrKeyA2",
"RecordHeader-value" : "attrValueA2"
} ]
}

View File

@ -0,0 +1,18 @@
{
"ProducerRecord-key" : "key1B",
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyB1",
"RecordHeader-value" : "attrValueB1"
}, {
"RecordHeader-key" : "attrKeyB2",
"RecordHeader-value" : "attrValueB2"
} ]
}

View File

@ -0,0 +1,22 @@
{
"ProducerRecord-key" : "keyA1",
"ProducerRecord-value" : {
"key" : {
"type" : "person"
},
"value" : {
"name" : "Mark",
"number" : 49
},
"headers" : {
"headerA" : "headerAValue"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyA1",
"RecordHeader-value" : "attrValueA1"
}, {
"RecordHeader-key" : "attrKeyB1",
"RecordHeader-value" : "attrValueB1"
} ]
}

View File

@ -0,0 +1,22 @@
{
"ProducerRecord-key" : "keyA2",
"ProducerRecord-value" : {
"key" : {
"type" : "person"
},
"value" : {
"name" : "Mark",
"number" : 49
},
"headers" : {
"headerA" : "headerAValue"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyA2",
"RecordHeader-value" : "attrValueA2"
}, {
"RecordHeader-key" : "attrKeyB2",
"RecordHeader-value" : "attrValueB2"
} ]
}

View File

@ -0,0 +1,8 @@
{
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number": "AC1234"
}
}

View File

@ -0,0 +1,15 @@
{
"key": "Acme Holdings",
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"accountType": "enterprise",
"test": "true"
}
}

View File

@ -0,0 +1,15 @@
{
"key": {
"accountName": "Acme Holdings",
"accountHolder": "John Doe",
"accountId": "280182830-A009"
},
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
}
}

View File

@ -0,0 +1,21 @@
{
"ProducerRecord-key" : {
"name" : "Acme",
"number" : "AC1234"
},
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attributeA",
"RecordHeader-value" : "valueA"
}, {
"RecordHeader-key" : "attributeB",
"RecordHeader-value" : "valueB"
} ]
}

View File

@ -0,0 +1,18 @@
{
"ProducerRecord-key" : "Acme Holdings",
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "accountType",
"RecordHeader-value" : "enterprise"
}, {
"RecordHeader-key" : "test",
"RecordHeader-value" : "true"
} ]
}

View File

@ -0,0 +1,16 @@
{
"ProducerRecord-key" : {
"accountName" : "Acme Holdings",
"accountHolder" : "John Doe",
"accountId" : "280182830-A009"
},
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ ]
}