added embedded Kafka server and tests

This commit is contained in:
Oleg Zhurakousky 2015-11-30 12:34:24 -05:00 committed by Mark Payne
parent 8e031c987b
commit b043d04ecf
6 changed files with 655 additions and 0 deletions

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class GetKafkaIntegrationTests {
private static EmbeddedKafka kafkaLocal;
private static EmbeddedKafkaProducerHelper producerHelper;
@BeforeClass
public static void bforeClass(){
kafkaLocal = new EmbeddedKafka();
kafkaLocal.start();
producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
}
@AfterClass
public static void afterClass() throws Exception {
producerHelper.close();
kafkaLocal.stop();
}
/**
* Will set auto-offset to 'smallest' to ensure that all events (the once
* that were sent before and after consumer startup) are received.
*/
@Test
public void testGetAllMessages() throws Exception {
String topicName = "testGetAllMessages";
GetKafka getKafka = new GetKafka();
final TestRunner runner = TestRunners.newTestRunner(getKafka);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:" + kafkaLocal.getZookeeperPort());
runner.setProperty(GetKafka.TOPIC, topicName);
runner.setProperty(GetKafka.BATCH_SIZE, "5");
runner.setProperty(GetKafka.AUTO_OFFSET_RESET, GetKafka.SMALLEST);
runner.setProperty("consumer.timeout.ms", "300");
producerHelper.sendEvent(topicName, "Hello-1");
producerHelper.sendEvent(topicName, "Hello-2");
producerHelper.sendEvent(topicName, "Hello-3");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
runner.run(20, false);
} finally {
latch.countDown();
}
}
}).start();
// Thread.sleep(1000);
producerHelper.sendEvent(topicName, "Hello-4");
producerHelper.sendEvent(topicName, "Hello-5");
producerHelper.sendEvent(topicName, "Hello-6");
latch.await();
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
// must be two since we sent 6 messages with batch of 5
assertEquals(2, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
String[] events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(5, events.length);
// spot check
assertEquals("Hello-1", events[0]);
assertEquals("Hello-4", events[3]);
flowFile = flowFiles.get(1);
events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(1, events.length);
getKafka.shutdownConsumer();
}
/**
* Based on auto-offset set to 'largest' events sent before consumer start
* should not be consumed.
*
*/
@Test
public void testGetOnlyMessagesAfterConsumerStartup() throws Exception {
String topicName = "testGetOnlyMessagesAfterConsumerStartup";
GetKafka getKafka = new GetKafka();
final TestRunner runner = TestRunners.newTestRunner(getKafka);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:" + kafkaLocal.getZookeeperPort());
runner.setProperty(GetKafka.TOPIC, topicName);
runner.setProperty(GetKafka.BATCH_SIZE, "5");
runner.setProperty("consumer.timeout.ms", "300");
producerHelper.sendEvent(topicName, "Hello-1");
producerHelper.sendEvent(topicName, "Hello-2");
producerHelper.sendEvent(topicName, "Hello-3");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
runner.run(20, false);
} finally {
latch.countDown();
}
}
}).start();
latch.await();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
assertEquals(0, flowFiles.size());
producerHelper.sendEvent(topicName, "Hello-4");
producerHelper.sendEvent(topicName, "Hello-5");
producerHelper.sendEvent(topicName, "Hello-6");
latch.await();
runner.run(5, false);
flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
// must be single since we should only be receiving 4,5 and 6 in batch
// of 5
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
String[] events = new String(flowFile.toByteArray()).split("\\s+");
assertEquals(3, events.length);
assertEquals("Hello-4", events[0]);
assertEquals("Hello-5", events[1]);
assertEquals("Hello-6", events[2]);
getKafka.shutdownConsumer();
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.test;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
/**
* Embedded Kafka server, primarily to be used for testing.
*/
public class EmbeddedKafka {
private final KafkaServerStartable kafkaServer;
private final Properties zookeeperConfig;
private final Properties kafkaConfig;
private final ZooKeeperServer zkServer;
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
private final int kafkaPort;
private final int zookeeperPort;
private boolean started;
/**
* Will create instance of the embedded Kafka server. Kafka and Zookeeper
* configuration properties will be loaded from 'server.properties' and
* 'zookeeper.properties' located at the root of the classpath.
*/
public EmbeddedKafka() {
this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
}
/**
* Will create instance of the embedded Kafka server.
*
* @param kafkaConfig
* Kafka configuration properties
* @param zookeeperConfig
* Zookeeper configuration properties
*/
public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
this.cleanupKafkaWorkDir();
this.zookeeperConfig = zookeeperConfig;
this.kafkaConfig = kafkaConfig;
this.kafkaPort = this.availablePort();
this.zookeeperPort = this.availablePort();
this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
this.zkServer = new ZooKeeperServer();
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
}
/**
*
* @return port for Kafka server
*/
public int getKafkaPort() {
if (!this.started) {
throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
}
return this.kafkaPort;
}
/**
*
* @return port for Zookeeper server
*/
public int getZookeeperPort() {
if (!this.started) {
throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
}
return this.zookeeperPort;
}
/**
* Will start embedded Kafka server. It's data directories will be created
* at 'kafka-tmp' directory relative to the working directory of the current
* runtime. The data directories will be deleted upon JVM exit.
*
*/
public void start() {
if (!this.started) {
logger.info("Starting Zookeeper server");
this.startZookeeper();
logger.info("Starting Kafka server");
this.kafkaServer.startup();
logger.info("Embeded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
this.started = true;
}
}
/**
* Will stop embedded Kafka server, cleaning up all working directories.
*/
public void stop() {
if (this.started) {
logger.info("Shutting down Kafka server");
this.kafkaServer.shutdown();
this.kafkaServer.awaitShutdown();
logger.info("Shutting down Zookeeper server");
this.shutdownZookeeper();
logger.info("Embeded Kafka is shut down.");
this.cleanupKafkaWorkDir();
this.started = false;
}
}
/**
*
*/
private void cleanupKafkaWorkDir() {
File kafkaTmp = new File("kafka-tmp");
try {
FileUtils.deleteDirectory(kafkaTmp);
} catch (Exception e) {
logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
}
}
/**
* Will start Zookeeper server via {@link ServerCnxnFactory}
*/
private void startZookeeper() {
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(this.zookeeperConfig);
ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
FileTxnSnapLog txnLog = new FileTxnSnapLog(new File(configuration.getDataLogDir()),
new File(configuration.getDataDir()));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(configuration.getTickTime());
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory();
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
configuration.getMaxClientCnxns());
zookeeperConnectionFactory.startup(zkServer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
throw new IllegalStateException("Failed to start Zookeeper server", e);
}
}
/**
* Will shut down Zookeeper server.
*/
private void shutdownZookeeper() {
zkServer.shutdown();
}
/**
* Will load {@link Properties} from properties file discovered at the
* provided path relative to the root of the classpath.
*/
private static Properties loadPropertiesFromClasspath(String path) {
try {
Properties kafkaProperties = new Properties();
kafkaProperties.load(Class.class.getResourceAsStream(path));
return kafkaProperties;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
/**
* Will determine the available port used by Kafka/Zookeeper servers.
*/
private int availablePort() {
ServerSocket s = null;
try {
s = new ServerSocket(0);
s.setReuseAddress(true);
return s.getLocalPort();
} catch (Exception e) {
throw new IllegalStateException("Failed to discover available port.", e);
} finally {
try {
s.close();
} catch (IOException e) {
// ignore
}
}
}
}

View File

@ -0,0 +1,94 @@
package org.apache.nifi.processors.kafka.test;
import java.io.Closeable;
import java.io.IOException;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.OldProducer;
/**
* Helper class which helps to produce events targeting {@link EmbeddedKafka}
* server.
*/
public class EmbeddedKafkaProducerHelper implements Closeable {
private final EmbeddedKafka kafkaServer;
private final OldProducer producer;
/**
* Will create an instance of EmbeddedKafkaProducerHelper based on default
* configurations.<br>
* Default configuration includes:<br>
* <i>
* metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
* serializer.class=kafka.serializer.DefaultEncoder<br>
* key.serializer.class=kafka.serializer.DefaultEncoder<br>
* auto.create.topics.enable=true
* </i><br>
* <br>
* If you wish to supply additional configuration properties or override
* existing use
* {@link EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, Properties)}
* constructor.
*
* @param kafkaServer
* instance of {@link EmbeddedKafka}
*/
public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) {
this(kafkaServer, null);
}
/**
* Will create an instance of EmbeddedKafkaProducerHelper based on default
* configurations and additional configuration properties.<br>
* Default configuration includes:<br>
* metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
* serializer.class=kafka.serializer.DefaultEncoder<br>
* key.serializer.class=kafka.serializer.DefaultEncoder<br>
* auto.create.topics.enable=true<br>
* <br>
*
* @param kafkaServer
* instance of {@link EmbeddedKafka}
* @param additionalProperties
* instance of {@link Properties} specifying additional producer
* configuration properties.
*/
public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties additionalProperties) {
this.kafkaServer = kafkaServer;
Properties producerProperties = new Properties();
producerProperties.put("metadata.broker.list", "localhost:" + this.kafkaServer.getKafkaPort());
producerProperties.put("serializer.class", "kafka.serializer.DefaultEncoder");
producerProperties.put("key.serializer.class", "kafka.serializer.DefaultEncoder");
producerProperties.put("auto.create.topics.enable", "true");
if (additionalProperties != null) {
producerProperties.putAll(additionalProperties);
}
this.producer = new OldProducer(producerProperties);
}
/**
* Will send an event to a Kafka topic. If topic doesn't exist it will be
* auto-created.
*
* @param topicName
* Kafka topic name.
* @param event
* string representing an event(message) to be sent to Kafka.
*/
public void sendEvent(String topicName, String event) {
KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>(topicName, event.getBytes());
this.producer.send(data.topic(), data.key(), data.message());
}
/**
* Will close the underlying Kafka producer.
*/
@Override
public void close() throws IOException {
this.producer.close();
}
}

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootCategory=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
log4j.category.org.apache.nifi.processors.kafka=INFO
log4j.category.kafka=ERROR
#log4j.category.org.apache.nifi.startup=INFO

View File

@ -0,0 +1,121 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The port the socket server listens on
#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=kafka-tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=kafka-tmp/zookeeper
# the port at which the clients will connect
#clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0