diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 3caeedf905d..dc0c4e3e082 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -22,22 +22,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; @@ -63,7 +61,6 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.collect.Utils; -import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; import io.druid.segment.indexing.RealtimeIOConfig; @@ -106,21 +103,20 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -135,7 +131,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; - private final Map endOffsets = new ConcurrentHashMap<>(); + private final Map endOffsets; private final Map nextOffsets = new ConcurrentHashMap<>(); private final Map lastPersistedOffsets = new ConcurrentHashMap<>(); @@ -185,10 +181,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private final RowIngestionMeters rowIngestionMeters; private final Set publishingSequences = Sets.newConcurrentHashSet(); - private final BlockingQueue publishQueue = new LinkedBlockingQueue<>(); - private final List> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue - private final CountDownLatch waitForPublishes = new CountDownLatch(1); - private final AtomicReference throwableAtomicReference = new AtomicReference<>(); + private final List> publishWaitList = new LinkedList<>(); + private final List> handOffWaitList = new LinkedList<>(); private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -200,12 +194,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private volatile IngestionState ingestionState; private volatile boolean pauseRequested = false; - private volatile long pauseMillis = 0; private volatile long nextCheckpointTime; private volatile CopyOnWriteArrayList sequences; - private volatile File sequencesPersistFile; - private ListeningExecutorService publishExecService; + private volatile Throwable backgroundThreadException; public IncrementalPublishingKafkaIndexTaskRunner( KafkaIndexTask task, @@ -226,7 +218,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask this.topic = ioConfig.getStartPartitions().getTopic(); this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters(); - this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap()); this.sequences = new CopyOnWriteArrayList<>(); this.ingestionState = IngestionState.NOT_STARTED; @@ -258,49 +250,39 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask status = Status.STARTING; this.toolbox = toolbox; - final Map context = task.getContext(); - if (context != null && context.get("checkpoints") != null) { - final String checkpointsString = (String) context.get("checkpoints"); - log.info("Got checkpoints [%s]", checkpointsString); - final TreeMap> checkpoints = toolbox.getObjectMapper().readValue( - checkpointsString, - new TypeReference>>() - { - } - ); - - Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); - Map.Entry> previous = sequenceOffsets.next(); - while (sequenceOffsets.hasNext()) { - Map.Entry> current = sequenceOffsets.next(); + if (!restoreSequences()) { + final TreeMap> checkpoints = getCheckPointsFromContext(toolbox, task); + if (checkpoints != null) { + Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); + Map.Entry> previous = sequenceOffsets.next(); + while (sequenceOffsets.hasNext()) { + Map.Entry> current = sequenceOffsets.next(); + sequences.add(new SequenceMetadata( + previous.getKey(), + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), + previous.getValue(), + current.getValue(), + true + )); + previous = current; + } sequences.add(new SequenceMetadata( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), - current.getValue(), - true + endOffsets, + false + )); + } else { + sequences.add(new SequenceMetadata( + 0, + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), + ioConfig.getStartPartitions().getPartitionOffsetMap(), + endOffsets, + false )); - previous = current; } - sequences.add(new SequenceMetadata( - previous.getKey(), - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), - previous.getValue(), - endOffsets, - false - )); - } else { - sequences.add(new SequenceMetadata( - 0, - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), - ioConfig.getStartPartitions().getPartitionOffsetMap(), - endOffsets, - false - )); } - - sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json"); - restoreSequences(); log.info("Starting with sequences: %s", sequences); if (chatHandlerProvider.isPresent()) { @@ -335,15 +317,12 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask ) ); - try ( - final KafkaConsumer consumer = task.newConsumer() - ) { + try (final KafkaConsumer consumer = task.newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox); driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics); - createAndStartPublishExecutor(); final String topic = ioConfig.getStartPartitions().getTopic(); @@ -435,7 +414,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -448,8 +427,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask } // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed() - && !ioConfig.isPauseAfterRead())) { + if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; } @@ -457,12 +435,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask break; } - checkAndMaybeThrowException(); - - if (!ioConfig.isPauseAfterRead()) { - maybePersistAndPublishSequences(committerSupplier); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); } + checkPublishAndHandoffFailure(); + + maybePersistAndPublishSequences(committerSupplier); + // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to // offset is not present in the topic-partition. This can happen if we're asking a task to read from data // that has not been written yet (which is totally legitimate). So let's wait for it to show up. @@ -473,19 +453,17 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } SequenceMetadata sequenceToCheckpoint = null; for (ConsumerRecord record : records) { - if (log.isTraceEnabled()) { - log.trace( - "Got topic[%s] partition[%d] offset[%,d].", - record.topic(), - record.partition(), - record.offset() - ); - } + log.trace( + "Got topic[%s] partition[%d] offset[%,d].", + record.topic(), + record.partition(), + record.offset() + ); if (record.offset() < endOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) { @@ -513,24 +491,23 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask : parser.parseBatch(ByteBuffer.wrap(valueBytes)); boolean isPersistRequired = false; + final SequenceMetadata sequenceToUse = sequences + .stream() + .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) + .findFirst() + .orElse(null); + + if (sequenceToUse == null) { + throw new ISE( + "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", + record.partition(), + record.offset(), + sequences + ); + } + for (InputRow row : rows) { if (row != null && task.withinMinMaxRecordTime(row)) { - SequenceMetadata sequenceToUse = null; - for (SequenceMetadata sequence : sequences) { - if (sequence.canHandle(record)) { - sequenceToUse = sequence; - } - } - - if (sequenceToUse == null) { - throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", - record.partition(), - record.offset(), - sequences - ); - } - final AppenderatorDriverAddResult addResult = driver.add( row, sequenceToUse.getSequenceName(), @@ -588,7 +565,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask public void onFailure(Throwable t) { log.error("Persist failed, dying"); - throwableAtomicReference.set(t); + backgroundThreadException = t; } } ); @@ -605,7 +582,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } @@ -622,7 +599,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask sequenceToCheckpoint, sequences ); - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( task.getDataSource(), ioConfig.getBaseSequenceName(), @@ -636,6 +613,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask ingestionState = IngestionState.COMPLETED; } catch (Exception e) { + // (1) catch all exceptions while reading from kafka log.error(e, "Encountered exception in run() before persisting."); throw e; } @@ -659,16 +637,23 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask sequenceMetadata.updateAssignments(nextOffsets); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } } - // add Sentinel SequenceMetadata to indicate end of all sequences - publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata()); - waitForPublishes.await(); - checkAndMaybeThrowException(); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); + } - List handedOffList = Lists.newArrayList(); + // Wait for publish futures to complete. + Futures.allAsList(publishWaitList).get(); + + // Wait for handoff futures to complete. + // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding + // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it + // failed to persist sequences. It might also return null if handoff failed, but was recoverable. + // See publishAndRegisterHandoff() for details. + List handedOffList = Collections.emptyList(); if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { @@ -677,6 +662,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { + // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception + // here. log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) .addData("TaskId", task.getId()) .emit(); @@ -692,8 +679,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } + + appenderator.close(); } catch (InterruptedException | RejectedExecutionException e) { + // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including + // the final publishing. + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); appenderator.closeNow(); // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException @@ -709,14 +702,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask log.info("The task was asked to stop before completing"); } + catch (Exception e) { + // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + appenderator.closeNow(); + throw e; + } finally { - if (appenderator != null) { - if (throwableAtomicReference.get() != null) { - appenderator.closeNow(); - } else { - appenderator.close(); - } - } if (driver != null) { driver.close(); } @@ -724,10 +717,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask chatHandlerProvider.get().unregister(task.getId()); } - if (publishExecService != null) { - publishExecService.shutdownNow(); - } - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } @@ -736,83 +725,143 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask return TaskStatus.success(task.getId()); } - private void createAndStartPublishExecutor() + private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException { - publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver")); - publishExecService.submit( - (Runnable) () -> { - while (true) { + // Check if any publishFuture failed. + final List> publishFinished = publishWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture publishFuture : publishFinished) { + // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + publishFuture.get(); + } + + publishWaitList.removeAll(publishFinished); + + // Check if any handoffFuture failed. + final List> handoffFinished = handOffWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture handoffFuture : handoffFinished) { + // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + handoffFuture.get(); + } + + handOffWaitList.removeAll(handoffFinished); + } + + private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) + { + log.info("Publishing segments for sequence [%s]", sequenceMetadata); + + final ListenableFuture publishFuture = Futures.transform( + driver.publish( + sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), + sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), + Collections.singletonList(sequenceMetadata.getSequenceName()) + ), + (Function) publishedSegmentsAndMetadata -> { + if (publishedSegmentsAndMetadata == null) { + throw new ISE( + "Transaction failure publishing segments for sequence [%s]", + sequenceMetadata + ); + } else { + return publishedSegmentsAndMetadata; + } + } + ); + publishWaitList.add(publishFuture); + + // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails. + final SettableFuture handoffFuture = SettableFuture.create(); + handOffWaitList.add(handoffFuture); + + Futures.addCallback( + publishFuture, + new FutureCallback() + { + @Override + public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata) + { + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()), + Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata") + ); + + sequences.remove(sequenceMetadata); + publishingSequences.remove(sequenceMetadata.getSequenceName()); try { - final SequenceMetadata sequenceMetadata = publishQueue.take(); - - Preconditions.checkNotNull(driver); - - if (sequenceMetadata.isSentinel()) { - waitForPublishes.countDown(); - break; - } - - log.info("Publishing segments for sequence [%s]", sequenceMetadata); - - final SegmentsAndMetadata result = driver.publish( - sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()), - sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), - ImmutableList.of(sequenceMetadata.getSequenceName()) - ).get(); - - if (result == null) { - throw new ISE( - "Transaction failure publishing segments for sequence [%s]", - sequenceMetadata - ); - } else { - log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) - ), - Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata") - ); - } - - sequences.remove(sequenceMetadata); - publishingSequences.remove(sequenceMetadata.getSequenceName()); - try { - persistSequences(); - } - catch (IOException e) { - log.error(e, "Unable to persist state, dying"); - Throwables.propagate(e); - } - - final ListenableFuture handOffFuture = driver.registerHandoff(result); - handOffWaitList.add(handOffFuture); + persistSequences(); } - catch (Throwable t) { - if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException - && t.getCause() instanceof InterruptedException))) { - log.warn("Stopping publish thread as we are interrupted, probably we are shutting down"); - } else { - log.makeAlert(t, "Error in publish thread, dying").emit(); - throwableAtomicReference.set(t); - } - Futures.allAsList(handOffWaitList).cancel(true); - waitForPublishes.countDown(); - break; + catch (IOException e) { + log.error(e, "Unable to persist state, dying"); + handoffFuture.setException(e); + throw new RuntimeException(e); } + + Futures.transform( + driver.registerHandoff(publishedSegmentsAndMetadata), + new Function() + { + @Nullable + @Override + public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) + { + if (handoffSegmentsAndMetadata == null) { + log.warn( + "Failed to handoff segments[%s]", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()) + ); + } + handoffFuture.set(handoffSegmentsAndMetadata); + return null; + } + } + ); + } + + @Override + public void onFailure(Throwable t) + { + log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); + handoffFuture.setException(t); } } ); } - private void restoreSequences() throws IOException + private static File getSequencesPersistFile(TaskToolbox toolbox) { - Preconditions.checkNotNull(sequencesPersistFile); + return new File(toolbox.getPersistDir(), "sequences.json"); + } + + private boolean restoreSequences() throws IOException + { + final File sequencesPersistFile = getSequencesPersistFile(toolbox); if (sequencesPersistFile.exists()) { - sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().>readValue( - sequencesPersistFile, new TypeReference>() - { - })); + sequences = new CopyOnWriteArrayList<>( + toolbox.getObjectMapper().>readValue( + sequencesPersistFile, + new TypeReference>() + { + } + ) + ); + return true; + } else { + return false; } } @@ -823,7 +872,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask new TypeReference>() { } - ).writeValue(sequencesPersistFile, sequences); + ).writeValue(getSequencesPersistFile(toolbox), sequences); } private Map getTaskCompletionReports(@Nullable String errorMsg) @@ -877,7 +926,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask result, sequenceMetadata ); - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } catch (InterruptedException e) { log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata); @@ -918,58 +967,25 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask return assignment; } - private void checkAndMaybeThrowException() - { - if (throwableAtomicReference.get() != null) { - Throwables.propagate(throwableAtomicReference.get()); - } - } - /** - * Checks if the pauseRequested flag was set and if so blocks: - * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared - * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared - *

- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the - * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume - * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal - * shouldResume after adjusting pauseMillis for the new value to take effect. + * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. *

* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. *

- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. * * @return true if a pause request was handled, false otherwise */ - private boolean possiblyPause(Set assignment) throws InterruptedException + private boolean possiblyPause() throws InterruptedException { pauseLock.lockInterruptibly(); try { - if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { - pauseMillis = KafkaIndexTask.PAUSE_FOREVER; - pauseRequested = true; - } - if (pauseRequested) { status = Status.PAUSED; - long nanos = 0; hasPaused.signalAll(); while (pauseRequested) { - if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } else { - if (pauseMillis > 0) { - log.info("Pausing ingestion for [%,d] ms", pauseMillis); - nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); - pauseMillis = 0; - } - if (nanos <= 0L) { - pauseRequested = false; // timeout elapsed - } - nanos = shouldResume.awaitNanos(nanos); - } + log.info("Pausing ingestion until resumed"); + shouldResume.await(); } status = Status.READING; @@ -1062,9 +1078,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask return status == Status.PAUSED; } - private void requestPause(long pauseMillis) + private void requestPause() { - this.pauseMillis = pauseMillis; pauseRequested = true; } @@ -1090,7 +1105,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask .addData("partitions", partitionOffsetMap.keySet()) .emit(); // wait for being killed by supervisor - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); } else { log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); } @@ -1225,14 +1240,13 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsetsHTTP( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume, @QueryParam("finish") @DefaultValue("true") final boolean finish, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets, resume, finish); + return setEndOffsets(offsets, finish); } @GET @@ -1276,7 +1290,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @Override public Response setEndOffsets( Map offsets, - final boolean resume, final boolean finish // this field is only for internal purposes, shouldn't be usually set by users ) throws InterruptedException { @@ -1305,7 +1318,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask (latestSequence.getEndOffsets().equals(offsets) && finish)) { log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences); return Response.ok(offsets).build(); - } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) { + } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST) .entity(StringUtils.format( "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", @@ -1339,7 +1352,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets); endOffsets.putAll(offsets); } else { - Preconditions.checkState(!ioConfig.isPauseAfterRead()); // create new sequence final SequenceMetadata newSequence = new SequenceMetadata( latestSequence.getSequenceId() + 1, @@ -1355,17 +1367,19 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask } catch (Exception e) { log.error(e, "Unable to set end offsets, dying"); - throwableAtomicReference.set(e); - Throwables.propagate(e); + backgroundThreadException = e; + // should resume to immediately finish kafka index task as failed + resume(); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(Throwables.getStackTraceAsString(e)) + .build(); } finally { pauseLock.unlock(); } } - if (resume) { - resume(); - } + resume(); return Response.ok(offsets).build(); } @@ -1396,8 +1410,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask /** * Signals the ingestion loop to pause. * - * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely - * * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets * in the response body if the task successfully paused @@ -1406,16 +1418,15 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask @Path("/pause") @Produces(MediaType.APPLICATION_JSON) public Response pauseHTTP( - @QueryParam("timeout") @DefaultValue("0") final long timeout, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return pause(timeout); + return pause(); } @Override - public Response pause(final long timeout) throws InterruptedException + public Response pause() throws InterruptedException { if (!(status == Status.PAUSED || status == Status.READING)) { return Response.status(Response.Status.BAD_REQUEST) @@ -1425,7 +1436,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask pauseLock.lockInterruptibly(); try { - pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout; pauseRequested = true; pollRetryLock.lockInterruptibly(); @@ -1634,22 +1644,6 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask } } - private SequenceMetadata() - { - this.sequenceId = -1; - this.sequenceName = null; - this.startOffsets = null; - this.endOffsets = null; - this.assignments = null; - this.checkpointed = true; - this.sentinel = true; - } - - static SequenceMetadata getSentinelSequenceMetadata() - { - return new SequenceMetadata(); - } - @Override public String toString() { @@ -1723,7 +1717,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask }; } - TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction) + TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction) { return (segments, commitMetadata) -> { final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( @@ -1758,4 +1752,24 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask }; } } + + @Nullable + private static TreeMap> getCheckPointsFromContext( + TaskToolbox toolbox, + KafkaIndexTask task + ) throws IOException + { + final String checkpointsString = task.getContextValue("checkpoints"); + if (checkpointsString != null) { + log.info("Checkpoints [%s]", checkpointsString); + return toolbox.getObjectMapper().readValue( + checkpointsString, + new TypeReference>>() + { + } + ); + } else { + return null; + } + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index d9a7fb20ee8..5d48fe19405 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -31,7 +31,6 @@ import java.util.Map; public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; - private static final boolean DEFAULT_PAUSE_AFTER_READ = false; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; private final String baseSequenceName; @@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig private final KafkaPartitions endPartitions; private final Map consumerProperties; private final boolean useTransaction; - private final boolean pauseAfterRead; private final Optional minimumMessageTime; private final Optional maximumMessageTime; private final boolean skipOffsetGaps; @@ -51,7 +49,6 @@ public class KafkaIOConfig implements IOConfig @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, - @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps @@ -62,7 +59,6 @@ public class KafkaIOConfig implements IOConfig this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; - this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; @@ -117,12 +113,6 @@ public class KafkaIOConfig implements IOConfig return useTransaction; } - @JsonProperty - public boolean isPauseAfterRead() - { - return pauseAfterRead; - } - @JsonProperty public Optional getMaximumMessageTime() { @@ -150,7 +140,6 @@ public class KafkaIOConfig implements IOConfig ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + ", useTransaction=" + useTransaction + - ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + ", maximumMessageTime=" + maximumMessageTime + ", skipOffsetGaps=" + skipOffsetGaps + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index da92f4ffc76..ee31b6aa4b8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -68,8 +68,6 @@ import java.util.stream.Collectors; public class KafkaIndexTask extends AbstractTask implements ChatHandler { - public static final long PAUSE_FOREVER = -1L; - public enum Status { NOT_STARTED, @@ -86,14 +84,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler private static final Random RANDOM = new Random(); static final long POLL_TIMEOUT = 100; static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; - private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; - private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; private final DataSchema dataSchema; private final InputRowParser parser; private final KafkaTuningConfig tuningConfig; private final KafkaIOConfig ioConfig; - private final AuthorizerMapper authorizerMapper; private final Optional chatHandlerProvider; private final KafkaIndexTaskRunner runner; @@ -126,7 +121,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); - this.authorizerMapper = authorizerMapper; final CircularBuffer savedParseExceptions; if (tuningConfig.getMaxSavedParseExceptions() > 0) { savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 3d0886a566c..26643f8be12 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -28,11 +28,11 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskInfoProvider; -import io.druid.indexer.TaskStatus; import io.druid.java.util.common.IAE; import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; @@ -167,19 +167,14 @@ public class KafkaIndexTaskClient public Map pause(final String id) { - return pause(id, 0); - } - - public Map pause(final String id, final long timeout) - { - log.debug("Pause task[%s] timeout[%d]", id, timeout); + log.debug("Pause task[%s]", id); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "pause", - timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null, + null, true ); @@ -361,18 +356,17 @@ public class KafkaIndexTaskClient public boolean setEndOffsets( final String id, final Map endOffsets, - final boolean resume, final boolean finalize ) { - log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize); + log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "offsets/end", - StringUtils.format("resume=%s&finish=%s", resume, finalize), + StringUtils.format("finish=%s", finalize), jsonMapper.writeValueAsBytes(endOffsets), true ); @@ -415,11 +409,6 @@ public class KafkaIndexTaskClient } public ListenableFuture> pauseAsync(final String id) - { - return pauseAsync(id, 0); - } - - public ListenableFuture> pauseAsync(final String id, final long timeout) { return executorService.submit( new Callable>() @@ -427,7 +416,7 @@ public class KafkaIndexTaskClient @Override public Map call() { - return pause(id, timeout); + return pause(id); } } ); @@ -490,7 +479,7 @@ public class KafkaIndexTaskClient } public ListenableFuture setEndOffsetsAsync( - final String id, final Map endOffsets, final boolean resume, final boolean finalize + final String id, final Map endOffsets, final boolean finalize ) { return executorService.submit( @@ -499,7 +488,7 @@ public class KafkaIndexTaskClient @Override public Boolean call() { - return setEndOffsets(id, endOffsets, resume, finalize); + return setEndOffsets(id, endOffsets, finalize); } } ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java index 4f84510008e..e0535373dd3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java @@ -60,12 +60,11 @@ public interface KafkaIndexTaskRunner extends ChatHandler @VisibleForTesting Response setEndOffsets( Map offsets, - boolean resume, boolean finish // this field is only for internal purposes, shouldn't be usually set by users ) throws InterruptedException; @VisibleForTesting - Response pause(long timeout) throws InterruptedException; + Response pause() throws InterruptedException; @VisibleForTesting void resume() throws InterruptedException; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index a32c1ec30da..f057b058517 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -20,14 +20,11 @@ package io.druid.indexing.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.druid.data.input.Committer; @@ -81,12 +78,10 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -102,6 +97,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -174,7 +170,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner private volatile FireDepartmentMetrics fireDepartmentMetrics; private volatile IngestionState ingestionState; - private volatile long pauseMillis = 0; private volatile boolean pauseRequested; LegacyKafkaIndexTaskRunner( @@ -355,7 +350,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -381,7 +376,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } for (ConsumerRecord record : records) { @@ -476,7 +471,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } } @@ -534,32 +529,38 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner sequenceNames.values() ).get(); + final List publishedSegments = published.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()); + + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegments, + Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata") + ); + final Future handoffFuture = driver.registerHandoff(published); - final SegmentsAndMetadata handedOff; + SegmentsAndMetadata handedOff = null; if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOff = handoffFuture.get(); } else { - handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + try { + handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) + .addData("TaskId", task.getId()) + .emit(); + } } if (handedOff == null) { - throw new ISE("Transaction failure publishing segments, aborting"); + log.warn("Failed to handoff segments[%s]", publishedSegments); } else { log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - Iterables.transform( - handedOff.getSegments(), - new Function() - { - @Override - public String apply(DataSegment input) - { - return input.getIdentifier(); - } - } - ) - ), + "Handoff completed for segments[%s] with metadata[%s]", + handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()), Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } @@ -627,50 +628,24 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner } /** - * Checks if the pauseRequested flag was set and if so blocks: - * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared - * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared - *

- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the - * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume - * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal - * shouldResume after adjusting pauseMillis for the new value to take effect. + * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. *

* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. *

- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. * * @return true if a pause request was handled, false otherwise */ - private boolean possiblyPause(Set assignment) throws InterruptedException + private boolean possiblyPause() throws InterruptedException { pauseLock.lockInterruptibly(); try { - if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { - pauseMillis = KafkaIndexTask.PAUSE_FOREVER; - pauseRequested = true; - } - if (pauseRequested) { status = Status.PAUSED; - long nanos = 0; hasPaused.signalAll(); while (pauseRequested) { - if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } else { - if (pauseMillis > 0) { - log.info("Pausing ingestion for [%,d] ms", pauseMillis); - nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); - pauseMillis = 0; - } - if (nanos <= 0L) { - pauseRequested = false; // timeout elapsed - } - nanos = shouldResume.awaitNanos(nanos); - } + log.info("Pausing ingestion until resumed"); + shouldResume.await(); } status = Status.READING; @@ -752,15 +727,14 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner .addData("partitions", partitionOffsetMap.keySet()) .emit(); // wait for being killed by supervisor - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); } else { log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); } } - private void requestPause(long pauseMillis) + private void requestPause() { - this.pauseMillis = pauseMillis; pauseRequested = true; } @@ -941,10 +915,10 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner } @Override - public Response setEndOffsets(Map offsets, boolean resume, boolean finish) throws InterruptedException + public Response setEndOffsets(Map offsets, boolean finish) throws InterruptedException { // finish is not used in this mode - return setEndOffsets(offsets, resume); + return setEndOffsets(offsets); } @POST @@ -953,12 +927,11 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsetsHTTP( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets, resume); + return setEndOffsets(offsets); } @GET @@ -1000,8 +973,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner } public Response setEndOffsets( - Map offsets, - final boolean resume + Map offsets ) throws InterruptedException { if (offsets == null) { @@ -1048,9 +1020,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner pauseLock.unlock(); } - if (resume) { - resume(); - } + resume(); return Response.ok(endOffsets).build(); } @@ -1063,8 +1033,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner /** * Signals the ingestion loop to pause. * - * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely - * * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets * in the response body if the task successfully paused @@ -1073,16 +1041,15 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner @Path("/pause") @Produces(MediaType.APPLICATION_JSON) public Response pauseHTTP( - @QueryParam("timeout") @DefaultValue("0") final long timeout, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return pause(timeout); + return pause(); } @Override - public Response pause(final long timeout) throws InterruptedException + public Response pause() throws InterruptedException { if (!(status == Status.PAUSED || status == Status.READING)) { return Response.status(Response.Status.BAD_REQUEST) @@ -1092,7 +1059,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner pauseLock.lockInterruptibly(); try { - pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout; pauseRequested = true; pollRetryLock.lockInterruptibly(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 029a0332c6f..d8492830a9b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1533,7 +1533,7 @@ public class KafkaSupervisor implements Supervisor log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); for (final String taskId : setEndOffsetTaskIds) { - setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize)); + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } List results = Futures.successfulAsList(setEndOffsetFutures) @@ -1780,7 +1780,6 @@ public class KafkaSupervisor implements Supervisor new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, true, - false, minimumMessageTime, maximumMessageTime, ioConfig.isSkipOffsetGaps() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 49a9b90033d..22a792f7e6f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -71,8 +71,7 @@ public class KafkaIOConfigTest Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(true, config.isUseTransaction()); - Assert.assertEquals(false, config.isPauseAfterRead()); + Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -88,7 +87,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" + " \"skipOffsetGaps\": true\n" @@ -109,8 +107,7 @@ public class KafkaIOConfigTest Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(false, config.isUseTransaction()); - Assert.assertEquals(true, config.isPauseAfterRead()); + Assert.assertFalse(config.isUseTransaction()); Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -125,7 +122,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -145,7 +141,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -165,7 +160,6 @@ public class KafkaIOConfigTest + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -185,7 +179,6 @@ public class KafkaIOConfigTest + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -206,7 +199,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -227,7 +219,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -248,7 +239,6 @@ public class KafkaIOConfigTest + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 75c43a09971..c8d18db4a1c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -28,8 +28,8 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexer.TaskLocation; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.IAE; @@ -57,6 +57,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -143,13 +144,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Assert.assertEquals(false, client.stop(TEST_ID, true)); Assert.assertEquals(false, client.resume(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); - Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10)); + Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID)); Assert.assertEquals(null, client.getStartTime(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true)); Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), false, true)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), true, true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); verifyAll(); } @@ -447,33 +448,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Assert.assertEquals(10, (long) results.get(1)); } - @Test - public void testPauseWithTimeout() throws Exception - { - Capture captured = Capture.newInstance(); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2); - expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replayAll(); - - Map results = client.pause(TEST_ID, 101); - verifyAll(); - - Request request = captured.getValue(); - Assert.assertEquals(HttpMethod.POST, request.getMethod()); - Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"), - request.getUrl() - ); - Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); - - Assert.assertEquals(2, results.size()); - Assert.assertEquals(1, (long) results.get(0)); - Assert.assertEquals(10, (long) results.get(1)); - } - @Test public void testPauseWithSubsequentGetOffsets() throws Exception { @@ -560,13 +534,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, false, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -585,13 +559,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, true, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -739,39 +713,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport } } - @Test - public void testPauseAsyncWithTimeout() throws Exception - { - final int numRequests = TEST_IDS.size(); - Capture captured = Capture.newInstance(CaptureType.ALL); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); - expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ).times(numRequests); - replayAll(); - - List expectedUrls = Lists.newArrayList(); - List>> futures = Lists.newArrayList(); - for (String testId : TEST_IDS) { - expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9"))); - futures.add(client.pauseAsync(testId, 9)); - } - - List> responses = Futures.allAsList(futures).get(); - - verifyAll(); - List requests = captured.getValues(); - - Assert.assertEquals(numRequests, requests.size()); - Assert.assertEquals(numRequests, responses.size()); - for (int i = 0; i < numRequests; i++) { - Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod()); - Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl())); - Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i)); - } - } - @Test public void testGetStatusAsync() throws Exception { @@ -925,9 +866,9 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport TEST_HOST, TEST_PORT, testId, - StringUtils.format("offsets/end?resume=%s&finish=%s", false, true) + StringUtils.format("offsets/end?finish=%s", true) ))); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); @@ -966,11 +907,11 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport TEST_HOST, TEST_PORT, testId, - "offsets/end?resume=true&finish=true" + "offsets/end?finish=true" ) ) ); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 17c2a145f43..f76788d4984 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -382,7 +382,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -424,7 +423,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -500,7 +498,6 @@ public class KafkaIndexTaskTest endPartitions, consumerProps, true, - false, null, null, false @@ -513,7 +510,7 @@ public class KafkaIndexTaskTest final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap() .equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, true, false); + task.getRunner().setEndOffsets(currentOffsets, false); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); @@ -589,7 +586,6 @@ public class KafkaIndexTaskTest endPartitions, consumerProps, true, - false, null, null, false @@ -603,7 +599,7 @@ public class KafkaIndexTaskTest } final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, true, false); + task.getRunner().setEndOffsets(currentOffsets, false); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); @@ -646,7 +642,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, DateTimes.of("2010"), null, false @@ -700,7 +695,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, DateTimes.of("2010"), false @@ -764,7 +758,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -824,7 +817,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -865,7 +857,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -917,7 +908,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -927,10 +917,7 @@ public class KafkaIndexTaskTest final ListenableFuture future = runTask(task); // Wait for task to exit - Assert.assertEquals( - isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED, - future.get().getStatusCode() - ); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -975,7 +962,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1019,7 +1005,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 13L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1101,7 +1086,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1161,7 +1145,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1175,7 +1158,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1229,7 +1211,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1243,7 +1224,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1298,7 +1278,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1312,7 +1291,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1372,7 +1350,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1437,7 +1414,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1451,7 +1427,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1507,7 +1482,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1544,7 +1518,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1596,7 +1569,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1621,7 +1593,7 @@ public class KafkaIndexTaskTest Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus()); Map currentOffsets = objectMapper.readValue( - task.getRunner().pause(0).getEntity().toString(), + task.getRunner().pause().getEntity().toString(), new TypeReference>() { } @@ -1669,93 +1641,6 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 60_000L) - public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception - { - final KafkaIndexTask task = createTask( - null, - new KafkaIOConfig( - "sequence0", - new KafkaPartitions(topic, ImmutableMap.of(0, 1L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - kafkaServer.consumerProperties(), - true, - true, - null, - null, - false - ) - ); - - final ListenableFuture future = runTask(task); - - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the assigned offsets and paused instead of publishing - Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - Assert.assertEquals(ImmutableMap.of(0, 3L), task.getRunner().getEndOffsets()); - Map newEndOffsets = ImmutableMap.of(0, 4L); - task.getRunner().setEndOffsets(newEndOffsets, false, true); - Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - task.getRunner().resume(); - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the updated offsets and paused - Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - // try again but with resume flag == true - newEndOffsets = ImmutableMap.of(0, 7L); - task.getRunner().setEndOffsets(newEndOffsets, true, true); - Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets()); - Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - task.getRunner().resume(); - - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - // Check metrics - Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); - - // Check published metadata - SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); - SegmentDescriptor desc2 = SD(task, "2010/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); - Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) - ); - - // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3)); - } - @Test(timeout = 30_000L) public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception { @@ -1767,7 +1652,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1780,7 +1664,7 @@ public class KafkaIndexTaskTest Thread.sleep(2000); } - task.getRunner().pause(0); + task.getRunner().pause(); while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.PAUSED)) { Thread.sleep(25); @@ -1806,7 +1690,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1860,7 +1743,6 @@ public class KafkaIndexTaskTest new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index d823b60b132..05b7d72b6a3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -263,7 +263,6 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); @@ -1055,7 +1054,6 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFuture(true)).times(2); @@ -1083,7 +1081,6 @@ public class KafkaSupervisorTest extends EasyMockSupport KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig(); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -1171,7 +1168,6 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1260,7 +1256,6 @@ public class KafkaSupervisorTest extends EasyMockSupport Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1573,7 +1568,6 @@ public class KafkaSupervisorTest extends EasyMockSupport taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); @@ -1698,7 +1692,7 @@ public class KafkaSupervisorTest extends EasyMockSupport expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); - expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true)) + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3"); expectLastCall().times(2); @@ -2130,7 +2124,6 @@ public class KafkaSupervisorTest extends EasyMockSupport endPartitions, ImmutableMap.of(), true, - false, minimumMessageTime, maximumMessageTime, false diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index 992b130cf88..22898a5f5cd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; -import io.druid.indexing.common.TaskLock; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.DateTimes; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 9e1763c8a50..416ab859397 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -180,6 +180,7 @@ public interface Task */ TaskStatus run(TaskToolbox toolbox) throws Exception; + @Nullable Map getContext(); @Nullable diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index b833aaaf095..3b6311afced 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -37,12 +37,12 @@ import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReportFileWriter; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LockAcquireAction; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java index 72a7f8b325c..74a1011abc5 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java @@ -23,12 +23,13 @@ import com.google.common.collect.ImmutableList; import io.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; public class SegmentsAndMetadata { - private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.of(), null); + private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null); private final Object commitMetadata; private final ImmutableList segments; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 185419cd3b4..3f0bbdfa70d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -230,7 +229,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver throw e; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } }