mirror of https://github.com/apache/nifi.git
NIFI-4756: Updated PublishKafkaRecord processors to include attributes generated from schema write strategy into the message headers when appropriate
This closes #2396. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
28e1bcc9d0
commit
7c1ce17223
|
@ -21,7 +21,9 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
|
@ -29,6 +31,12 @@ import org.apache.nifi.avro.AvroTypeUtil;
|
|||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
|
||||
private final Map<RecordSchema, String> avroSchemaTextCache = new LinkedHashMap<RecordSchema, String>() {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry<RecordSchema, String> eldest) {
|
||||
return size() > 10;
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
|
||||
|
@ -36,8 +44,22 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
|
|||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema schema) {
|
||||
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
|
||||
final String schemaText = avroSchema.toString();
|
||||
// First, check if schema has the Avro Text available already.
|
||||
final Optional<String> schemaFormat = schema.getSchemaFormat();
|
||||
if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) {
|
||||
final Optional<String> schemaText = schema.getSchemaText();
|
||||
if (schemaText.isPresent()) {
|
||||
return Collections.singletonMap("avro.schema", schemaText.get());
|
||||
}
|
||||
}
|
||||
|
||||
String schemaText = avroSchemaTextCache.get(schema);
|
||||
if (schemaText == null) {
|
||||
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
|
||||
schemaText = avroSchema.toString();
|
||||
avroSchemaTextCache.put(schema, schemaText);
|
||||
}
|
||||
|
||||
return Collections.singletonMap("avro.schema", schemaText);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
@ -164,8 +166,10 @@ public class PublisherLease implements Closeable {
|
|||
recordCount++;
|
||||
baos.reset();
|
||||
|
||||
Map<String, String> additionalAttributes = Collections.emptyMap();
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
||||
writer.write(record);
|
||||
final WriteResult writeResult = writer.write(record);
|
||||
additionalAttributes = writeResult.getAttributes();
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
|
@ -173,7 +177,7 @@ public class PublisherLease implements Closeable {
|
|||
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
||||
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
publish(flowFile, messageKey, messageContent, topic, tracker);
|
||||
publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
|
||||
|
||||
if (tracker.isFailed(flowFile)) {
|
||||
// If we have a failure, don't try to send anything else.
|
||||
|
@ -195,7 +199,7 @@ public class PublisherLease implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
|
||||
private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
|
||||
if (attributeNameRegex == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -206,11 +210,23 @@ public class PublisherLease implements Closeable {
|
|||
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
|
||||
}
|
||||
}
|
||||
|
||||
for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
|
||||
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
|
||||
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
|
||||
publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
|
||||
}
|
||||
|
||||
protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
|
||||
final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
|
||||
|
||||
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
|
||||
addHeaders(flowFile, record);
|
||||
addHeaders(flowFile, additionalAttributes, record);
|
||||
|
||||
producer.send(record, new Callback() {
|
||||
@Override
|
||||
|
|
|
@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_11 {
|
|||
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2);
|
||||
|
||||
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
|
||||
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
|
||||
verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
|
||||
verify(mockLease, times(1)).complete();
|
||||
verify(mockLease, times(0)).poison();
|
||||
verify(mockLease, times(1)).close();
|
||||
|
|
|
@ -38,12 +38,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
|
|||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
|
||||
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
@ -270,13 +270,12 @@ public class TestPublisherLease {
|
|||
final RecordSet recordSet = reader.createRecordSet();
|
||||
final RecordSchema schema = reader.getSchema();
|
||||
|
||||
final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age");
|
||||
|
||||
final String topic = "unit-test";
|
||||
final String keyField = "person_id";
|
||||
|
||||
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
|
||||
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
|
||||
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
|
||||
|
||||
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
|
||||
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.kafka.pubsub;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
class TestUtils {
|
||||
|
||||
public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
|
||||
field.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||
|
||||
field.set(instance, newValue);
|
||||
}
|
||||
|
||||
static Unsafe getUnsafe() {
|
||||
try {
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
return (Unsafe) f.get(null);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
|||
|
||||
@Override
|
||||
public WriteResult write(Record record) throws IOException {
|
||||
return null;
|
||||
return WriteResult.of(1, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.kafka.test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ServerConfig;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.KafkaServerStartable;
|
||||
|
||||
/**
|
||||
* Embedded Kafka server, primarily to be used for testing.
|
||||
*/
|
||||
public class EmbeddedKafka {
|
||||
|
||||
private final KafkaServerStartable kafkaServer;
|
||||
|
||||
private final Properties zookeeperConfig;
|
||||
|
||||
private final Properties kafkaConfig;
|
||||
|
||||
private final ZooKeeperServer zkServer;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
|
||||
|
||||
private final int kafkaPort;
|
||||
|
||||
private final int zookeeperPort;
|
||||
|
||||
private boolean started;
|
||||
|
||||
/**
|
||||
* Will create instance of the embedded Kafka server. Kafka and Zookeeper
|
||||
* configuration properties will be loaded from 'server.properties' and
|
||||
* 'zookeeper.properties' located at the root of the classpath.
|
||||
*/
|
||||
public EmbeddedKafka() {
|
||||
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Will create instance of the embedded Kafka server.
|
||||
*
|
||||
* @param kafkaConfig
|
||||
* Kafka configuration properties
|
||||
* @param zookeeperConfig
|
||||
* Zookeeper configuration properties
|
||||
*/
|
||||
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
|
||||
this.cleanupKafkaWorkDir();
|
||||
this.zookeeperConfig = zookeeperConfig;
|
||||
this.kafkaConfig = kafkaConfig;
|
||||
this.kafkaPort = this.availablePort();
|
||||
this.zookeeperPort = this.availablePort();
|
||||
|
||||
this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
|
||||
this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
|
||||
this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
|
||||
this.zkServer = new ZooKeeperServer();
|
||||
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return port for Kafka server
|
||||
*/
|
||||
public int getKafkaPort() {
|
||||
if (!this.started) {
|
||||
throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
|
||||
}
|
||||
return this.kafkaPort;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return port for Zookeeper server
|
||||
*/
|
||||
public int getZookeeperPort() {
|
||||
if (!this.started) {
|
||||
throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
|
||||
}
|
||||
return this.zookeeperPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will start embedded Kafka server. Its data directories will be created
|
||||
* at 'kafka-tmp' directory relative to the working directory of the current
|
||||
* runtime. The data directories will be deleted upon JVM exit.
|
||||
*
|
||||
*/
|
||||
public void start() {
|
||||
if (!this.started) {
|
||||
logger.info("Starting Zookeeper server");
|
||||
this.startZookeeper();
|
||||
|
||||
logger.info("Starting Kafka server");
|
||||
this.kafkaServer.startup();
|
||||
|
||||
logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
|
||||
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
|
||||
this.started = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will stop embedded Kafka server, cleaning up all working directories.
|
||||
*/
|
||||
public void stop() {
|
||||
if (this.started) {
|
||||
logger.info("Shutting down Kafka server");
|
||||
this.kafkaServer.shutdown();
|
||||
this.kafkaServer.awaitShutdown();
|
||||
logger.info("Shutting down Zookeeper server");
|
||||
this.shutdownZookeeper();
|
||||
logger.info("Embedded Kafka is shut down.");
|
||||
this.cleanupKafkaWorkDir();
|
||||
this.started = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private void cleanupKafkaWorkDir() {
|
||||
File kafkaTmp = new File("target/kafka-tmp");
|
||||
try {
|
||||
FileUtils.deleteDirectory(kafkaTmp);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will start Zookeeper server via {@link ServerCnxnFactory}
|
||||
*/
|
||||
private void startZookeeper() {
|
||||
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
|
||||
try {
|
||||
quorumConfiguration.parseProperties(this.zookeeperConfig);
|
||||
|
||||
ServerConfig configuration = new ServerConfig();
|
||||
configuration.readFrom(quorumConfiguration);
|
||||
|
||||
FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
|
||||
|
||||
zkServer.setTxnLogFactory(txnLog);
|
||||
zkServer.setTickTime(configuration.getTickTime());
|
||||
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
|
||||
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
|
||||
ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
|
||||
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
|
||||
configuration.getMaxClientCnxns());
|
||||
zookeeperConnectionFactory.startup(zkServer);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to start Zookeeper server", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will shut down Zookeeper server.
|
||||
*/
|
||||
private void shutdownZookeeper() {
|
||||
zkServer.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Will load {@link Properties} from properties file discovered at the
|
||||
* provided path relative to the root of the classpath.
|
||||
*/
|
||||
private static Properties loadPropertiesFromClasspath(String path) {
|
||||
try {
|
||||
Properties kafkaProperties = new Properties();
|
||||
kafkaProperties.load(Class.class.getResourceAsStream(path));
|
||||
return kafkaProperties;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will determine the available port used by Kafka/Zookeeper servers.
|
||||
*/
|
||||
private int availablePort() {
|
||||
ServerSocket s = null;
|
||||
try {
|
||||
s = new ServerSocket(0);
|
||||
s.setReuseAddress(true);
|
||||
return s.getLocalPort();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to discover available port.", e);
|
||||
} finally {
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.kafka.pubsub;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
class TestUtils {
|
||||
|
||||
public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
|
||||
field.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||
|
||||
field.set(instance, newValue);
|
||||
}
|
||||
|
||||
static Unsafe getUnsafe() {
|
||||
try {
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
return (Unsafe) f.get(null);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
@ -163,8 +165,10 @@ public class PublisherLease implements Closeable {
|
|||
recordCount++;
|
||||
baos.reset();
|
||||
|
||||
Map<String, String> additionalAttributes = Collections.emptyMap();
|
||||
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
|
||||
writer.write(record);
|
||||
final WriteResult writeResult = writer.write(record);
|
||||
additionalAttributes = writeResult.getAttributes();
|
||||
writer.flush();
|
||||
}
|
||||
|
||||
|
@ -172,7 +176,7 @@ public class PublisherLease implements Closeable {
|
|||
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
|
||||
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
publish(flowFile, messageKey, messageContent, topic, tracker);
|
||||
publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker);
|
||||
|
||||
if (tracker.isFailed(flowFile)) {
|
||||
// If we have a failure, don't try to send anything else.
|
||||
|
@ -194,7 +198,7 @@ public class PublisherLease implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
|
||||
private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
|
||||
if (attributeNameRegex == null) {
|
||||
return;
|
||||
}
|
||||
|
@ -205,11 +209,23 @@ public class PublisherLease implements Closeable {
|
|||
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
|
||||
}
|
||||
}
|
||||
|
||||
for (final Map.Entry<String, String> entry : additionalAttributes.entrySet()) {
|
||||
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
|
||||
headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
|
||||
publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker);
|
||||
}
|
||||
|
||||
protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes,
|
||||
final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
|
||||
|
||||
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
|
||||
addHeaders(flowFile, record);
|
||||
addHeaders(flowFile, additionalAttributes, record);
|
||||
|
||||
producer.send(record, new Callback() {
|
||||
@Override
|
||||
|
|
|
@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_1_0 {
|
|||
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
|
||||
|
||||
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
|
||||
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
|
||||
verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
|
||||
verify(mockLease, times(1)).complete();
|
||||
verify(mockLease, times(0)).poison();
|
||||
verify(mockLease, times(1)).close();
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.nifi.serialization.MalformedRecordException;
|
|||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
@ -274,6 +275,7 @@ public class TestPublisherLease {
|
|||
|
||||
final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class);
|
||||
final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class);
|
||||
Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap()));
|
||||
|
||||
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer);
|
||||
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.kafka.pubsub;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
class TestUtils {
|
||||
|
||||
public static void setFinalField(Field field, Object instance, Object newValue) throws Exception {
|
||||
field.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
|
||||
|
||||
field.set(instance, newValue);
|
||||
}
|
||||
|
||||
static Unsafe getUnsafe() {
|
||||
try {
|
||||
Field f = Unsafe.class.getDeclaredField("theUnsafe");
|
||||
f.setAccessible(true);
|
||||
return (Unsafe) f.get(null);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
|
|||
|
||||
@Override
|
||||
public WriteResult write(Record record) throws IOException {
|
||||
return null;
|
||||
return WriteResult.of(1, Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,226 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.kafka.test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zookeeper.server.ServerCnxnFactory;
|
||||
import org.apache.zookeeper.server.ServerConfig;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
||||
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.KafkaServerStartable;
|
||||
|
||||
/**
|
||||
* Embedded Kafka server, primarily to be used for testing.
|
||||
*/
|
||||
public class EmbeddedKafka {
|
||||
|
||||
private final KafkaServerStartable kafkaServer;
|
||||
|
||||
private final Properties zookeeperConfig;
|
||||
|
||||
private final Properties kafkaConfig;
|
||||
|
||||
private final ZooKeeperServer zkServer;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
|
||||
|
||||
private final int kafkaPort;
|
||||
|
||||
private final int zookeeperPort;
|
||||
|
||||
private boolean started;
|
||||
|
||||
/**
|
||||
* Will create instance of the embedded Kafka server. Kafka and Zookeeper
|
||||
* configuration properties will be loaded from 'server.properties' and
|
||||
* 'zookeeper.properties' located at the root of the classpath.
|
||||
*/
|
||||
public EmbeddedKafka() {
|
||||
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Will create instance of the embedded Kafka server.
|
||||
*
|
||||
* @param kafkaConfig
|
||||
* Kafka configuration properties
|
||||
* @param zookeeperConfig
|
||||
* Zookeeper configuration properties
|
||||
*/
|
||||
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
|
||||
this.cleanupKafkaWorkDir();
|
||||
this.zookeeperConfig = zookeeperConfig;
|
||||
this.kafkaConfig = kafkaConfig;
|
||||
this.kafkaPort = this.availablePort();
|
||||
this.zookeeperPort = this.availablePort();
|
||||
|
||||
this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
|
||||
this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
|
||||
this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
|
||||
this.zkServer = new ZooKeeperServer();
|
||||
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return port for Kafka server
|
||||
*/
|
||||
public int getKafkaPort() {
|
||||
if (!this.started) {
|
||||
throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
|
||||
}
|
||||
return this.kafkaPort;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return port for Zookeeper server
|
||||
*/
|
||||
public int getZookeeperPort() {
|
||||
if (!this.started) {
|
||||
throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
|
||||
}
|
||||
return this.zookeeperPort;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will start embedded Kafka server. Its data directories will be created
|
||||
* at 'kafka-tmp' directory relative to the working directory of the current
|
||||
* runtime. The data directories will be deleted upon JVM exit.
|
||||
*
|
||||
*/
|
||||
public void start() {
|
||||
if (!this.started) {
|
||||
logger.info("Starting Zookeeper server");
|
||||
this.startZookeeper();
|
||||
|
||||
logger.info("Starting Kafka server");
|
||||
this.kafkaServer.startup();
|
||||
|
||||
logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
|
||||
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
|
||||
this.started = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will stop embedded Kafka server, cleaning up all working directories.
|
||||
*/
|
||||
public void stop() {
|
||||
if (this.started) {
|
||||
logger.info("Shutting down Kafka server");
|
||||
this.kafkaServer.shutdown();
|
||||
this.kafkaServer.awaitShutdown();
|
||||
logger.info("Shutting down Zookeeper server");
|
||||
this.shutdownZookeeper();
|
||||
logger.info("Embedded Kafka is shut down.");
|
||||
this.cleanupKafkaWorkDir();
|
||||
this.started = false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private void cleanupKafkaWorkDir() {
|
||||
File kafkaTmp = new File("target/kafka-tmp");
|
||||
try {
|
||||
FileUtils.deleteDirectory(kafkaTmp);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will start Zookeeper server via {@link ServerCnxnFactory}
|
||||
*/
|
||||
private void startZookeeper() {
|
||||
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
|
||||
try {
|
||||
quorumConfiguration.parseProperties(this.zookeeperConfig);
|
||||
|
||||
ServerConfig configuration = new ServerConfig();
|
||||
configuration.readFrom(quorumConfiguration);
|
||||
|
||||
FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
|
||||
|
||||
zkServer.setTxnLogFactory(txnLog);
|
||||
zkServer.setTickTime(configuration.getTickTime());
|
||||
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
|
||||
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
|
||||
ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
|
||||
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
|
||||
configuration.getMaxClientCnxns());
|
||||
zookeeperConnectionFactory.startup(zkServer);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to start Zookeeper server", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will shut down Zookeeper server.
|
||||
*/
|
||||
private void shutdownZookeeper() {
|
||||
zkServer.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Will load {@link Properties} from properties file discovered at the
|
||||
* provided path relative to the root of the classpath.
|
||||
*/
|
||||
private static Properties loadPropertiesFromClasspath(String path) {
|
||||
try {
|
||||
Properties kafkaProperties = new Properties();
|
||||
kafkaProperties.load(Class.class.getResourceAsStream(path));
|
||||
return kafkaProperties;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Will determine the available port used by Kafka/Zookeeper servers.
|
||||
*/
|
||||
private int availablePort() {
|
||||
ServerSocket s = null;
|
||||
try {
|
||||
s = new ServerSocket(0);
|
||||
s.setReuseAddress(true);
|
||||
return s.getLocalPort();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to discover available port.", e);
|
||||
} finally {
|
||||
try {
|
||||
s.close();
|
||||
} catch (IOException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
|||
import java.io.OutputStream;
|
||||
import java.math.BigInteger;
|
||||
import java.text.DateFormat;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -118,30 +117,26 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
|
|||
public Map<String, String> writeRecord(final Record record) throws IOException {
|
||||
// If we are not writing an active record set, then we need to ensure that we write the
|
||||
// schema information.
|
||||
boolean firstRecord = false;
|
||||
if (!isActiveRecordSet()) {
|
||||
generator.flush();
|
||||
schemaAccess.writeHeader(recordSchema, getOutputStream());
|
||||
firstRecord = true;
|
||||
}
|
||||
|
||||
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true);
|
||||
return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
|
||||
return schemaAccess.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult writeRawRecord(final Record record) throws IOException {
|
||||
// If we are not writing an active record set, then we need to ensure that we write the
|
||||
// schema information.
|
||||
boolean firstRecord = false;
|
||||
if (!isActiveRecordSet()) {
|
||||
generator.flush();
|
||||
schemaAccess.writeHeader(recordSchema, getOutputStream());
|
||||
firstRecord = true;
|
||||
}
|
||||
|
||||
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false);
|
||||
final Map<String, String> attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
|
||||
final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
|
||||
return WriteResult.of(incrementRecordCount(), attributes);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue