Add ingest/input/bytes metric and Kafka consumer metrics. (#14582)

* Add ingest/input/bytes metric and Kafka consumer metrics.

New metrics:

1) ingest/input/bytes. Equivalent to processedBytes in the task reports.

2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
   metric "bytes-consumed-total". Only emitted for Kafka tasks.

3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
   metric "records-consumed-total". Only emitted for Kafka tasks.

* Fix anchor.

* Fix KafkaConsumerMonitor.

* Interface updates.

* Doc changes.

* Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java

Co-authored-by: Benedict Jin <asdf2014@apache.org>

---------

Co-authored-by: Benedict Jin <asdf2014@apache.org>
This commit is contained in:
Gian Merlino 2023-07-19 19:56:22 -07:00 committed by GitHub
parent f7348d7389
commit bac5ef347c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 270 additions and 102 deletions

View File

@ -210,10 +210,12 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/events/processedWithError`|Number of events processed with some partial errors per emission period. Events processed with partial errors are counted towards both this metric and `ingest/events/processed`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/thrownAway`|Number of events rejected because they are null, or filtered by `transformSpec`, or outside one of `lateMessageRejectionPeriod`, `earlyMessageRejectionPeriod`, or `windowPeriod`.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.|
|`ingest/rows/output`|Number of Druid rows persisted.|`dataSource`, `taskId`, `taskType`, `groupId`|Your number of events with rollup.|
|`ingest/persists/count`|Number of times persist occurred.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration.|
|`ingest/persists/time`|Milliseconds spent doing intermediate persist.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on configuration. Generally a few minutes at most.|

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.kafka;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaConsumerMonitor extends AbstractMonitor
{
private volatile boolean stopAfterNext = false;
// Kafka metric name -> Druid metric name
private static final Map<String, String> METRICS =
ImmutableMap.<String, String>builder()
.put("bytes-consumed-total", "kafka/consumer/bytesConsumed")
.put("records-consumed-total", "kafka/consumer/recordsConsumed")
.build();
private static final String TOPIC_TAG = "topic";
private static final Set<String> TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG);
private final KafkaConsumer<?, ?> consumer;
private final Map<String, AtomicLong> counters = new HashMap<>();
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
{
this.consumer = consumer;
}
@Override
public boolean doMonitor(final ServiceEmitter emitter)
{
for (final Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
final MetricName metricName = entry.getKey();
if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) {
final String topic = metricName.tags().get(TOPIC_TAG);
final long newValue = ((Number) entry.getValue().metricValue()).longValue();
final long priorValue =
counters.computeIfAbsent(metricName.name(), ignored -> new AtomicLong())
.getAndSet(newValue);
if (newValue != priorValue) {
final ServiceMetricEvent.Builder builder =
new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic);
emitter.emit(builder.build(METRICS.get(metricName.name()), newValue - priorValue));
}
}
}
return !stopAfterNext;
}
public void stopAfterNextEmit()
{
stopAfterNext = true;
}
private static boolean isTopicMetric(final MetricName metricName)
{
// Certain metrics are emitted both as grand totals and broken down by topic; we want to ignore the grand total and
// only look at the per-topic metrics. See https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
return TOPIC_METRIC_TAGS.equals(metricName.tags().keySet())
&& !Strings.isNullOrEmpty(metricName.tags().get(TOPIC_TAG));
}
}

View File

@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@ -97,7 +98,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
}
@Override
protected KafkaRecordSupplier newTaskRecordSupplier()
protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
@ -107,7 +108,14 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, Kafka
props.put("auto.offset.reset", "none");
return new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());
final KafkaRecordSupplier recordSupplier =
new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides());
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor());
}
return recordSupplier;
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);

View File

@ -33,6 +33,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -60,6 +61,7 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
private final KafkaConsumerMonitor monitor;
private boolean closed;
public KafkaRecordSupplier(
@ -77,6 +79,7 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
)
{
this.consumer = consumer;
this.monitor = new KafkaConsumerMonitor(consumer);
}
@Override
@ -190,6 +193,14 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
});
}
/**
* Returns a Monitor that emits Kafka consumer metrics.
*/
public Monitor monitor()
{
return monitor;
}
@Override
public void close()
{
@ -197,6 +208,8 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaR
return;
}
closed = true;
monitor.stopAfterNextEmit();
consumer.close();
}

