Add API to fetch conflicting task locks (#16799)

* Add API to fetch conflicting active locks
This commit is contained in:
AmatyaAvadhanula 2024-07-30 11:40:48 +05:30 committed by GitHub
parent e9ea243d97
commit 92a40d8169
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 403 additions and 241 deletions

View File

@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
@ -49,6 +50,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -992,50 +994,76 @@ public class TaskLockbox
}
/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
* Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
* a Task with a Segment Lock is assumed to lock a whole Interval and not just
* the corresponding Segment.
*
* @param minTaskPriority Minimum task priority for each datasource. Only the
* Intervals that are locked by Tasks with equal or
* higher priority than this are returned. Locked intervals
* for datasources that are not present in this Map are
* not returned.
* @return Map from Datasource to List of Intervals locked by Tasks that have
* priority greater than or equal to the {@code minTaskPriority} for that datasource.
* @param lockFilterPolicies Lock filters for the given datasources
* @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval
*/
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy> lockFilterPolicies)
{
final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
final Map<String, List<TaskLock>> datasourceToLocks = new HashMap<>();
// Take a lock and populate the maps
giant.lock();
try {
running.forEach(
(datasource, datasourceLocks) -> {
// If this datasource is not requested, do not proceed
if (!minTaskPriority.containsKey(datasource)) {
lockFilterPolicies.forEach(
lockFilter -> {
final String datasource = lockFilter.getDatasource();
if (!running.containsKey(datasource)) {
return;
}
datasourceLocks.forEach(
final int priority = lockFilter.getPriority();
final List<Interval> intervals;
if (lockFilter.getIntervals() != null) {
intervals = lockFilter.getIntervals();
} else {
intervals = Collections.singletonList(Intervals.ETERNITY);
}
final Map<String, Object> context = lockFilter.getContext();
final boolean ignoreAppendLocks;
final Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
Tasks.USE_CONCURRENT_LOCKS,
context.get(Tasks.USE_CONCURRENT_LOCKS)
);
if (useConcurrentLocks == null) {
TaskLockType taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
context.get(Tasks.TASK_LOCK_TYPE),
TaskLockType.class
);
if (taskLockType == null) {
ignoreAppendLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
} else {
ignoreAppendLocks = taskLockType == TaskLockType.APPEND;
}
} else {
ignoreAppendLocks = useConcurrentLocks;
}
running.get(datasource).forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
(interval, taskLockPosses) -> taskLockPosses.forEach(
taskLockPosse -> {
if (taskLockPosse.getTaskLock().isRevoked()) {
// Do not proceed if the lock is revoked
return;
// do nothing
} else if (taskLockPosse.getTaskLock().getPriority() == null
|| taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
// Do not proceed if the lock has a priority strictly less than the minimum
return;
|| taskLockPosse.getTaskLock().getPriority() < priority) {
// do nothing
} else if (ignoreAppendLocks
&& taskLockPosse.getTaskLock().getType() == TaskLockType.APPEND) {
// do nothing
} else {
for (Interval filterInterval : intervals) {
if (interval.overlaps(filterInterval)) {
datasourceToLocks.computeIfAbsent(datasource, ds -> new ArrayList<>())
.add(taskLockPosse.getTaskLock());
break;
}
}
}
datasourceToIntervals
.computeIfAbsent(datasource, k -> new HashSet<>())
.add(interval);
})
}
)
)
);
}
@ -1045,11 +1073,7 @@ public class TaskLockbox
giant.unlock();
}
return datasourceToIntervals.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ArrayList<>(entry.getValue())
));
return datasourceToLocks;
}
public void unlock(final Task task, final Interval interval)

View File

@ -27,6 +27,7 @@ import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
@ -94,19 +95,12 @@ public class TaskQueryTool
}
/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
*
* @param minTaskPriority Minimum task priority for each datasource. Only the
* Intervals that are locked by Tasks with equal or
* higher priority than this are returned. Locked intervals
* for datasources that are not present in this Map are
* not returned.
* @return Map from Datasource to List of Intervals locked by Tasks that have
* priority greater than or equal to the {@code minTaskPriority} for that datasource.
* @param lockFilterPolicies Requests for active locks for various datasources
* @return Map from datasource to conflicting lock infos
*/
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
public Map<String, List<TaskLock>> getActiveLocks(List<LockFilterPolicy> lockFilterPolicies)
{
return taskLockbox.getLockedIntervals(minTaskPriority);
return taskLockbox.getActiveLocks(lockFilterPolicies);
}
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)

View File

