HADOOP-10949. metrics2 sink plugin for Apache Kafka (Babak Behzad via aw)

This commit is contained in:
Allen Wittenauer 2015-10-05 14:02:07 -07:00
parent 2fc2b50534
commit b59e434116
6 changed files with 511 additions and 1 deletions

View File

@ -56,6 +56,8 @@ Trunk (Unreleased)
HADOOP-10854. unit tests for the shell scripts (aw)
HADOOP-10949. metrics2 sink plugin for Apache Kafka (Babak Behzad via aw)
IMPROVEMENTS
HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes

View File

@ -45,8 +45,9 @@
<!-- Version number for xerces used by JDiff -->
<xerces.jdiff.version>2.11.0</xerces.jdiff.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<kafka.version>0.8.2.1</kafka.version>
<hadoop.assemblies.version>${project.version}</hadoop.assemblies.version>
<commons-daemon.version>1.0.13</commons-daemon.version>
<test.build.dir>${project.build.directory}/test-dir</test.build.dir>

View File

@ -0,0 +1,129 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-kafka</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>Apache Hadoop Kafka Library support</name>
<description>
This module contains code to support integration with Kafka.
It also declares the dependencies needed to work with Kafka.
</description>
<packaging>jar</packaging>
<properties>
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
</properties>
<profiles>
<profile>
<id>tests-off</id>
<activation>
<file>
<missing>src/test/resources/auth-keys.xml</missing>
</file>
</activation>
<properties>
<maven.test.skip>true</maven.test.skip>
</properties>
</profile>
<profile>
<id>tests-on</id>
<activation>
<file>
<exists>src/test/resources/auth-keys.xml</exists>
</file>
</activation>
<properties>
<maven.test.skip>false</maven.test.skip>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput>
<excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
</excludeFilterFile>
<effort>Max</effort>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<configuration>
<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- see ../../hadoop-project/pom.xml for versions -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink;
import com.google.common.base.Strings;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* A metrics sink that writes to a Kafka broker. This requires you to configure
* a broker_list and a topic in the metrics2 configuration file. The broker_list
* must contain a comma-separated list of kafka broker host and ports. The topic
* will contain only one topic.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KafkaSink implements MetricsSink, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
public static final String BROKER_LIST = "broker_list";
public static final String TOPIC = "topic";
private String hostname = null;
private String brokerList = null;
private String topic = null;
private Producer<Integer, byte[]> producer = null;
public void setProducer(Producer<Integer, byte[]> p) {
this.producer = p;
}
@Override
public void init(SubsetConfiguration conf) {
// Get Kafka broker configuration.
Properties props = new Properties();
brokerList = conf.getString(BROKER_LIST);
if (LOG.isDebugEnabled()) {
LOG.debug("Broker list " + brokerList);
}
props.put("bootstrap.servers", brokerList);
if (LOG.isDebugEnabled()) {
LOG.debug("Kafka brokers: " + brokerList);
}
// Get Kafka topic configuration.
topic = conf.getString(TOPIC);
if (LOG.isDebugEnabled()) {
LOG.debug("Kafka topic " + topic);
}
if (Strings.isNullOrEmpty(topic)) {
throw new MetricsException("Kafka topic can not be null");
}
// Set the rest of Kafka configuration.
props.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("request.required.acks", "0");
// Set the hostname once and use it in every message.
hostname = "null";
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOG.warn("Error getting Hostname, going to continue");
}
try {
// Create the producer object.
producer = new KafkaProducer<Integer, byte[]>(props);
} catch (Exception e) {
throw new MetricsException("Error creating Producer, " + brokerList, e);
}
}
@Override
public void putMetrics(MetricsRecord record) {
if (producer == null) {
throw new MetricsException("Producer in KafkaSink is null!");
}
// Create the json object.
StringBuilder jsonLines = new StringBuilder();
Long timestamp = record.timestamp();
Date currDate = new Date(timestamp);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
String date = dateFormat.format(currDate);
String time = timeFormat.format(currDate);
// Collect datapoints and populate the json object.
jsonLines.append("{\"hostname\": \"" + hostname);
jsonLines.append("\", \"timestamp\": " + timestamp);
jsonLines.append(", \"date\": \"" + date);
jsonLines.append("\",\"time\": \"" + time);
jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
for (MetricsTag tag : record.tags()) {
jsonLines.append(
", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + tag.value().toString() + "\"");
}
for (AbstractMetric metric : record.metrics()) {
jsonLines.append(", \""
+ metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + metric.value().toString() + "\"");
}
jsonLines.append("}");
LOG.debug("kafka message: " + jsonLines.toString());
// Create the record to be sent from the json.
ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>(
topic, jsonLines.toString().getBytes(Charset.forName("UTF-8")));
// Send the data to the Kafka broker. Here is an example of this data:
// {"hostname": "...", "timestamp": 1436913651516,
// "date": "2015-6-14","time": "22:40:51","context": "yarn","name":
// "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0",
// "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1",
// "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0",
// "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132",
// "AllocatedContainers": "132", "AggregateContainersAllocated": "132",
// "AggregateContainersReleased": "0", "AvailableMB": "0",
// "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269",
// "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0",
// "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"}
Future<RecordMetadata> future = producer.send(data);
jsonLines.setLength(0);
try {
future.get();
} catch (InterruptedException e) {
throw new MetricsException("Error sending data", e);
} catch (ExecutionException e) {
throw new MetricsException("Error sending data", e);
}
}
@Override
public void flush() {
LOG.debug("Kafka seems not to have any flush() mechanism!");
}
@Override
public void close() throws IOException {
// Close the producer and set it to null.
try {
producer.close();
} catch (RuntimeException e) {
throw new MetricsException("Error closing producer", e);
} finally {
producer = null;
}
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.impl;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricType;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.MetricsVisitor;
import org.apache.hadoop.metrics2.sink.KafkaSink;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* This tests that the KafkaSink properly formats the Kafka message.
*/
public class TestKafkaMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(TestKafkaMetrics.class);
private KafkaSink kafkaSink;
enum KafkaMetricsInfo implements MetricsInfo {
KafkaMetrics("Kafka related metrics etc."), KafkaCounter(
"Kafka counter."), KafkaTag("Kafka tag.");
// metrics
private final String desc;
KafkaMetricsInfo(String desc) {
this.desc = desc;
}
@Override
public String description() {
return desc;
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("name", name())
.add("description", desc).toString();
}
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPutMetrics() throws Exception {
// Create a record by mocking MetricsRecord class.
MetricsRecord record = mock(MetricsRecord.class);
when(record.tags()).thenReturn(Lists
.newArrayList(new MetricsTag(KafkaMetricsInfo.KafkaTag, "test_tag")));
when(record.timestamp()).thenReturn(System.currentTimeMillis());
// Create a metric using AbstractMetric class.
AbstractMetric metric = new AbstractMetric(KafkaMetricsInfo.KafkaCounter) {
@Override
public Number value() {
return new Integer(123);
}
@Override
public MetricType type() {
return null;
}
@Override
public void visit(MetricsVisitor visitor) {
}
};
// Create a list of metrics.
Iterable<AbstractMetric> metrics = Lists.newArrayList(metric);
when(record.name()).thenReturn("Kafka record name");
when(record.metrics()).thenReturn(metrics);
SubsetConfiguration conf = mock(SubsetConfiguration.class);
when(conf.getString(KafkaSink.BROKER_LIST)).thenReturn("localhost:9092");
String topic = "myTestKafkaTopic";
when(conf.getString(KafkaSink.TOPIC)).thenReturn(topic);
// Create the KafkaSink object and initialize it.
kafkaSink = new KafkaSink();
kafkaSink.init(conf);
// Create a mock KafkaProducer as a producer for KafkaSink.
Producer<Integer, byte[]> mockProducer = mock(KafkaProducer.class);
kafkaSink.setProducer(mockProducer);
// Create the json object from the record.
StringBuilder jsonLines = recordToJson(record);
if (LOG.isDebugEnabled()) {
LOG.debug("kafka message: " + jsonLines.toString());
}
// Send the record and store the result in a mock Future.
Future<RecordMetadata> f = mock(Future.class);
when(mockProducer.send((ProducerRecord) anyObject())).thenReturn(f);
kafkaSink.putMetrics(record);
// Get the argument and verity it.
ArgumentCaptor<ProducerRecord> argument =
ArgumentCaptor.forClass(ProducerRecord.class);
verify(mockProducer).send(argument.capture());
// Compare the received data with the original one.
ProducerRecord<Integer, byte[]> data = (argument.getValue());
String jsonResult = new String(data.value());
if (LOG.isDebugEnabled()) {
LOG.debug("kafka result: " + jsonResult);
}
assertEquals(jsonLines.toString(), jsonResult);
}
StringBuilder recordToJson(MetricsRecord record) {
// Create a json object from a metrics record.
StringBuilder jsonLines = new StringBuilder();
Long timestamp = record.timestamp();
Date currDate = new Date(timestamp);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String date = dateFormat.format(currDate);
SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
String time = timeFormat.format(currDate);
String hostname = new String("null");
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOG.warn("Error getting Hostname, going to continue");
}
jsonLines.append("{\"hostname\": \"" + hostname);
jsonLines.append("\", \"timestamp\": " + timestamp);
jsonLines.append(", \"date\": \"" + date);
jsonLines.append("\",\"time\": \"" + time);
jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
for (MetricsTag tag : record.tags()) {
jsonLines.append(
", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + tag.value().toString() + "\"");
}
for (AbstractMetric m : record.metrics()) {
jsonLines.append(
", \"" + m.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
jsonLines.append(" \"" + m.value().toString() + "\"");
}
jsonLines.append("}");
return jsonLines;
}
}

View File

@ -46,6 +46,7 @@
<module>hadoop-sls</module>
<module>hadoop-azure</module>
<module>hadoop-aws</module>
<module>hadoop-kafka</module>
</modules>
<build>