auto reset option for Kafka Indexing service (#3842)

* auto reset option for Kafka Indexing service in case message at the offset being fetched is not present anymore at kafka brokers

* review comments

* review comments

* reverted last change

* review comments

* review comments

* fix typo
This commit is contained in:
Parag Jain 2017-02-02 14:57:45 -06:00 committed by Himanshu
parent a457cded28
commit 1aabb45a09
26 changed files with 485 additions and 109 deletions

View File

@ -95,6 +95,36 @@ public class KafkaDataSourceMetadata implements DataSourceMetadata
}
}
@Override
public DataSourceMetadata minus(DataSourceMetadata other)
{
if (!(other instanceof KafkaDataSourceMetadata)) {
throw new IAE(
"Expected instance of %s, got %s",
KafkaDataSourceMetadata.class.getCanonicalName(),
other.getClass().getCanonicalName()
);
}
final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;
if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
// Same topic, remove partitions present in "that" from "this"
final Map<Integer, Long> newMap = Maps.newHashMap();
for (Map.Entry<Integer, Long> entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) {
if(!that.getKafkaPartitions().getPartitionOffsetMap().containsKey(entry.getKey())) {
newMap.put(entry.getKey(), entry.getValue());
}
}
return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap));
} else {
// Different topic, prefer "this".
return this;
}
}
@Override
public boolean equals(Object o)
{

View File

@ -32,7 +32,6 @@ public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_USE_EARLIEST_OFFSET = false;
private final String baseSequenceName;
private final KafkaPartitions startPartitions;
@ -41,7 +40,6 @@ public class KafkaIOConfig implements IOConfig
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final boolean useEarliestOffset;
@JsonCreator
public KafkaIOConfig(
@ -51,8 +49,7 @@ public class KafkaIOConfig implements IOConfig
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
@ -62,7 +59,6 @@ public class KafkaIOConfig implements IOConfig
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : DEFAULT_USE_EARLIEST_OFFSET;
Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
@ -126,12 +122,6 @@ public class KafkaIOConfig implements IOConfig
return minimumMessageTime;
}
@JsonProperty
public boolean isUseEarliestOffset()
{
return useEarliestOffset;
}
@Override
public String toString()
{
@ -143,7 +133,6 @@ public class KafkaIOConfig implements IOConfig
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", useEarliestOffest=" + useEarliestOffset +
'}';
}
}

View File

