mirror of https://github.com/apache/druid.git
Indexing service: Break up segment actions
Each one now one operates on at most a collection of segments that comprise a single partition. The main purpose of this change is to prevent audit log payload sizes from getting out of control.
This commit is contained in:
parent
2d2fa319bd
commit
566a3a6112
|
@ -20,10 +20,15 @@
|
|||
package io.druid.indexing.common;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import com.metamx.metrics.MonitorScheduler;
|
||||
import io.druid.client.ServerView;
|
||||
import io.druid.indexing.common.actions.SegmentInsertAction;
|
||||
import io.druid.indexing.common.actions.TaskActionClient;
|
||||
import io.druid.indexing.common.actions.TaskActionClientFactory;
|
||||
import io.druid.indexing.common.config.TaskConfig;
|
||||
|
@ -37,10 +42,14 @@ import io.druid.segment.loading.SegmentLoader;
|
|||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.server.coordination.DataSegmentAnnouncer;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
|
@ -167,7 +176,7 @@ public class TaskToolbox
|
|||
return objectMapper;
|
||||
}
|
||||
|
||||
public Map<DataSegment, File> getSegments(List<DataSegment> segments)
|
||||
public Map<DataSegment, File> fetchSegments(List<DataSegment> segments)
|
||||
throws SegmentLoadingException
|
||||
{
|
||||
Map<DataSegment, File> retVal = Maps.newLinkedHashMap();
|
||||
|
@ -178,6 +187,25 @@ public class TaskToolbox
|
|||
return retVal;
|
||||
}
|
||||
|
||||
public void pushSegments(Iterable<DataSegment> segments) throws IOException {
|
||||
// Request segment pushes for each set
|
||||
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
|
||||
segments,
|
||||
new Function<DataSegment, Interval>()
|
||||
{
|
||||
@Override
|
||||
public Interval apply(DataSegment segment)
|
||||
{
|
||||
return segment.getInterval();
|
||||
}
|
||||
}
|
||||
);
|
||||
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
|
||||
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public File getTaskWorkDir()
|
||||
{
|
||||
return taskWorkDir;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.indexing.common.actions;
|
||||
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.indexing.overlord.TaskStorage;
|
||||
|
@ -45,21 +46,21 @@ public class LocalTaskActionClient implements TaskActionClient
|
|||
{
|
||||
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);
|
||||
|
||||
final RetType ret = taskAction.perform(task, toolbox);
|
||||
|
||||
if (taskAction.isAudited()) {
|
||||
// Add audit log
|
||||
try {
|
||||
storage.addAuditLog(task, taskAction);
|
||||
}
|
||||
catch (Exception e) {
|
||||
final String actionClass = taskAction.getClass().getName();
|
||||
log.makeAlert(e, "Failed to record action in audit log")
|
||||
.addData("task", task.getId())
|
||||
.addData("actionClass", taskAction.getClass().getName())
|
||||
.addData("actionClass", actionClass)
|
||||
.emit();
|
||||
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
return taskAction.perform(task, toolbox);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -80,9 +79,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
|
|||
@Override
|
||||
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
|
||||
{
|
||||
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
|
||||
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
|
||||
}
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
|
||||
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -42,10 +41,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
|
|||
Task task, TaskActionToolbox toolbox
|
||||
) throws IOException
|
||||
{
|
||||
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
|
||||
throw new ISE("Segments not covered by locks for task: %s", task.getId());
|
||||
}
|
||||
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
|
||||
|
||||
// Emit metrics
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.service.ServiceMetricEvent;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -59,10 +58,7 @@ public class SegmentNukeAction implements TaskAction<Void>
|
|||
@Override
|
||||
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
|
||||
{
|
||||
if(!toolbox.taskLockCoversSegments(task, segments, true)) {
|
||||
throw new ISE("Segments not covered by locks for task: %s", task.getId());
|
||||
}
|
||||
|
||||
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
|
||||
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
|
||||
|
||||
// Emit metrics
|
||||
|
|
|
@ -19,9 +19,11 @@
|
|||
|
||||
package io.druid.indexing.common.actions;
|
||||
|
||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.emitter.service.ServiceEmitter;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
|
@ -65,6 +67,42 @@ public class TaskActionToolbox
|
|||
return emitter;
|
||||
}
|
||||
|
||||
public boolean segmentsAreFromSamePartitionSet(
|
||||
final Set<DataSegment> segments
|
||||
)
|
||||
{
|
||||
// Verify that these segments are all in the same partition set
|
||||
|
||||
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
|
||||
final DataSegment firstSegment = segments.iterator().next();
|
||||
for (final DataSegment segment : segments) {
|
||||
if (!segment.getDataSource().equals(firstSegment.getDataSource())) {
|
||||
return false;
|
||||
}
|
||||
if (!segment.getInterval().equals(firstSegment.getInterval())) {
|
||||
return false;
|
||||
}
|
||||
if (!segment.getVersion().equals(firstSegment.getVersion())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public void verifyTaskLocksAndSinglePartitionSettitude(
|
||||
final Task task,
|
||||
final Set<DataSegment> segments,
|
||||
final boolean allowOlderVersions
|
||||
)
|
||||
{
|
||||
if (!taskLockCoversSegments(task, segments, allowOlderVersions)) {
|
||||
throw new ISE("Segments not covered by locks for task: %s", task.getId());
|
||||
}
|
||||
if (!segmentsAreFromSamePartitionSet(segments)) {
|
||||
throw new ISE("Segments are not in the same partition set: %s", segments);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean taskLockCoversSegments(
|
||||
final Task task,
|
||||
final Set<DataSegment> segments,
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.indexing.common.task;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -103,7 +104,7 @@ public class DeleteTask extends AbstractFixedIntervalTask
|
|||
segment.getVersion()
|
||||
);
|
||||
|
||||
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
|
||||
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
|
|
@ -24,10 +24,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Multimaps;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||
|
@ -47,12 +51,15 @@ import io.tesla.aether.internal.DefaultTeslaAether;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class HadoopIndexTask extends AbstractFixedIntervalTask
|
||||
{
|
||||
|
@ -180,14 +187,10 @@ public class HadoopIndexTask extends AbstractFixedIntervalTask
|
|||
|
||||
if (segments != null) {
|
||||
List<DataSegment> publishedSegments = toolbox.getObjectMapper().readValue(
|
||||
segments, new TypeReference<List<DataSegment>>()
|
||||
{
|
||||
}
|
||||
segments,
|
||||
new TypeReference<List<DataSegment>>() {}
|
||||
);
|
||||
// Request segment pushes
|
||||
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
|
||||
|
||||
// Done
|
||||
toolbox.pushSegments(publishedSegments);
|
||||
return TaskStatus.success(getId());
|
||||
} else {
|
||||
return TaskStatus.failure(getId());
|
||||
|
|
|
@ -156,7 +156,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
segments.add(segment);
|
||||
}
|
||||
}
|
||||
toolbox.getTaskActionClient().submit(new SegmentInsertAction(segments));
|
||||
toolbox.pushSegments(segments);
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
|
|
|
@ -97,11 +97,9 @@ public class KillTask extends AbstractFixedIntervalTask
|
|||
// Kill segments
|
||||
for (DataSegment segment : unusedSegments) {
|
||||
toolbox.getDataSegmentKiller().kill(segment);
|
||||
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
|
||||
}
|
||||
|
||||
// Remove metadata for these segments
|
||||
toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
|
|||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -142,7 +143,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
|
|||
);
|
||||
|
||||
// download segments to merge
|
||||
final Map<DataSegment, File> gettedSegments = toolbox.getSegments(segments);
|
||||
final Map<DataSegment, File> gettedSegments = toolbox.fetchSegments(segments);
|
||||
|
||||
// merge files together
|
||||
final File fileToUpload = merge(gettedSegments, new File(taskDir, "merged"));
|
||||
|
@ -165,7 +166,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
|
|||
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
|
||||
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
|
||||
|
||||
toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
|
||||
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
|
|
@ -98,18 +98,12 @@ public class MoveTask extends AbstractFixedIntervalTask
|
|||
log.info("OK to move segment: %s", unusedSegment.getIdentifier());
|
||||
}
|
||||
|
||||
List<DataSegment> movedSegments = Lists.newLinkedList();
|
||||
|
||||
// Move segments
|
||||
for (DataSegment segment : unusedSegments) {
|
||||
movedSegments.add(toolbox.getDataSegmentMover().move(segment, targetLoadSpec));
|
||||
final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
|
||||
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
|
||||
}
|
||||
|
||||
// Update metadata for moved segments
|
||||
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(
|
||||
ImmutableSet.copyOf(movedSegments)
|
||||
));
|
||||
|
||||
return TaskStatus.success(getId());
|
||||
}
|
||||
|
||||
|
|
|
@ -418,7 +418,7 @@ public class RealtimeIndexTask extends AbstractTask
|
|||
@Override
|
||||
public void publishSegment(DataSegment segment) throws IOException
|
||||
{
|
||||
taskToolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
|
||||
taskToolbox.pushSegments(ImmutableList.of(segment));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.guava.FunctionalIterable;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
@ -248,7 +249,7 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
|
|||
}
|
||||
}
|
||||
|
||||
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
|
||||
final Map<DataSegment, File> localSegments = toolbox.fetchSegments(Arrays.asList(segment));
|
||||
|
||||
final File location = localSegments.get(segment);
|
||||
final File outLocation = new File(location, "v9_out");
|
||||
|
|
Loading…
Reference in New Issue