Fix the broken Appenderator contract in KafkaIndexTask (#5905)

* Fix broken Appenderator contract in KafkaIndexTask

* fix build

* add publishFuture

* reuse sequenceToUse if possible
This commit is contained in:
Jihoon Son 2018-07-03 13:31:29 -07:00 committed by Gian Merlino
parent 867f6a9e2b
commit 1ccabab98e
16 changed files with 342 additions and 585 deletions

View File

@ -22,22 +22,20 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
@ -63,7 +61,6 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -106,21 +103,20 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -135,7 +131,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> endOffsets;
private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();
@ -185,10 +181,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
private final RowIngestionMeters rowIngestionMeters;
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<>();
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue
private final CountDownLatch waitForPublishes = new CountDownLatch(1);
private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new LinkedList<>();
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new LinkedList<>();
private volatile DateTime startTime;
private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
@ -200,12 +194,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
private volatile IngestionState ingestionState;
private volatile boolean pauseRequested = false;
private volatile long pauseMillis = 0;
private volatile long nextCheckpointTime;
private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
private volatile File sequencesPersistFile;
private ListeningExecutorService publishExecService;
private volatile Throwable backgroundThreadException;
public IncrementalPublishingKafkaIndexTaskRunner(
KafkaIndexTask task,
@ -226,7 +218,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
this.topic = ioConfig.getStartPartitions().getTopic();
this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap());
this.sequences = new CopyOnWriteArrayList<>();
this.ingestionState = IngestionState.NOT_STARTED;
@ -258,49 +250,39 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
status = Status.STARTING;
this.toolbox = toolbox;
final Map<String, Object> context = task.getContext();
if (context != null && context.get("checkpoints") != null) {
final String checkpointsString = (String) context.get("checkpoints");
log.info("Got checkpoints [%s]", checkpointsString);
final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
checkpointsString,
new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}
);
Iterator<Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
if (!restoreSequences()) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = getCheckPointsFromContext(toolbox, task);
if (checkpoints != null) {
Iterator<Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
sequences.add(new SequenceMetadata(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
current.getValue(),
true
));
previous = current;
}
sequences.add(new SequenceMetadata(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
current.getValue(),
true
endOffsets,
false
));
} else {
sequences.add(new SequenceMetadata(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionOffsetMap(),
endOffsets,
false
));
previous = current;
}
sequences.add(new SequenceMetadata(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
endOffsets,
false
));
} else {
sequences.add(new SequenceMetadata(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionOffsetMap(),
endOffsets,
false
));
}
sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json");
restoreSequences();
log.info("Starting with sequences: %s", sequences);
if (chatHandlerProvider.isPresent()) {
@ -335,15 +317,12 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
)
);
try (
final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()
) {
try (final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()) {
toolbox.getDataSegmentServerAnnouncer().announce();
toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox);
driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics);
createAndStartPublishExecutor();
final String topic = ioConfig.getStartPartitions().getTopic();
@ -435,7 +414,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
status = Status.READING;
try {
while (stillReading) {
if (possiblyPause(assignment)) {
if (possiblyPause()) {
// The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);
@ -448,8 +427,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
}
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed()
&& !ioConfig.isPauseAfterRead())) {
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
status = Status.PUBLISHING;
}
@ -457,12 +435,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
break;
}
checkAndMaybeThrowException();
if (!ioConfig.isPauseAfterRead()) {
maybePersistAndPublishSequences(committerSupplier);
if (backgroundThreadException != null) {
throw new RuntimeException(backgroundThreadException);
}
checkPublishAndHandoffFailure();
maybePersistAndPublishSequences(committerSupplier);
// The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
@ -473,19 +453,17 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
stillReading = !assignment.isEmpty();
}
SequenceMetadata sequenceToCheckpoint = null;
for (ConsumerRecord<byte[], byte[]> record : records) {
if (log.isTraceEnabled()) {
log.trace(
"Got topic[%s] partition[%d] offset[%,d].",
record.topic(),
record.partition(),
record.offset()
);
}
log.trace(
"Got topic[%s] partition[%d] offset[%,d].",
record.topic(),
record.partition(),
record.offset()
);
if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
@ -513,24 +491,23 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
: parser.parseBatch(ByteBuffer.wrap(valueBytes));
boolean isPersistRequired = false;
final SequenceMetadata sequenceToUse = sequences
.stream()
.filter(sequenceMetadata -> sequenceMetadata.canHandle(record))
.findFirst()
.orElse(null);
if (sequenceToUse == null) {
throw new ISE(
"WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
record.partition(),
record.offset(),
sequences
);
}
for (InputRow row : rows) {
if (row != null && task.withinMinMaxRecordTime(row)) {
SequenceMetadata sequenceToUse = null;
for (SequenceMetadata sequence : sequences) {
if (sequence.canHandle(record)) {
sequenceToUse = sequence;
}
}
if (sequenceToUse == null) {
throw new ISE(
"WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
record.partition(),
record.offset(),
sequences
);
}
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceToUse.getSequenceName(),
@ -588,7 +565,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
public void onFailure(Throwable t)
{
log.error("Persist failed, dying");
throwableAtomicReference.set(t);
backgroundThreadException = t;
}
}
);
@ -605,7 +582,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
KafkaIndexTask.assignPartitions(consumer, topic, assignment);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
stillReading = !assignment.isEmpty();
}
}
@ -622,7 +599,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
sequenceToCheckpoint,
sequences
);
requestPause(KafkaIndexTask.PAUSE_FOREVER);
requestPause();
if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
task.getDataSource(),
ioConfig.getBaseSequenceName(),
@ -636,6 +613,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
ingestionState = IngestionState.COMPLETED;
}
catch (Exception e) {
// (1) catch all exceptions while reading from kafka
log.error(e, "Encountered exception in run() before persisting.");
throw e;
}
@ -659,16 +637,23 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
sequenceMetadata.updateAssignments(nextOffsets);
publishingSequences.add(sequenceMetadata.getSequenceName());
// persist already done in finally, so directly add to publishQueue
publishQueue.add(sequenceMetadata);
publishAndRegisterHandoff(sequenceMetadata);
}
}
// add Sentinel SequenceMetadata to indicate end of all sequences
publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
waitForPublishes.await();
checkAndMaybeThrowException();
if (backgroundThreadException != null) {
throw new RuntimeException(backgroundThreadException);
}
List<SegmentsAndMetadata> handedOffList = Lists.newArrayList();
// Wait for publish futures to complete.
Futures.allAsList(publishWaitList).get();
// Wait for handoff futures to complete.
// Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding
// handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it
// failed to persist sequences. It might also return null if handoff failed, but was recoverable.
// See publishAndRegisterHandoff() for details.
List<SegmentsAndMetadata> handedOffList = Collections.emptyList();
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOffList = Futures.allAsList(handOffWaitList).get();
} else {
@ -677,6 +662,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
// Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception
// here.
log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
.addData("TaskId", task.getId())
.emit();
@ -692,8 +679,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
);
}
appenderator.close();
}
catch (InterruptedException | RejectedExecutionException e) {
// (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
// the final publishing.
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
appenderator.closeNow();
// handle the InterruptedException that gets wrapped in a RejectedExecutionException
if (e instanceof RejectedExecutionException
@ -709,14 +702,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
log.info("The task was asked to stop before completing");
}
catch (Exception e) {
// (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
Futures.allAsList(publishWaitList).cancel(true);
Futures.allAsList(handOffWaitList).cancel(true);
appenderator.closeNow();
throw e;
}
finally {
if (appenderator != null) {
if (throwableAtomicReference.get() != null) {
appenderator.closeNow();
} else {
appenderator.close();
}
}
if (driver != null) {
driver.close();
}
@ -724,10 +717,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
chatHandlerProvider.get().unregister(task.getId());
}
if (publishExecService != null) {
publishExecService.shutdownNow();
}
toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
toolbox.getDataSegmentServerAnnouncer().unannounce();
}
@ -736,83 +725,143 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
return TaskStatus.success(task.getId());
}
private void createAndStartPublishExecutor()
private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
{
publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
publishExecService.submit(
(Runnable) () -> {
while (true) {
// Check if any publishFuture failed.
final List<ListenableFuture<SegmentsAndMetadata>> publishFinished = publishWaitList
.stream()
.filter(Future::isDone)
.collect(Collectors.toList());
for (ListenableFuture<SegmentsAndMetadata> publishFuture : publishFinished) {
// If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
publishFuture.get();
}
publishWaitList.removeAll(publishFinished);
// Check if any handoffFuture failed.
final List<ListenableFuture<SegmentsAndMetadata>> handoffFinished = handOffWaitList
.stream()
.filter(Future::isDone)
.collect(Collectors.toList());
for (ListenableFuture<SegmentsAndMetadata> handoffFuture : handoffFinished) {
// If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
handoffFuture.get();
}
handOffWaitList.removeAll(handoffFinished);
}
private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
{
log.info("Publishing segments for sequence [%s]", sequenceMetadata);
final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
driver.publish(
sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()),
sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
Collections.singletonList(sequenceMetadata.getSequenceName())
),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
if (publishedSegmentsAndMetadata == null) {
throw new ISE(
"Transaction failure publishing segments for sequence [%s]",
sequenceMetadata
);
} else {
return publishedSegmentsAndMetadata;
}
}
);
publishWaitList.add(publishFuture);
// Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails.
final SettableFuture<SegmentsAndMetadata> handoffFuture = SettableFuture.create();
handOffWaitList.add(handoffFuture);
Futures.addCallback(
publishFuture,
new FutureCallback<SegmentsAndMetadata>()
{
@Override
public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata)
{
log.info(
"Published segments[%s] with metadata[%s].",
publishedSegmentsAndMetadata.getSegments()
.stream()
.map(DataSegment::getIdentifier)
.collect(Collectors.toList()),
Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata")
);
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
try {
final SequenceMetadata sequenceMetadata = publishQueue.take();
Preconditions.checkNotNull(driver);
if (sequenceMetadata.isSentinel()) {
waitForPublishes.countDown();
break;
}
log.info("Publishing segments for sequence [%s]", sequenceMetadata);
final SegmentsAndMetadata result = driver.publish(
sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()),
sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
ImmutableList.of(sequenceMetadata.getSequenceName())
).get();
if (result == null) {
throw new ISE(
"Transaction failure publishing segments for sequence [%s]",
sequenceMetadata
);
} else {
log.info(
"Published segments[%s] with metadata[%s].",
Joiner.on(", ").join(
result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
),
Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata")
);
}
sequences.remove(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
try {
persistSequences();
}
catch (IOException e) {
log.error(e, "Unable to persist state, dying");
Throwables.propagate(e);
}
final ListenableFuture<SegmentsAndMetadata> handOffFuture = driver.registerHandoff(result);
handOffWaitList.add(handOffFuture);
persistSequences();
}
catch (Throwable t) {
if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException
&& t.getCause() instanceof InterruptedException))) {
log.warn("Stopping publish thread as we are interrupted, probably we are shutting down");
} else {
log.makeAlert(t, "Error in publish thread, dying").emit();
throwableAtomicReference.set(t);
}
Futures.allAsList(handOffWaitList).cancel(true);
waitForPublishes.countDown();
break;
catch (IOException e) {
log.error(e, "Unable to persist state, dying");
handoffFuture.setException(e);
throw new RuntimeException(e);
}
Futures.transform(
driver.registerHandoff(publishedSegmentsAndMetadata),
new Function<SegmentsAndMetadata, Void>()
{
@Nullable
@Override
public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata)
{
if (handoffSegmentsAndMetadata == null) {
log.warn(
"Failed to handoff segments[%s]",
publishedSegmentsAndMetadata.getSegments()
.stream()
.map(DataSegment::getIdentifier)
.collect(Collectors.toList())
);
}
handoffFuture.set(handoffSegmentsAndMetadata);
return null;
}
}
);
}
@Override
public void onFailure(Throwable t)
{
log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
);
}
private void restoreSequences() throws IOException
private static File getSequencesPersistFile(TaskToolbox toolbox)
{
Preconditions.checkNotNull(sequencesPersistFile);
return new File(toolbox.getPersistDir(), "sequences.json");
}
private boolean restoreSequences() throws IOException
{
final File sequencesPersistFile = getSequencesPersistFile(toolbox);
if (sequencesPersistFile.exists()) {
sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
sequencesPersistFile, new TypeReference<List<SequenceMetadata>>()
{
}));
sequences = new CopyOnWriteArrayList<>(
toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
sequencesPersistFile,
new TypeReference<List<SequenceMetadata>>()
{
}
)
);
return true;
} else {
return false;
}
}
@ -823,7 +872,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
new TypeReference<List<SequenceMetadata>>()
{
}
).writeValue(sequencesPersistFile, sequences);
).writeValue(getSequencesPersistFile(toolbox), sequences);
}
private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg)
@ -877,7 +926,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
result,
sequenceMetadata
);
publishQueue.add(sequenceMetadata);
publishAndRegisterHandoff(sequenceMetadata);
}
catch (InterruptedException e) {
log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata);
@ -918,58 +967,25 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
return assignment;
}
private void checkAndMaybeThrowException()
{
if (throwableAtomicReference.get() != null) {
Throwables.propagate(throwableAtomicReference.get());
}
}
/**
* Checks if the pauseRequested flag was set and if so blocks:
* a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
* b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
* <p/>
* If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
* pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
* and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
* shouldResume after adjusting pauseMillis for the new value to take effect.
* Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
* <p/>
* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
* <p/>
* Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
*
* @return true if a pause request was handled, false otherwise
*/
private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
private boolean possiblyPause() throws InterruptedException
{
pauseLock.lockInterruptibly();
try {
if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
pauseMillis = KafkaIndexTask.PAUSE_FOREVER;
pauseRequested = true;
}
if (pauseRequested) {
status = Status.PAUSED;
long nanos = 0;
hasPaused.signalAll();
while (pauseRequested) {
if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) {
log.info("Pausing ingestion until resumed");
shouldResume.await();
} else {
if (pauseMillis > 0) {
log.info("Pausing ingestion for [%,d] ms", pauseMillis);
nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
pauseMillis = 0;
}
if (nanos <= 0L) {
pauseRequested = false; // timeout elapsed
}
nanos = shouldResume.awaitNanos(nanos);
}
log.info("Pausing ingestion until resumed");
shouldResume.await();
}
status = Status.READING;
@ -1062,9 +1078,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
return status == Status.PAUSED;
}
private void requestPause(long pauseMillis)
private void requestPause()
{
this.pauseMillis = pauseMillis;
pauseRequested = true;
}
@ -1090,7 +1105,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
.addData("partitions", partitionOffsetMap.keySet())
.emit();
// wait for being killed by supervisor
requestPause(KafkaIndexTask.PAUSE_FOREVER);
requestPause();
} else {
log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
}
@ -1225,14 +1240,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@Produces(MediaType.APPLICATION_JSON)
public Response setEndOffsetsHTTP(
Map<Integer, Long> offsets,
@QueryParam("resume") @DefaultValue("false") final boolean resume,
@QueryParam("finish") @DefaultValue("true") final boolean finish,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
return setEndOffsets(offsets, resume, finish);
return setEndOffsets(offsets, finish);
}
@GET
@ -1276,7 +1290,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@Override
public Response setEndOffsets(
Map<Integer, Long> offsets,
final boolean resume,
final boolean finish // this field is only for internal purposes, shouldn't be usually set by users
) throws InterruptedException
{
@ -1305,7 +1318,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
(latestSequence.getEndOffsets().equals(offsets) && finish)) {
log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences);
return Response.ok(offsets).build();
} else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) {
} else if (latestSequence.isCheckpointed()) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(StringUtils.format(
"WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]",
@ -1339,7 +1352,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets);
endOffsets.putAll(offsets);
} else {
Preconditions.checkState(!ioConfig.isPauseAfterRead());
// create new sequence
final SequenceMetadata newSequence = new SequenceMetadata(
latestSequence.getSequenceId() + 1,
@ -1355,17 +1367,19 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
}
catch (Exception e) {
log.error(e, "Unable to set end offsets, dying");
throwableAtomicReference.set(e);
Throwables.propagate(e);
backgroundThreadException = e;
// should resume to immediately finish kafka index task as failed
resume();
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(Throwables.getStackTraceAsString(e))
.build();
}
finally {
pauseLock.unlock();
}
}
if (resume) {
resume();
}
resume();
return Response.ok(offsets).build();
}
@ -1396,8 +1410,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
/**
* Signals the ingestion loop to pause.
*
* @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
*
* @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
* method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
* in the response body if the task successfully paused
@ -1406,16 +1418,15 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
@Path("/pause")
@Produces(MediaType.APPLICATION_JSON)
public Response pauseHTTP(
@QueryParam("timeout") @DefaultValue("0") final long timeout,
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
return pause(timeout);
return pause();
}
@Override
public Response pause(final long timeout) throws InterruptedException
public Response pause() throws InterruptedException
{
if (!(status == Status.PAUSED || status == Status.READING)) {
return Response.status(Response.Status.BAD_REQUEST)
@ -1425,7 +1436,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
pauseLock.lockInterruptibly();
try {
pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout;
pauseRequested = true;
pollRetryLock.lockInterruptibly();
@ -1634,22 +1644,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
}
}
private SequenceMetadata()
{
this.sequenceId = -1;
this.sequenceName = null;
this.startOffsets = null;
this.endOffsets = null;
this.assignments = null;
this.checkpointed = true;
this.sentinel = true;
}
static SequenceMetadata getSentinelSequenceMetadata()
{
return new SequenceMetadata();
}
@Override
public String toString()
{
@ -1723,7 +1717,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
};
}
TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction)
TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
{
return (segments, commitMetadata) -> {
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
@ -1758,4 +1752,24 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
};
}
}
@Nullable
private static TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
TaskToolbox toolbox,
KafkaIndexTask task
) throws IOException
{
final String checkpointsString = task.getContextValue("checkpoints");
if (checkpointsString != null) {
log.info("Checkpoints [%s]", checkpointsString);
return toolbox.getObjectMapper().readValue(
checkpointsString,
new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}
);
} else {
return null;
}
}
}

View File

@ -31,7 +31,6 @@ import java.util.Map;
public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
private final String baseSequenceName;
@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig
private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties;
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final Optional<DateTime> maximumMessageTime;
private final boolean skipOffsetGaps;
@ -51,7 +49,6 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
@JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@ -62,7 +59,6 @@ public class KafkaIOConfig implements IOConfig
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
@ -117,12 +113,6 @@ public class KafkaIOConfig implements IOConfig
return useTransaction;
}
@JsonProperty
public boolean isPauseAfterRead()
{
return pauseAfterRead;
}
@JsonProperty
public Optional<DateTime> getMaximumMessageTime()
{
@ -150,7 +140,6 @@ public class KafkaIOConfig implements IOConfig
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", maximumMessageTime=" + maximumMessageTime +
", skipOffsetGaps=" + skipOffsetGaps +

View File

@ -68,8 +68,6 @@ import java.util.stream.Collectors;
public class KafkaIndexTask extends AbstractTask implements ChatHandler
{
public static final long PAUSE_FOREVER = -1L;
public enum Status
{
NOT_STARTED,
@ -86,14 +84,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private static final Random RANDOM = new Random();
static final long POLL_TIMEOUT = 100;
static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
private final DataSchema dataSchema;
private final InputRowParser<ByteBuffer> parser;
private final KafkaTuningConfig tuningConfig;
private final KafkaIOConfig ioConfig;
private final AuthorizerMapper authorizerMapper;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
private final KafkaIndexTaskRunner runner;
@ -126,7 +121,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
this.authorizerMapper = authorizerMapper;
final CircularBuffer<Throwable> savedParseExceptions;
if (tuningConfig.getMaxSavedParseExceptions() > 0) {
savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions());

View File

@ -28,11 +28,11 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.indexer.TaskLocation;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexer.TaskStatus;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
@ -167,19 +167,14 @@ public class KafkaIndexTaskClient
public Map<Integer, Long> pause(final String id)
{
return pause(id, 0);
}
public Map<Integer, Long> pause(final String id, final long timeout)
{
log.debug("Pause task[%s] timeout[%d]", id, timeout);
log.debug("Pause task[%s]", id);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"pause",
timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null,
null,
true
);
@ -361,18 +356,17 @@ public class KafkaIndexTaskClient
public boolean setEndOffsets(
final String id,
final Map<Integer, Long> endOffsets,
final boolean resume,
final boolean finalize
)
{
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);
log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize);
try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"offsets/end",
StringUtils.format("resume=%s&finish=%s", resume, finalize),
StringUtils.format("finish=%s", finalize),
jsonMapper.writeValueAsBytes(endOffsets),
true
);
@ -415,11 +409,6 @@ public class KafkaIndexTaskClient
}
public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id)
{
return pauseAsync(id, 0);
}
public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout)
{
return executorService.submit(
new Callable<Map<Integer, Long>>()
@ -427,7 +416,7 @@ public class KafkaIndexTaskClient
@Override
public Map<Integer, Long> call()
{
return pause(id, timeout);
return pause(id);
}
}
);
@ -490,7 +479,7 @@ public class KafkaIndexTaskClient
}
public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
final String id, final Map<Integer, Long> endOffsets, final boolean finalize
)
{
return executorService.submit(
@ -499,7 +488,7 @@ public class KafkaIndexTaskClient
@Override
public Boolean call()
{
return setEndOffsets(id, endOffsets, resume, finalize);
return setEndOffsets(id, endOffsets, finalize);
}
}
);

View File

@ -60,12 +60,11 @@ public interface KafkaIndexTaskRunner extends ChatHandler
@VisibleForTesting
Response setEndOffsets(
Map<Integer, Long> offsets,
boolean resume,
boolean finish // this field is only for internal purposes, shouldn't be usually set by users
) throws InterruptedException;
@VisibleForTesting
Response pause(long timeout) throws InterruptedException;
Response pause() throws InterruptedException;
@VisibleForTesting
void resume() throws InterruptedException;
}

View File

@ -20,14 +20,11 @@ package io.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.data.input.Committer;
@ -81,12 +78,10 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -102,6 +97,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -174,7 +170,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
private volatile FireDepartmentMetrics fireDepartmentMetrics;
private volatile IngestionState ingestionState;
private volatile long pauseMillis = 0;
private volatile boolean pauseRequested;
LegacyKafkaIndexTaskRunner(
@ -355,7 +350,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
status = Status.READING;
try {
while (stillReading) {
if (possiblyPause(assignment)) {
if (possiblyPause()) {
// The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);
@ -381,7 +376,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
stillReading = !assignment.isEmpty();
}
for (ConsumerRecord<byte[], byte[]> record : records) {
@ -476,7 +471,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
KafkaIndexTask.assignPartitions(consumer, topic, assignment);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
stillReading = !assignment.isEmpty();
}
}
}
@ -534,32 +529,38 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
sequenceNames.values()
).get();
final List<String> publishedSegments = published.getSegments()
.stream()
.map(DataSegment::getIdentifier)
.collect(Collectors.toList());
log.info(
"Published segments[%s] with metadata[%s].",
publishedSegments,
Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
);
final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
final SegmentsAndMetadata handedOff;
SegmentsAndMetadata handedOff = null;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = handoffFuture.get();
} else {
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
try {
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
.addData("TaskId", task.getId())
.emit();
}
}
if (handedOff == null) {
throw new ISE("Transaction failure publishing segments, aborting");
log.warn("Failed to handoff segments[%s]", publishedSegments);
} else {
log.info(
"Published segments[%s] with metadata[%s].",
Joiner.on(", ").join(
Iterables.transform(
handedOff.getSegments(),
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getIdentifier();
}
}
)
),
"Handoff completed for segments[%s] with metadata[%s]",
handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()),
Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
);
}
@ -627,50 +628,24 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
}
/**
* Checks if the pauseRequested flag was set and if so blocks:
* a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
* b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
* <p/>
* If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
* pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
* and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
* shouldResume after adjusting pauseMillis for the new value to take effect.
* Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
* <p/>
* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
* <p/>
* Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
*
* @return true if a pause request was handled, false otherwise
*/
private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
private boolean possiblyPause() throws InterruptedException
{
pauseLock.lockInterruptibly();
try {
if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
pauseMillis = KafkaIndexTask.PAUSE_FOREVER;
pauseRequested = true;
}
if (pauseRequested) {
status = Status.PAUSED;
long nanos = 0;
hasPaused.signalAll();
while (pauseRequested) {
if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) {
log.info("Pausing ingestion until resumed");
shouldResume.await();
} else {
if (pauseMillis > 0) {
log.info("Pausing ingestion for [%,d] ms", pauseMillis);
nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
pauseMillis = 0;
}
if (nanos <= 0L) {
pauseRequested = false; // timeout elapsed
}
nanos = shouldResume.awaitNanos(nanos);
}
log.info("Pausing ingestion until resumed");
shouldResume.await();
}
status = Status.READING;
@ -752,15 +727,14 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
.addData("partitions", partitionOffsetMap.keySet())
.emit();
// wait for being killed by supervisor
requestPause(KafkaIndexTask.PAUSE_FOREVER);
requestPause();
} else {
log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
}
}
private void requestPause(long pauseMillis)
private void requestPause()
{
this.pauseMillis = pauseMillis;
pauseRequested = true;
}
@ -941,10 +915,10 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
}
@Override
public Response setEndOffsets(Map<Integer, Long> offsets, boolean resume, boolean finish) throws InterruptedException
public Response setEndOffsets(Map<Integer, Long> offsets, boolean finish) throws InterruptedException
{
// finish is not used in this mode
return setEndOffsets(offsets, resume);
return setEndOffsets(offsets);
}
@POST
@ -953,12 +927,11 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
@Produces(MediaType.APPLICATION_JSON)
public Response setEndOffsetsHTTP(
Map<Integer, Long> offsets,
@QueryParam("resume") @DefaultValue("false") final boolean resume,
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
return setEndOffsets(offsets, resume);
return setEndOffsets(offsets);
}
@GET
@ -1000,8 +973,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
}
public Response setEndOffsets(
Map<Integer, Long> offsets,
final boolean resume
Map<Integer, Long> offsets
) throws InterruptedException
{
if (offsets == null) {
@ -1048,9 +1020,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
pauseLock.unlock();
}
if (resume) {
resume();
}
resume();
return Response.ok(endOffsets).build();
}
@ -1063,8 +1033,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
/**
* Signals the ingestion loop to pause.
*
* @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
*
* @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
* method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
* in the response body if the task successfully paused
@ -1073,16 +1041,15 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
@Path("/pause")
@Produces(MediaType.APPLICATION_JSON)
public Response pauseHTTP(
@QueryParam("timeout") @DefaultValue("0") final long timeout,
@Context final HttpServletRequest req
) throws InterruptedException
{
authorizationCheck(req, Action.WRITE);
return pause(timeout);
return pause();
}
@Override
public Response pause(final long timeout) throws InterruptedException
public Response pause() throws InterruptedException
{
if (!(status == Status.PAUSED || status == Status.READING)) {
return Response.status(Response.Status.BAD_REQUEST)
@ -1092,7 +1059,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
pauseLock.lockInterruptibly();
try {
pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout;
pauseRequested = true;
pollRetryLock.lockInterruptibly();

View File

@ -1533,7 +1533,7 @@ public class KafkaSupervisor implements Supervisor
log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize));
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}
List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
@ -1780,7 +1780,6 @@ public class KafkaSupervisor implements Supervisor
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
true,
false,
minimumMessageTime,
maximumMessageTime,
ioConfig.isSkipOffsetGaps()

View File

@ -71,8 +71,7 @@ public class KafkaIOConfigTest
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(true, config.isUseTransaction());
Assert.assertEquals(false, config.isPauseAfterRead());
Assert.assertTrue(config.isUseTransaction());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
@ -88,7 +87,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
+ " \"skipOffsetGaps\": true\n"
@ -109,8 +107,7 @@ public class KafkaIOConfigTest
Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
Assert.assertEquals(false, config.isUseTransaction());
Assert.assertEquals(true, config.isPauseAfterRead());
Assert.assertFalse(config.isUseTransaction());
Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
@ -125,7 +122,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -145,7 +141,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -165,7 +160,6 @@ public class KafkaIOConfigTest
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -185,7 +179,6 @@ public class KafkaIOConfigTest
+ " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -206,7 +199,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -227,7 +219,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";
@ -248,7 +239,6 @@ public class KafkaIOConfigTest
+ " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
+ " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
+ " \"useTransaction\": false,\n"
+ " \"pauseAfterRead\": true,\n"
+ " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
+ " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
+ "}";

View File

@ -28,8 +28,8 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexer.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
@ -57,6 +57,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -143,13 +144,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertEquals(false, client.stop(TEST_ID, true));
Assert.assertEquals(false, client.resume(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10));
Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID));
Assert.assertEquals(null, client.getStartTime(TEST_ID));
Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), false, true));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true, true));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
verifyAll();
}
@ -447,33 +448,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Assert.assertEquals(10, (long) results.get(1));
}
@Test
public void testPauseWithTimeout() throws Exception
{
Capture<Request> captured = Capture.newInstance();
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
);
replayAll();
Map<Integer, Long> results = client.pause(TEST_ID, 101);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
Assert.assertEquals(2, results.size());
Assert.assertEquals(1, (long) results.get(0));
Assert.assertEquals(10, (long) results.get(1));
}
@Test
public void testPauseWithSubsequentGetOffsets() throws Exception
{
@ -560,13 +534,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
client.setEndOffsets(TEST_ID, endOffsets, false, true);
client.setEndOffsets(TEST_ID, endOffsets, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"),
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@ -585,13 +559,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
);
replayAll();
client.setEndOffsets(TEST_ID, endOffsets, true, true);
client.setEndOffsets(TEST_ID, endOffsets, true);
verifyAll();
Request request = captured.getValue();
Assert.assertEquals(HttpMethod.POST, request.getMethod());
Assert.assertEquals(
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"),
new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
request.getUrl()
);
Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@ -739,39 +713,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
}
}
@Test
public void testPauseAsyncWithTimeout() throws Exception
{
final int numRequests = TEST_IDS.size();
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
Futures.immediateFuture(responseHolder)
).times(numRequests);
replayAll();
List<URL> expectedUrls = Lists.newArrayList();
List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
for (String testId : TEST_IDS) {
expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9")));
futures.add(client.pauseAsync(testId, 9));
}
List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
verifyAll();
List<Request> requests = captured.getValues();
Assert.assertEquals(numRequests, requests.size());
Assert.assertEquals(numRequests, responses.size());
for (int i = 0; i < numRequests; i++) {
Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
}
}
@Test
public void testGetStatusAsync() throws Exception
{
@ -925,9 +866,9 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TEST_HOST,
TEST_PORT,
testId,
StringUtils.format("offsets/end?resume=%s&finish=%s", false, true)
StringUtils.format("offsets/end?finish=%s", true)
)));
futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true));
futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();
@ -966,11 +907,11 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
TEST_HOST,
TEST_PORT,
testId,
"offsets/end?resume=true&finish=true"
"offsets/end?finish=true"
)
)
);
futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true));
futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
}
List<Boolean> responses = Futures.allAsList(futures).get();

View File

@ -382,7 +382,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -424,7 +423,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -500,7 +498,6 @@ public class KafkaIndexTaskTest
endPartitions,
consumerProps,
true,
false,
null,
null,
false
@ -513,7 +510,7 @@ public class KafkaIndexTaskTest
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
.equals(currentOffsets));
task.getRunner().setEndOffsets(currentOffsets, true, false);
task.getRunner().setEndOffsets(currentOffsets, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
@ -589,7 +586,6 @@ public class KafkaIndexTaskTest
endPartitions,
consumerProps,
true,
false,
null,
null,
false
@ -603,7 +599,7 @@ public class KafkaIndexTaskTest
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets));
task.getRunner().setEndOffsets(currentOffsets, true, false);
task.getRunner().setEndOffsets(currentOffsets, false);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
Assert.assertEquals(1, checkpointRequestsHash.size());
@ -646,7 +642,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
DateTimes.of("2010"),
null,
false
@ -700,7 +695,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
DateTimes.of("2010"),
false
@ -764,7 +758,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -824,7 +817,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -865,7 +857,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -917,7 +908,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -927,10 +917,7 @@ public class KafkaIndexTaskTest
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
Assert.assertEquals(
isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED,
future.get().getStatusCode()
);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed());
@ -975,7 +962,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1019,7 +1005,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 13L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1101,7 +1086,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1161,7 +1145,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1175,7 +1158,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1229,7 +1211,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1243,7 +1224,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1298,7 +1278,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
false,
false,
null,
null,
false
@ -1312,7 +1291,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 10L)),
kafkaServer.consumerProperties(),
false,
false,
null,
null,
false
@ -1372,7 +1350,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1437,7 +1414,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1451,7 +1427,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1507,7 +1482,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1544,7 +1518,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1596,7 +1569,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1621,7 +1593,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus());
Map<Integer, Long> currentOffsets = objectMapper.readValue(
task.getRunner().pause(0).getEntity().toString(),
task.getRunner().pause().getEntity().toString(),
new TypeReference<Map<Integer, Long>>()
{
}
@ -1669,93 +1641,6 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 60_000L)
public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
{
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions(topic, ImmutableMap.of(0, 1L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
kafkaServer.consumerProperties(),
true,
true,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
}
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the assigned offsets and paused instead of publishing
Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
Assert.assertEquals(ImmutableMap.of(0, 3L), task.getRunner().getEndOffsets());
Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
task.getRunner().setEndOffsets(newEndOffsets, false, true);
Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
task.getRunner().resume();
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
// reached the end of the updated offsets and paused
Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
// try again but with resume flag == true
newEndOffsets = ImmutableMap.of(0, 7L);
task.getRunner().setEndOffsets(newEndOffsets, true, true);
Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) {
Thread.sleep(25);
}
Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets());
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus());
task.getRunner().resume();
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// Check metrics
Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2));
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
}
@Test(timeout = 30_000L)
public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
{
@ -1767,7 +1652,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1780,7 +1664,7 @@ public class KafkaIndexTaskTest
Thread.sleep(2000);
}
task.getRunner().pause(0);
task.getRunner().pause();
while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
Thread.sleep(25);
@ -1806,7 +1690,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false
@ -1860,7 +1743,6 @@ public class KafkaIndexTaskTest
new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null,
false

View File

@ -263,7 +263,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
@ -1055,7 +1054,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.immediateFuture(true)).times(2);
@ -1083,7 +1081,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig();
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@ -1171,7 +1168,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@ -1260,7 +1256,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
// check that the new task was created with starting offsets matching where the publishing task finished
Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@ -1573,7 +1568,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskClient.setEndOffsetsAsync(
EasyMock.contains("sequenceName-0"),
EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
EasyMock.eq(true),
EasyMock.eq(true)
)
).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
@ -1698,7 +1692,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
expect(taskClient.pauseAsync("id2"))
.andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true))
expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
.andReturn(Futures.immediateFuture(true));
taskQueue.shutdown("id3");
expectLastCall().times(2);
@ -2130,7 +2124,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
endPartitions,
ImmutableMap.<String, String>of(),
true,
false,
minimumMessageTime,
maximumMessageTime,
false

View File

@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.druid.indexing.common.TaskLock;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.java.util.common.DateTimes;

View File

@ -180,6 +180,7 @@ public interface Task
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;
@Nullable
Map<String, Object> getContext();
@Nullable

View File

@ -37,12 +37,12 @@ import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.TaskState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockAcquireAction;

View File

@ -23,12 +23,13 @@ import com.google.common.collect.ImmutableList;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
public class SegmentsAndMetadata
{
private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.<DataSegment>of(), null);
private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null);
private final Object commitMetadata;
private final ImmutableList<DataSegment> segments;

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -230,7 +229,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
throw e;
}
catch (Exception e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}