@ -46,6 +46,7 @@ import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
@ -386,7 +387,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, assignment);
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
}
@ -995,11 +996,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment
) throws InterruptedException
TaskToolbox taskToolbox
) throws InterruptedException, IOException
{
boolean shouldRetry = false;
if(tuningConfig.isResetOffsetAutomatically()) {
final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
boolean doReset = false;
if (tuningConfig.isResetOffsetAutomatically()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
final TopicPartition topicPartition = outOfRangePartition.getKey();
final long nextOffset = outOfRangePartition.getValue();
@ -1012,15 +1014,15 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
// and the current message offset in the kafka partition is more than the
// next message offset that we are trying to fetch
if (leastAvailableOffset > nextOffset) {
resetOffset(consumer, assignment, topicPartition);
} else {
shouldRetry = true;
doReset = true;
resetPartitions.put(topicPartition, nextOffset);
}
}
} else {
shouldRetry = true;
}
if (shouldRetry) {
if (doReset) {
sendResetRequestAndWait(resetPartitions, taskToolbox);
} else {
log.warn("Retrying in %dms", POLL_RETRY_MS);
pollRetryLock.lockInterruptibly();
try {
@ -1035,34 +1037,33 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
}
private void resetOffset(
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment,
TopicPartition topicPartition
)
private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox) throws IOException
{
log.warn(
"Resetting consumer offset to [%s] for partition [%d]",
ioConfig.isUseEarliestOffset() ? "earliest" : "latest",
topicPartition.partition()
);
if (ioConfig.isUseEarliestOffset()) {
consumer.seekToBeginning(topicPartition);
Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
for (Map.Entry<TopicPartition, Long> outOfRangePartition: outOfRangePartitions.entrySet()) {
partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
}
boolean result = taskToolbox.getTaskActionClient()
.submit(new ResetDataSourceMetadataAction(
getDataSource(),
new KafkaDataSourceMetadata(new KafkaPartitions(
ioConfig.getStartPartitions()
.getTopic(),
partitionOffsetMap
))
));
if (result) {
log.warn("Successfully sent the reset request for partitions [%s], waiting to be killed", partitionOffsetMap.keySet());
// wait for being killed by supervisor
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException e) {
throw new RuntimeException("Got interrupted while waiting to be killed");
}
} else {
consumer.seekToEnd(topicPartition);
log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
}
nextOffsets.put(topicPartition.partition(), consumer.position(topicPartition));
log.warn("Consumer is now at offset [%d]", nextOffsets.get(topicPartition.partition()));
// check if we seeked passed the endOffset for this partition
if (nextOffsets.get(topicPartition.partition()) >= endOffsets.get(topicPartition.partition())
&& assignment.remove(topicPartition.partition())) {
log.info(
"Finished reading topic[%s], partition[%,d].",
topicPartition.topic(),
topicPartition.partition()
);
}
// update assignments if something changed
assignPartitions(consumer, topicPartition.topic(), assignment);
}
}

View File

@ -63,6 +63,7 @@ import io.druid.indexing.overlord.TaskRunnerWorkItem;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.metadata.EntryExistsException;
@ -74,6 +75,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -417,10 +419,10 @@ public class KafkaSupervisor implements Supervisor
}
@Override
public void reset()
public void reset(DataSourceMetadata dataSourceMetadata)
{
log.info("Posting ResetNotice");
notices.add(new ResetNotice());
notices.add(new ResetNotice(dataSourceMetadata));
}
public void possiblyRegisterListener()
@ -506,29 +508,113 @@ public class KafkaSupervisor implements Supervisor
private class ResetNotice implements Notice
{
final DataSourceMetadata dataSourceMetadata;
ResetNotice(DataSourceMetadata dataSourceMetadata)
{
this.dataSourceMetadata = dataSourceMetadata;
}
@Override
public void handle()
{
resetInternal();
log.makeAlert("Resetting dataSource [%s]", dataSource).emit();
resetInternal(dataSourceMetadata);
}
}
@VisibleForTesting
void resetInternal()
void resetInternal(DataSourceMetadata dataSourceMetadata)
{
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
if (dataSourceMetadata == null) {
// Reset everything
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
killTaskGroupForPartitions(JavaCompatUtils.keySet(taskGroups));
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
} else {
// Reset only the partitions in dataSourceMetadata if it has not been reset yet
final KafkaDataSourceMetadata resetKafkaMetadata = (KafkaDataSourceMetadata) dataSourceMetadata;
for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
if (resetKafkaMetadata.getKafkaPartitions().getTopic().equals(ioConfig.getTopic())) {
// metadata can be null
final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (metadata != null && !(metadata instanceof KafkaDataSourceMetadata)) {
throw new IAE(
"Expected KafkaDataSourceMetadata from metadata store but found instance of [%s]",
metadata.getClass()
);
}
final KafkaDataSourceMetadata currentMetadata = (KafkaDataSourceMetadata) metadata;
// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
if (partitionOffsetInMetadataStore != null ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue()))) {
doReset = true;
break;
}
}
if (!doReset) {
return;
}
boolean metadataUpdateSuccess = false;
if (currentMetadata == null) {
metadataUpdateSuccess = true;
} else {
final DataSourceMetadata newMetadata = currentMetadata.minus(resetKafkaMetadata);
try {
metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata);
}
catch (IOException e) {
log.error("Resetting DataSourceMetadata failed [%s]", e.getMessage());
Throwables.propagate(e);
}
}
if (metadataUpdateSuccess) {
killTaskGroupForPartitions(JavaCompatUtils.keySet(resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()));
} else {
throw new ISE("Unable to reset metadata");
}
} else {
log.warn(
"Reset metadata topic [%s] and supervisor's topic [%s] do not match",
resetKafkaMetadata.getKafkaPartitions().getTopic(),
ioConfig.getTopic()
);
}
}
}
partitionGroups.clear();
taskGroups.clear();
private void killTaskGroupForPartitions(Set<Integer> partitions)
{
for (Integer partition : partitions) {
TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition));
if (taskGroup != null) {
// kill all tasks in this task group
for (String taskId : JavaCompatUtils.keySet(taskGroup.tasks)) {
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
}
}
partitionGroups.remove(getTaskGroupIdForPartition(partition));
taskGroups.remove(getTaskGroupIdForPartition(partition));
}
}
@VisibleForTesting
@ -1287,8 +1373,7 @@ public class KafkaSupervisor implements Supervisor
consumerProperties,
true,
false,
minimumMessageTime,
ioConfig.isUseEarliestOffset()
minimumMessageTime
);
for (int i = 0; i < replicas; i++) {

View File

@ -94,6 +94,35 @@ public class KafkaDataSourceMetadataTest
);
}
@Test
public void testMinus()
{
Assert.assertEquals(
KM("foo", ImmutableMap.of(1, 3L)),
KM1.minus(KM3)
);
Assert.assertEquals(
KM("foo", ImmutableMap.<Integer, Long>of()),
KM0.minus(KM2)
);
Assert.assertEquals(
KM("foo", ImmutableMap.<Integer, Long>of()),
KM1.minus(KM2)
);
Assert.assertEquals(
KM("foo", ImmutableMap.of(2, 5L)),
KM2.minus(KM1)
);
Assert.assertEquals(
KM("foo", ImmutableMap.<Integer, Long>of()),
KM2.minus(KM2)
);
}
private static KafkaDataSourceMetadata KM(String topic, Map<Integer, Long> offsets)
{
return new KafkaDataSourceMetadata(new KafkaPartitions(topic, offsets));

View File

@ -69,6 +69,7 @@ import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.jackson.DefaultObjectMapper;
@ -304,7 +305,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -347,7 +347,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -402,8 +401,7 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
new DateTime("2010"),
null
new DateTime("2010")
),
null,
null
@ -464,7 +462,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -506,7 +503,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -559,7 +555,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -611,7 +606,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -645,7 +639,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -660,7 +653,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -715,7 +707,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -730,7 +721,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -786,7 +776,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
false,
false,
null,
null
),
null,
@ -801,7 +790,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
false,
false,
null,
null
),
null,
@ -862,7 +850,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -920,7 +907,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -935,7 +921,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -992,7 +977,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -1028,7 +1012,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -1081,7 +1064,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -1165,7 +1147,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
true,
null,
null
),
null,
@ -1253,7 +1234,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -1290,7 +1270,6 @@ public class KafkaIndexTaskTest
kafkaServer.consumerProperties(),
true,
false,
null,
null
),
null,
@ -1468,7 +1447,8 @@ public class KafkaIndexTaskTest
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
emitter
emitter,
new SupervisorManager(null)
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
@ -49,6 +48,7 @@ import io.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import io.druid.indexing.kafka.KafkaPartitions;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.indexing.kafka.test.TestBroker;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
@ -1485,11 +1485,88 @@ public class KafkaSupervisorTest extends EasyMockSupport
expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true);
replay(indexerMetadataStorageCoordinator);
supervisor.resetInternal();
supervisor.resetInternal(null);
verifyAll();
}
@Test
public void testResetDataSourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
Capture<String> captureDataSource = EasyMock.newCapture();
Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture();
KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC,
ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L)
));
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC,
ImmutableMap.of(1, 1000L, 2, 1000L)
));
KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC,
ImmutableMap.of(0, 1000L)
));
reset(indexerMetadataStorageCoordinator);
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(kafkaDataSourceMetadata);
expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(
EasyMock.capture(captureDataSource),
EasyMock.capture(captureDataSourceMetadata)
)).andReturn(true);
replay(indexerMetadataStorageCoordinator);
supervisor.resetInternal(resetMetadata);
verifyAll();
Assert.assertEquals(captureDataSource.getValue(), DATASOURCE);
Assert.assertEquals(captureDataSourceMetadata.getValue(), expectedMetadata);
}
@Test
public void testResetNoDataSourceMetadata() throws Exception
{
supervisor = getSupervisor(1, 1, true, "PT1H", null);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.<Task>of()).anyTimes();
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
supervisor.start();
supervisor.runInternal();
verifyAll();
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions(
KAFKA_TOPIC,
ImmutableMap.of(1, 1000L, 2, 1000L)
));
reset(indexerMetadataStorageCoordinator);
// no DataSourceMetadata in metadata store
expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(null);
replay(indexerMetadataStorageCoordinator);
supervisor.resetInternal(resetMetadata);
verifyAll();
}
@Test
public void testResetRunningTasks() throws Exception
{
@ -1566,7 +1643,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskQueue.shutdown("id3");
replay(taskQueue, indexerMetadataStorageCoordinator);
supervisor.resetInternal();
supervisor.resetInternal(null);
verifyAll();
}
@ -1702,8 +1779,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
ImmutableMap.<String, String>of(),
true,
false,
minimumMessageTime,
null
minimumMessageTime
),
ImmutableMap.<String, Object>of(),
null

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
import java.io.IOException;
public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
{
private final String dataSource;
private final DataSourceMetadata resetMetadata;
public ResetDataSourceMetadataAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("resetMetadata") DataSourceMetadata resetMetadata
)
{
this.dataSource = dataSource;
this.resetMetadata = resetMetadata;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public DataSourceMetadata getResetMetadata()
{
return resetMetadata;
}
@Override
public TypeReference<Boolean> getReturnTypeReference()
{
return new TypeReference<Boolean>()
{
};
}
@Override
public Boolean perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "ResetDataSourceMetadataAction{" +
"dataSource='" + dataSource + '\'' +
", resetMetadata=" + resetMetadata +
'}';
}
}

