Realtime:

- Move VersioningPolicy, RetryPolicy outside of RealtimePlumberSchool
- Move plumber stuff into its own package, since there's a lot of it
This commit is contained in:
Gian Merlino 2013-03-14 12:56:25 -07:00
parent c34108418a
commit b8c08f235a
15 changed files with 150 additions and 113 deletions

View File

@ -39,10 +39,10 @@ import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.PlumberSchool;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.Sink;
import com.metamx.druid.realtime.plumber.Sink;
import org.apache.commons.io.FileUtils;

View File

@ -39,9 +39,9 @@ import com.metamx.druid.merger.common.index.YeOldePlumberSchool;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.Sink;
import com.metamx.druid.realtime.plumber.Sink;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -22,12 +22,13 @@ import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.Plumber;
import com.metamx.druid.realtime.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.Sink;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.druid.realtime.plumber.VersioningPolicy;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -164,7 +165,7 @@ public class RealtimeIndexTask extends AbstractTask
// Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the
// realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in
// the plumber such that waiting for the coordinator doesn't block data processing.
final RealtimePlumberSchool.VersioningPolicy versioningPolicy = new RealtimePlumberSchool.VersioningPolicy()
final VersioningPolicy versioningPolicy = new VersioningPolicy()
{
@Override
public String getVersion(final Interval interval)

View File

@ -24,6 +24,8 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool;
import java.io.IOException;

View File

@ -36,6 +36,8 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -0,0 +1,12 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.Interval;
public class IntervalStartVersioningPolicy implements VersioningPolicy
{
@Override
public String getVersion(Interval interval)
{
return interval.getStart().toString();
}
}

View File

@ -0,0 +1,39 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(final Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public DateTime getCurrMaxTime()
{
return new DateTime(maxTimestamp);
}
@Override
public boolean accept(long timestamp)
{
maxTimestamp = Math.max(maxTimestamp, timestamp);
return timestamp >= (maxTimestamp - windowMillis);
}
@Override
public String toString()
{
return String.format("messageTime-%s", windowPeriod);
}
};
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
package com.metamx.druid.realtime.plumber;
import com.metamx.druid.Query;
import com.metamx.druid.query.QueryRunner;

View File

@ -17,11 +17,13 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Schema;
/**
*/

View File

@ -17,13 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@ -55,6 +53,11 @@ import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentAnnouncer;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -572,104 +575,6 @@ public class RealtimePlumberSchool implements PlumberSchool
};
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class)
})
public static interface VersioningPolicy
{
public String getVersion(Interval interval);
}
public static class IntervalStartVersioningPolicy implements VersioningPolicy
{
@Override
public String getVersion(Interval interval)
{
return interval.getStart().toString();
}
}
public interface RejectionPolicy
{
public DateTime getCurrMaxTime();
public boolean accept(long timestamp);
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class)
})
public static interface RejectionPolicyFactory
{
public RejectionPolicy create(Period windowPeriod);
}
public static class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(final Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime();
}
@Override
public boolean accept(long timestamp)
{
return timestamp >= (System.currentTimeMillis() - windowMillis);
}
@Override
public String toString()
{
return String.format("serverTime-%s", windowPeriod);
}
};
}
}
public static class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(final Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public DateTime getCurrMaxTime()
{
return new DateTime(maxTimestamp);
}
@Override
public boolean accept(long timestamp)
{
maxTimestamp = Math.max(maxTimestamp, timestamp);
return timestamp >= (maxTimestamp - windowMillis);
}
@Override
public String toString()
{
return String.format("messageTime-%s", windowPeriod);
}
};
}
}
private File computeBaseDir(Schema schema)
{
return new File(basePersistDirectory, schema.getDataSource());

View File

@ -0,0 +1,9 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
public interface RejectionPolicy
{
public DateTime getCurrMaxTime();
public boolean accept(long timestamp);
}

View File

@ -0,0 +1,15 @@
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.joda.time.Period;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class)
})
public interface RejectionPolicyFactory
{
public RejectionPolicy create(Period windowPeriod);
}

View File

@ -0,0 +1,34 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(final Period windowPeriod)
{
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime();
}
@Override
public boolean accept(long timestamp)
{
return timestamp >= (System.currentTimeMillis() - windowMillis);
}
@Override
public String toString()
{
return String.format("serverTime-%s", windowPeriod);
}
};
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.realtime;
package com.metamx.druid.realtime.plumber;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
@ -32,6 +32,8 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.FireHydrant;
import com.metamx.druid.realtime.Schema;
import org.joda.time.Interval;
import javax.annotation.Nullable;

View File

@ -0,0 +1,14 @@
package com.metamx.druid.realtime.plumber;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.joda.time.Interval;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "intervalStart", value = IntervalStartVersioningPolicy.class)
})
public interface VersioningPolicy
{
public String getVersion(Interval interval);
}