From b8c08f235a6553a37e0779b19d8e54214c4d864d Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Mar 2013 12:56:25 -0700 Subject: [PATCH] Realtime: - Move VersioningPolicy, RetryPolicy outside of RealtimePlumberSchool - Move plumber stuff into its own package, since there's a lot of it --- .../common/index/YeOldePlumberSchool.java | 6 +- .../common/task/IndexGeneratorTask.java | 4 +- .../merger/common/task/RealtimeIndexTask.java | 9 +- .../metamx/druid/realtime/FireDepartment.java | 2 + .../druid/realtime/RealtimeManager.java | 2 + .../IntervalStartVersioningPolicy.java | 12 ++ .../MessageTimeRejectionPolicyFactory.java | 39 +++++++ .../druid/realtime/{ => plumber}/Plumber.java | 2 +- .../realtime/{ => plumber}/PlumberSchool.java | 4 +- .../{ => plumber}/RealtimePlumberSchool.java | 107 +----------------- .../realtime/plumber/RejectionPolicy.java | 9 ++ .../plumber/RejectionPolicyFactory.java | 15 +++ .../ServerTimeRejectionPolicyFactory.java | 34 ++++++ .../druid/realtime/{ => plumber}/Sink.java | 4 +- .../realtime/plumber/VersioningPolicy.java | 14 +++ 15 files changed, 150 insertions(+), 113 deletions(-) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java rename realtime/src/main/java/com/metamx/druid/realtime/{ => plumber}/Plumber.java (97%) rename realtime/src/main/java/com/metamx/druid/realtime/{ => plumber}/PlumberSchool.java (90%) rename realtime/src/main/java/com/metamx/druid/realtime/{ => plumber}/RealtimePlumberSchool.java (89%) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java rename realtime/src/main/java/com/metamx/druid/realtime/{ => plumber}/Sink.java (97%) create mode 100644 realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java diff --git a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java index 777fc3dd378..c89122f49e3 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/index/YeOldePlumberSchool.java @@ -39,10 +39,10 @@ import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireHydrant; -import com.metamx.druid.realtime.Plumber; -import com.metamx.druid.realtime.PlumberSchool; +import com.metamx.druid.realtime.plumber.Plumber; +import com.metamx.druid.realtime.plumber.PlumberSchool; import com.metamx.druid.realtime.Schema; -import com.metamx.druid.realtime.Sink; +import com.metamx.druid.realtime.plumber.Sink; import org.apache.commons.io.FileUtils; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index 6eb58ea91c6..8922a0473bf 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -39,9 +39,9 @@ import com.metamx.druid.merger.common.index.YeOldePlumberSchool; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.FirehoseFactory; -import com.metamx.druid.realtime.Plumber; +import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.Schema; -import com.metamx.druid.realtime.Sink; +import com.metamx.druid.realtime.plumber.Sink; import org.joda.time.DateTime; import org.joda.time.Interval; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index 999aeec6c6b..ac0c3a6b77b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -22,12 +22,13 @@ import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.FirehoseFactory; -import com.metamx.druid.realtime.Plumber; -import com.metamx.druid.realtime.RealtimePlumberSchool; +import com.metamx.druid.realtime.plumber.Plumber; +import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.SegmentAnnouncer; import com.metamx.druid.realtime.SegmentPublisher; -import com.metamx.druid.realtime.Sink; +import com.metamx.druid.realtime.plumber.Sink; +import com.metamx.druid.realtime.plumber.VersioningPolicy; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -164,7 +165,7 @@ public class RealtimeIndexTask extends AbstractTask // Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the // realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in // the plumber such that waiting for the coordinator doesn't block data processing. - final RealtimePlumberSchool.VersioningPolicy versioningPolicy = new RealtimePlumberSchool.VersioningPolicy() + final VersioningPolicy versioningPolicy = new VersioningPolicy() { @Override public String getVersion(final Interval interval) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java b/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java index aab4509bbe5..b895cb21040 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/FireDepartment.java @@ -24,6 +24,8 @@ package com.metamx.druid.realtime; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.druid.realtime.plumber.Plumber; +import com.metamx.druid.realtime.plumber.PlumberSchool; import java.io.IOException; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java index 17dba60a847..26cb785fbc0 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimeManager.java @@ -36,6 +36,8 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.SegmentDescriptor; +import com.metamx.druid.realtime.plumber.Plumber; +import com.metamx.druid.realtime.plumber.Sink; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.joda.time.Interval; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java new file mode 100644 index 00000000000..4ad3f123299 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/IntervalStartVersioningPolicy.java @@ -0,0 +1,12 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.Interval; + +public class IntervalStartVersioningPolicy implements VersioningPolicy +{ + @Override + public String getVersion(Interval interval) + { + return interval.getStart().toString(); + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java new file mode 100644 index 00000000000..117fa6a40eb --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/MessageTimeRejectionPolicyFactory.java @@ -0,0 +1,39 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.DateTime; +import org.joda.time.Period; + +public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory +{ + @Override + public RejectionPolicy create(final Period windowPeriod) + { + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + + return new RejectionPolicy() + { + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(maxTimestamp); + } + + @Override + public boolean accept(long timestamp) + { + maxTimestamp = Math.max(maxTimestamp, timestamp); + + return timestamp >= (maxTimestamp - windowMillis); + } + + @Override + public String toString() + { + return String.format("messageTime-%s", windowPeriod); + } + }; + } +} + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/Plumber.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Plumber.java similarity index 97% rename from realtime/src/main/java/com/metamx/druid/realtime/Plumber.java rename to realtime/src/main/java/com/metamx/druid/realtime/plumber/Plumber.java index 57366b5ee5b..3487c655efb 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/Plumber.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Plumber.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.realtime; +package com.metamx.druid.realtime.plumber; import com.metamx.druid.Query; import com.metamx.druid.query.QueryRunner; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/PlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/PlumberSchool.java similarity index 90% rename from realtime/src/main/java/com/metamx/druid/realtime/PlumberSchool.java rename to realtime/src/main/java/com/metamx/druid/realtime/plumber/PlumberSchool.java index 5fcc1f29f7d..7963c58a0d8 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/PlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/PlumberSchool.java @@ -17,11 +17,13 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.realtime; +package com.metamx.druid.realtime.plumber; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.druid.realtime.FireDepartmentMetrics; +import com.metamx.druid.realtime.Schema; /** */ diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java similarity index 89% rename from realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java rename to realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 84d5058a3ac..d30ef7d7156 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -17,13 +17,11 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.realtime; +package com.metamx.druid.realtime.plumber; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -55,6 +53,11 @@ import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; +import com.metamx.druid.realtime.FireDepartmentMetrics; +import com.metamx.druid.realtime.FireHydrant; +import com.metamx.druid.realtime.Schema; +import com.metamx.druid.realtime.SegmentAnnouncer; +import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -572,104 +575,6 @@ public class RealtimePlumberSchool implements PlumberSchool }; } - @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") - @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class) - }) - public static interface VersioningPolicy - { - public String getVersion(Interval interval); - } - - public static class IntervalStartVersioningPolicy implements VersioningPolicy - { - @Override - public String getVersion(Interval interval) - { - return interval.getStart().toString(); - } - } - - public interface RejectionPolicy - { - public DateTime getCurrMaxTime(); - public boolean accept(long timestamp); - } - - @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") - @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), - @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class) - }) - public static interface RejectionPolicyFactory - { - public RejectionPolicy create(Period windowPeriod); - } - - public static class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory - { - @Override - public RejectionPolicy create(final Period windowPeriod) - { - final long windowMillis = windowPeriod.toStandardDuration().getMillis(); - - return new RejectionPolicy() - { - @Override - public DateTime getCurrMaxTime() - { - return new DateTime(); - } - - @Override - public boolean accept(long timestamp) - { - return timestamp >= (System.currentTimeMillis() - windowMillis); - } - - @Override - public String toString() - { - return String.format("serverTime-%s", windowPeriod); - } - }; - } - } - - public static class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory - { - @Override - public RejectionPolicy create(final Period windowPeriod) - { - final long windowMillis = windowPeriod.toStandardDuration().getMillis(); - - return new RejectionPolicy() - { - private volatile long maxTimestamp = Long.MIN_VALUE; - - @Override - public DateTime getCurrMaxTime() - { - return new DateTime(maxTimestamp); - } - - @Override - public boolean accept(long timestamp) - { - maxTimestamp = Math.max(maxTimestamp, timestamp); - - return timestamp >= (maxTimestamp - windowMillis); - } - - @Override - public String toString() - { - return String.format("messageTime-%s", windowPeriod); - } - }; - } - } - private File computeBaseDir(Schema schema) { return new File(basePersistDirectory, schema.getDataSource()); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java new file mode 100644 index 00000000000..847c917dc35 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicy.java @@ -0,0 +1,9 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.DateTime; + +public interface RejectionPolicy +{ + public DateTime getCurrMaxTime(); + public boolean accept(long timestamp); +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java new file mode 100644 index 00000000000..40e8e496bf6 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RejectionPolicyFactory.java @@ -0,0 +1,15 @@ +package com.metamx.druid.realtime.plumber; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.joda.time.Period; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), + @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class) +}) +public interface RejectionPolicyFactory +{ + public RejectionPolicy create(Period windowPeriod); +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java new file mode 100644 index 00000000000..3557a8ba3bc --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/ServerTimeRejectionPolicyFactory.java @@ -0,0 +1,34 @@ +package com.metamx.druid.realtime.plumber; + +import org.joda.time.DateTime; +import org.joda.time.Period; + +public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory +{ + @Override + public RejectionPolicy create(final Period windowPeriod) + { + final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + + return new RejectionPolicy() + { + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(); + } + + @Override + public boolean accept(long timestamp) + { + return timestamp >= (System.currentTimeMillis() - windowMillis); + } + + @Override + public String toString() + { + return String.format("serverTime-%s", windowPeriod); + } + }; + } +} diff --git a/realtime/src/main/java/com/metamx/druid/realtime/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java similarity index 97% rename from realtime/src/main/java/com/metamx/druid/realtime/Sink.java rename to realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java index 70e305b67e4..d1985082622 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/Sink.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/Sink.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.realtime; +package com.metamx.druid.realtime.plumber; import com.google.common.base.Function; import com.google.common.base.Predicate; @@ -32,6 +32,8 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.input.InputRow; +import com.metamx.druid.realtime.FireHydrant; +import com.metamx.druid.realtime.Schema; import org.joda.time.Interval; import javax.annotation.Nullable; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java new file mode 100644 index 00000000000..5fe790dd284 --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/VersioningPolicy.java @@ -0,0 +1,14 @@ +package com.metamx.druid.realtime.plumber; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.joda.time.Interval; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class) +}) +public interface VersioningPolicy +{ + public String getVersion(Interval interval); +}