View File

@ -38,7 +38,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class),
@JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class)
})
public interface TaskAction<RetType>
{

View File

@ -27,6 +27,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.ISE;
import io.druid.timeline.DataSegment;
@ -38,17 +39,20 @@ public class TaskActionToolbox
private final TaskLockbox taskLockbox;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
private final SupervisorManager supervisorManager;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter
ServiceEmitter emitter,
SupervisorManager supervisorManager
)
{
this.taskLockbox = taskLockbox;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
this.supervisorManager = supervisorManager;
}
public TaskLockbox getTaskLockbox()
@ -66,6 +70,11 @@ public class TaskActionToolbox
return emitter;
}
public SupervisorManager getSupervisorManager()
{
return supervisorManager;
}
public void verifyTaskLocks(
final Task task,
final Set<DataSegment> segments

View File

@ -25,12 +25,14 @@ import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.metadata.MetadataSupervisorManager;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -141,7 +143,7 @@ public class SupervisorManager
return supervisor == null ? Optional.<SupervisorReport>absent() : Optional.fromNullable(supervisor.lhs.getStatus());
}
public boolean resetSupervisor(String id)
public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");
@ -152,7 +154,7 @@ public class SupervisorManager
return false;
}
supervisor.lhs.reset();
supervisor.lhs.reset(dataSourceMetadata);
return true;
}

