add control and status endpoints to KafkaIndexTask (#2730)

This commit is contained in:
David Lim 2016-04-21 16:34:59 -06:00 committed by Gian Merlino
parent 984a518c9f
commit 7641f2628f
4 changed files with 685 additions and 158 deletions

View File

@ -29,12 +29,14 @@ import java.util.Map;
public class KafkaIOConfig implements IOConfig public class KafkaIOConfig implements IOConfig
{ {
private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private final String baseSequenceName; private final String baseSequenceName;
private final KafkaPartitions startPartitions; private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions; private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties; private final Map<String, String> consumerProperties;
private final boolean useTransaction; private final boolean useTransaction;
private final boolean pauseAfterRead;
@JsonCreator @JsonCreator
public KafkaIOConfig( public KafkaIOConfig(
@ -42,7 +44,8 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties, @JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction @JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead
) )
{ {
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
@ -50,6 +53,7 @@ public class KafkaIOConfig implements IOConfig
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
Preconditions.checkArgument( Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()), startPartitions.getTopic().equals(endPartitions.getTopic()),
@ -101,6 +105,12 @@ public class KafkaIOConfig implements IOConfig
return useTransaction; return useTransaction;
} }
@JsonProperty
public boolean isPauseAfterRead()
{
return pauseAfterRead;
}
@Override @Override
public String toString() public String toString()
{ {
@ -110,6 +120,7 @@ public class KafkaIOConfig implements IOConfig
", endPartitions=" + endPartitions + ", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties + ", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction + ", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
'}'; '}';
} }
} }

View File

@ -19,15 +19,19 @@
package io.druid.indexing.kafka; package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -66,6 +70,8 @@ import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -73,7 +79,17 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -82,9 +98,25 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class KafkaIndexTask extends AbstractTask public class KafkaIndexTask extends AbstractTask implements ChatHandler
{ {
public static final long PAUSE_FOREVER = -1L;
public enum Status
{
NOT_STARTED,
STARTING,
READING,
PAUSED,
PUBLISHING
}
private static final Logger log = new Logger(KafkaIndexTask.class); private static final Logger log = new Logger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka"; private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
@ -95,13 +127,42 @@ public class KafkaIndexTask extends AbstractTask
private final InputRowParser<ByteBuffer> parser; private final InputRowParser<ByteBuffer> parser;
private final KafkaTuningConfig tuningConfig; private final KafkaTuningConfig tuningConfig;
private final KafkaIOConfig ioConfig; private final KafkaIOConfig ioConfig;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private ObjectMapper mapper;
private volatile Appenderator appenderator = null; private volatile Appenderator appenderator = null;
private volatile FireDepartmentMetrics fireDepartmentMetrics = null; private volatile FireDepartmentMetrics fireDepartmentMetrics = null;
private volatile boolean startedReading = false; private volatile DateTime startTime;
private volatile boolean stopping = false; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
private volatile boolean publishing = false;
private volatile Thread runThread = null; private volatile Thread runThread = null;
private volatile boolean stopRequested = false;
private volatile boolean publishOnStop = false;
// The pause lock and associated conditions are to support coordination between the Jetty threads and the main
// ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
// the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The
// fields are used as follows (every step requires acquiring [pauseLock]):
// Pausing:
// - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the
// condition checked when [hasPaused] is signalled.
// - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED,
// [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by
// the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled.
// Resuming:
// - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to
// change to something other than PAUSED, with the condition checked when [shouldResume] is signalled.
// - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends,
// [status] is changed to STARTING and [shouldResume] is signalled.
private final Lock pauseLock = new ReentrantLock();
private final Condition hasPaused = pauseLock.newCondition();
private final Condition shouldResume = pauseLock.newCondition();
private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0;
@JsonCreator @JsonCreator
public KafkaIndexTask( public KafkaIndexTask(
@ -110,7 +171,8 @@ public class KafkaIndexTask extends AbstractTask
@JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig, @JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaIOConfig ioConfig, @JsonProperty("ioConfig") KafkaIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context @JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider
) )
{ {
super( super(
@ -125,6 +187,9 @@ public class KafkaIndexTask extends AbstractTask
this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser"); this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
} }
private static String makeTaskId(String dataSource, int randomBits) private static String makeTaskId(String dataSource, int randomBits)
@ -166,19 +231,20 @@ public class KafkaIndexTask extends AbstractTask
return ioConfig; return ioConfig;
} }
/**
* Public for tests.
*/
@JsonIgnore
public boolean hasStartedReading()
{
return startedReading;
}
@Override @Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception public TaskStatus run(final TaskToolbox toolbox) throws Exception
{ {
log.info("Starting up!"); log.info("Starting up!");
startTime = DateTime.now();
mapper = toolbox.getObjectMapper();
status = Status.STARTING;
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this);
} else {
log.warn("No chat handler detected");
}
runThread = Thread.currentThread(); runThread = Thread.currentThread();
@ -207,7 +273,6 @@ public class KafkaIndexTask extends AbstractTask
// Start up, set up initial offsets. // Start up, set up initial offsets.
final Object restoredMetadata = driver.startJob(); final Object restoredMetadata = driver.startJob();
final Map<Integer, Long> nextOffsets = Maps.newHashMap();
if (restoredMetadata == null) { if (restoredMetadata == null) {
nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap()); nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap());
} else { } else {
@ -272,39 +337,26 @@ public class KafkaIndexTask extends AbstractTask
} }
}; };
// Initialize consumer assignment. Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic);
final Set<Integer> assignment = Sets.newHashSet();
for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(entry.getKey());
if (entry.getValue() < endOffset) {
assignment.add(entry.getKey());
} else if (entry.getValue() == endOffset) {
log.info("Finished reading partition[%d].", entry.getKey());
} else {
throw new ISE(
"WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
entry.getValue(),
endOffset
);
}
}
assignPartitions(consumer, topic, assignment);
// Seek to starting offsets.
for (final int partition : assignment) {
final long offset = nextOffsets.get(partition);
log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
consumer.seek(new TopicPartition(topic, partition), offset);
}
// Main loop. // Main loop.
// Could eventually support early termination (triggered by a supervisor)
// Could eventually support leader/follower mode (for keeping replicas more in sync) // Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = !assignment.isEmpty(); boolean stillReading = !assignment.isEmpty();
try {
while (stillReading) { while (stillReading) {
if (stopping) { if (possiblyPause(assignment)) {
log.info("Stopping early."); // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);
if (assignment.isEmpty()) {
log.info("All partitions have been fully read");
publishOnStop = true;
stopRequested = true;
}
}
if (stopRequested) {
break; break;
} }
@ -321,7 +373,7 @@ public class KafkaIndexTask extends AbstractTask
return consumer.poll(POLL_TIMEOUT); return consumer.poll(POLL_TIMEOUT);
} }
finally { finally {
startedReading = true; status = Status.READING;
} }
} }
}, },
@ -346,7 +398,7 @@ public class KafkaIndexTask extends AbstractTask
); );
} }
if (record.offset() < ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition())) { if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) {
throw new ISE( throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
@ -388,37 +440,39 @@ public class KafkaIndexTask extends AbstractTask
} }
} }
final long nextOffset = record.offset() + 1; nextOffsets.put(record.partition(), record.offset() + 1);
final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition()); }
nextOffsets.put(record.partition(), nextOffset); if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition()))
&& assignment.remove(record.partition())) {
if (nextOffset == endOffset && assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment); assignPartitions(consumer, topic, assignment);
stillReading = !assignment.isEmpty(); stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
} }
} }
} }
} }
finally {
driver.persist(committerSupplier.get()); // persist pending data
}
// Persist pending data. if (stopRequested && !publishOnStop) {
final Committer finalCommitter = committerSupplier.get(); throw new InterruptedException("Stopping without publishing");
driver.persist(finalCommitter);
publishing = true;
if (stopping) {
// Stopped gracefully. Exit code shouldn't matter, so fail to be on the safe side.
return TaskStatus.failure(getId());
} }
status = Status.PUBLISHING;
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{ {
@Override @Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{ {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);
// Sanity check, we should only be publishing things that match our desired end state. // Sanity check, we should only be publishing things that match our desired end state.
if (!ioConfig.getEndPartitions().equals(((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS))) { if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
} }
@ -428,7 +482,7 @@ public class KafkaIndexTask extends AbstractTask
action = new SegmentInsertAction( action = new SegmentInsertAction(
segments, segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(ioConfig.getEndPartitions()) new KafkaDataSourceMetadata(finalPartitions)
); );
} else { } else {
action = new SegmentInsertAction(segments, null, null); action = new SegmentInsertAction(segments, null, null);
@ -463,6 +517,15 @@ public class KafkaIndexTask extends AbstractTask
); );
} }
} }
catch (InterruptedException e) {
// if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
if (!stopRequested) {
Thread.currentThread().interrupt();
throw e;
}
log.info("The task was asked to stop before completing");
}
return success(); return success();
} }
@ -473,14 +536,15 @@ public class KafkaIndexTask extends AbstractTask
return true; return true;
} }
@POST
@Path("/stop")
@Override @Override
public void stopGracefully() public void stopGracefully()
{ {
log.info("Stopping gracefully."); log.info("Stopping gracefully.");
stopRequested = true;
stopping = true; if (runThread.isAlive()) {
if (publishing && runThread.isAlive()) { log.info("Interrupting run thread (status: [%s])", status);
log.info("stopGracefully: Run thread started publishing, interrupting it.");
runThread.interrupt(); runThread.interrupt();
} }
} }
@ -503,12 +567,183 @@ public class KafkaIndexTask extends AbstractTask
}; };
} }
@GET
@Path("/status")
@Produces(MediaType.APPLICATION_JSON)
public Status getStatus()
{
return status;
}
@GET
@Path("/offsets/current")
@Produces(MediaType.APPLICATION_JSON)
public Map<Integer, Long> getCurrentOffsets()
{
return nextOffsets;
}
@GET
@Path("/offsets/end")
@Produces(MediaType.APPLICATION_JSON)
public Map<Integer, Long> getEndOffsets()
{
return endOffsets;
}
@POST
@Path("/offsets/end")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response setEndOffsets(
Map<Integer, Long> offsets,
@QueryParam("resume") @DefaultValue("false") final boolean resume
) throws InterruptedException
{
if (offsets == null) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("Request body must contain a map of { partition:endOffset }")
.build();
} else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
String.format(
"Request contains partitions not being handled by this task, my partitions: %s",
endOffsets.keySet()
)
)
.build();
}
pauseLock.lockInterruptibly();
try {
if (!isPaused()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity("Task must be paused before changing the end offsets")
.build();
}
for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
String.format(
"End offset must be >= current offset for partition [%s] (current: %s)",
entry.getKey(),
nextOffsets.get(entry.getKey())
)
)
.build();
}
}
endOffsets.putAll(offsets);
log.info("endOffsets changed to %s", endOffsets);
}
finally {
pauseLock.unlock();
}
if (resume) {
resume();
}
return Response.ok(endOffsets).build();
}
/**
* Signals the ingestion loop to pause.
*
* @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
*
* @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
* method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
* in the response body if the task successfully paused
*/
@POST
@Path("/pause")
@Produces(MediaType.APPLICATION_JSON)
public Response pause(@QueryParam("timeout") @DefaultValue("0") final long timeout)
throws InterruptedException
{
if (!(status == Status.PAUSED || status == Status.READING)) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(String.format("Can't pause, task is not in a pausable state (state: [%s])", status))
.build();
}
pauseLock.lockInterruptibly();
try {
pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout;
pauseRequested = true;
if (isPaused()) {
shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
}
long nanos = TimeUnit.SECONDS.toNanos(2);
while (!isPaused()) {
if (nanos <= 0L) {
return Response.status(Response.Status.ACCEPTED)
.entity("Request accepted but task has not yet paused")
.build();
}
nanos = hasPaused.awaitNanos(nanos);
}
}
finally {
pauseLock.unlock();
}
try {
return Response.ok().entity(mapper.writeValueAsString(getCurrentOffsets())).build();
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@POST
@Path("/resume")
public void resume() throws InterruptedException
{
pauseLock.lockInterruptibly();
try {
pauseRequested = false;
shouldResume.signalAll();
long nanos = TimeUnit.SECONDS.toNanos(5);
while (isPaused()) {
if (nanos <= 0L) {
throw new RuntimeException("Resume command was not accepted within 5 seconds");
}
nanos = shouldResume.awaitNanos(nanos);
}
}
finally {
pauseLock.unlock();
}
}
@GET
@Path("/time/start")
@Produces(MediaType.APPLICATION_JSON)
public DateTime getStartTime()
{
return startTime;
}
@VisibleForTesting @VisibleForTesting
public FireDepartmentMetrics getFireDepartmentMetrics() FireDepartmentMetrics getFireDepartmentMetrics()
{ {
return fireDepartmentMetrics; return fireDepartmentMetrics;
} }
private boolean isPaused()
{
return status == Status.PAUSED;
}
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{ {
return Appenderators.createRealtime( return Appenderators.createRealtime(
@ -590,4 +825,95 @@ public class KafkaIndexTask extends AbstractTask
) )
); );
} }
private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
{
// Initialize consumer assignment.
final Set<Integer> assignment = Sets.newHashSet();
for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
final long endOffset = endOffsets.get(entry.getKey());
if (entry.getValue() < endOffset) {
assignment.add(entry.getKey());
} else if (entry.getValue() == endOffset) {
log.info("Finished reading partition[%d].", entry.getKey());
} else {
throw new ISE(
"WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
entry.getValue(),
endOffset
);
}
}
assignPartitions(consumer, topic, assignment);
// Seek to starting offsets.
for (final int partition : assignment) {
final long offset = nextOffsets.get(partition);
log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
consumer.seek(new TopicPartition(topic, partition), offset);
}
return assignment;
}
/**
* Checks if the pauseRequested flag was set and if so blocks:
* a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
* b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
* <p>
* If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
* pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
* and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
* shouldResume after adjusting pauseMillis for the new value to take effect.
* <p>
* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
* <p>
* Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
*
* @return true if a pause request was handled, false otherwise
*/
private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
{
pauseLock.lockInterruptibly();
try {
if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
pauseMillis = PAUSE_FOREVER;
pauseRequested = true;
}
if (pauseRequested) {
status = Status.PAUSED;
long nanos = 0;
hasPaused.signalAll();
while (pauseRequested) {
if (pauseMillis == PAUSE_FOREVER) {
log.info("Pausing ingestion until resumed");
shouldResume.await();
} else {
if (pauseMillis > 0) {
log.info("Pausing ingestion for [%,d] ms", pauseMillis);
nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
pauseMillis = 0;
}
if (nanos <= 0L) {
pauseRequested = false; // timeout elapsed
}
nanos = shouldResume.awaitNanos(nanos);
}
}
status = Status.READING;
shouldResume.signalAll();
log.info("Ingestion loop resumed");
return true;
}
}
finally {
pauseLock.unlock();
}
return false;
}
} }

