Reset offsets supervisor API (#14772)

* Add supervisor /resetOffsets API.

- Add a new endpoint /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets
which accepts DataSourceMetadata as a body parameter.
- Update logs, unit tests and docs.

* Add a new interface method for backwards compatibility.

* Rename

* Adjust tests and javadocs.

* Use CoreInjectorBuilder instead of deprecated makeInjectorWithModules

* UT fix

* Doc updates.

* remove extraneous debugging logs.

* Remove the boolean setting; only ResetHandle() and resetInternal()

* Relax constraints and add a new ResetOffsetsNotice; cleanup old logic.

* A separate ResetOffsetsNotice and some cleanup.

* Minor cleanup

* Add a check & test to verify that sequence numbers are only of type SeekableStreamEndSequenceNumbers

* Add unit tests for the no op implementations for test coverage

* CodeQL fix

* checkstyle from merge conflict

* Doc changes

* DOCUSAURUS code tabs fix. Thanks, Brian!
This commit is contained in:
Abhishek Radhakrishnan 2023-08-17 17:13:10 -04:00 committed by GitHub
parent 2cc3bd6383
commit 37db5d9b81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1208 additions and 12 deletions

View File

@ -3065,7 +3065,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
### Reset a supervisor
Resets the specified supervisor. This endpoint clears stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). It kills and recreates active tasks to read from valid positions.
Resets the specified supervisor. This endpoint clears _all_ stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). It kills and recreates active tasks to read from valid positions.
Use this endpoint to recover from a stopped state due to missing offsets in Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may result in skipped messages and lead to data loss or duplicate data.
@ -3130,6 +3130,104 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
</details>
### Reset Offsets for a supervisor
Resets the specified offsets for a supervisor. This endpoint clears _only_ the specified offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading.
If there are no stored offsets, the specified offsets will be set in the metadata store. The supervisor will start from the reset offsets for the partitions specified and for the other partitions from the stored offset.
It kills and recreates active tasks pertaining to the partitions specified to read from valid offsets.
Use this endpoint to selectively reset offsets for partitions without resetting the entire set.
#### URL
<code class="postAPI">POST</code> <code>/druid/indexer/v1/supervisor/:supervisorId/resetOffsets</code>
#### Responses
<!--DOCUSAURUS_CODE_TABS-->
<!--200 SUCCESS-->
*Successfully reset offsets*
<!--404 NOT FOUND-->
*Invalid supervisor ID*
<!--END_DOCUSAURUS_CODE_TABS-->
---
#### Reset Offsets Metadata
This section presents the structure and details of the reset offsets metadata payload.
| Field | Type | Description | Required |
|---------|---------|---------|---------|
| `type` | String | The type of reset offsets metadata payload. It must match the supervisor's `type`. Possible values: `kafka` or `kinesis`. | Yes |
| `partitions` | Object | An object representing the reset metadata. See below for details. | Yes |
#### Partitions
The following table defines the fields within the `partitions` object in the reset offsets metadata payload.
| Field | Type | Description | Required |
|---------|---------|---------|---------|
| `type` | String | Must be set as `end`. Indicates the end sequence numbers for the reset offsets. | Yes |
| `stream` | String | The stream to be reset. It must be a valid stream consumed by the supervisor. | Yes |
| `partitionOffsetMap` | Object | A map of partitions to corresponding offsets for the stream to be reset.| Yes |
#### Sample request
The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading
from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, "1": 10, "2": 20, "3": 40}`.
<!--DOCUSAURUS_CODE_TABS-->
<!--cURL-->
```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
--header 'Content-Type: application/json'
--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_stream","partitionOffsetMap":{"0":100, "2": 650}}}'
```
<!--HTTP-->
```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
{
"type": "kafka",
"partitions": {
"type": "end",
"stream": "ads_media_stream",
"partitionOffsetMap": {
"0": 100,
"2": 650
}
}
}
```
The above operation will reset offsets only for partitions 0 and 2 to 100 and 650 respectively. After a successful reset,
when the supervisor's tasks restart, they will resume reading from `{"0": 100, "1": 10, "2": 650, "3": 40}`.
<!--END_DOCUSAURUS_CODE_TABS-->
#### Sample response
<details>
<summary>Click to show sample response</summary>
```json
{
"id": "social_media"
}
```
</details>
### Terminate a supervisor
Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When terminated, a tombstone marker is placed in the database to prevent reloading on restart.

View File

@ -131,6 +131,70 @@ to start and in flight tasks will fail. This operation enables you to recover fr
Note that the supervisor must be running for this endpoint to be available.
## Resetting Offsets for a Supervisor
The supervisor must be running for this endpoint to be available.
The `POST /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets` operation clears stored
offsets, causing the supervisor to start reading from the specified offsets. After resetting stored
offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions,
so that tasks begin reading from specified offsets. For partitions that are not specified in this operation, the supervisor
will resume from the last stored offset.
Use care when using this operation! Resetting offsets for a supervisor may cause Kafka messages to be skipped or read
twice, resulting in missing or duplicate data.
#### Sample request
The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading
from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 40}`.
<!--DOCUSAURUS_CODE_TABS-->
<!--cURL-->
```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
--header 'Content-Type: application/json'
--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0": 3, "ads_media_bar:1": 12}}}'
```
<!--HTTP-->
```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
{
"type": "kafka",
"partitions": {
"type": "end",
"stream": "ads_media_foo|ads_media_bar",
"partitionOffsetMap": {
"ads_media_foo:0": 3,
"ads_media_bar:1": 12
}
}
}
```
The above operation will reset offsets for `ads_media_foo` partition 0 and `ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a successful reset,
when the supervisor's tasks restart, they will resume reading from `{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 12}`.
<!--END_DOCUSAURUS_CODE_TABS-->
#### Sample response
<details>
<summary>Click to show sample response</summary>
```json
{
"id": "social_media"
}
```
</details>
## Terminating Supervisors
The `POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` operation terminates a supervisor and causes all

View File

@ -45,7 +45,7 @@ See [Supervisor API](../../api-reference/supervisor-api.md) for more information
|--------|----|-----------|--------|
|`type`|String|The supervisor type; this should always be `kinesis`.|Yes|
|`spec`|Object|The container object for the supervisor configuration.|Yes|
|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task.|Yes|
|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) object for configuring Kinesis connection and I/O-related settings for the supervisor and indexing task.|Yes|
|`dataSchema`|Object|The schema used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for more information.|Yes|
|`tuningConfig`|Object|The [tuning configuration](#supervisor-tuning-configuration) object for configuring performance-related settings for the supervisor and indexing tasks.|No|
@ -593,6 +593,16 @@ for the generated segments to be accepted. If the messages at the expected start
no longer available in Kinesis (typically because the message retention period has elapsed or the topic was
removed and re-created) the supervisor will refuse to start and in-flight tasks will fail. This endpoint enables you to recover from this condition.
### Resetting Offsets for a supervisor
To reset partition offsets for a supervisor, send a `POST` request to the `/druid/indexer/v1/supervisor/:supervisorId/resetOffsets` endpoint. This endpoint clears stored
sequence numbers, prompting the supervisor to start reading from the specified offsets.
After resetting stored offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions,
so that tasks begin reading specified offsets. For partitions that are not specified in this operation, the supervisor will resume from the last
stored offset.
Use this endpoint with caution as it may result in skipped messages, leading to data loss or duplicate data.
### Terminate a supervisor
To terminate a supervisor and its associated indexing tasks, send a `POST` request to the `/druid/indexer/v1/supervisor/:supervisorId/terminate` endpoint.

View File

@ -277,6 +277,12 @@ public class MaterializedViewSupervisor implements Supervisor
}
}
@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
throw new UnsupportedOperationException("Reset offsets not supported in MaterializedViewSupervisor");
}
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

View File

@ -387,4 +387,37 @@ public class MaterializedViewSupervisorTest
EasyMock.replay(mock);
supervisor.run();
}
@Test
public void testResetOffsetsNotSupported()
{
MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim"))),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
true,
objectMapper,
taskMaster,
taskStorage,
metadataSupervisorManager,
sqlSegmentsMetadataManager,
indexerMetadataStorageCoordinator,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
EasyMock.createMock(ChatHandlerProvider.class),
new SupervisorStateManagerConfig()
);
MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor();
Assert.assertThrows(
"Reset offsets not supported in MaterializedViewSupervisor",
UnsupportedOperationException.class,
() -> supervisor.resetOffsets(null)
);
}
}

View File

@ -19,11 +19,19 @@
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
@ -163,6 +171,37 @@ public class KafkaDataSourceMetadataTest
);
}
@Test
public void testKafkaDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of());
String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(kdm1, dsMeta1);
KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(kdm2, dsMeta2);
}
@Test
public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr1 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(dsMeta1, expectedKdm1);
KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L, 2, 1900L));
String kdmStr2 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3, \"2\":1900},\"partitionOffsetMap\":{\"1\":3, \"2\":1900},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(dsMeta2, expectedKdm2);
}
private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
{
Map<KafkaTopicPartition, Long> newOffsets = CollectionUtils.mapKeys(
@ -188,4 +227,21 @@ public class KafkaDataSourceMetadataTest
);
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", newOffsets));
}
private static ObjectMapper createObjectMapper()
{
DruidModule module = new KafkaIndexTaskModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
).build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}

View File

@ -20,10 +20,18 @@
package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.junit.Assert;
import org.junit.Test;
@ -217,6 +225,37 @@ public class KinesisDataSourceMetadataTest
);
}
@Test
public void testKinesisDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KinesisDataSourceMetadata kdm1 = startMetadata(ImmutableMap.of(), ImmutableSet.of());
String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(kdm1, dsMeta1);
KinesisDataSourceMetadata kdm2 = startMetadata(ImmutableMap.of("1", "3"), ImmutableSet.of());
String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(kdm2, dsMeta2);
}
@Test
public void testKinesisDataSourceMetadataSerde() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KinesisDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of("1", "5"));
String kdmStr1 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":5},\"partitionOffsetMap\":{\"1\":5},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(dsMeta1, expectedKdm1);
KinesisDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of("1", "10", "2", "19"));
String kdmStr2 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":10, \"2\":19},\"partitionOffsetMap\":{\"1\":10, \"2\":19},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(dsMeta2, expectedKdm2);
}
private static KinesisDataSourceMetadata simpleStartMetadata(Map<String, String> sequences)
{
return startMetadata(sequences, sequences.keySet());
@ -233,4 +272,20 @@ public class KinesisDataSourceMetadataTest
{
return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", sequences));
}
private static ObjectMapper createObjectMapper()
{
DruidModule module = new KinesisIndexingServiceModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
).build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}

View File

@ -2636,7 +2636,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.resetInternal(null);
verifyAll();
}
@Test

View File

@ -201,7 +201,7 @@ public class SupervisorManager
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.isHealthy());
}
public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata)
public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetDataSourceMetadata)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");
@ -212,7 +212,11 @@ public class SupervisorManager
return false;
}
supervisor.lhs.reset(dataSourceMetadata);
if (resetDataSourceMetadata == null) {
supervisor.lhs.reset(null);
} else {
supervisor.lhs.resetOffsets(resetDataSourceMetadata);
}
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();

View File

@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
import org.apache.druid.java.util.common.StringUtils;
@ -45,6 +46,7 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
@ -493,10 +495,31 @@ public class SupervisorResource
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response reset(@PathParam("id") final String id)
{
return handleResetRequest(id, null);
}
@POST
@Path("/{id}/resetOffsets")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response resetOffsets(
@PathParam("id") final String id,
final DataSourceMetadata resetDataSourceMetadata
)
{
return handleResetRequest(id, resetDataSourceMetadata);
}
private Response handleResetRequest(
final String id,
@Nullable final DataSourceMetadata resetDataSourceMetadata
)
{
return asLeaderWithSupervisorManager(
manager -> {
if (manager.resetSupervisor(id, null)) {
if (manager.resetSupervisor(id, resetDataSourceMetadata)) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)

View File

@ -42,6 +42,8 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@ -61,6 +63,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl;
@ -95,6 +98,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
@ -608,6 +612,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
private class ResetOffsetsNotice implements Notice
{
final DataSourceMetadata dataSourceMetadata;
private static final String TYPE = "reset_offsets_notice";
ResetOffsetsNotice(
final DataSourceMetadata dataSourceMetadata
)
{
this.dataSourceMetadata = dataSourceMetadata;
}
@Override
public void handle()
{
resetOffsetsInternal(dataSourceMetadata);
}
@Override
public String getType()
{
return TYPE;
}
}
protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
@ -998,12 +1027,59 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@Override
public void reset(DataSourceMetadata dataSourceMetadata)
public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
{
log.info("Posting ResetNotice");
log.info("Posting ResetNotice with datasource metadata [%s]", dataSourceMetadata);
addNotice(new ResetNotice(dataSourceMetadata));
}
/**
* Reset offsets with provided dataSource metadata. Validates {@code resetDataSourceMetadata},
* creates a {@code ResetOffsetsNotice} with the metadata and adds it to the notice queue. The resulting stored offsets
* is a union of existing checkpointed offsets and provided offsets.
* @param resetDataSourceMetadata required datasource metadata with offsets to reset.
* @throws DruidException if any metadata attribute doesn't match the supervisor's.
*/
@Override
public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
{
if (resetDataSourceMetadata == null) {
throw InvalidInput.exception("Reset dataSourceMetadata is required for resetOffsets.");
}
if (!checkSourceMetadataMatch(resetDataSourceMetadata)) {
throw InvalidInput.exception(
"Datasource metadata instance does not match required, found instance of [%s].",
resetDataSourceMetadata.getClass()
);
}
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) resetDataSourceMetadata;
final SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> streamSequenceNumbers = resetMetadata.getSeekableStreamSequenceNumbers();
if (!(streamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers)) {
throw InvalidInput.exception(
"Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[%s], but found[%s].",
resetMetadata,
SeekableStreamEndSequenceNumbers.class.getSimpleName(),
streamSequenceNumbers.getClass().getSimpleName()
);
}
final String resetStream = streamSequenceNumbers.getStream();
if (!ioConfig.getStream().equals(resetStream)) {
throw InvalidInput.exception(
"Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is consuming stream[%s].",
resetStream,
supervisorId,
ioConfig.getStream()
);
}
log.info("Posting ResetOffsetsNotice with reset dataSource metadata[%s]", resetDataSourceMetadata);
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
@ -1693,6 +1769,70 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
/**
* Reset offsets with the data source metadata. If checkpoints exist, the resulting stored offsets will be a union of
* existing checkpointed offsets and provided offsets; any checkpointed offsets not specified in the metadata will be
* preserved as-is. If checkpoints don't exist, the provided reset datasource metdadata will be inserted into
* the metadata storage. Once the offsets are reset, any active tasks serving the partition offsets will be restarted.
* @param dataSourceMetadata Required reset data source metdata. Assumed that the metadata is validated.
*/
public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMetadata)
{
log.info("Reset offsets for dataSource[%s] with metadata[%s]", dataSource, dataSourceMetadata);
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) dataSourceMetadata;
final boolean metadataUpdateSuccess;
final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
if (metadata == null) {
log.info("Checkpointed metadata in null for dataSource[%s] - inserting metadata[%s]", dataSource, resetMetadata);
metadataUpdateSuccess = indexerMetadataStorageCoordinator.insertDataSourceMetadata(dataSource, resetMetadata);
} else {
if (!checkSourceMetadataMatch(metadata)) {
throw InvalidInput.exception(
"Datasource metadata instance does not match required, found instance of [%s]",
metadata.getClass()
);
}
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) metadata;
final DataSourceMetadata newMetadata = currentMetadata.plus(resetMetadata);
log.info("Current checkpointed metadata[%s], new metadata[%s] for dataSource[%s]", currentMetadata, newMetadata, dataSource);
try {
metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata);
}
catch (IOException e) {
log.error("Reset offsets for dataSource[%s] with metadata[%s] failed [%s]", dataSource, newMetadata, e.getMessage());
throw new RuntimeException(e);
}
}
if (!metadataUpdateSuccess) {
throw new ISE("Unable to reset metadata[%s] for datasource[%s]", dataSource, dataSourceMetadata);
}
resetMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(
ImmutableSet.of(partition),
"DataSourceMetadata is updated while reset offsets is called"
);
activelyReadingTaskGroups.remove(groupId);
// killTaskGroupForPartitions() cleans up partitionGroups.
// Add the removed groups back.
partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>());
partitionOffsets.put(partition, getNotSetMarker());
});
}
private void killTask(final String id, String reasonFormat, Object... args)
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();

View File

@ -22,7 +22,10 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.easymock.Capture;
@ -327,6 +330,33 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
@Test
public void testResetSupervisorWithSpecificOffsets()
{
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
"id1", new TestSupervisorSpec("id1", supervisor1)
);
DataSourceMetadata datasourceMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of("0", "10", "1", "20", "2", "30"),
ImmutableSet.of()
)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.resetOffsets(datasourceMetadata);
replayAll();
manager.start();
Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", datasourceMetadata));
Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", datasourceMetadata));
verifyAll();
}
@Test
public void testCreateSuspendResumeAndStopSupervisor()
{

View File

@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
@ -1198,6 +1200,53 @@ public class SupervisorResourceTest extends EasyMockSupport
verifyAll();
}
@Test
public void testResetOffsets()
{
Capture<String> id1 = Capture.newInstance();
Capture<String> id2 = Capture.newInstance();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.resetSupervisor(
EasyMock.capture(id1),
EasyMock.anyObject(DataSourceMetadata.class)
)).andReturn(true);
EasyMock.expect(supervisorManager.resetSupervisor(
EasyMock.capture(id2),
EasyMock.anyObject(DataSourceMetadata.class)
)).andReturn(false);
replayAll();
DataSourceMetadata datasourceMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
"topic",
ImmutableMap.of("0", "10", "1", "20", "2", "30"),
ImmutableSet.of()
)
);
Response response = supervisorResource.resetOffsets("my-id", datasourceMetadata);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
response = supervisorResource.resetOffsets("my-id-2", datasourceMetadata);
Assert.assertEquals(404, response.getStatus());
Assert.assertEquals("my-id", id1.getValue());
Assert.assertEquals("my-id-2", id2.getValue());
verifyAll();
resetAll();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();
response = supervisorResource.terminate("my-id");
Assert.assertEquals(503, response.getStatus());
verifyAll();
}
@Test
public void testNoopSupervisorSpecSerde() throws Exception
{

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
{
@JsonCreator
public TestSeekableStreamDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers)
{
super(seekableStreamSequenceNumbers);
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createConcreteDataSourceMetaData(
SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers
)
{
return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
}
@Override
public DataSourceMetadata asStartMetadata()
{
return null;
}
}

View File

@ -32,6 +32,8 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
@ -56,6 +58,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
@ -81,6 +84,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -90,6 +94,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
@ -171,8 +176,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
EasyMock.expectLastCall().times(0, 1);
EasyMock
.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes();
}
@ -831,7 +835,6 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null
);
supervisor.start();
Assert.assertTrue(supervisor.stateManager.isHealthy());
@ -1039,6 +1042,523 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
);
}
@Test
public void testSupervisorResetAllWithCheckpoints() throws InterruptedException
{
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(
true
);
taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset");
EasyMock.expectLastCall();
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.reset(null);
validateSupervisorStateAfterResetOffsets(supervisor, ImmutableMap.of(), 0);
}
@Test
public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws InterruptedException, IOException
{
final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "0", "1", "10", "2", "20", "3", "30");
final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "1000", "2", "2500");
final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "1000", "1", "10", "2", "2500", "3", "30");
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
checkpointOffsets
)
)
);
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
expectedOffsets
))
)).andReturn(
true
);
taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
checkpointOffsets,
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
resetOffsets
)
);
Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.resetOffsets(resetMetadata);
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
}
@Test
public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException
{
final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "100");
final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "10", "1", "8");
final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "10", "1", "8", "2", "100");
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
checkpointOffsets
)
)
);
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"stream",
expectedOffsets
)
))).andReturn(true);
taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
);
final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
resetOffsets
)
);
Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.resetOffsets(resetMetadata);
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
}
@Test
public void testSupervisorResetOffsetsWithNoCheckpoints() throws InterruptedException
{
final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "10", "1", "8");
final ImmutableMap<String, String> expectedOffsets = ImmutableMap.copyOf(resetOffsets);
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null);
EasyMock.expect(indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"stream",
expectedOffsets
)
))).andReturn(true);
taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
// Spin off three active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
);
final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
resetOffsets
)
);
Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.resetOffsets(resetMetadata);
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
}
@Test
public void testSupervisorResetWithNoPartitions() throws IOException, InterruptedException
{
final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6");
final ImmutableMap<String, String> resetOffsets = ImmutableMap.of();
final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "5", "1", "6");
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
checkpointOffsets
)
)
);
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"stream",
expectedOffsets
)
))).andReturn(true);
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
resetOffsets
)
);
Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.resetOffsets(resetMetadata);
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 2);
}
@Test
public void testSupervisorResetWithNewPartition() throws IOException, InterruptedException
{
final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6");
final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("2", "20");
final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "20");
EasyMock.expect(spec.isSuspended()).andReturn(false);
EasyMock.reset(indexerMetadataStorageCoordinator);
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
checkpointOffsets
)
)
);
EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"stream",
expectedOffsets
)
))).andReturn(true);
taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
EasyMock.expectLastCall();
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
STREAM,
resetOffsets
)
);
Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(0, supervisor.getNoticesQueueSize());
Assert.assertEquals(0, supervisor.getPartitionOffsets().size());
supervisor.resetOffsets(resetMetadata);
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
}
@Test
public void testSupervisorNoResetDataSourceMetadata()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
verifyAll();
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () ->
supervisor.resetOffsets(null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Reset dataSourceMetadata is required for resetOffsets."
)
);
}
@Test
public void testSupervisorResetWithInvalidStartSequenceMetadata()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
verifyAll();
final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
"i-am-not-real",
ImmutableMap.of("0", "10", "1", "20", "2", "30"),
ImmutableSet.of()
)
);
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () ->
supervisor.resetOffsets(dataSourceMetadata)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
StringUtils.format(
"Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[SeekableStreamEndSequenceNumbers], but found[SeekableStreamStartSequenceNumbers].",
dataSourceMetadata
)
)
);
}
@Test
public void testSupervisorResetInvalidStream()
{
EasyMock.expect(spec.isSuspended()).andReturn(false);
replayAll();
final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);
supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "0"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);
verifyAll();
final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(
"i-am-not-real",
ImmutableMap.of("0", "10", "1", "20", "2", "30")
)
);
MatcherAssert.assertThat(
Assert.assertThrows(DruidException.class, () ->
supervisor.resetOffsets(dataSourceMetadata)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Stream[i-am-not-real] doesn't exist in the supervisor[testSupervisorId]. Supervisor is consuming stream[stream]."
)
);
}
@Test
public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
{
@ -1063,6 +1583,25 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
Assert.assertEquals(0, emitter.getEvents().size());
}
private void validateSupervisorStateAfterResetOffsets(
final TestSeekableStreamSupervisor supervisor,
final ImmutableMap<String, String> expectedResetOffsets,
final int expectedActiveTaskCount
) throws InterruptedException
{
// Wait for the notice queue to be drained asynchronously before we validate the supervisor's final state.
while (supervisor.getNoticesQueueSize() > 0) {
Thread.sleep(100);
}
Thread.sleep(1000);
Assert.assertEquals(expectedActiveTaskCount, supervisor.getActiveTaskGroupsCount());
Assert.assertEquals(expectedResetOffsets.size(), supervisor.getPartitionOffsets().size());
for (Map.Entry<String, String> entry : expectedResetOffsets.entrySet()) {
Assert.assertEquals(supervisor.getNotSetMarker(), supervisor.getPartitionOffsets().get(entry.getKey()));
}
verifyAll();
}
@Test
public void testScheduleReporting()
{
@ -1419,7 +1958,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
@Override
protected int getTaskGroupIdForPartition(String partition)
{
return 0;
try {
return Integer.parseInt(partition) % spec.getIoConfig().getTaskCount();
}
catch (NumberFormatException e) {
return 0;
}
}
@Override

View File

@ -163,6 +163,11 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
}
@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
}
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@ -61,8 +62,20 @@ public interface Supervisor
return null; // default implementation for interface compatability; returning null since true or false is misleading
}
/**
* Resets all offsets for a dataSource.
* @param dataSourceMetadata optional dataSource metadata.
*/
void reset(DataSourceMetadata dataSourceMetadata);
/**
* Reset offsets with provided dataSource metadata. The resulting stored offsets should be a union of existing checkpointed
* offsets with provided offsets.
* @param resetDataSourceMetadata required datasource metadata with offsets to reset.
* @throws DruidException if any metadata attribute doesn't match the supervisor's state.
*/
void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage

View File

@ -19,8 +19,10 @@
package org.apache.druid.indexing;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.junit.Assert;
@ -71,4 +73,20 @@ public class NoopSupervisorSpecTest
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty());
}
@Test
public void testNoppSupervisorResetOffsetsDoNothing()
{
NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null);
Supervisor noOpSupervisor = expectedSpec.createSupervisor();
Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
noOpSupervisor.resetOffsets(null);
Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, noOpSupervisor.getState());
Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
noOpSupervisor.resetOffsets(new ObjectMetadata("someObject"));
Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount());
Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, noOpSupervisor.getState());
}
}

View File

@ -1098,6 +1098,7 @@ numProcessors
q.size
repartitionTransitionDuration
replicastaskCounttaskCount
resetOffsets
resetuseEarliestSequenceNumberPOST
resumePOST
statusrecentErrorsdruid.supervisor.maxStoredExceptionEventsstatedetailedStatestatedetailedStatestatestatePENDINGRUNNINGSUSPENDEDSTOPPINGUNHEALTHY_SUPERVISORUNHEALTHY_TASKSdetailedStatestatedruid.supervisor.unhealthinessThresholddruid.supervisor.taskUnhealthinessThresholdtaskDurationtaskCountreplicasdetailedStatedetailedStateRUNNINGPOST