diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 72d4a4728da..04725bab46a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -56,6 +56,8 @@ Trunk (Unreleased)
HADOOP-10854. unit tests for the shell scripts (aw)
+ HADOOP-10949. metrics2 sink plugin for Apache Kafka (Babak Behzad via aw)
+
IMPROVEMENTS
HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index fd332461979..a62e45bd9ef 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -45,8 +45,9 @@
2.11.0
- ${project.version}
+ 0.8.2.1
+ ${project.version}
1.0.13
${project.build.directory}/test-dir
diff --git a/hadoop-tools/hadoop-kafka/pom.xml b/hadoop-tools/hadoop-kafka/pom.xml
new file mode 100644
index 00000000000..75405e10d5f
--- /dev/null
+++ b/hadoop-tools/hadoop-kafka/pom.xml
@@ -0,0 +1,129 @@
+
+
+
+ 4.0.0
+
+ org.apache.hadoop
+ hadoop-project
+ 3.0.0-SNAPSHOT
+ ../../hadoop-project
+
+ hadoop-kafka
+ 3.0.0-SNAPSHOT
+ Apache Hadoop Kafka Library support
+
+ This module contains code to support integration with Kafka.
+ It also declares the dependencies needed to work with Kafka.
+
+ jar
+
+
+ UTF-8
+ true
+
+
+
+
+ tests-off
+
+
+ src/test/resources/auth-keys.xml
+
+
+
+ true
+
+
+
+ tests-on
+
+
+ src/test/resources/auth-keys.xml
+
+
+
+ false
+
+
+
+
+
+
+
+
+ org.codehaus.mojo
+ findbugs-maven-plugin
+
+ true
+ true
+ ${basedir}/dev-support/findbugs-exclude.xml
+
+ Max
+
+
+
+ org.apache.maven.plugins
+ maven-project-info-reports-plugin
+
+ false
+ false
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 3600
+
+
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ compile
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ test
+ test-jar
+
+
+
+
+ org.apache.kafka
+ kafka_2.10
+ ${kafka.version}
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java b/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java
new file mode 100644
index 00000000000..d8e5bd83638
--- /dev/null
+++ b/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import com.google.common.base.Strings;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * A metrics sink that writes to a Kafka broker. This requires you to configure
+ * a broker_list and a topic in the metrics2 configuration file. The broker_list
+ * must contain a comma-separated list of kafka broker host and ports. The topic
+ * will contain only one topic.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class KafkaSink implements MetricsSink, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+ public static final String BROKER_LIST = "broker_list";
+ public static final String TOPIC = "topic";
+
+ private String hostname = null;
+ private String brokerList = null;
+ private String topic = null;
+ private Producer producer = null;
+
+ public void setProducer(Producer p) {
+ this.producer = p;
+ }
+
+ @Override
+ public void init(SubsetConfiguration conf) {
+ // Get Kafka broker configuration.
+ Properties props = new Properties();
+ brokerList = conf.getString(BROKER_LIST);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Broker list " + brokerList);
+ }
+ props.put("bootstrap.servers", brokerList);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Kafka brokers: " + brokerList);
+ }
+
+ // Get Kafka topic configuration.
+ topic = conf.getString(TOPIC);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Kafka topic " + topic);
+ }
+ if (Strings.isNullOrEmpty(topic)) {
+ throw new MetricsException("Kafka topic can not be null");
+ }
+
+ // Set the rest of Kafka configuration.
+ props.put("key.serializer",
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put("value.serializer",
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put("request.required.acks", "0");
+
+ // Set the hostname once and use it in every message.
+ hostname = "null";
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (Exception e) {
+ LOG.warn("Error getting Hostname, going to continue");
+ }
+
+ try {
+ // Create the producer object.
+ producer = new KafkaProducer(props);
+ } catch (Exception e) {
+ throw new MetricsException("Error creating Producer, " + brokerList, e);
+ }
+ }
+
+ @Override
+ public void putMetrics(MetricsRecord record) {
+
+ if (producer == null) {
+ throw new MetricsException("Producer in KafkaSink is null!");
+ }
+
+ // Create the json object.
+ StringBuilder jsonLines = new StringBuilder();
+
+ Long timestamp = record.timestamp();
+ Date currDate = new Date(timestamp);
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
+ String date = dateFormat.format(currDate);
+ String time = timeFormat.format(currDate);
+
+ // Collect datapoints and populate the json object.
+ jsonLines.append("{\"hostname\": \"" + hostname);
+ jsonLines.append("\", \"timestamp\": " + timestamp);
+ jsonLines.append(", \"date\": \"" + date);
+ jsonLines.append("\",\"time\": \"" + time);
+ jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
+ for (MetricsTag tag : record.tags()) {
+ jsonLines.append(
+ ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
+ jsonLines.append(" \"" + tag.value().toString() + "\"");
+ }
+ for (AbstractMetric metric : record.metrics()) {
+ jsonLines.append(", \""
+ + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
+ jsonLines.append(" \"" + metric.value().toString() + "\"");
+ }
+ jsonLines.append("}");
+ LOG.debug("kafka message: " + jsonLines.toString());
+
+ // Create the record to be sent from the json.
+ ProducerRecord data = new ProducerRecord(
+ topic, jsonLines.toString().getBytes(Charset.forName("UTF-8")));
+
+ // Send the data to the Kafka broker. Here is an example of this data:
+ // {"hostname": "...", "timestamp": 1436913651516,
+ // "date": "2015-6-14","time": "22:40:51","context": "yarn","name":
+ // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0",
+ // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1",
+ // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0",
+ // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132",
+ // "AllocatedContainers": "132", "AggregateContainersAllocated": "132",
+ // "AggregateContainersReleased": "0", "AvailableMB": "0",
+ // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269",
+ // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0",
+ // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"}
+ Future future = producer.send(data);
+ jsonLines.setLength(0);
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ throw new MetricsException("Error sending data", e);
+ } catch (ExecutionException e) {
+ throw new MetricsException("Error sending data", e);
+ }
+ }
+
+ @Override
+ public void flush() {
+ LOG.debug("Kafka seems not to have any flush() mechanism!");
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Close the producer and set it to null.
+ try {
+ producer.close();
+ } catch (RuntimeException e) {
+ throw new MetricsException("Error closing producer", e);
+ } finally {
+ producer = null;
+ }
+ }
+}
diff --git a/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java b/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java
new file mode 100644
index 00000000000..3a658689066
--- /dev/null
+++ b/hadoop-tools/hadoop-kafka/src/test/java/org/apache/hadoop/metrics2/impl/TestKafkaMetrics.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricType;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+import org.apache.hadoop.metrics2.sink.KafkaSink;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * This tests that the KafkaSink properly formats the Kafka message.
+ */
+public class TestKafkaMetrics {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestKafkaMetrics.class);
+ private KafkaSink kafkaSink;
+
+ enum KafkaMetricsInfo implements MetricsInfo {
+ KafkaMetrics("Kafka related metrics etc."), KafkaCounter(
+ "Kafka counter."), KafkaTag("Kafka tag.");
+ // metrics
+
+ private final String desc;
+
+ KafkaMetricsInfo(String desc) {
+ this.desc = desc;
+ }
+
+ @Override
+ public String description() {
+ return desc;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("name", name())
+ .add("description", desc).toString();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void testPutMetrics() throws Exception {
+ // Create a record by mocking MetricsRecord class.
+ MetricsRecord record = mock(MetricsRecord.class);
+ when(record.tags()).thenReturn(Lists
+ .newArrayList(new MetricsTag(KafkaMetricsInfo.KafkaTag, "test_tag")));
+ when(record.timestamp()).thenReturn(System.currentTimeMillis());
+
+ // Create a metric using AbstractMetric class.
+ AbstractMetric metric = new AbstractMetric(KafkaMetricsInfo.KafkaCounter) {
+ @Override
+ public Number value() {
+ return new Integer(123);
+ }
+
+ @Override
+ public MetricType type() {
+ return null;
+ }
+
+ @Override
+ public void visit(MetricsVisitor visitor) {
+
+ }
+ };
+
+ // Create a list of metrics.
+ Iterable metrics = Lists.newArrayList(metric);
+ when(record.name()).thenReturn("Kafka record name");
+ when(record.metrics()).thenReturn(metrics);
+ SubsetConfiguration conf = mock(SubsetConfiguration.class);
+ when(conf.getString(KafkaSink.BROKER_LIST)).thenReturn("localhost:9092");
+ String topic = "myTestKafkaTopic";
+ when(conf.getString(KafkaSink.TOPIC)).thenReturn(topic);
+
+ // Create the KafkaSink object and initialize it.
+ kafkaSink = new KafkaSink();
+ kafkaSink.init(conf);
+
+ // Create a mock KafkaProducer as a producer for KafkaSink.
+ Producer mockProducer = mock(KafkaProducer.class);
+ kafkaSink.setProducer(mockProducer);
+
+ // Create the json object from the record.
+ StringBuilder jsonLines = recordToJson(record);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("kafka message: " + jsonLines.toString());
+ }
+
+ // Send the record and store the result in a mock Future.
+ Future f = mock(Future.class);
+ when(mockProducer.send((ProducerRecord) anyObject())).thenReturn(f);
+ kafkaSink.putMetrics(record);
+
+ // Get the argument and verity it.
+ ArgumentCaptor argument =
+ ArgumentCaptor.forClass(ProducerRecord.class);
+ verify(mockProducer).send(argument.capture());
+
+ // Compare the received data with the original one.
+ ProducerRecord data = (argument.getValue());
+ String jsonResult = new String(data.value());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("kafka result: " + jsonResult);
+ }
+ assertEquals(jsonLines.toString(), jsonResult);
+ }
+
+ StringBuilder recordToJson(MetricsRecord record) {
+ // Create a json object from a metrics record.
+ StringBuilder jsonLines = new StringBuilder();
+ Long timestamp = record.timestamp();
+ Date currDate = new Date(timestamp);
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ String date = dateFormat.format(currDate);
+ SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
+ String time = timeFormat.format(currDate);
+ String hostname = new String("null");
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (Exception e) {
+ LOG.warn("Error getting Hostname, going to continue");
+ }
+ jsonLines.append("{\"hostname\": \"" + hostname);
+ jsonLines.append("\", \"timestamp\": " + timestamp);
+ jsonLines.append(", \"date\": \"" + date);
+ jsonLines.append("\",\"time\": \"" + time);
+ jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
+ for (MetricsTag tag : record.tags()) {
+ jsonLines.append(
+ ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
+ jsonLines.append(" \"" + tag.value().toString() + "\"");
+ }
+ for (AbstractMetric m : record.metrics()) {
+ jsonLines.append(
+ ", \"" + m.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
+ jsonLines.append(" \"" + m.value().toString() + "\"");
+ }
+ jsonLines.append("}");
+ return jsonLines;
+ }
+}
diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml
index 0061bf079f0..9f7e590454e 100644
--- a/hadoop-tools/pom.xml
+++ b/hadoop-tools/pom.xml
@@ -46,6 +46,7 @@
hadoop-sls
hadoop-azure
hadoop-aws
+ hadoop-kafka