mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Concurrent replace should work with supervisors using concurrent locks (#15995)
* Concurrent replace should work with supervisors using concurrent locks * Ignore supervisors with useConcurrentLocks set to false * Apply feedback
This commit is contained in:
parent
d6f59d1999
commit
7c42e87db9
@ -87,23 +87,38 @@ public class SupervisorManager
|
||||
final Supervisor supervisor = entry.getValue().lhs;
|
||||
final SupervisorSpec supervisorSpec = entry.getValue().rhs;
|
||||
|
||||
TaskLockType taskLockType = null;
|
||||
boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
|
||||
if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
|
||||
SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec;
|
||||
Map<String, Object> context = seekableStreamSupervisorSpec.getContext();
|
||||
if (context != null) {
|
||||
taskLockType = QueryContexts.getAsEnum(
|
||||
Tasks.TASK_LOCK_TYPE,
|
||||
context.get(Tasks.TASK_LOCK_TYPE),
|
||||
TaskLockType.class
|
||||
Boolean useConcurrentLocks = QueryContexts.getAsBoolean(
|
||||
Tasks.USE_CONCURRENT_LOCKS,
|
||||
context.get(Tasks.USE_CONCURRENT_LOCKS)
|
||||
);
|
||||
if (useConcurrentLocks == null) {
|
||||
TaskLockType taskLockType = QueryContexts.getAsEnum(
|
||||
Tasks.TASK_LOCK_TYPE,
|
||||
context.get(Tasks.TASK_LOCK_TYPE),
|
||||
TaskLockType.class
|
||||
);
|
||||
if (taskLockType == null) {
|
||||
hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS;
|
||||
} else if (taskLockType == TaskLockType.APPEND) {
|
||||
hasAppendLock = true;
|
||||
} else {
|
||||
hasAppendLock = false;
|
||||
}
|
||||
} else {
|
||||
hasAppendLock = useConcurrentLocks;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (supervisor instanceof SeekableStreamSupervisor
|
||||
&& !supervisorSpec.isSuspended()
|
||||
&& supervisorSpec.getDataSources().contains(datasource)
|
||||
&& TaskLockType.APPEND.equals(taskLockType)) {
|
||||
&& (hasAppendLock)) {
|
||||
return Optional.of(supervisorId);
|
||||
}
|
||||
}
|
||||
|
@ -468,6 +468,21 @@ public class SupervisorManagerTest extends EasyMockSupport
|
||||
EasyMock.replay(activeSpec);
|
||||
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
|
||||
|
||||
SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisorSpec.class);
|
||||
Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class);
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes();
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes();
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources())
|
||||
.andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes();
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor())
|
||||
.andReturn(activeSupervisorWithConcurrentLocks).anyTimes();
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks))
|
||||
.andReturn(null).anyTimes();
|
||||
EasyMock.expect(activeSpecWithConcurrentLocks.getContext())
|
||||
.andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes();
|
||||
EasyMock.replay(activeSpecWithConcurrentLocks);
|
||||
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
|
||||
|
||||
SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
|
||||
Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
|
||||
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
|
||||
@ -482,6 +497,25 @@ public class SupervisorManagerTest extends EasyMockSupport
|
||||
EasyMock.replay(activeAppendSpec);
|
||||
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
|
||||
|
||||
// A supervisor with useConcurrentLocks set to false explicitly must not use an append lock
|
||||
SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisorSpec.class);
|
||||
Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class);
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes();
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes();
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources())
|
||||
.andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes();
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes();
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse))
|
||||
.andReturn(null).anyTimes();
|
||||
EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of(
|
||||
Tasks.USE_CONCURRENT_LOCKS,
|
||||
false,
|
||||
Tasks.TASK_LOCK_TYPE,
|
||||
TaskLockType.APPEND.name()
|
||||
)).anyTimes();
|
||||
EasyMock.replay(specWithUseConcurrentLocksFalse);
|
||||
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());
|
||||
|
||||
replayAll();
|
||||
manager.start();
|
||||
|
||||
@ -499,6 +533,14 @@ public class SupervisorManagerTest extends EasyMockSupport
|
||||
manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
|
||||
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());
|
||||
|
||||
manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks);
|
||||
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent());
|
||||
|
||||
manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse);
|
||||
Assert.assertFalse(
|
||||
manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent()
|
||||
);
|
||||
|
||||
verifyAll();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user