@ -241,26 +241,11 @@ public class OverlordResource
}
}
@Deprecated
@POST
@Path("/lockedIntervals")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriority)
{
if (minTaskPriority == null || minTaskPriority.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build();
}
// Build the response
return Response.ok(taskQueryTool.getLockedIntervals(minTaskPriority)).build();
}
@POST
@Path("/lockedIntervals/v2")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getDatasourceLockedIntervalsV2(List<LockFilterPolicy> lockFilterPolicies)
public Response getDatasourceLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
{
if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No filter provided").build();
@ -270,6 +255,20 @@ public class OverlordResource
return Response.ok(taskQueryTool.getLockedIntervals(lockFilterPolicies)).build();
}
@POST
@Path("/activeLocks")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(StateResourceFilter.class)
public Response getActiveLocks(List<LockFilterPolicy> lockFilterPolicies)
{
if (lockFilterPolicies == null || lockFilterPolicies.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("No filter provided").build();
}
// Build the response
return Response.ok(new TaskLockResponse(taskQueryTool.getActiveLocks(lockFilterPolicies))).build();
}
@GET
@Path("/task/{taskid}")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.common.TaskLock;
import java.util.List;
import java.util.Map;
public class TaskLockResponse
{
private final Map<String, List<TaskLock>> datasourceToLocks;
@JsonCreator
public TaskLockResponse(
@JsonProperty("datasourceToLocks") final Map<String, List<TaskLock>> datasourceToLocks
)
{
this.datasourceToLocks = datasourceToLocks;
}
@JsonProperty
public Map<String, List<TaskLock>> getDatasourceToLocks()
{
return datasourceToLocks;
}
@Override
public String toString()
{
return "TaskLockResponse{" +
"datasourceToLocks='" + datasourceToLocks +
'}';
}
}

View File

