From 33fdd770f7048f4209f01441086847e336d7fe94 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Sun, 22 Oct 2023 14:12:36 +0530 Subject: [PATCH] Consider only supervisors with append lock for concurrent transactional replace (#15220) A SegmentTransactionReplaceAction must only update the mapping of tasks with append locks that are running concurrently. To ensure this, we return the supervisor id only if it has the taskLockType as APPEND in its context. --- .../SegmentTransactionalReplaceAction.java | 11 ++- .../supervisor/SupervisorManager.java | 27 +++++++- .../supervisor/SupervisorManagerTest.java | 67 +++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 5a2b3ceec8f..e6ad0426e46 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -139,8 +139,9 @@ public class SegmentTransactionalReplaceAction implements TaskAction activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource()); - if (!activeSupervisorId.isPresent()) { + final Optional activeSupervisorIdWithAppendLock = + supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } @@ -153,7 +154,11 @@ public class SegmentTransactionalReplaceAction implements TaskAction toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId) + .registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + oldId, + newId + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index d55f3cc8bd0..df454c1011a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -22,14 +22,18 @@ package org.apache.druid.indexing.overlord.supervisor; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -71,15 +75,34 @@ public class SupervisorManager return supervisors.keySet(); } - public Optional getActiveSupervisorIdForDatasource(String datasource) + /** + * @param datasource Datasource to find active supervisor id with append lock for. + * @return An optional with the active appending supervisor id if it exists. + */ + public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String datasource) { for (Map.Entry> entry : supervisors.entrySet()) { final String supervisorId = entry.getKey(); final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; + + TaskLockType taskLockType = null; + if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { + SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; + Map context = seekableStreamSupervisorSpec.getContext(); + if (context != null) { + taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + } + } + if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() - && supervisorSpec.getDataSources().contains(datasource)) { + && supervisorSpec.getDataSources().contains(datasource) + && TaskLockType.APPEND.equals(taskLockType)) { return Optional.of(supervisorId); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index ba4b963e9b3..e8c5d839cf1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -23,9 +23,13 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; import org.easymock.Capture; @@ -434,6 +438,69 @@ public class SupervisorManagerTest extends EasyMockSupport Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testGetActiveSupervisorIdForDatasourceWithAppendLock() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec suspendedSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes(); + EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes(); + EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes(); + EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes(); + EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(suspendedSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes(); + EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes(); + EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes(); + EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(activeSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); + EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes(); + EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes(); + EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name() + )).anyTimes(); + EasyMock.replay(activeAppendSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + replayAll(); + manager.start(); + + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent()); + + manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(suspendedSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeAppendSpec); + Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent()); + + verifyAll(); + } private static class TestSupervisorSpec implements SupervisorSpec {