Handle and map errors in delete pending segments API (#15673)

Changes:
- Handle exception in deletePendingSegments API and map to correct HTTP status code
- Clean up exception message using `DruidException`
- Add unit tests
This commit is contained in:
Abhishek Radhakrishnan 2024-01-14 20:39:01 -08:00 committed by GitHub
parent e49a7bb3cd
commit 08c01f1dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 238 additions and 35 deletions

View File

@ -19,11 +19,12 @@
package org.apache.druid.indexing.overlord;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Comparator;
@ -46,23 +47,31 @@ public class IndexerMetadataStorageAdapter
public int deletePendingSegments(String dataSource, Interval deleteInterval)
{
// Check the given interval overlaps the interval(minCreatedDateOfActiveTasks, MAX)
final Optional<DateTime> minCreatedDateOfActiveTasks = taskStorageQueryAdapter
// Find the earliest active task created for the specified datasource; if one exists,
// check if its interval overlaps with the delete interval.
final Optional<TaskInfo<Task, TaskStatus>> earliestActiveTaskOptional = taskStorageQueryAdapter
.getActiveTaskInfo(dataSource)
.stream()
.map(TaskInfo::getCreatedTime)
.min(Comparator.naturalOrder());
.min(Comparator.comparing(TaskInfo::getCreatedTime));
if (earliestActiveTaskOptional.isPresent()) {
final TaskInfo<Task, TaskStatus> earliestActiveTask = earliestActiveTaskOptional.get();
final Interval activeTaskInterval = new Interval(
minCreatedDateOfActiveTasks.orElse(DateTimes.MAX),
earliestActiveTask.getCreatedTime(),
DateTimes.MAX
);
Preconditions.checkArgument(
!deleteInterval.overlaps(activeTaskInterval),
"Cannot delete pendingSegments because there is at least one active task created at %s",
activeTaskInterval.getStart()
if (deleteInterval.overlaps(activeTaskInterval)) {
throw InvalidInput.exception(
"Cannot delete pendingSegments for datasource[%s] as there is at least one active task[%s] created at[%s] "
+ "that overlaps with the delete interval[%s]. Please retry when there are no active tasks.",
dataSource,
earliestActiveTask.getId(),
activeTaskInterval.getStart(),
deleteInterval
);
}
}
return indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(dataSource, deleteInterval);
}

View File

@ -238,7 +238,7 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
}
/**
* Returns true if it's the leader and its all services have been properly initialized.
* Returns true if it's the leader and all its services have been initialized.
*/
public boolean isLeader()
{

View File

@ -939,10 +939,25 @@ public class OverlordResource
}
if (taskMaster.isLeader()) {
try {
final int numDeleted = indexerMetadataStorageAdapter.deletePendingSegments(dataSource, deleteInterval);
return Response.ok().entity(ImmutableMap.of("numDeleted", numDeleted)).build();
}
catch (DruidException e) {
return Response.status(e.getStatusCode())
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
catch (Exception e) {
log.warn(e, "Failed to delete pending segments for datasource[%s] and interval[%s].", dataSource, deleteInterval);
return Response.status(Status.INTERNAL_SERVER_ERROR)
.entity(ImmutableMap.<String, Object>of("error", e.getMessage()))
.build();
}
} else {
return Response.status(Status.SERVICE_UNAVAILABLE).build();
return Response.status(Status.SERVICE_UNAVAILABLE)
.entity(ImmutableMap.of("error", "overlord is not the leader or not initialized yet"))
.build();
}
}

View File

@ -20,6 +20,8 @@
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.NoopTask;
@ -27,21 +29,16 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
public class IndexerMetadataStorageAdapterTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private TaskStorageQueryAdapter taskStorageQueryAdapter;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@ -69,7 +66,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create()
),
new TaskInfo<>(
"id1",
"id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
@ -93,7 +90,7 @@ public class IndexerMetadataStorageAdapterTest
}
@Test
public void testDeletePendingSegmentsOfRunningTasks()
public void testDeletePendingSegmentsOfOneOverlappingRunningTask()
{
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
@ -104,7 +101,7 @@ public class IndexerMetadataStorageAdapterTest
NoopTask.create()
),
new TaskInfo<>(
"id1",
"id2",
DateTimes.of("2017-12-02"),
TaskStatus.running("id2"),
"dataSource",
@ -125,8 +122,62 @@ public class IndexerMetadataStorageAdapterTest
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
expectedException.expect(CoreMatchers.instanceOf(IllegalArgumentException.class));
expectedException.expectMessage("Cannot delete pendingSegments because there is at least one active task created");
indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id1]"
+ " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete "
+ "interval[2017-01-01T00:00:00.000Z/2017-12-01T00:00:00.000Z]. Please retry when there are no active tasks."
)
);
}
@Test
public void testDeletePendingSegmentsOfMultipleOverlappingRunningTasks()
{
final ImmutableList<TaskInfo<Task, TaskStatus>> taskInfos = ImmutableList.of(
new TaskInfo<>(
"id1",
DateTimes.of("2017-12-01"),
TaskStatus.running("id1"),
"dataSource",
NoopTask.create()
),
new TaskInfo<>(
"id2",
DateTimes.of("2017-11-01"),
TaskStatus.running("id2"),
"dataSource",
NoopTask.create()
)
);
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("dataSource")).andReturn(taskInfos);
final Interval deleteInterval = Intervals.of("2017-01-01/2018-12-01");
EasyMock
.expect(
indexerMetadataStorageCoordinator.deletePendingSegmentsCreatedInInterval(
EasyMock.anyString(),
EasyMock.eq(deleteInterval)
)
)
.andReturn(10);
EasyMock.replay(taskStorageQueryAdapter, indexerMetadataStorageCoordinator);
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> indexerMetadataStorageAdapter.deletePendingSegments("dataSource", deleteInterval)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Cannot delete pendingSegments for datasource[dataSource] as there is at least one active task[id2]"
+ " created at[2017-11-01T00:00:00.000Z] that overlaps with the delete"
+ " interval[2017-01-01T00:00:00.000Z/2018-12-01T00:00:00.000Z]. Please retry when there are no active tasks."
)
);
}
}

View File

@ -28,6 +28,8 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@ -983,10 +985,136 @@ public class OverlordResourceTest
authConfig
);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req)
.getEntity();
Assert.assertEquals(2, response.get("numDeleted").intValue());
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numDeleted", 2), response.getEntity());
}
@Test
public void testKillPendingSegmentsThrowsInvalidInputDruidException()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "Some exception msg";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(InvalidInput.exception(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(400, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}
@Test
public void testKillPendingSegmentsThrowsDefensiveDruidException()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "An internal defensive exception";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(DruidException.defensive(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}
@Test
public void testKillPendingSegmentsThrowsArbitraryException()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(true);
final String exceptionMsg = "An unexpected illegal state exception";
EasyMock
.expect(
indexerMetadataStorageAdapter.deletePendingSegments(
EasyMock.eq("allow"),
EasyMock.anyObject(Interval.class)
)
)
.andThrow(new IllegalStateException(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(500, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", exceptionMsg), response.getEntity());
}
@Test
public void testKillPendingSegmentsToNonLeader()
{
expectAuthorizationTokenCheck();
EasyMock.expect(taskMaster.isLeader()).andReturn(false);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
Assert.assertEquals(503, response.getStatus());
Assert.assertEquals(ImmutableMap.of("error", "overlord is not the leader or not initialized yet"), response.getEntity());
}
@Test