Add unit tests

This commit is contained in:
George Wu 2024-11-25 16:15:33 -05:00
parent 64e3851160
commit 1c05c2f5f5
4 changed files with 222 additions and 2 deletions

View File

@ -225,7 +225,7 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.SERVICE_UNAVAILABLE)
.build("Cannot append segments to [%s] right now." +
.build("Cannot append segments to [%s] right now. " +
"There might be another task waiting to publish its segments. Check the overlord logs for details.",
dataSourceToInsert
);

View File

@ -22,25 +22,45 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
public class SegmentTransactionalInsertActionTest
import javax.annotation.Nullable;
import java.util.Map;
@RunWith(EasyMockRunner.class)
public class SegmentTransactionalInsertActionTest extends EasyMockSupport
{
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
@ -86,6 +106,12 @@ public class SegmentTransactionalInsertActionTest
1024
);
@Mock
private TaskActionToolbox taskActionToolbox;
@Mock
SupervisorManager supervisorManager;
private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
throws InterruptedException
{
@ -175,4 +201,87 @@ public class SegmentTransactionalInsertActionTest
);
Assert.assertTrue(exception.getMessage().contains("are not covered by locks"));
}
@Test
public void testStreamingTaskNotPublishable() throws Exception
{
// Mocking the config classes because they have a lot of logic in their constructors that we don't really want here.
SeekableStreamIndexTaskTuningConfig taskTuningConfig = EasyMock.createMock(SeekableStreamIndexTaskTuningConfig.class);
SeekableStreamIndexTaskIOConfig taskIOConfig = EasyMock.createMock(SeekableStreamIndexTaskIOConfig.class);
final SeekableStreamIndexTask streamingTask = new TestSeekableStreamIndexTask(
"id1",
null,
DataSchema.builder().withDataSource(DATA_SOURCE).build(),
taskTuningConfig,
taskIOConfig,
ImmutableMap.of(),
"0"
);
EasyMock.expect(taskActionToolbox.getSupervisorManager()).andReturn(supervisorManager);
EasyMock.expect(taskActionToolbox.getTaskLockbox()).andReturn(actionTestKit.getTaskLockbox());
EasyMock.expect(supervisorManager.canPublishSegments(EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyString()))
.andReturn(false);
actionTestKit.getTaskLockbox().add(streamingTask);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, streamingTask, INTERVAL, 5000);
replayAll();
DruidException druidException = Assert.assertThrows(DruidException.class, () -> SegmentTransactionalInsertAction.appendAction(
ImmutableSet.of(SEGMENT1),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableList.of(1)),
null
).perform(
streamingTask,
taskActionToolbox
));
verifyAll();
Assert.assertEquals(503, druidException.getStatusCode());
}
private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
public TestSeekableStreamIndexTask(
String id,
@Nullable TaskResource taskResource,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig,
SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
@Nullable Map<String, Object> context,
@Nullable String groupId
)
{
super(
id,
taskResource,
dataSchema,
tuningConfig,
ioConfig,
context,
groupId
);
}
@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
return null;
}
@Override
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return null;
}
@Override
public String getType()
{
return "test";
}
}
}

View File

@ -291,6 +291,52 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
}
@Test
public void testCanPublishSegments_returnsFalse()
{
String taskId = "id1";
String supervisorId = "supervisor-id";
Integer groupId = 1;
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andReturn(false);
replayAll();
manager.start();
Assert.assertFalse(manager.canPublishSegments(supervisorId, groupId, taskId));
verifyAll();
}
@Test
public void testCanPublishSegments_throwsException_returnsTrue()
{
String taskId = "id1";
String supervisorId = "supervisor-id";
Integer groupId = 1;
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)
);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andThrow(new RuntimeException());
replayAll();
manager.start();
Assert.assertTrue(manager.canPublishSegments(supervisorId, groupId, taskId));
verifyAll();
}
@Test
public void testStartAlreadyStarted()
{

View File

@ -100,4 +100,69 @@ public class StreamSupervisorTest
ex.getMessage()
);
}
@Test
public void testDefaultCanPublishSegments()
{
// Create an instance of stream supervisor without overriding handoffTaskGroupsEarly().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
@Override
public void start()
{
}
@Override
public void stop(boolean stopGracefully)
{
}
@Override
public SupervisorReport getStatus()
{
return null;
}
@Override
public SupervisorStateManager.State getState()
{
return null;
}
@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{
}
@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
}
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
}
@Override
public LagStats computeLagStats()
{
return null;
}
@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};
Assert.assertTrue(streamSupervisor.canPublishSegments(1, "taskId"));
}
}