option to reset offest automatically in case of OffsetOutOfRangeException (#3678)

* option to reset offset automatically in case of OffsetOutOfRangeException
if the next offset is less than the earliest available offset for that partition

* review comments

* refactoring

* refactor

* review comments
This commit is contained in:
Parag Jain 2016-11-21 16:29:46 -06:00 committed by Himanshu
parent 7c63bee7f5
commit 7ee6bb7410
10 changed files with 217 additions and 27 deletions

View File

@ -123,6 +123,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)| |`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))| |`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))| |`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)| |`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)|

View File

@ -32,6 +32,7 @@ public class KafkaIOConfig implements IOConfig
{ {
private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false; private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_USE_EARLIEST_OFFSET = false;
private final String baseSequenceName; private final String baseSequenceName;
private final KafkaPartitions startPartitions; private final KafkaPartitions startPartitions;
@ -40,6 +41,7 @@ public class KafkaIOConfig implements IOConfig
private final boolean useTransaction; private final boolean useTransaction;
private final boolean pauseAfterRead; private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime; private final Optional<DateTime> minimumMessageTime;
private final boolean useEarliestOffset;
@JsonCreator @JsonCreator
public KafkaIOConfig( public KafkaIOConfig(
@ -49,7 +51,8 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("consumerProperties") Map<String, String> consumerProperties, @JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead, @JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset
) )
{ {
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
@ -59,6 +62,7 @@ public class KafkaIOConfig implements IOConfig
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : DEFAULT_USE_EARLIEST_OFFSET;
Preconditions.checkArgument( Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()), startPartitions.getTopic().equals(endPartitions.getTopic()),
@ -122,6 +126,12 @@ public class KafkaIOConfig implements IOConfig
return minimumMessageTime; return minimumMessageTime;
} }
@JsonProperty
public boolean isUseEarliestOffset()
{
return useEarliestOffset;
}
@Override @Override
public String toString() public String toString()
{ {
@ -133,6 +143,7 @@ public class KafkaIOConfig implements IOConfig
", useTransaction=" + useTransaction + ", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead + ", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime + ", minimumMessageTime=" + minimumMessageTime +
", useEarliestOffest=" + useEarliestOffset +
'}'; '}';
} }
} }

View File

@ -38,7 +38,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer; import io.druid.data.input.Committer;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
@ -52,7 +52,6 @@ import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.common.task.TaskResource;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner; import io.druid.query.NoopQueryRunner;
@ -116,7 +115,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
PUBLISHING PUBLISHING
} }
private static final Logger log = new Logger(KafkaIndexTask.class); private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka"; private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random(); private static final Random RANDOM = new Random();
private static final long POLL_TIMEOUT = 100; private static final long POLL_TIMEOUT = 100;
@ -386,17 +385,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
records = consumer.poll(POLL_TIMEOUT); records = consumer.poll(POLL_TIMEOUT);
} }
catch (OffsetOutOfRangeException e) { catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s], retrying in %dms", e.getMessage(), POLL_RETRY_MS); log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
pollRetryLock.lockInterruptibly(); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, assignment);
try { stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
finally {
pollRetryLock.unlock();
}
} }
for (ConsumerRecord<byte[], byte[]> record : records) { for (ConsumerRecord<byte[], byte[]> record : records) {
@ -1000,4 +991,78 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
return false; return false;
} }
private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment
) throws InterruptedException
{
boolean shouldRetry = false;
if(tuningConfig.isResetOffsetAutomatically()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
final TopicPartition topicPartition = outOfRangePartition.getKey();
final long nextOffset = outOfRangePartition.getValue();
// seek to the beginning to get the least available offset
consumer.seekToBeginning(topicPartition);
final long leastAvailableOffset = consumer.position(topicPartition);
// reset the seek
consumer.seek(topicPartition, nextOffset);
// Reset consumer offset if resetOffsetAutomatically is set to true
// and the current message offset in the kafka partition is more than the
// next message offset that we are trying to fetch
if (leastAvailableOffset > nextOffset) {
resetOffset(consumer, assignment, topicPartition);
} else {
shouldRetry = true;
}
}
} else {
shouldRetry = true;
}
if (shouldRetry) {
log.warn("Retrying in %dms", POLL_RETRY_MS);
pollRetryLock.lockInterruptibly();
try {
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
finally {
pollRetryLock.unlock();
}
}
}
private void resetOffset(
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment,
TopicPartition topicPartition
)
{
log.warn(
"Resetting consumer offset to [%s] for partition [%d]",
ioConfig.isUseEarliestOffset() ? "earliest" : "latest",
topicPartition.partition()
);
if (ioConfig.isUseEarliestOffset()) {
consumer.seekToBeginning(topicPartition);
} else {
consumer.seekToEnd(topicPartition);
}
nextOffsets.put(topicPartition.partition(), consumer.position(topicPartition));
log.warn("Consumer is now at offset [%d]", nextOffsets.get(topicPartition.partition()));
// check if we seeked passed the endOffset for this partition
if (nextOffsets.get(topicPartition.partition()) >= endOffsets.get(topicPartition.partition())
&& assignment.remove(topicPartition.partition())) {
log.info(
"Finished reading topic[%s], partition[%,d].",
topicPartition.topic(),
topicPartition.partition()
);
}
// update assignments if something changed
assignPartitions(consumer, topicPartition.topic(), assignment);
}
} }