View File

@ -217,7 +217,7 @@ public class SupervisorResource
@Override
public Response apply(SupervisorManager manager)
{
if (manager.resetSupervisor(id)) {
if (manager.resetSupervisor(id, null)) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)

View File

@ -26,11 +26,13 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.rules.ExternalResource;
@ -91,7 +93,8 @@ public class TaskActionTestKit extends ExternalResource
taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
new NoopServiceEmitter()
new NoopServiceEmitter(),
EasyMock.createMock(SupervisorManager.class)
);
testDerbyConnector.createDataSourceTable();
testDerbyConnector.createPendingSegmentsTable();

View File

@ -60,6 +60,7 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
import io.druid.indexing.test.TestDataSegmentPusher;
@ -960,7 +961,8 @@ public class RealtimeIndexTaskTest
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
mdc,
emitter
emitter,
EasyMock.createMock(SupervisorManager.class)
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,

View File

@ -54,6 +54,7 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.query.aggregation.AggregatorFactory;
@ -201,7 +202,7 @@ public class IngestSegmentFirehoseFactoryTest
};
final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory(
ts,
new TaskActionToolbox(tl, mdc, newMockEmitter())
new TaskActionToolbox(tl, mdc, newMockEmitter(), EasyMock.createMock(SupervisorManager.class))
);
SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
EasyMock.replay(notifierFactory);

View File

@ -66,6 +66,7 @@ import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Granularity;
@ -507,7 +508,8 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(emitter);
taskLockbox = new TaskLockbox(taskStorage);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter));
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
SupervisorManager.class)));
File tmpDir = temporaryFolder.newFolder();
taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);

View File

