diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index 6744f2280f0..999aeec6c6b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -158,11 +158,12 @@ public class RealtimeIndexTask extends AbstractTask } }; - // TODO -- This can block if there is lock contention, which will block plumber.getSink (and thus the firehose) - // TODO -- Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the - // TODO -- realtime window, but if they do it can be problematic. - // TODO -- If we decide to care, we can use more threads in the plumber such that waiting for the coordinator - // TODO -- doesn't block data processing. + // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink + // NOTE: (and thus the firehose) + + // Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the + // realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in + // the plumber such that waiting for the coordinator doesn't block data processing. final RealtimePlumberSchool.VersioningPolicy versioningPolicy = new RealtimePlumberSchool.VersioningPolicy() { @Override @@ -180,8 +181,10 @@ public class RealtimeIndexTask extends AbstractTask } }; - // TODO - Might need to have task id in segmentPusher path or some other way of making redundant realtime - // TODO - workers not step on each other's toes when pushing segments to S3 + // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means + // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and + // NOTE: descriptor.json to mismatch, or it can cause compute nodes to load different instances of the + // NOTE: "same" segment. realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher()); realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate()); realtimePlumberSchool.setVersioningPolicy(versioningPolicy);