View File

@ -32,6 +32,7 @@ import java.io.File;
public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
{ {
private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
private final int maxRowsInMemory; private final int maxRowsInMemory;
private final int maxRowsPerSegment; private final int maxRowsPerSegment;
@ -42,6 +43,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final boolean buildV9Directly; private final boolean buildV9Directly;
private final boolean reportParseExceptions; private final boolean reportParseExceptions;
private final long handoffConditionTimeout; private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;
@JsonCreator @JsonCreator
public KafkaTuningConfig( public KafkaTuningConfig(
@ -53,7 +55,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically
) )
{ {
// Cannot be a static because default basePersistDirectory is unique per-instance // Cannot be a static because default basePersistDirectory is unique per-instance
@ -74,6 +77,9 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
this.handoffConditionTimeout = handoffConditionTimeout == null this.handoffConditionTimeout = handoffConditionTimeout == null
? defaults.getHandoffConditionTimeout() ? defaults.getHandoffConditionTimeout()
: handoffConditionTimeout; : handoffConditionTimeout;
this.resetOffsetAutomatically = resetOffsetAutomatically == null
? DEFAULT_RESET_OFFSET_AUTOMATICALLY
: resetOffsetAutomatically;
} }
public static KafkaTuningConfig copyOf(KafkaTuningConfig config) public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
@ -87,7 +93,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
config.indexSpec, config.indexSpec,
config.buildV9Directly, config.buildV9Directly,
config.reportParseExceptions, config.reportParseExceptions,
config.handoffConditionTimeout config.handoffConditionTimeout,
config.resetOffsetAutomatically
); );
} }
@ -145,6 +152,12 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return handoffConditionTimeout; return handoffConditionTimeout;
} }
@JsonProperty
public boolean isResetOffsetAutomatically()
{
return resetOffsetAutomatically;
}
public KafkaTuningConfig withBasePersistDirectory(File dir) public KafkaTuningConfig withBasePersistDirectory(File dir)
{ {
return new KafkaTuningConfig( return new KafkaTuningConfig(
@ -156,7 +169,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
indexSpec, indexSpec,
buildV9Directly, buildV9Directly,
reportParseExceptions, reportParseExceptions,
handoffConditionTimeout handoffConditionTimeout,
resetOffsetAutomatically
); );
} }
@ -171,7 +185,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
indexSpec, indexSpec,
buildV9Directly, buildV9Directly,
reportParseExceptions, reportParseExceptions,
handoffConditionTimeout handoffConditionTimeout,
resetOffsetAutomatically
); );
} }
@ -205,6 +220,9 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
if (handoffConditionTimeout != that.handoffConditionTimeout) { if (handoffConditionTimeout != that.handoffConditionTimeout) {
return false; return false;
} }
if (resetOffsetAutomatically != that.resetOffsetAutomatically) {
return false;
}
if (intermediatePersistPeriod != null if (intermediatePersistPeriod != null
? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod) ? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod)
: that.intermediatePersistPeriod != null) { : that.intermediatePersistPeriod != null) {
@ -215,7 +233,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
: that.basePersistDirectory != null) { : that.basePersistDirectory != null) {
return false; return false;
} }
return !(indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null); return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null;
} }
@Override @Override
@ -230,6 +249,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
result = 31 * result + (buildV9Directly ? 1 : 0); result = 31 * result + (buildV9Directly ? 1 : 0);
result = 31 * result + (reportParseExceptions ? 1 : 0); result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32)); result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
return result; return result;
} }
@ -246,6 +266,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
", buildV9Directly=" + buildV9Directly + ", buildV9Directly=" + buildV9Directly +
", reportParseExceptions=" + reportParseExceptions + ", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout + ", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
'}'; '}';
} }
} }

View File

@ -1261,7 +1261,8 @@ public class KafkaSupervisor implements Supervisor
consumerProperties, consumerProperties,
true, true,
false, false,
minimumMessageTime minimumMessageTime,
ioConfig.isUseEarliestOffset()
); );
for (int i = 0; i < replicas; i++) { for (int i = 0; i < replicas; i++) {

View File

@ -78,6 +78,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
null, null,
null, null,
null, null,
null,
null null
); );
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");