@ -21,6 +21,7 @@ package io.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.metadata.MetadataSupervisorManager;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@ -258,12 +259,12 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.reset();
supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class));
replayAll();
manager.start();
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1"));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home"));
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", null));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", null));
verifyAll();
}

View File

@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.TaskMaster;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
@ -270,9 +272,11 @@ public class SupervisorResourceTest extends EasyMockSupport
@Test
public void testReset() throws Exception
{
Capture<String> id1 = Capture.newInstance();
Capture<String> id2 = Capture.newInstance();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.resetSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.resetSupervisor("my-id-2")).andReturn(false);
EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id1), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(true);
EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id2), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(false);
replayAll();
Response response = supervisorResource.reset("my-id");
@ -283,6 +287,8 @@ public class SupervisorResourceTest extends EasyMockSupport
response = supervisorResource.reset("my-id-2");
Assert.assertEquals(404, response.getStatus());
Assert.assertEquals("my-id", id1.getValue());
Assert.assertEquals("my-id-2", id2.getValue());
verifyAll();
resetAll();

View File

@ -57,6 +57,14 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
throw new UnsupportedOperationException();
}
@Override
public boolean resetDataSourceMetadata(
String dataSource, DataSourceMetadata dataSourceMetadata
) throws IOException
{
return false;
}
@Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{

View File

@ -71,4 +71,14 @@ public interface DataSourceMetadata
* @return merged copy
*/
DataSourceMetadata plus(DataSourceMetadata other);
/**
* Returns a copy of this instance with "other" subtracted.
*
* Behavior is undefined if you pass in an instance of a different class from this one.
*
* @param other another instance
* @return subtracted copy
*/
DataSourceMetadata minus(DataSourceMetadata other);
}

View File

@ -128,6 +128,15 @@ public interface IndexerMetadataStorageCoordinator
*/
boolean deleteDataSourceMetadata(String dataSource);
/**
* Resets dataSourceMetadata entry for 'dataSource' to the one supplied.
*
* @param dataSource identifier
* @param dataSourceMetadata value to set
* @return true if the entry was reset, false otherwise
*/
boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException;
void updateSegmentMetadata(Set<DataSegment> segments) throws IOException;
void deleteSegments(Set<DataSegment> segments) throws IOException;

View File

@ -60,6 +60,12 @@ public final class ObjectMetadata implements DataSourceMetadata
return other;
}
@Override
public DataSourceMetadata minus(DataSourceMetadata other)
{
return this;
}
@Override
public boolean equals(Object o)
{

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord.supervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
/**
* Used as a tombstone marker in the supervisors metadata table to indicate that the supervisor has been removed.
*/
@ -48,7 +50,7 @@ public class NoopSupervisorSpec implements SupervisorSpec
}
@Override
public void reset() {}
public void reset(DataSourceMetadata dataSourceMetadata) {}
};
}
}

View File

@ -19,6 +19,8 @@
package io.druid.indexing.overlord.supervisor;
import io.druid.indexing.overlord.DataSourceMetadata;
public interface Supervisor
{
void start();
@ -33,5 +35,5 @@ public interface Supervisor
SupervisorReport getStatus();
void reset();
void reset(DataSourceMetadata dataSourceMetadata);
}

View File

@ -818,6 +818,41 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
@Override
public boolean resetDataSourceMetadata(
final String dataSource, final DataSourceMetadata dataSourceMetadata
) throws IOException
{
final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(dataSourceMetadata);
final String newCommitMetadataSha1 = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
final int numRows = handle.createStatement(
String.format(
"UPDATE %s SET "
+ "commit_metadata_payload = :new_commit_metadata_payload, "
+ "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ "WHERE dataSource = :dataSource",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("new_commit_metadata_payload", newCommitMetadataBytes)
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
.execute();
return numRows == 1;
}
}
);
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
connector.getDBI().inTransaction(

View File

@ -73,6 +73,7 @@ import io.druid.indexing.overlord.helpers.TaskLogAutoCleanerConfig;
import io.druid.indexing.overlord.http.OverlordRedirectInfo;
import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.overlord.supervisor.SupervisorManager;
import io.druid.indexing.overlord.supervisor.SupervisorResource;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.java.util.common.logger.Logger;
@ -146,6 +147,7 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(SupervisorManager.class).in(LazySingleton.class);
binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));