View File

@ -76,7 +76,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -152,7 +151,6 @@ import java.util.stream.Stream;
@RunWith(Parameterized.class)
public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final long POLL_RETRY_MS = 100;
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new Header()
{

View File

@ -30,6 +30,8 @@ import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.druid.segment.TestHelper;
@ -59,15 +61,15 @@ import java.util.stream.Collectors;
public class KafkaRecordSupplierTest
{
private static String topic = "topic";
private static String additonal_parameter = "additional.parameter";
private static long poll_timeout_millis = 1000;
private static int pollRetry = 5;
private static int topicPosFix = 0;
private static final String ADDITIONAL_PARAMETER = "additional.parameter";
private static final long POLL_TIMEOUT_MILLIS = 1000;
private static final int POLL_RETRY = 5;
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
private static String TOPIC = "topic";
private static int TOPIC_POS_FIX = 0;
private static TestingCluster ZK_SERVER;
private static TestBroker KAFKA_SERVER;
private List<ProducerRecord<byte[], byte[]>> records;
@ -112,9 +114,9 @@ public class KafkaRecordSupplierTest
}
}
private static String getTopicName()
private static String nextTopicName()
{
return "topic-" + topicPosFix++;
return "topic-" + TOPIC_POS_FIX++;
}
private List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> createOrderedPartitionableRecords()
@ -129,7 +131,7 @@ public class KafkaRecordSupplierTest
partitionToOffset.put(r.partition(), 1L);
}
return new OrderedPartitionableRecord<>(
topic,
TOPIC,
r.partition(),
offset,
r.value() == null ? null : Collections.singletonList(new KafkaRecordEntity(
@ -187,34 +189,34 @@ public class KafkaRecordSupplierTest
@BeforeClass
public static void setupClass() throws Exception
{
zkServer = new TestingCluster(1);
zkServer.start();
ZK_SERVER = new TestingCluster(1);
ZK_SERVER.start();
kafkaServer = new TestBroker(
zkServer.getConnectString(),
KAFKA_SERVER = new TestBroker(
ZK_SERVER.getConnectString(),
null,
1,
ImmutableMap.of("num.partitions", "2")
);
kafkaServer.start();
KAFKA_SERVER.start();
}
@Before
public void setupTest()
{
topic = getTopicName();
records = generateRecords(topic);
TOPIC = nextTopicName();
records = generateRecords(TOPIC);
}
@AfterClass
public static void tearDownClass() throws Exception
{
kafkaServer.close();
kafkaServer = null;
KAFKA_SERVER.close();
KAFKA_SERVER = null;
zkServer.stop();
zkServer = null;
ZK_SERVER.stop();
ZK_SERVER = null;
}
@Test
@ -225,19 +227,19 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
Assert.assertTrue(recordSupplier.getAssignment().isEmpty());
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic));
Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
}
@ -250,11 +252,11 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
Map<String, Object> properties = kafkaServer.consumerProperties();
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
@ -269,7 +271,7 @@ public class KafkaRecordSupplierTest
recordSupplier.assign(partitions);
Assert.assertEquals(partitions, recordSupplier.getAssignment());
Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(topic));
Assert.assertEquals(ImmutableSet.of(0, 1), recordSupplier.getPartitionIds(TOPIC));
recordSupplier.close();
}
@ -279,10 +281,10 @@ public class KafkaRecordSupplierTest
public void testSupplierSetupCustomDeserializerRequiresParameter()
{
Map<String, Object> properties = kafkaServer.consumerProperties();
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put(additonal_parameter, "stringValue");
properties.put(ADDITIONAL_PARAMETER, "stringValue");
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
properties,
@ -298,7 +300,7 @@ public class KafkaRecordSupplierTest
public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt()
{
Map<String, Object> properties = kafkaServer.consumerProperties();
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializerRequiresParameter.class.getName());
@ -320,11 +322,11 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
Map<String, Object> properties = kafkaServer.consumerProperties();
Map<String, Object> properties = KAFKA_SERVER.consumerProperties();
properties.put("key.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
properties.put("value.deserializer", KafkaRecordSupplierTest.TestKafkaDeserializer.class.getName());
@ -339,9 +341,10 @@ public class KafkaRecordSupplierTest
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(
POLL_TIMEOUT_MILLIS);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < POLL_RETRY; i++) {
polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@ -360,24 +363,27 @@ public class KafkaRecordSupplierTest
insertData();
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(),
KAFKA_SERVER.consumerProperties(),
OBJECT_MAPPER,
null
);
final Monitor monitor = recordSupplier.monitor();
monitor.start();
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords =
recordSupplier.poll(POLL_TIMEOUT_MILLIS);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < POLL_RETRY; i++) {
polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@ -385,7 +391,14 @@ public class KafkaRecordSupplierTest
Assert.assertEquals(initialRecords.size(), polledRecords.size());
Assert.assertTrue(initialRecords.containsAll(polledRecords));
// Verify metrics
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
Assert.assertTrue(monitor.monitor(emitter));
emitter.verifyEmitted("kafka/consumer/bytesConsumed", 1);
emitter.verifyEmitted("kafka/consumer/recordsConsumed", 1);
recordSupplier.close();
Assert.assertFalse(monitor.monitor(emitter));
}
@ -393,7 +406,7 @@ public class KafkaRecordSupplierTest
public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionException
{
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records.subList(0, 13)) {
@ -403,25 +416,26 @@ public class KafkaRecordSupplierTest
}
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != 13 && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(
POLL_TIMEOUT_MILLIS);
for (int i = 0; polledRecords.size() != 13 && i < POLL_RETRY; i++) {
polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records.subList(13, 15)) {
@ -431,8 +445,8 @@ public class KafkaRecordSupplierTest
}
for (int i = 0; polledRecords.size() != records.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
for (int i = 0; polledRecords.size() != records.size() && i < POLL_RETRY; i++) {
polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@ -471,16 +485,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -493,9 +507,10 @@ public class KafkaRecordSupplierTest
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> initialRecords = createOrderedPartitionableRecords();
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != 11 && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(
POLL_TIMEOUT_MILLIS);
for (int i = 0; polledRecords.size() != 11 && i < POLL_RETRY; i++) {
polledRecords.addAll(recordSupplier.poll(POLL_TIMEOUT_MILLIS));
Thread.sleep(200);
}
@ -514,16 +529,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -532,7 +547,8 @@ public class KafkaRecordSupplierTest
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
recordSupplier.seekToLatest(partitions);
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(poll_timeout_millis);
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = recordSupplier.poll(
POLL_TIMEOUT_MILLIS);
Assert.assertEquals(Collections.emptyList(), polledRecords);
recordSupplier.close();
@ -542,21 +558,21 @@ public class KafkaRecordSupplierTest
public void testSeekUnassigned() throws InterruptedException, ExecutionException
{
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = KAFKA_SERVER.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0)
StreamPartition.of(TOPIC, 0)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
@ -573,16 +589,16 @@ public class KafkaRecordSupplierTest
// Insert data
insertData();
StreamPartition<Integer> partition0 = StreamPartition.of(topic, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(topic, 1);
StreamPartition<Integer> partition0 = StreamPartition.of(TOPIC, 0);
StreamPartition<Integer> partition1 = StreamPartition.of(TOPIC, 1);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(
StreamPartition.of(topic, 0),
StreamPartition.of(topic, 1)
StreamPartition.of(TOPIC, 0),
StreamPartition.of(TOPIC, 1)
);
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -617,8 +633,8 @@ public class KafkaRecordSupplierTest
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -629,8 +645,8 @@ public class KafkaRecordSupplierTest
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
@ -641,8 +657,8 @@ public class KafkaRecordSupplierTest
public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
@ -653,8 +669,8 @@ public class KafkaRecordSupplierTest
public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull()
{
KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(
kafkaServer.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(topic, 0);
KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null);
StreamPartition<Integer> streamPartition = StreamPartition.of(TOPIC, 0);
Set<StreamPartition<Integer>> partitions = ImmutableSet.of(streamPartition);
recordSupplier.assign(partitions);
recordSupplier.seekToLatest(partitions);
@ -693,7 +709,7 @@ public class KafkaRecordSupplierTest
{
KafkaConsumer<byte[], byte[]> kafkaConsumer = KafkaRecordSupplier.getKafkaConsumer(
OBJECT_MAPPER,
kafkaServer.consumerProperties(),
KAFKA_SERVER.consumerProperties(),
originalConsumerProperties -> {
final Map<String, Object> newMap = new HashMap<>(originalConsumerProperties);
newMap.put("client.id", "overrideConfigTest");
@ -711,7 +727,7 @@ public class KafkaRecordSupplierTest
private void insertData() throws ExecutionException, InterruptedException
{
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
try (final KafkaProducer<byte[], byte[]> kafkaProducer = KAFKA_SERVER.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records) {

View File

@ -100,7 +100,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
}
@Override
protected KinesisRecordSupplier newTaskRecordSupplier()
protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
throws RuntimeException
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig);

View File

@ -45,6 +45,7 @@ import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.ParseExceptionReport;
@ -2485,7 +2486,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
}
@Override
protected KinesisRecordSupplier newTaskRecordSupplier()
protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox)
{
return localSupplier == null ? recordSupplier : localSupplier;
}

View File

@ -108,6 +108,12 @@ public class TaskRealtimeMetricsMonitor extends AbstractMonitor
log.warn("[%,d] duplicate events!", dedup);
}
emitter.emit(builder.build("ingest/events/duplicate", dedup));
emitter.emit(
builder.build(
"ingest/input/bytes",
rowIngestionMetersTotals.getProcessedBytes() - previousRowIngestionMetersTotals.getProcessedBytes()
)
);
emitter.emit(builder.build("ingest/rows/output", metrics.rowOutput() - previousFireDepartmentMetrics.rowOutput()));
emitter.emit(builder.build("ingest/persists/count", metrics.numPersists() - previousFireDepartmentMetrics.numPersists()));

View File

@ -268,7 +268,27 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
protected abstract SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType> createTaskRunner();
protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier();
/**
* Deprecated method for providing the {@link RecordSupplier} that connects with the stream. New extensions should
* override {@link #newTaskRecordSupplier(TaskToolbox)} instead.
*/
@Deprecated
protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier()
{
throw new UnsupportedOperationException();
}
/**
* Subclasses must override this method to provide the {@link RecordSupplier} that connects with the stream.
*
* The default implementation delegates to {@link #newTaskRecordSupplier()}, which is deprecated, in order to support
* existing extensions that have implemented that older method instead of this newer one. New extensions should
* override this method, not {@link #newTaskRecordSupplier()}.
*/
protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return newTaskRecordSupplier();
}
@VisibleForTesting
public Appenderator getAppenderator()

View File

@ -416,8 +416,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;
try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier = task.newTaskRecordSupplier()) {
try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier =
task.newTaskRecordSupplier(toolbox)) {
if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);

View File

@ -367,7 +367,7 @@ public class SeekableStreamIndexTaskRunnerAuthTest
}
@Override
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier()
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return null;
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@ -1297,7 +1298,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Override
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier()
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return recordSupplier;
}

View File

@ -37,11 +37,20 @@ public interface RowIngestionMeters extends InputStats
String DETERMINE_PARTITIONS = "determinePartitions";
String PROCESSED = "processed";
String PROCESSED_BYTES = "processedBytes";
String PROCESSED_WITH_ERROR = "processedWithError";
String UNPARSEABLE = "unparseable";
String THROWN_AWAY = "thrownAway";
/**
* Number of bytes read by an ingestion task.
*
* Note: processedBytes is a misleading name; this generally measures size when data is initially read or fetched,
* not when it is processed by the ingest task. It's measuring a stage somewhat earlier in the pipeline. In other
* words, "processed" and "processedBytes" do not use the same definition of "process". A better name might be
* "bytesRead" or "inputBytes", although if we change it, we must consider compatibility with existing readers.
*/
String PROCESSED_BYTES = "processedBytes";
long getProcessed();
void incrementProcessed();