HADOOP-13235. Use Date and Time API in KafkaSink.

This commit is contained in:
Akira Ajisaka 2016-06-29 07:40:12 -07:00
parent 111739df8f
commit 8d202f1258
1 changed files with 15 additions and 8 deletions

View File

@ -38,8 +38,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -62,6 +64,12 @@ public class KafkaSink implements MetricsSink, Closeable {
private String topic = null;
private Producer<Integer, byte[]> producer = null;
private final DateTimeFormatter dateFormat =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
private final DateTimeFormatter timeFormat =
DateTimeFormatter.ofPattern("HH:mm:ss");
private final ZoneId zoneId = ZoneId.systemDefault();
public void setProducer(Producer<Integer, byte[]> p) {
this.producer = p;
}
@ -121,12 +129,11 @@ public class KafkaSink implements MetricsSink, Closeable {
// Create the json object.
StringBuilder jsonLines = new StringBuilder();
Long timestamp = record.timestamp();
Date currDate = new Date(timestamp);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
String date = dateFormat.format(currDate);
String time = timeFormat.format(currDate);
long timestamp = record.timestamp();
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime ldt = LocalDateTime.ofInstant(instant, zoneId);
String date = ldt.format(dateFormat);
String time = ldt.format(timeFormat);
// Collect datapoints and populate the json object.
jsonLines.append("{\"hostname\": \"" + hostname);