@ -75,7 +75,6 @@ import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -1174,99 +1173,6 @@ public class TaskLockboxTest
);
}
@Test
public void testGetLockedIntervals()
{
// Acquire locks for task1
final Task task1 = NoopTask.forDatasource("ds1");
lockbox.add(task1);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
task1,
Intervals.of("2017-01-01/2017-02-01")
);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
task1,
Intervals.of("2017-04-01/2017-05-01")
);
// Acquire locks for task2
final Task task2 = NoopTask.forDatasource("ds2");
lockbox.add(task2);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
task2,
Intervals.of("2017-03-01/2017-04-01")
);
// Verify the locked intervals
final Map<String, Integer> minTaskPriority = new HashMap<>();
minTaskPriority.put(task1.getDataSource(), 10);
minTaskPriority.put(task2.getDataSource(), 10);
final Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
Assert.assertEquals(2, lockedIntervals.size());
Assert.assertEquals(
Arrays.asList(
Intervals.of("2017-01-01/2017-02-01"),
Intervals.of("2017-04-01/2017-05-01")
),
lockedIntervals.get(task1.getDataSource())
);
Assert.assertEquals(
Collections.singletonList(
Intervals.of("2017-03-01/2017-04-01")),
lockedIntervals.get(task2.getDataSource())
);
}
@Test
public void testGetLockedIntervalsForLowPriorityTask()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.ofPriority(5);
lockbox.add(lowPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
lowPriorityTask,
Intervals.of("2017/2018")
);
final Map<String, Integer> minTaskPriority = new HashMap<>();
minTaskPriority.put(lowPriorityTask.getDataSource(), 10);
Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
Assert.assertTrue(lockedIntervals.isEmpty());
}
@Test
public void testGetLockedIntervalsForEqualPriorityTask()
{
// Acquire lock for a low priority task
final Task task = NoopTask.ofPriority(5);
lockbox.add(task);
taskStorage.insert(task, TaskStatus.running(task.getId()));
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
task,
Intervals.of("2017/2018")
);
final Map<String, Integer> minTaskPriority = new HashMap<>();
minTaskPriority.put(task.getDataSource(), 5);
Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
Assert.assertEquals(1, lockedIntervals.size());
Assert.assertEquals(
Collections.singletonList(Intervals.of("2017/2018")),
lockedIntervals.get(task.getDataSource())
);
}
@Test
public void testGetLockedIntervalsForHigherPriorityExclusiveLock()
{
@ -1282,6 +1188,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
75,
null,
null
);
@ -1305,6 +1212,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForExclusiveLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
null,
null
);
@ -1332,6 +1240,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
null,
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.REPLACE.name())
);
@ -1355,6 +1264,7 @@ public class TaskLockboxTest
LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy(
task.getDataSource(),
25,
null,
ImmutableMap.of(
Tasks.TASK_LOCK_TYPE,
TaskLockType.EXCLUSIVE.name(),
@ -1369,6 +1279,171 @@ public class TaskLockboxTest
}
@Test
public void testGetActiveLocks()
{
final Set<TaskLock> expectedLocks = new HashSet<>();
final TaskLock overlappingReplaceLock =
validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50);
expectedLocks.add(overlappingReplaceLock);
//Lower priority
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25);
final TaskLock overlappingAppendLock =
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75);
expectedLocks.add(overlappingAppendLock);
// Non-overlapping interval
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75);
final TaskLock overlappingExclusiveLock =
validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50);
expectedLocks.add(overlappingExclusiveLock);
LockFilterPolicy policy = new LockFilterPolicy(
"none",
50,
ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")),
null
);
LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
"nonExistent",
0,
null,
null
);
Map<String, List<TaskLock>> activeLocks =
lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource));
Assert.assertEquals(1, activeLocks.size());
Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
}
@Test
public void testGetActiveLocksWithAppendLockIgnoresAppendLocks()
{
final Set<TaskLock> expectedLocks = new HashSet<>();
final TaskLock overlappingReplaceLock =
validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50);
expectedLocks.add(overlappingReplaceLock);
//Lower priority
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25);
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75);
// Non-overlapping interval
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75);
final TaskLock overlappingExclusiveLock =
validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50);
expectedLocks.add(overlappingExclusiveLock);
LockFilterPolicy policy = new LockFilterPolicy(
"none",
50,
ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")),
ImmutableMap.of(Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
);
LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
"nonExistent",
0,
null,
null
);
Map<String, List<TaskLock>> activeLocks =
lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource));
Assert.assertEquals(1, activeLocks.size());
Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
}
@Test
public void testGetActiveLocksWithConcurrentLocksIgnoresAppendLocks()
{
final Set<TaskLock> expectedLocks = new HashSet<>();
final TaskLock overlappingReplaceLock =
validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50);
expectedLocks.add(overlappingReplaceLock);
//Lower priority
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25);
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75);
// Non-overlapping interval
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75);
final TaskLock overlappingExclusiveLock =
validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50);
expectedLocks.add(overlappingExclusiveLock);
LockFilterPolicy policy = new LockFilterPolicy(
"none",
50,
ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")),
ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true, Tasks.TASK_LOCK_TYPE, TaskLockType.EXCLUSIVE.name())
);
LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
"nonExistent",
0,
null,
null
);
Map<String, List<TaskLock>> activeLocks =
lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource));
Assert.assertEquals(1, activeLocks.size());
Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
}
@Test
public void testGetActiveLocksWithoutConcurrentLocksConsidersAppendLocks()
{
final Set<TaskLock> expectedLocks = new HashSet<>();
final TaskLock overlappingReplaceLock =
validator.expectLockCreated(TaskLockType.REPLACE, Intervals.of("2024/2025"), 50);
expectedLocks.add(overlappingReplaceLock);
//Lower priority
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024/2025"), 25);
final TaskLock overlappingAppendLock =
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-01-01/2024-02-01"), 75);
expectedLocks.add(overlappingAppendLock);
// Non-overlapping interval
validator.expectLockCreated(TaskLockType.APPEND, Intervals.of("2024-12-01/2025-01-01"), 75);
final TaskLock overlappingExclusiveLock =
validator.expectLockCreated(TaskLockType.EXCLUSIVE, Intervals.of("2020/2021"), 50);
expectedLocks.add(overlappingExclusiveLock);
LockFilterPolicy policy = new LockFilterPolicy(
"none",
50,
ImmutableList.of(Intervals.of("2020/2021"), Intervals.of("2024-01-01/2024-07-01")),
ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, false, Tasks.TASK_LOCK_TYPE, TaskLockType.APPEND.name())
);
LockFilterPolicy policyForNonExistentDatasource = new LockFilterPolicy(
"nonExistent",
0,
null,
null
);
Map<String, List<TaskLock>> activeLocks =
lockbox.getActiveLocks(ImmutableList.of(policy, policyForNonExistentDatasource));
Assert.assertEquals(1, activeLocks.size());
Assert.assertEquals(expectedLocks, new HashSet<>(activeLocks.get("none")));
}
@Test
public void testExclusiveLockCompatibility()
{
@ -1770,50 +1845,6 @@ public class TaskLockboxTest
validator.expectLockNotGranted(TaskLockType.APPEND, otherGroupTask, Intervals.of("2024/2025"));
}
@Test
public void testGetLockedIntervalsForRevokedLocks()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.ofPriority(5);
lockbox.add(lowPriorityTask);
taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
lowPriorityTask,
Intervals.of("2017/2018")
);
final Map<String, Integer> minTaskPriority = new HashMap<>();
minTaskPriority.put(lowPriorityTask.getDataSource(), 1);
Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
Assert.assertEquals(1, lockedIntervals.size());
Assert.assertEquals(
Collections.singletonList(
Intervals.of("2017/2018")),
lockedIntervals.get(lowPriorityTask.getDataSource())
);
// Revoke the lowPriorityTask
final Task highPriorityTask = NoopTask.ofPriority(10);
lockbox.add(highPriorityTask);
tryTimeChunkLock(
TaskLockType.EXCLUSIVE,
highPriorityTask,
Intervals.of("2017-05-01/2017-06-01")
);
// Verify the locked intervals
minTaskPriority.put(highPriorityTask.getDataSource(), 1);
lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
Assert.assertEquals(1, lockedIntervals.size());
Assert.assertEquals(
Collections.singletonList(
Intervals.of("2017-05-01/2017-06-01")),
lockedIntervals.get(highPriorityTask.getDataSource())
);
}
@Test
public void testFailedToReacquireTaskLock()
{

View File

@ -36,6 +36,9 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@ -61,6 +64,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@ -1057,31 +1061,33 @@ public class OverlordResourceTest
@Test
public void testGetLockedIntervals() throws Exception
{
final Map<String, Integer> minTaskPriority = Collections.singletonMap("ds1", 0);
final Map<String, List<Interval>> expectedLockedIntervals = Collections.singletonMap(
final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
new LockFilterPolicy("ds1", 25, null, null)
);
final Map<String, List<Interval>> expectedIntervals = Collections.singletonMap(
"ds1",
Arrays.asList(
Intervals.of("2012-01-01/2012-01-02"),
Intervals.of("2012-01-02/2012-01-03")
Intervals.of("2012-01-01/2012-01-02")
)
);
EasyMock.expect(taskLockbox.getLockedIntervals(minTaskPriority))
.andReturn(expectedLockedIntervals);
EasyMock.expect(taskLockbox.getLockedIntervals(lockFilterPolicies))
.andReturn(expectedIntervals);
replayAll();
final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
final Response response = overlordResource.getDatasourceLockedIntervals(lockFilterPolicies);
Assert.assertEquals(200, response.getStatus());
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
Map<String, List<Interval>> observedIntervals = jsonMapper.readValue(
jsonMapper.writeValueAsString(response.getEntity()),
new TypeReference<Map<String, List<Interval>>>()
{
}
);
Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
Assert.assertEquals(expectedIntervals, observedIntervals);
}
@Test
@ -1092,7 +1098,65 @@ public class OverlordResourceTest
Response response = overlordResource.getDatasourceLockedIntervals(null);
Assert.assertEquals(400, response.getStatus());
response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
response = overlordResource.getDatasourceLockedIntervals(Collections.emptyList());
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testGetActiveLocks() throws Exception
{
final List<LockFilterPolicy> lockFilterPolicies = ImmutableList.of(
new LockFilterPolicy("ds1", 25, null, null)
);
final Map<String, List<TaskLock>> expectedLocks = Collections.singletonMap(
"ds1",
Arrays.asList(
new TimeChunkLock(
TaskLockType.REPLACE,
"groupId",
"datasource",
Intervals.of("2012-01-01/2012-01-02"),
"version",
25
),
new TimeChunkLock(
TaskLockType.EXCLUSIVE,
"groupId",
"datasource",
Intervals.of("2012-01-02/2012-01-03"),
"version",
75
)
)
);
EasyMock.expect(taskLockbox.getActiveLocks(lockFilterPolicies))
.andReturn(expectedLocks);
replayAll();
final Response response = overlordResource.getActiveLocks(lockFilterPolicies);
Assert.assertEquals(200, response.getStatus());
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
Map<String, List<TaskLock>> observedLocks = jsonMapper.readValue(
jsonMapper.writeValueAsString(response.getEntity()),
new TypeReference<TaskLockResponse>()
{
}
).getDatasourceToLocks();
Assert.assertEquals(expectedLocks, observedLocks);
}
@Test
public void testGetActiveLocksWithEmptyBody()
{
replayAll();
Response response = overlordResource.getActiveLocks(null);
Assert.assertEquals(400, response.getStatus());
response = overlordResource.getActiveLocks(Collections.emptyList());
Assert.assertEquals(400, response.getStatus());
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
@ -343,12 +344,12 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
final Map<String, Integer> minTaskPriority = Collections.singletonMap(datasourceName, 0);
final List<LockFilterPolicy> lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"

View File

@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
@ -334,13 +335,13 @@ public class OverlordResourceTestClient
}
}
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
{
try {
String jsonBody = jsonMapper.writeValueAsString(minTaskPriority);
String jsonBody = jsonMapper.writeValueAsString(lockFilterPolicies);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals"))
new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals/v2"))
.setContent(
"application/json",
StringUtils.toUtf8(jsonBody)

View File

@ -19,11 +19,13 @@
package org.apache.druid.tests.coordinator.duty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@ -53,7 +55,6 @@ import org.testng.annotations.Test;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -265,13 +266,13 @@ public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingSer
*/
private void ensureLockedIntervals(Interval... intervals)
{
final Map<String, Integer> minTaskPriority = Collections.singletonMap(fullDatasourceName, 0);
final LockFilterPolicy lockFilterPolicy = new LockFilterPolicy(fullDatasourceName, 0, null, null);
final List<Interval> lockedIntervals = new ArrayList<>();
ITRetryUtil.retryUntilTrue(
() -> {
lockedIntervals.clear();
Map<String, List<Interval>> allIntervals = indexer.getLockedIntervals(minTaskPriority);
Map<String, List<Interval>> allIntervals = indexer.getLockedIntervals(ImmutableList.of(lockFilterPolicy));
if (allIntervals.containsKey(fullDatasourceName)) {
lockedIntervals.addAll(allIntervals.get(fullDatasourceName));
}

View File

@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
@ -342,12 +343,12 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
submitIndexTask(INDEX_TASK, datasourceName);
// Wait until it acquires a lock
final Map<String, Integer> minTaskPriority = Collections.singletonMap(datasourceName, 0);
final List<LockFilterPolicy> lockFilterPolicies = Collections.singletonList(new LockFilterPolicy(datasourceName, 0, null, null));
final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
ITRetryUtil.retryUntilFalse(
() -> {
lockedIntervals.clear();
lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
lockedIntervals.putAll(indexer.getLockedIntervals(lockFilterPolicies));
return lockedIntervals.isEmpty();
},
"Verify Intervals are Locked"

View File

@ -21,10 +21,12 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Specifies a policy to filter active locks held by a datasource
@ -33,17 +35,20 @@ public class LockFilterPolicy
{
private final String datasource;
private final int priority;
private final List<Interval> intervals;
private final Map<String, Object> context;
@JsonCreator
public LockFilterPolicy(
@JsonProperty("datasource") String datasource,
@JsonProperty("priority") int priority,
@JsonProperty("context") Map<String, Object> context
@JsonProperty("intervals") @Nullable List<Interval> intervals,
@JsonProperty("context") @Nullable Map<String, Object> context
)
{
this.datasource = datasource;
this.priority = priority;
this.intervals = intervals;
this.context = context == null ? Collections.emptyMap() : context;
}
@ -65,24 +70,10 @@ public class LockFilterPolicy
return context;
}
@Override
public boolean equals(Object o)
@Nullable
@JsonProperty
public List<Interval> getIntervals()
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LockFilterPolicy that = (LockFilterPolicy) o;
return Objects.equals(datasource, that.datasource)
&& priority == that.priority
&& Objects.equals(context, that.context);
}
@Override
public int hashCode()
{
return Objects.hash(datasource, priority, context);
return intervals;
}
}

View File

@ -278,7 +278,8 @@ public class CompactSegments implements CoordinatorCustomDuty
{
final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
.stream()
.map(config -> new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), config.getTaskContext()))
.map(config ->
new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext()))
.collect(Collectors.toList());
final Map<String, List<Interval>> datasourceToLockedIntervals =
new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true));

View File

@ -225,7 +225,7 @@ public class OverlordClientImplTest
final Map<String, List<Interval>> lockMap =
ImmutableMap.of("foo", Collections.singletonList(Intervals.of("2000/2001")));
final List<LockFilterPolicy> requests = ImmutableList.of(
new LockFilterPolicy("foo", 3, null)
new LockFilterPolicy("foo", 3, null, null)
);
serviceClient.expectAndRespond(
@ -246,7 +246,7 @@ public class OverlordClientImplTest
public void test_findLockedIntervals_nullReturn() throws Exception
{
final List<LockFilterPolicy> requests = ImmutableList.of(
new LockFilterPolicy("foo", 3, null)
new LockFilterPolicy("foo", 3, null, null)
);
serviceClient.expectAndRespond(