View File

@ -19,6 +19,8 @@
package io.druid.indexing.kafka; package io.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
@ -137,6 +139,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class KafkaIndexTaskTest public class KafkaIndexTaskTest
@ -159,6 +162,7 @@ public class KafkaIndexTaskTest
private final List<Task> runningTasks = Lists.newArrayList(); private final List<Task> runningTasks = Lists.newArrayList();
private static final Logger log = new Logger(KafkaIndexTaskTest.class); private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
private static final DataSchema DATA_SCHEMA; private static final DataSchema DATA_SCHEMA;
@ -175,7 +179,6 @@ public class KafkaIndexTaskTest
); );
static { static {
ObjectMapper objectMapper = new DefaultObjectMapper();
DATA_SCHEMA = new DataSchema( DATA_SCHEMA = new DataSchema(
"test_ds", "test_ds",
objectMapper.convertValue( objectMapper.convertValue(
@ -298,7 +301,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -337,7 +341,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -345,7 +350,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task); final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for the task to start reading // Wait for the task to start reading
while (!task.hasStartedReading()) { while (task.getStatus() != KafkaIndexTask.Status.READING) {
Thread.sleep(10); Thread.sleep(10);
} }
@ -395,7 +400,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -433,7 +439,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -482,7 +489,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -530,7 +538,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -560,7 +569,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -571,7 +581,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -622,7 +633,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -633,7 +645,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -685,6 +698,7 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false,
false false
), ),
null null
@ -696,6 +710,7 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false,
false false
), ),
null null
@ -753,7 +768,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -807,7 +823,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -818,7 +835,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(1, 0L)), new KafkaPartitions("topic0", ImmutableMap.of(1, 0L)),
new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)), new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -871,7 +889,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -891,9 +910,9 @@ public class KafkaIndexTaskTest
Assert.assertEquals(2, countEvents(task1)); Assert.assertEquals(2, countEvents(task1));
// Stop gracefully // Stop without publishing segment
task1.stopGracefully(); task1.stopGracefully();
Assert.assertEquals(TaskStatus.Status.FAILED, future1.get().getStatusCode()); Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode());
// Start a new task // Start a new task
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -903,7 +922,8 @@ public class KafkaIndexTaskTest
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true true,
false
), ),
null null
); );
@ -942,6 +962,172 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
} }
@Test(timeout = 60_000L)
public void testRunWithPauseAndResume() throws Exception
{
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false
),
null
);
final ListenableFuture<TaskStatus> future = runTask(task);
// Insert some data, but not enough for the task to finish
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(RECORDS, 4)) {
kafkaProducer.send(record).get();
}
}
while (countEvents(task) != 2) {
Thread.sleep(25);
}
Assert.assertEquals(2, countEvents(task));
Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus());
Map<Integer, Long> currentOffsets = objectMapper.readValue(
task.pause(0).getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
}
);
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
// Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(RECORDS, 4)) {
kafkaProducer.send(record).get();
}
}
try {
future.get(10, TimeUnit.SECONDS);
Assert.fail("Task completed when it should have been paused");
}
catch (TimeoutException e) {
// carry on..
}
Assert.assertEquals(currentOffsets, task.getCurrentOffsets());
task.resume();
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2));
}
@Test(timeout = 60_000L)
public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
{
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 1L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)),
kafkaServer.consumerProperties(),
true,
true
),
null
);
final ListenableFuture<TaskStatus> future = runTask(task);
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) {
kafkaProducer.send(record).get();
}
}
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the assigned offsets and paused instead of publishing
Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
task.setEndOffsets(newEndOffsets, false);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
task.resume();
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the updated offsets and paused
Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
// try again but with resume flag == true
newEndOffsets = ImmutableMap.of(0, 6L);
task.setEndOffsets(newEndOffsets, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
task.resume();
Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 6L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc1));
Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3));
}
private ListenableFuture<TaskStatus> runTask(final Task task) private ListenableFuture<TaskStatus> runTask(final Task task)
{ {
try { try {
@ -1015,6 +1201,7 @@ public class KafkaIndexTaskTest
DATA_SCHEMA, DATA_SCHEMA,
tuningConfig, tuningConfig,
ioConfig, ioConfig,
null,
null null
); );
} }

View File

@ -30,6 +30,8 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexMergerV9;
import io.druid.segment.column.ColumnConfig; import io.druid.segment.column.ColumnConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -70,6 +72,7 @@ public class TestUtils
.addValue(IndexIO.class, indexIO) .addValue(IndexIO.class, indexIO)
.addValue(IndexMerger.class, indexMerger) .addValue(IndexMerger.class, indexMerger)
.addValue(ObjectMapper.class, jsonMapper) .addValue(ObjectMapper.class, jsonMapper)
.addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider())
); );
} }