diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java index 4e2f08a992d..d1d494f5c83 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractFixedIntervalTask.java @@ -28,7 +28,7 @@ import org.joda.time.Interval; public abstract class AbstractFixedIntervalTask extends AbstractTask { @JsonIgnore - final Interval interval; + private final Interval interval; protected AbstractFixedIntervalTask( String id, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java index 8f5a0da8f46..872ac3507bd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java @@ -30,9 +30,7 @@ import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.SegmentInsertAction; -import io.druid.indexing.common.actions.TaskActionClient; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexMerger; import io.druid.segment.IndexableAdapter; @@ -81,13 +79,13 @@ public class DeleteTask extends AbstractFixedIntervalTask // Strategy: Create an empty segment covering the interval to be deleted final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); - final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty); + final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty); // Create DataSegment final DataSegment segment = DataSegment.builder() .dataSource(this.getDataSource()) - .interval(interval) + .interval(getInterval()) .version(myLock.getVersion()) .shardSpec(new NoneShardSpec()) .build(); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a92f893ccd6..468ff68bb0a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -160,7 +160,7 @@ public class IndexTask extends AbstractFixedIntervalTask indexGranularity, shardSpec ), - interval, + getInterval(), myLock.getVersion() ); segments.add(segment); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java index ef266b65f21..c77ddb21d96 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java @@ -28,10 +28,8 @@ import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.SegmentListUnusedAction; import io.druid.indexing.common.actions.SegmentNukeAction; -import io.druid.indexing.common.actions.TaskActionClient; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -63,12 +61,6 @@ public class KillTask extends AbstractFixedIntervalTask return "kill"; } - @Override - public boolean isReady(TaskActionClient taskActionClient) throws Exception - { - return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent(); - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -79,8 +71,8 @@ public class KillTask extends AbstractFixedIntervalTask throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource()); } - if (!myLock.getInterval().equals(interval)) { - throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), interval); + if (!myLock.getInterval().equals(getInterval())) { + throw new ISE("WTF?! Lock interval[%s] != task interval[%s]", myLock.getInterval(), getInterval()); } // List unused segments diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 4f43afccc65..d49d74b355b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -40,7 +40,6 @@ import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -52,7 +51,6 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.File; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -202,7 +200,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask final Set current = ImmutableSet.copyOf( Iterables.transform( - taskActionClient.submit(new SegmentListUsedAction(getDataSource(), interval)), + taskActionClient.submit(new SegmentListUsedAction(getDataSource(), getInterval())), toIdentifier ) ); @@ -243,7 +241,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask return Objects.toStringHelper(this) .add("id", getId()) .add("dataSource", getDataSource()) - .add("interval", interval) + .add("interval", getInterval()) .add("segments", segments) .toString(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java index 0d9dfc8f7ec..52807861a12 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/VersionConverterTask.java @@ -29,7 +29,6 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.LockTryAcquireAction; import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; @@ -221,12 +220,6 @@ public class VersionConverterTask extends AbstractFixedIntervalTask return "version_converter_sub"; } - @Override - public boolean isReady(TaskActionClient taskActionClient) throws Exception - { - return taskActionClient.submit(new LockTryAcquireAction(segment.getInterval())).isPresent(); - } - @Override public TaskStatus run(TaskToolbox toolbox) throws Exception {