From cfa2a901b3a9b1b070831ddbb5887951f4c63e1d Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Sat, 23 Mar 2024 14:22:11 +0530 Subject: [PATCH] Redact passwords from tasks fetched from the TaskQueue (#16182) * Redact passwords from tasks fetched from the TaskQueue --- .../druid/indexing/overlord/TaskMaster.java | 7 +- .../druid/indexing/overlord/TaskQueue.java | 31 +++- .../ConcurrentReplaceAndAppendTest.java | 3 +- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../indexing/overlord/TaskLockConfigTest.java | 4 +- .../indexing/overlord/TaskQueueScaleTest.java | 3 +- .../indexing/overlord/TaskQueueTest.java | 138 ++++++++++++++++-- .../indexing/overlord/http/OverlordTest.java | 4 +- 8 files changed, 172 insertions(+), 20 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 4798513aafd..9829e1ffa19 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; @@ -94,7 +95,8 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro final SupervisorManager supervisorManager, final OverlordDutyExecutor overlordDutyExecutor, @IndexingService final DruidLeaderSelector overlordLeaderSelector, - final SegmentAllocationQueue segmentAllocationQueue + final SegmentAllocationQueue segmentAllocationQueue, + final ObjectMapper mapper ) { this.supervisorManager = supervisorManager; @@ -125,7 +127,8 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro taskRunner, taskActionClientFactory, taskLockbox, - emitter + emitter, + mapper ); // Sensible order to start stuff: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 6dbf6e70798..d3ca29d3abc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -56,6 +58,8 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.PasswordProvider; +import org.apache.druid.metadata.PasswordProviderRedactionMixIn; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.utils.CollectionUtils; @@ -117,6 +121,7 @@ public class TaskQueue private final TaskActionClientFactory taskActionClientFactory; private final TaskLockbox taskLockbox; private final ServiceEmitter emitter; + private final ObjectMapper passwordRedactingMapper; private final ReentrantLock giant = new ReentrantLock(true); @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") @@ -160,7 +165,8 @@ public class TaskQueue TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, TaskLockbox taskLockbox, - ServiceEmitter emitter + ServiceEmitter emitter, + ObjectMapper mapper ) { this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig"); @@ -175,6 +181,8 @@ public class TaskQueue config.getTaskCompleteHandlerNumThreads(), "TaskQueue-OnComplete-%d" ); + this.passwordRedactingMapper = mapper.copy() + .addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class); } @VisibleForTesting @@ -970,15 +978,34 @@ public class TaskQueue return stats; } + /** + * Returns an optional containing the task payload after successfully redacting credentials. + * Returns an absent optional if there is no task payload corresponding to the taskId in memory. + * Throws DruidException if password could not be redacted due to serialization / deserialization failure + */ public Optional getActiveTask(String id) { + Task task; giant.lock(); try { - return Optional.fromNullable(tasks.get(id)); + task = tasks.get(id); } finally { giant.unlock(); } + if (task != null) { + try { + // Write and read the value using a mapper with password redaction mixin. + task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task), Task.class); + } + catch (JsonProcessingException e) { + log.error(e, "Failed to serialize or deserialize task with id [%s].", task.getId()); + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Failed to serialize or deserialize task[%s].", task.getId()); + } + } + return Optional.fromNullable(task); } @VisibleForTesting diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 91c7d6a7175..864d84dbfa1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -142,7 +142,8 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase taskRunner, taskActionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); runningTasks.clear(); taskQueue.start(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 852ea0df02d..b60bc10a8de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -696,7 +696,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest TaskQueueConfig.class ); - return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter); + return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter, mapper); } @After diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java index 7d2fbcd7923..1c90802f480 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; @@ -120,7 +121,8 @@ public class TaskLockConfigTest taskRunner, actionClientFactory, lockbox, - emitter + emitter, + new DefaultObjectMapper() ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index ade1314884e..1d478d9cd14 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -122,7 +122,8 @@ public class TaskQueueScaleTest taskRunner, unsupportedTaskActionFactory, // Not used for anything serious new TaskLockbox(taskStorage, storageCoordinator), - new NoopServiceEmitter() + new NoopServiceEmitter(), + jsonMapper ); taskQueue.start(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 8ee341c5db1..6a45cbb4c79 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -19,6 +19,9 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -26,6 +29,11 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.common.guava.DSuppliers; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.HttpInputSource; +import org.apache.druid.data.input.impl.HttpInputSourceConfig; +import org.apache.druid.data.input.impl.NoopInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.WorkerNodeService; import org.apache.druid.error.DruidException; @@ -38,10 +46,14 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.IngestionTestBase; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; @@ -53,9 +65,11 @@ import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner; import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerTest; import org.apache.druid.indexing.overlord.hrtr.WorkerHolder; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -64,7 +78,12 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.DefaultPasswordProvider; +import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory; +import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; import org.apache.druid.server.initialization.IndexerZkConfig; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -77,6 +96,7 @@ import org.junit.Test; import org.mockito.Mockito; import javax.annotation.Nullable; +import java.net.URI; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -109,7 +129,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); // task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting @@ -154,7 +175,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); @@ -194,7 +216,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); @@ -219,7 +242,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -254,7 +278,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -279,7 +304,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask( @@ -321,7 +347,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")); @@ -344,7 +371,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask( @@ -374,7 +402,8 @@ public class TaskQueueTest extends IngestionTestBase new SimpleTaskRunner(actionClientFactory), actionClientFactory, getLockbox(), - new NoopServiceEmitter() + new NoopServiceEmitter(), + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D")) @@ -421,7 +450,8 @@ public class TaskQueueTest extends IngestionTestBase taskRunner, actionClientFactory, getLockbox(), - metricsVerifier + metricsVerifier, + getObjectMapper() ); taskQueue.setActive(true); final Task task = new TestTask( @@ -507,7 +537,8 @@ public class TaskQueueTest extends IngestionTestBase taskRunner, createActionClientFactory(), getLockbox(), - new StubServiceEmitter("druid/overlord", "testHost") + new StubServiceEmitter("druid/overlord", "testHost"), + getObjectMapper() ); taskQueue.setActive(true); @@ -519,6 +550,91 @@ public class TaskQueueTest extends IngestionTestBase Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), taskQueue.getTaskStatus(failedTask).get()); } + @Test + public void testGetActiveTaskRedactsPassword() throws JsonProcessingException + { + final String password = "AbCd_1234"; + final ObjectMapper mapper = getObjectMapper(); + + final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http")); + mapper.setInjectableValues(new InjectableValues.Std() + .addValue(HttpInputSourceConfig.class, httpInputSourceConfig) + .addValue(ObjectMapper.class, new DefaultObjectMapper()) + ); + + final SQLMetadataConnector derbyConnector = derbyConnectorRule.getConnector(); + final TaskStorage taskStorage = new MetadataTaskStorage( + derbyConnector, + new TaskStorageConfig(null), + new DerbyMetadataStorageActionHandlerFactory( + derbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + ); + + final TaskQueue taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, null, null, null, null), + new DefaultTaskConfig(), + taskStorage, + EasyMock.createMock(HttpRemoteTaskRunner.class), + createActionClientFactory(), + new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()), + new StubServiceEmitter("druid/overlord", "testHost"), + mapper + ); + + final DataSchema dataSchema = new DataSchema( + "DS", + new TimestampSpec(null, null, null), + new DimensionsSpec(null), + null, + new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, null), + null + ); + final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( + null, + new HttpInputSource(Collections.singletonList(URI.create("http://host.org")), + "user", + new DefaultPasswordProvider(password), + null, + httpInputSourceConfig), + new NoopInputFormat(), + null, + null + ); + final ParallelIndexSupervisorTask taskWithPassword = new ParallelIndexSupervisorTask( + "taskWithPassword", + "taskWithPassword", + null, + new ParallelIndexIngestionSpec( + dataSchema, + ioConfig, + null + ), + null, + null, + false + ); + Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password)); + + taskQueue.start(); + taskQueue.add(taskWithPassword); + + final Optional taskInStorage = taskStorage.getTask(taskWithPassword.getId()); + Assert.assertTrue(taskInStorage.isPresent()); + final String taskInStorageAsString = mapper.writeValueAsString(taskInStorage.get()); + Assert.assertFalse(taskInStorageAsString.contains(password)); + + final Optional taskInQueue = taskQueue.getActiveTask(taskWithPassword.getId()); + Assert.assertTrue(taskInQueue.isPresent()); + final String taskInQueueAsString = mapper.writeValueAsString(taskInQueue.get()); + Assert.assertFalse(taskInQueueAsString.contains(password)); + + Assert.assertEquals(taskInStorageAsString, taskInQueueAsString); + } + private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTasks) { HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index b0df15ce4ef..066e4c6c58a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -65,6 +65,7 @@ import org.apache.druid.indexing.overlord.config.TaskQueueConfig; import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; @@ -247,7 +248,8 @@ public class OverlordTest supervisorManager, EasyMock.createNiceMock(OverlordDutyExecutor.class), new TestDruidLeaderSelector(), - EasyMock.createNiceMock(SegmentAllocationQueue.class) + EasyMock.createNiceMock(SegmentAllocationQueue.class), + new DefaultObjectMapper() ); EmittingLogger.registerEmitter(serviceEmitter); }