View File

@ -45,6 +45,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
@JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads, @JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries, @JsonProperty("chatRetries") Long chatRetries,
@ -61,7 +62,8 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
indexSpec, indexSpec,
buildV9Directly, buildV9Directly,
reportParseExceptions, reportParseExceptions,
handoffConditionTimeout handoffConditionTimeout,
resetOffsetAutomatically
); );
this.workerThreads = workerThreads; this.workerThreads = workerThreads;
@ -114,6 +116,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
", buildV9Directly=" + getBuildV9Directly() + ", buildV9Directly=" + getBuildV9Directly() +
", reportParseExceptions=" + isReportParseExceptions() + ", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", workerThreads=" + workerThreads + ", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads + ", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries + ", chatRetries=" + chatRetries +

View File

@ -304,8 +304,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -345,8 +347,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -398,8 +402,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
new DateTime("2010") new DateTime("2010"),
null
), ),
null,
null null
); );
@ -458,8 +464,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -498,8 +506,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -549,8 +559,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -599,8 +611,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -631,8 +645,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -644,8 +660,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -697,8 +715,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -710,8 +730,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -764,8 +786,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false, false,
false, false,
null,
null null
), ),
null,
null null
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -777,8 +801,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
false, false,
false, false,
null,
null null
), ),
null,
null null
); );
@ -836,8 +862,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -892,8 +920,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
final KafkaIndexTask task2 = createTask( final KafkaIndexTask task2 = createTask(
@ -905,8 +935,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -960,8 +992,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -994,8 +1028,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -1045,8 +1081,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -1127,8 +1165,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
true, true,
null,
null null
), ),
null,
null null
); );
@ -1213,8 +1253,10 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(), kafkaServer.consumerProperties(),
true, true,
false, false,
null,
null null
), ),
null,
null null
); );
@ -1229,6 +1271,47 @@ public class KafkaIndexTaskTest
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
} }
@Test(timeout = 30_000L)
public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAvailable() throws Exception
{
// Insert data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
for (ProducerRecord<byte[], byte[]> record : RECORDS) {
kafkaProducer.send(record).get();
}
}
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
"sequence0",
new KafkaPartitions("topic0", ImmutableMap.of(0, 200L)),
new KafkaPartitions("topic0", ImmutableMap.of(0, 500L)),
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
true
);
runTask(task);
while (!task.getStatus().equals(KafkaIndexTask.Status.READING)) {
Thread.sleep(2000);
}
int i = 0;
while(i++ < 5) {
Assert.assertEquals(task.getStatus(), KafkaIndexTask.Status.READING);
// Offset should not be reset
Assert.assertTrue(task.getCurrentOffsets().get(0) == 200L);
Thread.sleep(2000);
}
}
private ListenableFuture<TaskStatus> runTask(final Task task) private ListenableFuture<TaskStatus> runTask(final Task task)
{ {
try { try {
@ -1282,7 +1365,8 @@ public class KafkaIndexTaskTest
private KafkaIndexTask createTask( private KafkaIndexTask createTask(
final String taskId, final String taskId,
final KafkaIOConfig ioConfig, final KafkaIOConfig ioConfig,
final Integer maxRowsPerSegment final Integer maxRowsPerSegment,
final Boolean resetOffsetAutomatically
) )
{ {
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
@ -1294,7 +1378,8 @@ public class KafkaIndexTaskTest
null, null,
buildV9Directly, buildV9Directly,
reportParseExceptions, reportParseExceptions,
handoffConditionTimeout handoffConditionTimeout,
resetOffsetAutomatically
); );
return new KafkaIndexTask( return new KafkaIndexTask(
taskId, taskId,

View File

@ -104,7 +104,7 @@ public class KafkaTuningConfigTest
@Test @Test
public void testCopyOf() throws Exception public void testCopyOf() throws Exception
{ {
KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L); KafkaTuningConfig original = new KafkaTuningConfig(1, 2, new Period("PT3S"), new File("/tmp/xxx"), 4, new IndexSpec(), true, true, 5L, null);
KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original);
Assert.assertEquals(1, copy.getMaxRowsInMemory()); Assert.assertEquals(1, copy.getMaxRowsInMemory());

View File

@ -176,6 +176,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
true, true,
false, false,
null, null,
null,
numThreads, numThreads,
TEST_CHAT_THREADS, TEST_CHAT_THREADS,
TEST_CHAT_RETRIES, TEST_CHAT_RETRIES,
@ -1698,7 +1699,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.<String, String>of(), ImmutableMap.<String, String>of(),
true, true,
false, false,
minimumMessageTime minimumMessageTime,
null
), ),
ImmutableMap.<String, Object>of(), ImmutableMap.<String, Object>of(),
null null