mirror of https://github.com/apache/druid.git
Redact passwords from tasks fetched from the TaskQueue (#16182)
* Redact passwords from tasks fetched from the TaskQueue
This commit is contained in:
parent
6e19ce5e69
commit
cfa2a901b3
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.inject.Inject;
|
||||
import org.apache.druid.client.indexing.IndexingService;
|
||||
|
@ -94,7 +95,8 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
final SupervisorManager supervisorManager,
|
||||
final OverlordDutyExecutor overlordDutyExecutor,
|
||||
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
|
||||
final SegmentAllocationQueue segmentAllocationQueue
|
||||
final SegmentAllocationQueue segmentAllocationQueue,
|
||||
final ObjectMapper mapper
|
||||
)
|
||||
{
|
||||
this.supervisorManager = supervisorManager;
|
||||
|
@ -125,7 +127,8 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
|
|||
taskRunner,
|
||||
taskActionClientFactory,
|
||||
taskLockbox,
|
||||
emitter
|
||||
emitter,
|
||||
mapper
|
||||
);
|
||||
|
||||
// Sensible order to start stuff:
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -56,6 +58,8 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
|||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.metadata.PasswordProvider;
|
||||
import org.apache.druid.metadata.PasswordProviderRedactionMixIn;
|
||||
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
|
||||
|
@ -117,6 +121,7 @@ public class TaskQueue
|
|||
private final TaskActionClientFactory taskActionClientFactory;
|
||||
private final TaskLockbox taskLockbox;
|
||||
private final ServiceEmitter emitter;
|
||||
private final ObjectMapper passwordRedactingMapper;
|
||||
|
||||
private final ReentrantLock giant = new ReentrantLock(true);
|
||||
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
|
||||
|
@ -160,7 +165,8 @@ public class TaskQueue
|
|||
TaskRunner taskRunner,
|
||||
TaskActionClientFactory taskActionClientFactory,
|
||||
TaskLockbox taskLockbox,
|
||||
ServiceEmitter emitter
|
||||
ServiceEmitter emitter,
|
||||
ObjectMapper mapper
|
||||
)
|
||||
{
|
||||
this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
|
||||
|
@ -175,6 +181,8 @@ public class TaskQueue
|
|||
config.getTaskCompleteHandlerNumThreads(),
|
||||
"TaskQueue-OnComplete-%d"
|
||||
);
|
||||
this.passwordRedactingMapper = mapper.copy()
|
||||
.addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -970,15 +978,34 @@ public class TaskQueue
|
|||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an optional containing the task payload after successfully redacting credentials.
|
||||
* Returns an absent optional if there is no task payload corresponding to the taskId in memory.
|
||||
* Throws DruidException if password could not be redacted due to serialization / deserialization failure
|
||||
*/
|
||||
public Optional<Task> getActiveTask(String id)
|
||||
{
|
||||
Task task;
|
||||
giant.lock();
|
||||
try {
|
||||
return Optional.fromNullable(tasks.get(id));
|
||||
task = tasks.get(id);
|
||||
}
|
||||
finally {
|
||||
giant.unlock();
|
||||
}
|
||||
if (task != null) {
|
||||
try {
|
||||
// Write and read the value using a mapper with password redaction mixin.
|
||||
task = passwordRedactingMapper.readValue(passwordRedactingMapper.writeValueAsString(task), Task.class);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
log.error(e, "Failed to serialize or deserialize task with id [%s].", task.getId());
|
||||
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
|
||||
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
|
||||
.build(e, "Failed to serialize or deserialize task[%s].", task.getId());
|
||||
}
|
||||
}
|
||||
return Optional.fromNullable(task);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
|
@ -142,7 +142,8 @@ public class ConcurrentReplaceAndAppendTest extends IngestionTestBase
|
|||
taskRunner,
|
||||
taskActionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
runningTasks.clear();
|
||||
taskQueue.start();
|
||||
|
|
|
@ -696,7 +696,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
TaskQueueConfig.class
|
||||
);
|
||||
|
||||
return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter);
|
||||
return new TaskQueue(lockConfig, tqc, new DefaultTaskConfig(), ts, tr, tac, taskLockbox, emitter, mapper);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.config.DefaultTaskConfig;
|
|||
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
|
||||
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
||||
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.server.metrics.NoopServiceEmitter;
|
||||
import org.easymock.EasyMock;
|
||||
|
@ -120,7 +121,8 @@ public class TaskLockConfigTest
|
|||
taskRunner,
|
||||
actionClientFactory,
|
||||
lockbox,
|
||||
emitter
|
||||
emitter,
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -122,7 +122,8 @@ public class TaskQueueScaleTest
|
|||
taskRunner,
|
||||
unsupportedTaskActionFactory, // Not used for anything serious
|
||||
new TaskLockbox(taskStorage, storageCoordinator),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
taskQueue.start();
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.InjectableValues;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -26,6 +29,11 @@ import com.google.common.util.concurrent.Futures;
|
|||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.druid.common.guava.DSuppliers;
|
||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||
import org.apache.druid.data.input.impl.HttpInputSource;
|
||||
import org.apache.druid.data.input.impl.HttpInputSourceConfig;
|
||||
import org.apache.druid.data.input.impl.NoopInputFormat;
|
||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import org.apache.druid.discovery.WorkerNodeService;
|
||||
import org.apache.druid.error.DruidException;
|
||||
|
@ -38,10 +46,14 @@ import org.apache.druid.indexing.common.TaskLockType;
|
|||
import org.apache.druid.indexing.common.TaskToolbox;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClient;
|
||||
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import org.apache.druid.indexing.common.config.TaskStorageConfig;
|
||||
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
|
||||
import org.apache.druid.indexing.common.task.IngestionTestBase;
|
||||
import org.apache.druid.indexing.common.task.Task;
|
||||
import org.apache.druid.indexing.common.task.Tasks;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
|
||||
import org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
|
||||
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
|
||||
|
@ -53,9 +65,11 @@ import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
|
|||
import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerTest;
|
||||
import org.apache.druid.indexing.overlord.hrtr.WorkerHolder;
|
||||
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
|
||||
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
|
||||
import org.apache.druid.indexing.worker.TaskAnnouncement;
|
||||
import org.apache.druid.indexing.worker.Worker;
|
||||
import org.apache.druid.indexing.worker.config.WorkerConfig;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -64,7 +78,12 @@ import org.apache.druid.java.util.common.granularity.Granularities;
|
|||
import org.apache.druid.java.util.common.granularity.Granularity;
|
||||
import org.apache.druid.java.util.http.client.HttpClient;
|
||||
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||
import org.apache.druid.metadata.DefaultPasswordProvider;
|
||||
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
|
||||
import org.apache.druid.metadata.SQLMetadataConnector;
|
||||
import org.apache.druid.segment.TestHelper;
|
||||
import org.apache.druid.segment.indexing.DataSchema;
|
||||
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
|
||||
import org.apache.druid.server.initialization.IndexerZkConfig;
|
||||
import org.apache.druid.server.initialization.ZkPathsConfig;
|
||||
|
@ -77,6 +96,7 @@ import org.junit.Test;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -109,7 +129,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
// task1 emulates a case when there is a task that was issued before task2 and acquired locks conflicting
|
||||
|
@ -154,7 +175,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
|
||||
|
@ -194,7 +216,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
|
||||
|
@ -219,7 +242,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
|
||||
|
@ -254,7 +278,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
|
||||
|
@ -279,7 +304,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask(
|
||||
|
@ -321,7 +347,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"));
|
||||
|
@ -344,7 +371,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask(
|
||||
|
@ -374,7 +402,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
new SimpleTaskRunner(actionClientFactory),
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
new NoopServiceEmitter()
|
||||
new NoopServiceEmitter(),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask("t1", Intervals.of("2021-01-01/P1D"))
|
||||
|
@ -421,7 +450,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
taskRunner,
|
||||
actionClientFactory,
|
||||
getLockbox(),
|
||||
metricsVerifier
|
||||
metricsVerifier,
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
final Task task = new TestTask(
|
||||
|
@ -507,7 +537,8 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
taskRunner,
|
||||
createActionClientFactory(),
|
||||
getLockbox(),
|
||||
new StubServiceEmitter("druid/overlord", "testHost")
|
||||
new StubServiceEmitter("druid/overlord", "testHost"),
|
||||
getObjectMapper()
|
||||
);
|
||||
taskQueue.setActive(true);
|
||||
|
||||
|
@ -519,6 +550,91 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
Assert.assertEquals(TaskStatus.failure(failedTask, failedTask), taskQueue.getTaskStatus(failedTask).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetActiveTaskRedactsPassword() throws JsonProcessingException
|
||||
{
|
||||
final String password = "AbCd_1234";
|
||||
final ObjectMapper mapper = getObjectMapper();
|
||||
|
||||
final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http"));
|
||||
mapper.setInjectableValues(new InjectableValues.Std()
|
||||
.addValue(HttpInputSourceConfig.class, httpInputSourceConfig)
|
||||
.addValue(ObjectMapper.class, new DefaultObjectMapper())
|
||||
);
|
||||
|
||||
final SQLMetadataConnector derbyConnector = derbyConnectorRule.getConnector();
|
||||
final TaskStorage taskStorage = new MetadataTaskStorage(
|
||||
derbyConnector,
|
||||
new TaskStorageConfig(null),
|
||||
new DerbyMetadataStorageActionHandlerFactory(
|
||||
derbyConnector,
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get(),
|
||||
mapper
|
||||
)
|
||||
);
|
||||
|
||||
final TaskQueue taskQueue = new TaskQueue(
|
||||
new TaskLockConfig(),
|
||||
new TaskQueueConfig(null, null, null, null, null),
|
||||
new DefaultTaskConfig(),
|
||||
taskStorage,
|
||||
EasyMock.createMock(HttpRemoteTaskRunner.class),
|
||||
createActionClientFactory(),
|
||||
new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()),
|
||||
new StubServiceEmitter("druid/overlord", "testHost"),
|
||||
mapper
|
||||
);
|
||||
|
||||
final DataSchema dataSchema = new DataSchema(
|
||||
"DS",
|
||||
new TimestampSpec(null, null, null),
|
||||
new DimensionsSpec(null),
|
||||
null,
|
||||
new UniformGranularitySpec(Granularities.YEAR, Granularities.DAY, null),
|
||||
null
|
||||
);
|
||||
final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
|
||||
null,
|
||||
new HttpInputSource(Collections.singletonList(URI.create("http://host.org")),
|
||||
"user",
|
||||
new DefaultPasswordProvider(password),
|
||||
null,
|
||||
httpInputSourceConfig),
|
||||
new NoopInputFormat(),
|
||||
null,
|
||||
null
|
||||
);
|
||||
final ParallelIndexSupervisorTask taskWithPassword = new ParallelIndexSupervisorTask(
|
||||
"taskWithPassword",
|
||||
"taskWithPassword",
|
||||
null,
|
||||
new ParallelIndexIngestionSpec(
|
||||
dataSchema,
|
||||
ioConfig,
|
||||
null
|
||||
),
|
||||
null,
|
||||
null,
|
||||
false
|
||||
);
|
||||
Assert.assertTrue(mapper.writeValueAsString(taskWithPassword).contains(password));
|
||||
|
||||
taskQueue.start();
|
||||
taskQueue.add(taskWithPassword);
|
||||
|
||||
final Optional<Task> taskInStorage = taskStorage.getTask(taskWithPassword.getId());
|
||||
Assert.assertTrue(taskInStorage.isPresent());
|
||||
final String taskInStorageAsString = mapper.writeValueAsString(taskInStorage.get());
|
||||
Assert.assertFalse(taskInStorageAsString.contains(password));
|
||||
|
||||
final Optional<Task> taskInQueue = taskQueue.getActiveTask(taskWithPassword.getId());
|
||||
Assert.assertTrue(taskInQueue.isPresent());
|
||||
final String taskInQueueAsString = mapper.writeValueAsString(taskInQueue.get());
|
||||
Assert.assertFalse(taskInQueueAsString.contains(password));
|
||||
|
||||
Assert.assertEquals(taskInStorageAsString, taskInQueueAsString);
|
||||
}
|
||||
|
||||
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> runningTasks)
|
||||
{
|
||||
HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery();
|
||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
|
|||
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
|
||||
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -247,7 +248,8 @@ public class OverlordTest
|
|||
supervisorManager,
|
||||
EasyMock.createNiceMock(OverlordDutyExecutor.class),
|
||||
new TestDruidLeaderSelector(),
|
||||
EasyMock.createNiceMock(SegmentAllocationQueue.class)
|
||||
EasyMock.createNiceMock(SegmentAllocationQueue.class),
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
EmittingLogger.registerEmitter(serviceEmitter);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue