Remove task action audit logging and druid_taskLog metadata table (#16309)

Description:
Task action audit logging was first deprecated and disabled by default in Druid 0.13, #6368.

As called out in the original discussion #5859, there are several drawbacks to persisting task action audit logs. 
- Only usage of the task audit logs is to serve the API `/indexer/v1/task/{taskId}/segments`
which returns the list of segments created by a task.
- The use case is really narrow and no prod clusters really use this information.
- There can be better ways of obtaining this information, such as the metric
`segment/added/bytes` which reports both the segment ID and task ID
when a segment is committed by a task. We could also include committed segment IDs in task reports.
- A task persisting several segments would bloat up the audit logs table putting unnecessary strain
on metadata storage.

Changes:
- Remove `TaskAuditLogConfig`
- Remove method `TaskAction.isAudited()`. No task action is audited anymore.
- Remove `SegmentInsertAction` as it is not used anymore. `SegmentTransactionalInsertAction`
is the new incarnation which has been in use for a while.
- Deprecate `MetadataStorageActionHandler.addLog()` and `getLogs()`. These are not used anymore
but need to be retained for backward compatibility of extensions.
- Do not create `druid_taskLog` metadata table anymore.
This commit is contained in:
Kashif Faraz 2024-07-17 04:39:00 -07:00 committed by GitHub
parent ebf216829d
commit 9f6ce6ddc0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 388 additions and 1259 deletions

View File

@ -914,13 +914,10 @@ Host: http://ROUTER_IP:ROUTER_PORT
### Get task segments
:::info
This API is deprecated and will be removed in future releases.
This API is not supported anymore and always returns a 404 response.
Use the metric `segment/added/bytes` instead to identify the segment IDs committed by a task.
:::
Retrieves information about segments generated by the task given the task ID. To hit this endpoint, make sure to enable the audit log config on the Overlord with `druid.indexer.auditLog.enabled = true`.
In addition to enabling audit logs, configure a cleanup strategy to prevent overloading the metadata store with old audit logs which may cause performance issues. To enable automated cleanup of audit logs on the Coordinator, set `druid.coordinator.kill.audit.on`. You may also manually export the audit logs to external storage. For more information, see [Audit records](../operations/clean-metadata-store.md#audit-records).
#### URL
`GET` `/druid/indexer/v1/task/{taskId}/segments`
@ -929,12 +926,14 @@ In addition to enabling audit logs, configure a cleanup strategy to prevent over
<Tabs>
<TabItem value="27" label="200 SUCCESS">
<TabItem value="27" label="404 NOT FOUND">
<br/>
*Successfully retrieved task segments*
```json
{
"error": "Segment IDs committed by a task action are not persisted anymore. Use the metric 'segment/added/bytes' to identify the segments created by a task."
}
```
</TabItem>
</Tabs>

View File

@ -44,7 +44,7 @@ This applies to all metadata entities in this topic except compaction configurat
You can configure the retention period for each metadata type, when available, through the record's `durationToRetain` property.
Certain records may require additional conditions be satisfied before clean up occurs.
See the [example](#example) for how you can customize the automated metadata cleanup for a specific use case.
See the [example](#example-configuration-for-automated-metadata-cleanup) for how you can customize the automated metadata cleanup for a specific use case.
## Automated cleanup strategies
@ -62,13 +62,12 @@ You can configure cleanup for each entity separately, as described in this secti
Define the properties in the `coordinator/runtime.properties` file.
The cleanup of one entity may depend on the cleanup of another entity as follows:
- You have to configure a [kill task for segment records](#kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records).
- You have to configure a [kill task for segment records](#segment-records-and-segments-in-deep-storage-kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records).
- You have to schedule the metadata management tasks to run at the same or higher frequency as your most frequent cleanup job. For example, if your most frequent cleanup job is every hour, set the metadata store management period to one hour or less: `druid.coordinator.period.metadataStoreManagementPeriod=P1H`.
For details on configuration properties, see [Metadata management](../configuration/index.md#metadata-management).
If you want to skip the details, check out the [example](#example) for configuring automated metadata cleanup.
If you want to skip the details, check out the [example](#example-configuration-for-automated-metadata-cleanup) for configuring automated metadata cleanup.
<a name="kill-task"></a>
### Segment records and segments in deep storage (kill task)
:::info
@ -110,7 +109,7 @@ Supervisor cleanup uses the following configuration:
### Rules records
Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#kill-task).
Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task).
Rule cleanup uses the following configuration:
- `druid.coordinator.kill.rule.on`: When `true`, enables cleanup for rules records.
@ -129,7 +128,7 @@ To prevent the configuration from being prematurely removed, wait for the dataso
Unlike other metadata records, compaction configuration records do not have a retention period set by `durationToRetain`. Druid deletes compaction configuration records at every cleanup cycle for inactive datasources, which do not have segments either used or unused.
Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#kill-task).
Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task).
Compaction configuration cleanup uses the following configuration:
- `druid.coordinator.kill.compaction.on`: When `true`, enables cleanup for compaction configuration records.
@ -153,7 +152,7 @@ Datasource cleanup uses the following configuration:
You can configure the Overlord to periodically delete indexer task logs and associated metadata. During cleanup, the Overlord removes the following:
* Indexer task logs from deep storage.
* Indexer task log metadata from the tasks and tasklogs tables in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` and `druid_tasklogs` by default). Druid no longer uses the tasklogs table, and the table is always empty.
* Indexer task log metadata from the tasks table in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` by default).
To configure cleanup of task logs by the Overlord, set the following properties in the `overlord/runtime.properties` file.
@ -188,7 +187,6 @@ druid.coordinator.kill.rule.on=false
druid.coordinator.kill.datasource.on=false
```
<a name="example"></a>
## Example configuration for automated metadata cleanup
Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:

View File

@ -121,12 +121,6 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -22,8 +22,6 @@ package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
@ -37,45 +35,21 @@ public class LocalTaskActionClient implements TaskActionClient
private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class);
private final Task task;
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
private final TaskAuditLogConfig auditLogConfig;
public LocalTaskActionClient(
Task task,
TaskStorage storage,
TaskActionToolbox toolbox,
TaskAuditLogConfig auditLogConfig
TaskActionToolbox toolbox
)
{
this.task = task;
this.storage = storage;
this.toolbox = toolbox;
this.auditLogConfig = auditLogConfig;
}
@Override
public <RetType> RetType submit(TaskAction<RetType> taskAction)
{
log.debug("Performing action for task[%s]: %s", task.getId(), taskAction);
if (auditLogConfig.isEnabled() && taskAction.isAudited()) {
// Add audit log
try {
final long auditLogStartTime = System.currentTimeMillis();
storage.addAuditLog(task, taskAction);
emitTimerMetric("task/action/log/time", taskAction, System.currentTimeMillis() - auditLogStartTime);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}
final long performStartTime = System.currentTimeMillis();
final RetType result = performAction(taskAction);
emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime);

View File

@ -21,27 +21,22 @@ package org.apache.druid.indexing.common.actions;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskStorage;
/**
*/
public class LocalTaskActionClientFactory implements TaskActionClientFactory
{
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
private final TaskAuditLogConfig auditLogConfig;
@Inject
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox, TaskAuditLogConfig auditLogConfig)
public LocalTaskActionClientFactory(TaskActionToolbox toolbox)
{
this.storage = storage;
this.toolbox = toolbox;
this.auditLogConfig = auditLogConfig;
}
@Override
public TaskActionClient create(Task task)
{
return new LocalTaskActionClient(task, storage, toolbox, auditLogConfig);
return new LocalTaskActionClient(task, toolbox);
}
}

View File

@ -39,12 +39,6 @@ public class LockListAction implements TaskAction<List<TaskLock>>
return toolbox.getTaskLockbox().findLocksForTask(task);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -56,12 +56,6 @@ public class LockReleaseAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -67,9 +67,4 @@ public class MarkSegmentsAsUnusedAction implements TaskAction<Integer>
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
}
@Override
public boolean isAudited()
{
return true;
}
}

View File

@ -64,12 +64,6 @@ public class ResetDataSourceMetadataAction implements TaskAction<Boolean>
return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -74,12 +74,6 @@ public class RetrieveSegmentsByIdAction implements TaskAction<Set<DataSegment>>
.retrieveSegmentsById(dataSource, segmentIds);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public boolean equals(Object o)
{

View File

@ -101,12 +101,6 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
.retrieveUnusedSegmentsForInterval(dataSource, interval, versions, limit, maxUsedStatusLastUpdatedTime);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -186,12 +186,6 @@ public class RetrieveUsedSegmentsAction implements TaskAction<Collection<DataSeg
.retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility);
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public boolean equals(Object o)
{

View File

@ -386,12 +386,6 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Set;
/**
* Word of warning: Very large "segments" sets can cause oversized audit log entries, which is bad because it means
* that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the
* same action.
*/
public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
private final Set<DataSegment> segments;
@Nullable
private final SegmentSchemaMapping segmentSchemaMapping;
@JsonCreator
public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("segmentSchemaMapping") @Nullable SegmentSchemaMapping segmentSchemaMapping
)
{
this.segments = ImmutableSet.copyOf(segments);
this.segmentSchemaMapping = segmentSchemaMapping;
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
@Nullable
public SegmentSchemaMapping getSegmentSchemaMapping()
{
return segmentSchemaMapping;
}
@Override
public TypeReference<Set<DataSegment>> getReturnTypeReference()
{
return new TypeReference<Set<DataSegment>>()
{
};
}
/**
* Behaves similarly to
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments},
* with startMetadata and endMetadata both null.
*/
@Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return SegmentTransactionalInsertAction.appendAction(segments, null, null, segmentSchemaMapping).perform(task, toolbox).getSegments();
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{
return "SegmentInsertAction{" +
"segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
'}';
}
}

View File

@ -119,12 +119,6 @@ public class SegmentLockAcquireAction implements TaskAction<LockResult>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -102,12 +102,6 @@ public class SegmentLockTryAcquireAction implements TaskAction<List<LockResult>>
.collect(Collectors.toList());
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -99,12 +99,6 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -101,12 +101,6 @@ public class SegmentNukeAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -44,7 +44,6 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
*
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
@ -209,12 +208,6 @@ public class SegmentTransactionalAppendAction implements TaskAction<SegmentPubli
return retVal;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -304,12 +304,6 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
return segmentsMap;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -187,12 +187,6 @@ public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPubl
);
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -73,12 +73,6 @@ public class SurrogateAction<ReturnType, ActionType extends TaskAction<ReturnTyp
}
}
@Override
public boolean isAudited()
{
return taskAction.isAudited();
}
@Override
public String toString()
{

View File

@ -34,7 +34,6 @@ import java.util.concurrent.Future;
@JsonSubTypes.Type(name = "segmentLockAcquire", value = SegmentLockAcquireAction.class),
@JsonSubTypes.Type(name = "lockList", value = LockListAction.class),
@JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class),
@JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@ -61,8 +60,6 @@ public interface TaskAction<RetType>
RetType perform(Task task, TaskActionToolbox toolbox);
boolean isAudited();
default boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return false;

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The configuration for task audit logging.
* This class will be removed in future releases. See https://github.com/apache/druid/issues/5859.
*/
@Deprecated
public class TaskAuditLogConfig
{
@JsonProperty
private final boolean enabled;
@JsonCreator
public TaskAuditLogConfig(@JsonProperty("enabled") boolean enabled)
{
this.enabled = enabled;
}
@JsonProperty("enabled")
public boolean isEnabled()
{
return enabled;
}
}

View File

@ -97,12 +97,6 @@ public class TimeChunkLockAcquireAction implements TaskAction<TaskLock>
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -81,12 +81,6 @@ public class TimeChunkLockTryAcquireAction implements TaskAction<TaskLock>
return result.getTaskLock();
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{

View File

@ -63,12 +63,6 @@ public class UpdateLocationAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -85,12 +85,6 @@ public class UpdateStatusAction implements TaskAction<Void>
return null;
}
@Override
public boolean isAudited()
{
return true;
}
@Override
public String toString()
{

View File

@ -327,24 +327,6 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
}
@Deprecated
@Override
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
{
synchronized (taskActions) {
taskActions.put(task.getId(), taskAction);
}
}
@Deprecated
@Override
public List<TaskAction> getAuditLogs(String taskid)
{
synchronized (taskActions) {
return ImmutableList.copyOf(taskActions.get(taskid));
}
}
private static class TaskStuff
{
final Task task;

View File

@ -75,14 +75,6 @@ public class MetadataTaskStorage implements TaskStorage
};
}
@Override
public TypeReference<TaskAction> getLogType()
{
return new TypeReference<TaskAction>()
{
};
}
@Override
public TypeReference<TaskLock> getLockType()
{
@ -319,24 +311,6 @@ public class MetadataTaskStorage implements TaskStorage
);
}
@Deprecated
@Override
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
{
Preconditions.checkNotNull(taskAction, "taskAction");
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
handler.addLog(task.getId(), taskAction);
}
@Deprecated
@Override
public List<TaskAction> getAuditLogs(final String taskId)
{
return handler.getLogs(taskId);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return handler.getLocks(taskid);

View File

@ -24,7 +24,6 @@ import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -110,26 +109,6 @@ public interface TaskStorage
@Nullable
TaskInfo<Task, TaskStatus> getTaskInfo(String taskId);
/**
* Add an action taken by a task to the audit log.
*
* @param task task to record action for
* @param taskAction task action to record
* @param <T> task action return type
*/
@Deprecated
<T> void addAuditLog(Task task, TaskAction<T> taskAction);
/**
* Returns all actions taken by a task.
*
* @param taskid task ID
*
* @return list of task actions
*/
@Deprecated
List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.

View File

@ -24,22 +24,16 @@ import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Wraps a {@link TaskStorage}, providing a useful collection of read-only methods.
@ -126,27 +120,4 @@ public class TaskStorageQueryAdapter
return storage.getTaskInfo(taskId);
}
/**
* Returns all segments created by this task.
*
* This method is useful when you want to figure out all of the things a single task spawned. It does pose issues
* with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your
* own risk and know that at some point, we might adjust this to actually enforce some sort of limits.
*
* @param taskid task ID
* @return set of segments created by the specified task
*/
@Deprecated
public Set<DataSegment> getInsertedSegments(final String taskid)
{
final Set<DataSegment> segments = new HashSet<>();
for (final TaskAction action : storage.getAuditLogs(taskid)) {
if (action instanceof SegmentInsertAction) {
segments.addAll(((SegmentInsertAction) action).getSegments());
} else if (action instanceof SegmentTransactionalInsertAction) {
segments.addAll(((SegmentTransactionalInsertAction) action).getSegments());
}
}
return segments;
}
}

View File

@ -84,7 +84,6 @@ import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -400,8 +399,12 @@ public class OverlordResource
@ResourceFilters(TaskResourceFilter.class)
public Response getTaskSegments(@PathParam("taskid") String taskid)
{
final Set<DataSegment> segments = taskStorageQueryAdapter.getInsertedSegments(taskid);
return Response.ok().entity(segments).build();
final String errorMsg =
"Segment IDs committed by a task action are not persisted anymore."
+ " Use the metric 'segment/added/bytes' to identify the segments created by a task.";
return Response.status(Status.NOT_FOUND)
.entity(Collections.singletonMap("error", errorMsg))
.build();
}
@POST

View File

@ -1,154 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.Set;
public class SegmentInsertActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private static final String DATA_SOURCE = "none";
private static final Interval INTERVAL = Intervals.of("2020/2020T01");
private static final String PARTY_YEAR = "1999";
private static final String THE_DISTANT_FUTURE = "3000";
private static final DataSegment SEGMENT1 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(0),
9,
1024
);
private static final DataSegment SEGMENT2 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(1),
9,
1024
);
private static final DataSegment SEGMENT3 = new DataSegment(
DATA_SOURCE,
INTERVAL,
THE_DISTANT_FUTURE,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of(),
new LinearShardSpec(1),
9,
1024
);
private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
throws InterruptedException
{
return actionTestKit.getTaskLockbox().lock(task, new TimeChunkLockRequest(lockType, task, interval, null), timeoutMs);
}
@Test
public void testSimple() throws Exception
{
final Task task = NoopTask.create();
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2), null);
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singleton(INTERVAL),
CriticalAction.builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assertions.assertThat(
actionTestKit.getMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE)
).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2);
}
@Test
public void testFailBadVersion() throws Exception
{
final Task task = NoopTask.create();
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3), null);
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
final Set<DataSegment> segments = actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singleton(INTERVAL),
CriticalAction.<Set<DataSegment>>builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
() -> {
Assert.fail();
return null;
}
)
.build()
);
Assert.assertEquals(ImmutableSet.of(SEGMENT3), segments);
}
}

View File

@ -24,7 +24,6 @@ import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.overlord.TaskStorage;
import java.util.concurrent.ConcurrentHashMap;
@ -42,7 +41,7 @@ public class CountingLocalTaskActionClientForTest implements TaskActionClient
TaskActionToolbox toolbox
)
{
delegate = new LocalTaskActionClient(task, storage, toolbox, new TaskAuditLogConfig(false));
delegate = new LocalTaskActionClient(task, toolbox);
}
@Override

View File

@ -43,7 +43,6 @@ import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@ -353,7 +352,8 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
public class TestLocalTaskActionClient extends CountingLocalTaskActionClientForTest
{
private final Set<DataSegment> publishedSegments = new HashSet<>();
private SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
private final SegmentSchemaMapping segmentSchemaMapping
= new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
private TestLocalTaskActionClient(Task task)
{
@ -365,11 +365,9 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
final RetType result = super.submit(taskAction);
if (taskAction instanceof SegmentTransactionalInsertAction) {
publishedSegments.addAll(((SegmentTransactionalInsertAction) taskAction).getSegments());
segmentSchemaMapping.merge(((SegmentTransactionalInsertAction) taskAction).getSegmentSchemaMapping());
} else if (taskAction instanceof SegmentInsertAction) {
publishedSegments.addAll(((SegmentInsertAction) taskAction).getSegments());
segmentSchemaMapping.merge(((SegmentInsertAction) taskAction).getSegmentSchemaMapping());
SegmentTransactionalInsertAction insertAction = (SegmentTransactionalInsertAction) taskAction;
publishedSegments.addAll(insertAction.getSegments());
segmentSchemaMapping.merge(insertAction.getSegmentSchemaMapping());
}
return result;
}

View File

@ -20,14 +20,13 @@
package org.apache.druid.indexing.overlord;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
@ -38,6 +37,7 @@ import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import java.util.Collections;
import java.util.List;
/**
@ -97,18 +97,7 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
// Push first segment
SegmentInsertAction firstSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval1)
.version(lock1.getVersion())
.size(0)
.build()
),
null
);
toolbox.getTaskActionClient().submit(firstSegmentInsertAction);
toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval1, lock1.getVersion()));
// Release first lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
@ -118,18 +107,7 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
// Push second segment
SegmentInsertAction secondSegmentInsertAction = new SegmentInsertAction(
ImmutableSet.of(
DataSegment.builder()
.dataSource("foo")
.interval(interval2)
.version(lock2.getVersion())
.size(0)
.build()
),
null
);
toolbox.getTaskActionClient().submit(secondSegmentInsertAction);
toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval2, lock2.getVersion()));
// Release second lock
toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
@ -141,4 +119,17 @@ public class RealtimeishTask extends AbstractTask
// Exit
return TaskStatus.success(getId());
}
private SegmentTransactionalInsertAction createSegmentInsertAction(Interval interval, String version)
{
final DataSegment segmentToInsert
= DataSegment.builder()
.dataSource("foo")
.interval(interval)
.version(version)
.size(0)
.build();
return SegmentTransactionalInsertAction
.appendAction(Collections.singleton(segmentToInsert), null, null, null);
}
}

View File

@ -63,11 +63,10 @@ import org.apache.druid.indexing.common.TaskToolboxFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@ -592,7 +591,6 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
taskLockbox = new TaskLockbox(taskStorage, mdc);
tac = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(
taskLockbox,
taskStorage,
@ -600,8 +598,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
emitter,
EasyMock.createMock(SupervisorManager.class),
mapper
),
new TaskAuditLogConfig(true)
)
);
taskConfig = new TaskConfigBuilder()
.setBaseDir(temporaryFolder.newFolder().toString())
@ -747,12 +744,10 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
final TaskStatus mergedStatus = runTask(indexTask);
final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
@ -1103,7 +1098,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1144,7 +1141,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1186,7 +1185,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
.size(0)
.build();
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null));
toolbox.getTaskActionClient().submit(
SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null)
);
return TaskStatus.success(getId());
}
};
@ -1244,11 +1245,9 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished());
final List<DataSegment> loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());

View File

@ -197,10 +197,8 @@ public class OverlordResourceTest
);
}
@Test
public void testLeader()
private void replayAll()
{
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once();
EasyMock.replay(
taskRunner,
taskMaster,
@ -208,8 +206,18 @@ public class OverlordResourceTest
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
authConfig,
configManager,
auditManager,
provisioningStrategy
);
}
@Test
public void testLeader()
{
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once();
replayAll();
final Response response = overlordResource.getLeader();
Assert.assertEquals("boz", response.getEntity());
@ -221,15 +229,7 @@ public class OverlordResourceTest
{
EasyMock.expect(taskMaster.isLeader()).andReturn(true).once();
EasyMock.expect(taskMaster.isLeader()).andReturn(false).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
// true
final Response response1 = overlordResource.isLeader();
@ -267,15 +267,7 @@ public class OverlordResourceTest
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource.getWaitingTasks(req)
.getEntity();
@ -299,15 +291,7 @@ public class OverlordResourceTest
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List) overlordResource
.getCompleteTasks(null, req).getEntity();
@ -341,15 +325,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List) overlordResource.getRunningTasks(null, req)
.getEntity();
@ -390,15 +366,7 @@ public class OverlordResourceTest
)
).atLeastOnce();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, null, null, null, null, req)
.getEntity();
@ -437,15 +405,7 @@ public class OverlordResourceTest
new MockTaskRunnerWorkItem("id_4")
)
).atLeastOnce();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(null, "allow", null, null, null, req)
@ -484,15 +444,7 @@ public class OverlordResourceTest
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks(
"waiting",
@ -537,15 +489,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andReturn(RunnerTaskState.RUNNING);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List) overlordResource
.getTasks("running", "allow", null, null, null, req)
@ -587,15 +531,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getRunnerTaskState("id_3")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andStubReturn(RunnerTaskState.RUNNING);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("pending", null, null, null, null, req)
@ -622,15 +558,7 @@ public class OverlordResourceTest
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
.getEntity();
@ -657,15 +585,7 @@ public class OverlordResourceTest
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
String interval = "2010-01-01_P1D";
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, interval, null, null, req)
@ -712,16 +632,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andReturn(RunnerTaskState.PENDING);
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
// Verify that only the tasks of read access datasource are returned
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@ -769,16 +680,7 @@ public class OverlordResourceTest
EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
// Verify that only the tasks of read access datasource are returned
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
@ -797,16 +699,7 @@ public class OverlordResourceTest
// and no access to "buzzfeed"
expectAuthorizationTokenCheck(Users.WIKI_READER);
// Replay all mocks
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
// Verify that only the tasks of read access datasource are returned
expectedException.expect(WebApplicationException.class);
@ -832,15 +725,7 @@ public class OverlordResourceTest
createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow")
)
);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
List<TaskStatusPlus> responseObjects = (List<TaskStatusPlus>) overlordResource
.getTasks("complete", null, null, null, null, req)
.getEntity();
@ -852,15 +737,7 @@ public class OverlordResourceTest
@Test
public void testGetTasksNegativeState()
{
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Object responseObject = overlordResource
.getTasks("blah", "ds_test", null, null, null, req)
.getEntity();
@ -877,15 +754,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck();
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Task task = NoopTask.create();
overlordResource.taskPost(task, req);
}
@ -914,17 +783,7 @@ public class OverlordResourceTest
auditManager.doAudit(EasyMock.capture(auditEntryCapture));
EasyMock.expectLastCall().once();
EasyMock.replay(
taskRunner,
taskMaster,
taskQueue,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig,
auditManager
);
replayAll();
Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, 10, null, null);
overlordResource.taskPost(task, req);
@ -943,15 +802,7 @@ public class OverlordResourceTest
expectAuthorizationTokenCheck(Users.WIKI_READER);
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
// Verify that taskPost fails for user who has only datasource read access
Task task = NoopTask.forDatasource(Datasources.WIKIPEDIA);
@ -975,15 +826,7 @@ public class OverlordResourceTest
)
.andReturn(2);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
@ -1008,15 +851,7 @@ public class OverlordResourceTest
.andThrow(InvalidInput.exception(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
@ -1042,15 +877,7 @@ public class OverlordResourceTest
.andThrow(DruidException.defensive(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
@ -1076,15 +903,7 @@ public class OverlordResourceTest
.andThrow(new IllegalStateException(exceptionMsg))
.once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
@ -1100,15 +919,7 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.isLeader()).andReturn(false);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource
.killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req);
@ -1131,15 +942,7 @@ public class OverlordResourceTest
EasyMock.expect(taskStorageQueryAdapter.getTask("othertask"))
.andReturn(Optional.absent());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response1 = overlordResource.getTaskPayload("mytask");
final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue(
@ -1182,15 +985,7 @@ public class OverlordResourceTest
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getKnownTasks())
.andReturn(ImmutableList.of());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response1 = overlordResource.getTaskStatus(taskId);
final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue(
@ -1241,15 +1036,7 @@ public class OverlordResourceTest
EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority))
.andReturn(expectedLockedIntervals);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
Assert.assertEquals(200, response.getStatus());
@ -1268,15 +1055,7 @@ public class OverlordResourceTest
@Test
public void testGetLockedIntervalsWithEmptyBody()
{
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
Response response = overlordResource.getDatasourceLockedIntervals(null);
Assert.assertEquals(400, response.getStatus());
@ -1302,16 +1081,8 @@ public class OverlordResourceTest
mockQueue.shutdown("id_1", "Shutdown request from user");
EasyMock.expectLastCall();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
EasyMock.replay(mockQueue);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.doShutdown("id_1")
@ -1354,16 +1125,8 @@ public class OverlordResourceTest
mockQueue.shutdown("id_2", "Shutdown request from user");
EasyMock.expectLastCall();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
mockQueue,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
EasyMock.replay(mockQueue);
final Map<String, Integer> response = (Map<String, Integer>) overlordResource
.shutdownTasksForDataSource("datasource")
@ -1378,15 +1141,7 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList());
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.shutdownTasksForDataSource("notExisting");
Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
@ -1400,15 +1155,7 @@ public class OverlordResourceTest
workerTaskRunnerQueryAdapter.enableWorker(host);
EasyMock.expectLastCall().once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.enableWorker(host);
@ -1424,15 +1171,7 @@ public class OverlordResourceTest
workerTaskRunnerQueryAdapter.disableWorker(host);
EasyMock.expectLastCall().once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.disableWorker(host);
@ -1448,15 +1187,7 @@ public class OverlordResourceTest
workerTaskRunnerQueryAdapter.enableWorker(host);
EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.enableWorker(host);
@ -1472,15 +1203,7 @@ public class OverlordResourceTest
workerTaskRunnerQueryAdapter.disableWorker(host);
EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
authConfig
);
replayAll();
final Response response = overlordResource.disableWorker(host);
@ -1495,16 +1218,7 @@ public class OverlordResourceTest
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
Optional.absent()
).anyTimes();
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
authConfig
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus());
}
@ -1517,16 +1231,7 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
authConfig
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1541,16 +1246,7 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
authConfig
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1566,16 +1262,7 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
EasyMock.replay(
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
authConfig
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1617,17 +1304,9 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy,
authConfig
autoScaler
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1670,17 +1349,9 @@ public class OverlordResourceTest
EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
EasyMock.replay(
workerTaskRunner,
autoScaler,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter,
configManager,
provisioningStrategy,
authConfig
autoScaler
);
replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity());
@ -1706,16 +1377,8 @@ public class OverlordResourceTest
Action.READ
)));
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
EasyMock.replay(task);
replayAll();
Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE),
@ -1728,7 +1391,6 @@ public class OverlordResourceTest
@Test
public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled()
{
final String dataSource = "dataSourceTest";
final UOE expectedException = new UOE("unsupported");
Task task = EasyMock.createMock(Task.class);
@ -1739,17 +1401,8 @@ public class OverlordResourceTest
EasyMock.expect(task.getDestinationResource()).andReturn(java.util.Optional.of(new Resource(dataSource, ResourceType.DATASOURCE)));
EasyMock.expect(task.getInputSourceResources()).andThrow(expectedException);
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
EasyMock.replay(task);
replayAll();
final UOE e = Assert.assertThrows(
UOE.class,
@ -1762,7 +1415,6 @@ public class OverlordResourceTest
@Test
public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled()
{
final String dataSource = "dataSourceTest";
final String inputSourceType = "local";
Task task = EasyMock.createMock(Task.class);
@ -1776,16 +1428,8 @@ public class OverlordResourceTest
Action.READ
)));
EasyMock.replay(
task,
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
EasyMock.replay(task);
replayAll();
Set<ResourceAction> expectedResourceActions = ImmutableSet.of(
new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE)
@ -1797,24 +1441,14 @@ public class OverlordResourceTest
@Test
public void testGetMultipleTaskStatuses_presentTaskQueue()
{
// Needed for teardown
EasyMock.replay(
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
replayAll();
TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
EasyMock.expect(taskQueue.getTaskStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskQueue);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
EasyMock.replay(taskMaster);
EasyMock.replay(taskMaster, taskQueue);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
null,
@ -1835,24 +1469,14 @@ public class OverlordResourceTest
@Test
public void testGetMultipleTaskStatuses_absentTaskQueue()
{
// Needed for teardown
EasyMock.replay(
authConfig,
taskRunner,
taskMaster,
taskStorageQueryAdapter,
indexerMetadataStorageAdapter,
req,
workerTaskRunnerQueryAdapter
);
replayAll();
TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);
EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
.andReturn(Optional.of(TaskStatus.running("task")));
EasyMock.replay(taskStorageQueryAdapter);
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent());
EasyMock.replay(taskMaster);
EasyMock.replay(taskMaster, taskStorageQueryAdapter);
OverlordResource overlordResource = new OverlordResource(
taskMaster,
taskStorageQueryAdapter,
@ -1870,6 +1494,24 @@ public class OverlordResourceTest
Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
}
@Test
public void testGetTaskSegmentsReturns404()
{
replayAll();
OverlordResource overlordResource =
new OverlordResource(null, null, null, null, null, null, null, null, null, null);
final Response response = overlordResource.getTaskSegments("taskId");
Assert.assertEquals(404, response.getStatus());
Assert.assertEquals(
Collections.singletonMap(
"error",
"Segment IDs committed by a task action are not persisted anymore."
+ " Use the metric 'segment/added/bytes' to identify the segments created by a task."
),
response.getEntity()
);
}
private void expectAuthorizationTokenCheck()
{
expectAuthorizationTokenCheck(Users.DRUID);

View File

@ -62,7 +62,6 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@ -631,9 +630,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport
objectMapper
);
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
taskActionToolbox,
new TaskAuditLogConfig(false)
taskActionToolbox
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
{

View File

@ -20,6 +20,8 @@
package org.apache.druid.metadata;
import com.google.common.base.Optional;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@ -31,6 +33,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
@ExtensionPoint
public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{
/**
@ -161,21 +164,34 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
void removeTasksOlderThan(long timestamp);
/**
* Add a log to the entry with the given id.
* Task logs are not used anymore and this method is never called by Druid code.
* It has been retained only for backwards compatibility with older extensions.
* New extensions must not implement this method.
*
* @param entryId entry id
* @param log log to add
* @return true if the log was added
* @throws DruidException of category UNSUPPORTED whenever called.
*/
boolean addLog(String entryId, LogType log);
@Deprecated
default boolean addLog(String entryId, LogType log)
{
throw DruidException.defensive()
.ofCategory(DruidException.Category.UNSUPPORTED)
.build("Task actions are not logged anymore.");
}
/**
* Returns the logs for the entry with the given id.
* Task logs are not used anymore and this method is never called by Druid code.
* It has been retained only for backwards compatibility with older extensions.
* New extensions must not implement this method.
*
* @param entryId entry id
* @return list of logs
* @throws DruidException of category UNSUPPORTED whenever called.
*/
List<LogType> getLogs(String entryId);
@Deprecated
default List<LogType> getLogs(String entryId)
{
throw DruidException.defensive()
.ofCategory(DruidException.Category.UNSUPPORTED)
.build("Task actions are not logged anymore.");
}
/**
* Returns the locks for the given entry
@ -188,7 +204,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
/**
* Returns the lock id for the given entry and the lock.
*
* @return lock id if found. Otherwise null.
* @return lock id if found, otherwise null.
*/
@Nullable
Long getLockId(String entryId, LockType lock);

View File

@ -25,6 +25,5 @@ public interface MetadataStorageActionHandlerTypes<EntryType, StatusType, LogTyp
{
TypeReference<EntryType> getEntryType();
TypeReference<StatusType> getStatusType();
TypeReference<LogType> getLogType();
TypeReference<LockType> getLockType();
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.google.common.base.Optional;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Tests the default methods of the interface {@link MetadataStorageActionHandler}.
* Required only for coverage as these methods are already being tested in
* {@code SQLMetadataStorageActionHandlerTest}.
*/
public class MetadataStorageActionHandlerTest
{
private MetadataStorageActionHandler<String, String, String, String> handler;
@Before
public void setup()
{
this.handler = new MetadataStorageActionHandler<String, String, String, String>()
{
@Override
public void insert(
String id,
DateTime timestamp,
String dataSource,
String entry,
boolean active,
@Nullable String status,
String type,
String groupId
)
{
}
@Override
public boolean setStatus(String entryId, boolean active, String status)
{
return false;
}
@Override
public Optional<String> getEntry(String entryId)
{
return null;
}
@Override
public Optional<String> getStatus(String entryId)
{
return null;
}
@Nullable
@Override
public TaskInfo<String, String> getTaskInfo(String entryId)
{
return null;
}
@Override
public List<TaskInfo<String, String>> getTaskInfos(
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return Collections.emptyList();
}
@Override
public List<TaskInfo<TaskIdentifier, String>> getTaskStatusList(
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return Collections.emptyList();
}
@Override
public boolean addLock(String entryId, String lock)
{
return false;
}
@Override
public boolean replaceLock(String entryId, long oldLockId, String newLock)
{
return false;
}
@Override
public void removeLock(long lockId)
{
}
@Override
public void removeTasksOlderThan(long timestamp)
{
}
@Override
public Map<Long, String> getLocks(String entryId)
{
return Collections.emptyMap();
}
@Override
public Long getLockId(String entryId, String lock)
{
return 0L;
}
@Override
public void populateTaskTypeAndGroupIdAsync()
{
}
};
}
@Test
public void testAddLogThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.addLog("abcd", "logentry")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testGetLogsThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.getLogs("abcd")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils;
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -46,12 +45,4 @@ public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, L
return sql + " FETCH FIRST :n ROWS ONLY";
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s WHERE %s_id in ("
+ " SELECT id FROM %s WHERE created_date < :date_time and active = false)",
getLogTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
@ -44,13 +43,4 @@ public class PostgreSQLMetadataStorageActionHandler<EntryType, StatusType, LogTy
return sql + " LIMIT :n";
}
@Deprecated
@Override
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format("DELETE FROM %s USING %s "
+ "WHERE %s_id = %s.id AND created_date < :date_time and active = false",
getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable());
}
}

View File

@ -518,25 +518,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
);
}
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " %4$s_id VARCHAR(255) DEFAULT NULL,\n"
+ " log_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType(), getPayloadType(), entryTypeName
),
StringUtils.format("CREATE INDEX idx_%1$s_%2$s_id ON %1$s(%2$s_id)", tableName, entryTypeName)
)
);
}
public void createLockTable(final String tableName, final String entryTypeName)
{
createTable(
@ -814,7 +795,6 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
final String entryType = tablesConfig.getTaskEntryType();
prepareTaskEntryTable(tablesConfig.getEntryTable(entryType));
createLogTable(tablesConfig.getLogTable(entryType), entryType);
createLockTable(tablesConfig.getLockTable(entryType), entryType);
}
}

View File

@ -75,12 +75,10 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private final ObjectMapper jsonMapper;
private final TypeReference<EntryType> entryType;
private final TypeReference<StatusType> statusType;
private final TypeReference<LogType> logType;
private final TypeReference<LockType> lockType;
private final String entryTypeName;
private final String entryTable;
private final String logTable;
private final String lockTable;
private final TaskInfoMapper<EntryType, StatusType> taskInfoMapper;
@ -90,7 +88,11 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
private Future<Boolean> taskMigrationCompleteFuture;
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
/**
* @deprecated Use the other constructor without {@code logTable} argument
* since this argument is now unused.
*/
@Deprecated
public SQLMetadataStorageActionHandler(
final SQLMetadataConnector connector,
final ObjectMapper jsonMapper,
@ -100,6 +102,19 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
final String logTable,
final String lockTable
)
{
this(connector, jsonMapper, types, entryTypeName, entryTable, lockTable);
}
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
public SQLMetadataStorageActionHandler(
final SQLMetadataConnector connector,
final ObjectMapper jsonMapper,
final MetadataStorageActionHandlerTypes<EntryType, StatusType, LogType, LockType> types,
final String entryTypeName,
final String entryTable,
final String lockTable
)
{
this.connector = connector;
//fully qualified references required below due to identical package names across project modules.
@ -108,11 +123,9 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
org.apache.druid.metadata.PasswordProviderRedactionMixIn.class);
this.entryType = types.getEntryType();
this.statusType = types.getStatusType();
this.logType = types.getLogType();
this.lockType = types.getLockType();
this.entryTypeName = entryTypeName;
this.entryTable = entryTable;
this.logTable = logTable;
this.lockTable = lockTable;
this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType);
this.taskStatusMapper = new TaskStatusMapper(jsonMapper);
@ -142,7 +155,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
protected String getLogTable()
{
return logTable;
throw new UnsupportedOperationException("'tasklogs' table is not used anymore");
}
protected String getEntryTypeName()
@ -430,7 +443,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
}
/**
* Wraps the given error in a user friendly DruidException.
* Wraps the given error in a user-friendly DruidException.
*/
private DruidException wrapInDruidException(String taskId, Throwable t)
{
@ -855,21 +868,13 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
DateTime dateTime = DateTimes.utc(timestamp);
connector.retryWithHandle(
handle -> {
handle.createStatement(getSqlRemoveLogsOlderThan())
.bind("date_time", dateTime.toString())
.execute();
handle ->
handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE created_date < :date_time AND active = false",
entryTable
)
)
.bind("date_time", dateTime.toString())
.execute();
return null;
}
).bind("date_time", dateTime.toString()).execute()
);
}
@ -880,78 +885,6 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
.execute();
}
@Override
public boolean addLog(final String entryId, final LogType log)
{
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s_id, log_payload) VALUES (:entryId, :payload)",
logTable, entryTypeName
)
)
.bind("entryId", entryId)
.bind("payload", jsonMapper.writeValueAsBytes(log))
.execute() == 1;
}
}
);
}
@Override
public List<LogType> getLogs(final String entryId)
{
return connector.retryWithHandle(
new HandleCallback<List<LogType>>()
{
@Override
public List<LogType> withHandle(Handle handle)
{
return handle
.createQuery(
StringUtils.format(
"SELECT log_payload FROM %1$s WHERE %2$s_id = :entryId",
logTable, entryTypeName
)
)
.bind("entryId", entryId)
.map(ByteArrayMapper.FIRST)
.fold(
new ArrayList<>(),
(List<LogType> list, byte[] bytes, FoldController control, StatementContext ctx) -> {
try {
list.add(jsonMapper.readValue(bytes, logType));
return list;
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize log")
.addData("entryId", entryId)
.addData("payload", StringUtils.fromUtf8(bytes))
.emit();
throw new SQLException(e);
}
}
);
}
}
);
}
@Deprecated
public String getSqlRemoveLogsOlderThan()
{
return StringUtils.format(
"DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id "
+ "WHERE b.created_date < :date_time and b.active = false",
logTable, entryTable, entryTypeName
);
}
@Override
public Map<Long, LockType> getLocks(final String entryId)
{

View File

@ -56,7 +56,6 @@ public class SQLMetadataConnectorSchemaPersistenceTest
tables.add(tablesConfig.getSegmentsTable());
tables.add(tablesConfig.getRulesTable());
tables.add(tablesConfig.getLockTable(entryType));
tables.add(tablesConfig.getLogTable(entryType));
tables.add(tablesConfig.getEntryTable(entryType));
tables.add(tablesConfig.getAuditTable());
tables.add(tablesConfig.getSupervisorTable());
@ -67,7 +66,6 @@ public class SQLMetadataConnectorSchemaPersistenceTest
dropSequence.add(tablesConfig.getSegmentSchemasTable());
dropSequence.add(tablesConfig.getRulesTable());
dropSequence.add(tablesConfig.getLockTable(entryType));
dropSequence.add(tablesConfig.getLogTable(entryType));
dropSequence.add(tablesConfig.getEntryTable(entryType));
dropSequence.add(tablesConfig.getAuditTable());
dropSequence.add(tablesConfig.getSupervisorTable());

View File

@ -75,7 +75,6 @@ public class SQLMetadataConnectorTest
tables.add(tablesConfig.getSegmentsTable());
tables.add(tablesConfig.getRulesTable());
tables.add(tablesConfig.getLockTable(entryType));
tables.add(tablesConfig.getLogTable(entryType));
tables.add(tablesConfig.getEntryTable(entryType));
tables.add(tablesConfig.getAuditTable());
tables.add(tablesConfig.getSupervisorTable());

View File

@ -34,7 +34,6 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.joda.time.DateTime;
@ -73,12 +72,10 @@ public class SQLMetadataStorageActionHandlerTest
TestDerbyConnector connector = derbyConnectorRule.getConnector();
final String entryType = "entry";
final String logTable = "logs";
final String lockTable = "locks";
connector.prepareTaskEntryTable(entryTable);
connector.createLockTable(lockTable, entryType);
connector.createLogTable(logTable, entryType);
handler = new DerbyMetadataStorageActionHandler<>(
connector,
@ -101,12 +98,6 @@ public class SQLMetadataStorageActionHandlerTest
};
}
@Override
public TypeReference<Map<String, String>> getLogType()
{
return JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING;
}
@Override
public TypeReference<Map<String, Object>> getLockType()
{
@ -117,7 +108,7 @@ public class SQLMetadataStorageActionHandlerTest
},
entryType,
entryTable,
logTable,
null,
lockTable
);
}
@ -247,36 +238,30 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test
public void testLogs()
public void testAddLogThrowsUnsupportedException()
{
final String entryId = "abcd";
Map<String, Object> entry = ImmutableMap.of("a", 1);
Map<String, Object> status = ImmutableMap.of("count", 42);
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
Assert.assertEquals(
ImmutableList.of(),
handler.getLogs("non_exist_entry")
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.addLog("abcd", ImmutableMap.of("logentry", "created"))
);
Assert.assertEquals(
ImmutableMap.of(),
handler.getLocks(entryId)
);
final ImmutableMap<String, String> log1 = ImmutableMap.of("logentry", "created");
final ImmutableMap<String, String> log2 = ImmutableMap.of("logentry", "updated");
Assert.assertTrue(handler.addLog(entryId, log1));
Assert.assertTrue(handler.addLog(entryId, log2));
Assert.assertEquals(
ImmutableList.of(log1, log2),
handler.getLogs(entryId)
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testGetLogsThrowsUnsupportedException()
{
Exception exception = Assert.assertThrows(
DruidException.class,
() -> handler.getLogs("abcd")
);
Assert.assertEquals(
"Task actions are not logged anymore.",
exception.getMessage()
);
}
@Test
public void testLocks()
@ -388,19 +373,16 @@ public class SQLMetadataStorageActionHandlerTest
Map<String, Object> entry1 = ImmutableMap.of("numericId", 1234);
Map<String, Object> status1 = ImmutableMap.of("count", 42, "temp", 1);
handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group");
Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created")));
final String entryId2 = "ABC123";
Map<String, Object> entry2 = ImmutableMap.of("a", 1);
Map<String, Object> status2 = ImmutableMap.of("count", 42);
handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group");
Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created")));
final String entryId3 = "DEF5678";
Map<String, Object> entry3 = ImmutableMap.of("numericId", 5678);
Map<String, Object> status3 = ImmutableMap.of("count", 21, "temp", 2);
handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group");
Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created")));
Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1));
Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2));
@ -438,10 +420,6 @@ public class SQLMetadataStorageActionHandlerTest
.collect(Collectors.toList())
);
// tasklogs
Assert.assertEquals(0, handler.getLogs(entryId1).size());
Assert.assertEquals(1, handler.getLogs(entryId2).size());
Assert.assertEquals(1, handler.getLogs(entryId3).size());
}
@Test

View File

@ -58,7 +58,6 @@ import org.apache.druid.indexing.common.TaskStorageDirTracker;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
@ -209,7 +208,6 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.tasklock", TaskLockConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
binder.bind(TaskMaster.class).in(ManageLifecycle.class);

View File

@ -78,7 +78,6 @@ import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
@ -483,7 +482,6 @@ public class CliPeon extends GuiceRunnable
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
configureTaskActionClient(binder);