Task interval, isReady hygiene

This commit is contained in:
Gian Merlino 2013-12-11 22:41:27 -08:00
parent 05e24bd85c
commit b17dc6f744
6 changed files with 8 additions and 27 deletions

View File

@ -28,7 +28,7 @@ import org.joda.time.Interval;
public abstract class AbstractFixedIntervalTask extends AbstractTask public abstract class AbstractFixedIntervalTask extends AbstractTask
{ {
@JsonIgnore @JsonIgnore
final Interval interval; private final Interval interval;
protected AbstractFixedIntervalTask( protected AbstractFixedIntervalTask(
String id, String id,

View File

@ -30,9 +30,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
@ -81,13 +79,13 @@ public class DeleteTask extends AbstractFixedIntervalTask
// Strategy: Create an empty segment covering the interval to be deleted // Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment // Create DataSegment
final DataSegment segment = final DataSegment segment =
DataSegment.builder() DataSegment.builder()
.dataSource(this.getDataSource()) .dataSource(this.getDataSource())
.interval(interval) .interval(getInterval())
.version(myLock.getVersion()) .version(myLock.getVersion())
.shardSpec(new NoneShardSpec()) .shardSpec(new NoneShardSpec())
.build(); .build();

View File

@ -160,7 +160,7 @@ public class IndexTask extends AbstractFixedIntervalTask
indexGranularity, indexGranularity,
shardSpec shardSpec
), ),
interval, getInterval(),
myLock.getVersion() myLock.getVersion()
); );
segments.add(segment); segments.add(segment);

View File

@ -28,10 +28,8 @@ import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentListUnusedAction;
import io.druid.indexing.common.actions.SegmentNukeAction; import io.druid.indexing.common.actions.SegmentNukeAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -63,12 +61,6 @@ public class KillTask extends AbstractFixedIntervalTask
return "kill"; return "kill";
} }
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {
@ -79,8 +71,8 @@ public class KillTask extends AbstractFixedIntervalTask
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
} }
if (!myLock.getInterval().equals(interval)) { if (!myLock.getInterval().equals(getInterval())) {
throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), interval); throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval());
} }
// List unused segments // List unused segments

View File

@ -40,7 +40,6 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
@ -52,7 +51,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -202,7 +200,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
final Set<String> current = ImmutableSet.copyOf( final Set<String> current = ImmutableSet.copyOf(
Iterables.transform( Iterables.transform(
taskActionClient.submit(new SegmentListUsedAction(getDataSource(), interval)), taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())),
toIdentifier toIdentifier
) )
); );
@ -243,7 +241,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
return Objects.toStringHelper(this) return Objects.toStringHelper(this)
.add("id", getId()) .add("id", getId())
.add("dataSource", getDataSource()) .add("dataSource", getDataSource())
.add("interval", interval) .add("interval", getInterval())
.add("segments", segments) .add("segments", segments)
.toString(); .toString();
} }

View File

@ -29,7 +29,6 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClient;
@ -221,12 +220,6 @@ public class VersionConverterTask extends AbstractFixedIntervalTask
return "version_converter_sub"; return "version_converter_sub";
} }
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return taskActionClient.submit(new LockTryAcquireAction(segment.getInterval())).isPresent();
}
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
{ {