Add StreamSupervisor interface (#17151)

Follow up to #17137.

Instead of moving the streaming-only methods to the SeekableStreamSupervisor abstract class, this patch moves them to a separate StreamSupervisor interface. The reason is that the SeekableStreamSupervisor abstract class also has many other abstract methods. The StreamSupervisor interface on the other hand provides a minimal set of functions offering a good middle ground for any custom concrete implementation that doesn't require all the goodies from SeekableStreamSupervisor.
This commit is contained in:
Abhishek Radhakrishnan 2024-09-25 02:22:39 -07:00 committed by GitHub
parent 446ffc325c
commit 9132a65a48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 190 additions and 30 deletions

View File

@ -143,7 +143,7 @@ public class SupervisorManager
if (supervisor == null || supervisor.lhs == null) {
return false;
}
final SeekableStreamSupervisor streamSupervisor = requireSeekableStreamSupervisor(id, "handoff");
final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "handoff");
streamSupervisor.handoffTaskGroupsEarly(taskGroupIds);
return true;
}
@ -279,7 +279,7 @@ public class SupervisorManager
return false;
}
final SeekableStreamSupervisor streamSupervisor = requireSeekableStreamSupervisor(id, "reset");
final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "reset");
if (resetDataSourceMetadata == null) {
streamSupervisor.reset(null);
} else {
@ -306,7 +306,7 @@ public class SupervisorManager
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
final SeekableStreamSupervisor streamSupervisor = requireSeekableStreamSupervisor(supervisorId, "checkPoint");
final StreamSupervisor streamSupervisor = requireStreamSupervisor(supervisorId, "checkPoint");
streamSupervisor.checkpoint(taskGroupId, previousDataSourceMetadata);
return true;
}
@ -450,11 +450,11 @@ public class SupervisorManager
return true;
}
private SeekableStreamSupervisor requireSeekableStreamSupervisor(final String supervisorId, final String operation)
private StreamSupervisor requireStreamSupervisor(final String supervisorId, final String operation)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
if (supervisor.lhs instanceof SeekableStreamSupervisor) {
return (SeekableStreamSupervisor) supervisor.lhs;
if (supervisor.lhs instanceof StreamSupervisor) {
return (StreamSupervisor) supervisor.lhs;
} else {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.UNSUPPORTED)

View File

@ -56,7 +56,7 @@ import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.StreamSupervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
@ -146,7 +146,7 @@ import java.util.stream.Stream;
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
implements Supervisor
implements StreamSupervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION = "scalingSkipReason";
@ -975,6 +975,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, maxNumTasks, this.tuningConfig);
}
@Override
public int getActiveTaskGroupsCount()
{
return activelyReadingTaskGroups.values().size();
@ -1106,6 +1107,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* @param resetDataSourceMetadata required datasource metadata with offsets to reset.
* @throws DruidException if any metadata attribute doesn't match the supervisor's.
*/
@Override
public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata)
{
if (resetDataSourceMetadata == null) {
@ -1973,11 +1975,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/
@Override
public void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
@ -4172,15 +4170,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return ioConfig;
}
/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka/Kinesis Supervisor uses this to merge and handoff segments containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param checkpointMetadata metadata for the sequence to currently checkpoint
*/
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
Preconditions.checkNotNull(checkpointMetadata, "checkpointMetadata");
@ -4258,11 +4248,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return activeTaskMap.build();
}
/**
* Computes maxLag, totalLag and avgLag
*/
public abstract LagStats computeLagStats();
/**
* creates a specific task IOConfig instance for Kafka/Kinesis
*

View File

@ -63,10 +63,10 @@ public class SupervisorManagerTest extends EasyMockSupport
private MetadataSupervisorManager metadataSupervisorManager;
@Mock
private SeekableStreamSupervisor supervisor1;
private StreamSupervisor supervisor1;
@Mock
private SeekableStreamSupervisor supervisor2;
private StreamSupervisor supervisor2;
@Mock
private Supervisor supervisor3;

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import java.util.List;
/**
* An interface for managing supervisors that handle stream-based ingestion tasks.
* <p>
* This interface extends {@link Supervisor} and adds additional functionality for managing
* the lifecycle of stream-based ingestion tasks.
* </p>
*/
public interface StreamSupervisor extends Supervisor
{
/**
* Reset offsets with provided dataSource metadata. The resulting stored offsets should be a union of existing checkpointed
* offsets with provided offsets.
* @param resetDataSourceMetadata required datasource metadata with offsets to reset.
* @throws DruidException if any metadata attribute doesn't match the supervisor's state.
*/
void resetOffsets(DataSourceMetadata resetDataSourceMetadata);
/**
* The definition of checkpoint is not very strict as currently it does not affect data or control path.
* On this call Supervisor can potentially checkpoint data processed so far to some durable storage
* for example - Kafka/Kinesis Supervisor uses this to merge and handoff segments containing at least the data
* represented by {@param currentCheckpoint} DataSourceMetadata
*
* @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing
* @param checkpointMetadata metadata for the sequence to currently checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
/**
* Computes maxLag, totalLag and avgLag
*/
LagStats computeLagStats();
int getActiveTaskGroupsCount();
/**
* Marks the given task groups as ready for segment hand-off irrespective of the task run times.
* In the subsequent run, the supervisor initiates segment publish and hand-off for these task groups and rolls over their tasks.
* taskGroupIds that are not valid or not actively reading are simply ignored.
*/
default void handoffTaskGroupsEarly(List<Integer> taskGroupIds)
{
throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented");
}
}

View File

@ -30,7 +30,7 @@ import java.util.Map;
/**
* An interface representing a general supervisor for managing ingestion tasks. For streaming ingestion use cases,
* see SeekableStreamSupervisor.
* see {@link StreamSupervisor}.
*/
public interface Supervisor
{

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
public class StreamSupervisorTest
{
@Test
public void testDefaultHandoffTaskGroupsEarly()
{
// Create an instance of stream supervisor without overriding handoffTaskGroupsEarly().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
@Override
public void start()
{
}
@Override
public void stop(boolean stopGracefully)
{
}
@Override
public SupervisorReport getStatus()
{
return null;
}
@Override
public SupervisorStateManager.State getState()
{
return null;
}
@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{
}
@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
}
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
}
@Override
public LagStats computeLagStats()
{
return null;
}
@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};
final Exception ex = Assert.assertThrows(
UnsupportedOperationException.class,
() -> streamSupervisor.handoffTaskGroupsEarly(ImmutableList.of(1))
);
Assert.assertEquals(
"Supervisor does not have the feature to handoff task groups early implemented",
ex.getMessage()
);
}
}