diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java index d9be6732a0..5f94679dbc 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collections; import java.util.EnumSet; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.avro.Schema; @@ -29,6 +31,12 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.serialization.record.RecordSchema; public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter { + private final Map avroSchemaTextCache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > 10; + } + }; @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { @@ -36,8 +44,22 @@ public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter { @Override public Map getAttributes(final RecordSchema schema) { - final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); - final String schemaText = avroSchema.toString(); + // First, check if schema has the Avro Text available already. + final Optional schemaFormat = schema.getSchemaFormat(); + if (schemaFormat.isPresent() && AvroTypeUtil.AVRO_SCHEMA_FORMAT.equals(schemaFormat.get())) { + final Optional schemaText = schema.getSchemaText(); + if (schemaText.isPresent()) { + return Collections.singletonMap("avro.schema", schemaText.get()); + } + } + + String schemaText = avroSchemaTextCache.get(schema); + if (schemaText == null) { + final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); + schemaText = avroSchema.toString(); + avroSchemaTextCache.put(schema, schemaText); + } + return Collections.singletonMap("avro.schema", schemaText); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 72c90d2646..2e25129727 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; @@ -164,8 +166,10 @@ public class PublisherLease implements Closeable { recordCount++; baos.reset(); + Map additionalAttributes = Collections.emptyMap(); try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { - writer.write(record); + final WriteResult writeResult = writer.write(record); + additionalAttributes = writeResult.getAttributes(); writer.flush(); } @@ -173,7 +177,7 @@ public class PublisherLease implements Closeable { final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); - publish(flowFile, messageKey, messageContent, topic, tracker); + publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else. @@ -195,7 +199,7 @@ public class PublisherLease implements Closeable { } } - private void addHeaders(final FlowFile flowFile, final ProducerRecord record) { + private void addHeaders(final FlowFile flowFile, final Map additionalAttributes, final ProducerRecord record) { if (attributeNameRegex == null) { return; } @@ -206,11 +210,23 @@ public class PublisherLease implements Closeable { headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)); } } + + for (final Map.Entry entry : additionalAttributes.entrySet()) { + if (attributeNameRegex.matcher(entry.getKey()).matches()) { + headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)); + } + } } protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker); + } + + protected void publish(final FlowFile flowFile, final Map additionalAttributes, + final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + final ProducerRecord record = new ProducerRecord<>(topic, null, messageKey, messageContent); - addHeaders(flowFile, record); + addHeaders(flowFile, additionalAttributes, record); producer.send(record, new Callback() { @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java index b7d4abdaa6..9a209d5d9b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_11.java @@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_11 { runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_11.REL_SUCCESS, 2); verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); - verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); + verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index d2b52dd228..3ab7abb099 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -38,12 +38,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; -import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; @@ -270,13 +270,12 @@ public class TestPublisherLease { final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = reader.getSchema(); - final RecordSetWriterFactory writerService = new MockRecordWriter("person_id, name, age"); - final String topic = "unit-test"; final String keyField = "person_id"; final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class); final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); + Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap())); Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java deleted file mode 100644 index 819e3b73ac..0000000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; - -import sun.misc.Unsafe; - -class TestUtils { - - public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(instance, newValue); - } - - static Unsafe getUnsafe() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - -} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 90a909d9b6..0eb860688b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public WriteResult write(Record record) throws IOException { - return null; + return WriteResult.of(1, Collections.emptyMap()); } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java deleted file mode 100644 index a720b118ae..0000000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.test; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.Properties; - -import org.apache.commons.io.FileUtils; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; - -/** - * Embedded Kafka server, primarily to be used for testing. - */ -public class EmbeddedKafka { - - private final KafkaServerStartable kafkaServer; - - private final Properties zookeeperConfig; - - private final Properties kafkaConfig; - - private final ZooKeeperServer zkServer; - - private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class); - - private final int kafkaPort; - - private final int zookeeperPort; - - private boolean started; - - /** - * Will create instance of the embedded Kafka server. Kafka and Zookeeper - * configuration properties will be loaded from 'server.properties' and - * 'zookeeper.properties' located at the root of the classpath. - */ - public EmbeddedKafka() { - this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties")); - } - - /** - * Will create instance of the embedded Kafka server. - * - * @param kafkaConfig - * Kafka configuration properties - * @param zookeeperConfig - * Zookeeper configuration properties - */ - public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) { - this.cleanupKafkaWorkDir(); - this.zookeeperConfig = zookeeperConfig; - this.kafkaConfig = kafkaConfig; - this.kafkaPort = this.availablePort(); - this.zookeeperPort = this.availablePort(); - - this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort)); - this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort); - this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort)); - this.zkServer = new ZooKeeperServer(); - this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig)); - } - - /** - * - * @return port for Kafka server - */ - public int getKafkaPort() { - if (!this.started) { - throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined."); - } - return this.kafkaPort; - } - - /** - * - * @return port for Zookeeper server - */ - public int getZookeeperPort() { - if (!this.started) { - throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined."); - } - return this.zookeeperPort; - } - - /** - * Will start embedded Kafka server. Its data directories will be created - * at 'kafka-tmp' directory relative to the working directory of the current - * runtime. The data directories will be deleted upon JVM exit. - * - */ - public void start() { - if (!this.started) { - logger.info("Starting Zookeeper server"); - this.startZookeeper(); - - logger.info("Starting Kafka server"); - this.kafkaServer.startup(); - - logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port() - + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect")); - this.started = true; - } - } - - /** - * Will stop embedded Kafka server, cleaning up all working directories. - */ - public void stop() { - if (this.started) { - logger.info("Shutting down Kafka server"); - this.kafkaServer.shutdown(); - this.kafkaServer.awaitShutdown(); - logger.info("Shutting down Zookeeper server"); - this.shutdownZookeeper(); - logger.info("Embedded Kafka is shut down."); - this.cleanupKafkaWorkDir(); - this.started = false; - } - } - - /** - * - */ - private void cleanupKafkaWorkDir() { - File kafkaTmp = new File("target/kafka-tmp"); - try { - FileUtils.deleteDirectory(kafkaTmp); - } catch (Exception e) { - logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath()); - } - } - - /** - * Will start Zookeeper server via {@link ServerCnxnFactory} - */ - private void startZookeeper() { - QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); - try { - quorumConfiguration.parseProperties(this.zookeeperConfig); - - ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir())); - - zkServer.setTxnLogFactory(txnLog); - zkServer.setTickTime(configuration.getTickTime()); - zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout()); - zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout()); - ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory(); - zookeeperConnectionFactory.configure(configuration.getClientPortAddress(), - configuration.getMaxClientCnxns()); - zookeeperConnectionFactory.startup(zkServer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new IllegalStateException("Failed to start Zookeeper server", e); - } - } - - /** - * Will shut down Zookeeper server. - */ - private void shutdownZookeeper() { - zkServer.shutdown(); - } - - /** - * Will load {@link Properties} from properties file discovered at the - * provided path relative to the root of the classpath. - */ - private static Properties loadPropertiesFromClasspath(String path) { - try { - Properties kafkaProperties = new Properties(); - kafkaProperties.load(Class.class.getResourceAsStream(path)); - return kafkaProperties; - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - /** - * Will determine the available port used by Kafka/Zookeeper servers. - */ - private int availablePort() { - ServerSocket s = null; - try { - s = new ServerSocket(0); - s.setReuseAddress(true); - return s.getLocalPort(); - } catch (Exception e) { - throw new IllegalStateException("Failed to discover available port.", e); - } finally { - try { - s.close(); - } catch (IOException e) { - // ignore - } - } - } -} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java deleted file mode 100644 index 819e3b73ac..0000000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; - -import sun.misc.Unsafe; - -class TestUtils { - - public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(instance, newValue); - } - - static Unsafe getUnsafe() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - -} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 2b1cfe2610..1c241a41c4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -39,6 +40,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; @@ -163,8 +165,10 @@ public class PublisherLease implements Closeable { recordCount++; baos.reset(); + Map additionalAttributes = Collections.emptyMap(); try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { - writer.write(record); + final WriteResult writeResult = writer.write(record); + additionalAttributes = writeResult.getAttributes(); writer.flush(); } @@ -172,7 +176,7 @@ public class PublisherLease implements Closeable { final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); - publish(flowFile, messageKey, messageContent, topic, tracker); + publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else. @@ -194,7 +198,7 @@ public class PublisherLease implements Closeable { } } - private void addHeaders(final FlowFile flowFile, final ProducerRecord record) { + private void addHeaders(final FlowFile flowFile, final Map additionalAttributes, final ProducerRecord record) { if (attributeNameRegex == null) { return; } @@ -205,11 +209,23 @@ public class PublisherLease implements Closeable { headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)); } } + + for (final Map.Entry entry : additionalAttributes.entrySet()) { + if (attributeNameRegex.matcher(entry.getKey()).matches()) { + headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)); + } + } } protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker); + } + + protected void publish(final FlowFile flowFile, final Map additionalAttributes, + final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + final ProducerRecord record = new ProducerRecord<>(topic, null, messageKey, messageContent); - addHeaders(flowFile, record); + addHeaders(flowFile, additionalAttributes, record); producer.send(record, new Callback() { @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java index 45439cc126..abadc8920c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java @@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_1_0 { runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2); verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME)); - verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); + verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); verify(mockLease, times(1)).complete(); verify(mockLease, times(0)).poison(); verify(mockLease, times(1)).close(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index b2e1b0ef1f..2fbf539377 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -43,6 +43,7 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; @@ -274,6 +275,7 @@ public class TestPublisherLease { final RecordSetWriterFactory writerFactory = Mockito.mock(RecordSetWriterFactory.class); final RecordSetWriter writer = Mockito.mock(RecordSetWriter.class); + Mockito.when(writer.write(Mockito.any(Record.class))).thenReturn(WriteResult.of(1, Collections.emptyMap())); Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any())).thenReturn(writer); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java deleted file mode 100644 index 819e3b73ac..0000000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; - -import sun.misc.Unsafe; - -class TestUtils { - - public static void setFinalField(Field field, Object instance, Object newValue) throws Exception { - field.setAccessible(true); - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); - - field.set(instance, newValue); - } - - static Unsafe getUnsafe() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - -} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 90a909d9b6..0eb860688b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -108,7 +108,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor @Override public WriteResult write(Record record) throws IOException { - return null; + return WriteResult.of(1, Collections.emptyMap()); } @Override diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java deleted file mode 100644 index a720b118ae..0000000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.test; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.Properties; - -import org.apache.commons.io.FileUtils; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; - -/** - * Embedded Kafka server, primarily to be used for testing. - */ -public class EmbeddedKafka { - - private final KafkaServerStartable kafkaServer; - - private final Properties zookeeperConfig; - - private final Properties kafkaConfig; - - private final ZooKeeperServer zkServer; - - private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class); - - private final int kafkaPort; - - private final int zookeeperPort; - - private boolean started; - - /** - * Will create instance of the embedded Kafka server. Kafka and Zookeeper - * configuration properties will be loaded from 'server.properties' and - * 'zookeeper.properties' located at the root of the classpath. - */ - public EmbeddedKafka() { - this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties")); - } - - /** - * Will create instance of the embedded Kafka server. - * - * @param kafkaConfig - * Kafka configuration properties - * @param zookeeperConfig - * Zookeeper configuration properties - */ - public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) { - this.cleanupKafkaWorkDir(); - this.zookeeperConfig = zookeeperConfig; - this.kafkaConfig = kafkaConfig; - this.kafkaPort = this.availablePort(); - this.zookeeperPort = this.availablePort(); - - this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort)); - this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort); - this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort)); - this.zkServer = new ZooKeeperServer(); - this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig)); - } - - /** - * - * @return port for Kafka server - */ - public int getKafkaPort() { - if (!this.started) { - throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined."); - } - return this.kafkaPort; - } - - /** - * - * @return port for Zookeeper server - */ - public int getZookeeperPort() { - if (!this.started) { - throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined."); - } - return this.zookeeperPort; - } - - /** - * Will start embedded Kafka server. Its data directories will be created - * at 'kafka-tmp' directory relative to the working directory of the current - * runtime. The data directories will be deleted upon JVM exit. - * - */ - public void start() { - if (!this.started) { - logger.info("Starting Zookeeper server"); - this.startZookeeper(); - - logger.info("Starting Kafka server"); - this.kafkaServer.startup(); - - logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port() - + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect")); - this.started = true; - } - } - - /** - * Will stop embedded Kafka server, cleaning up all working directories. - */ - public void stop() { - if (this.started) { - logger.info("Shutting down Kafka server"); - this.kafkaServer.shutdown(); - this.kafkaServer.awaitShutdown(); - logger.info("Shutting down Zookeeper server"); - this.shutdownZookeeper(); - logger.info("Embedded Kafka is shut down."); - this.cleanupKafkaWorkDir(); - this.started = false; - } - } - - /** - * - */ - private void cleanupKafkaWorkDir() { - File kafkaTmp = new File("target/kafka-tmp"); - try { - FileUtils.deleteDirectory(kafkaTmp); - } catch (Exception e) { - logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath()); - } - } - - /** - * Will start Zookeeper server via {@link ServerCnxnFactory} - */ - private void startZookeeper() { - QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); - try { - quorumConfiguration.parseProperties(this.zookeeperConfig); - - ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()), new File(configuration.getDataDir())); - - zkServer.setTxnLogFactory(txnLog); - zkServer.setTickTime(configuration.getTickTime()); - zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout()); - zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout()); - ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory(); - zookeeperConnectionFactory.configure(configuration.getClientPortAddress(), - configuration.getMaxClientCnxns()); - zookeeperConnectionFactory.startup(zkServer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new IllegalStateException("Failed to start Zookeeper server", e); - } - } - - /** - * Will shut down Zookeeper server. - */ - private void shutdownZookeeper() { - zkServer.shutdown(); - } - - /** - * Will load {@link Properties} from properties file discovered at the - * provided path relative to the root of the classpath. - */ - private static Properties loadPropertiesFromClasspath(String path) { - try { - Properties kafkaProperties = new Properties(); - kafkaProperties.load(Class.class.getResourceAsStream(path)); - return kafkaProperties; - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - /** - * Will determine the available port used by Kafka/Zookeeper servers. - */ - private int availablePort() { - ServerSocket s = null; - try { - s = new ServerSocket(0); - s.setReuseAddress(true); - return s.getLocalPort(); - } catch (Exception e) { - throw new IllegalStateException("Failed to discover available port.", e); - } finally { - try { - s.close(); - } catch (IOException e) { - // ignore - } - } - } -} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index fc8418149f..41a72c7552 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.OutputStream; import java.math.BigInteger; import java.text.DateFormat; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -118,30 +117,26 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe public Map writeRecord(final Record record) throws IOException { // If we are not writing an active record set, then we need to ensure that we write the // schema information. - boolean firstRecord = false; if (!isActiveRecordSet()) { generator.flush(); schemaAccess.writeHeader(recordSchema, getOutputStream()); - firstRecord = true; } writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), true); - return firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap(); + return schemaAccess.getAttributes(recordSchema); } @Override public WriteResult writeRawRecord(final Record record) throws IOException { // If we are not writing an active record set, then we need to ensure that we write the // schema information. - boolean firstRecord = false; if (!isActiveRecordSet()) { generator.flush(); schemaAccess.writeHeader(recordSchema, getOutputStream()); - firstRecord = true; } writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject(), false); - final Map attributes = firstRecord ? schemaAccess.getAttributes(recordSchema) : Collections.emptyMap(); + final Map attributes = schemaAccess.getAttributes(recordSchema); return WriteResult.of(incrementRecordCount(), attributes); }