mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid into fjy
This commit is contained in:
commit
bcf57b58b9
|
@ -83,11 +83,11 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
private final Period windowPeriod;
|
||||
private final File basePersistDirectory;
|
||||
private final IndexGranularity segmentGranularity;
|
||||
private final RejectionPolicyFactory rejectionPolicyFactory;
|
||||
|
||||
private volatile Executor persistExecutor = null;
|
||||
private volatile ScheduledExecutorService scheduledExecutor = null;
|
||||
|
||||
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
|
||||
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
|
||||
private volatile SegmentPusher segmentPusher = null;
|
||||
private volatile MetadataUpdater metadataUpdater = null;
|
||||
|
@ -98,20 +98,25 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
public RealtimePlumberSchool(
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
|
||||
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
|
||||
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
|
||||
)
|
||||
{
|
||||
this.windowPeriod = windowPeriod;
|
||||
this.basePersistDirectory = basePersistDirectory;
|
||||
this.segmentGranularity = segmentGranularity;
|
||||
this.rejectionPolicyFactory = rejectionPolicyFactory == null ? new ServerTimeRejectionPolicyFactory() : rejectionPolicyFactory;
|
||||
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
|
||||
|
||||
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
|
||||
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
|
||||
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
|
||||
}
|
||||
|
||||
@JsonProperty("rejectionPolicy")
|
||||
public void setRejectionPolicyFactory(RejectionPolicyFactory factory)
|
||||
{
|
||||
this.rejectionPolicyFactory = factory;
|
||||
}
|
||||
|
||||
@JacksonInject("queryRunnerFactoryConglomerate")
|
||||
public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
|
||||
{
|
||||
|
@ -506,7 +511,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
|
||||
private void verifyState()
|
||||
{
|
||||
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do action.");
|
||||
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");
|
||||
Preconditions.checkNotNull(segmentPusher, "must specify a segmentPusher to do this action.");
|
||||
Preconditions.checkNotNull(metadataUpdater, "must specify a metadataUpdater to do this action.");
|
||||
Preconditions.checkNotNull(serverView, "must specify a serverView to do this action.");
|
||||
|
@ -546,12 +551,12 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
|
||||
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class)
|
||||
})
|
||||
public interface RejectionPolicyFactory
|
||||
public static interface RejectionPolicyFactory
|
||||
{
|
||||
public RejectionPolicy create(Period windowPeriod);
|
||||
}
|
||||
|
||||
public class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
|
||||
public static class ServerTimeRejectionPolicyFactory implements RejectionPolicyFactory
|
||||
{
|
||||
@Override
|
||||
public RejectionPolicy create(final Period windowPeriod)
|
||||
|
@ -581,7 +586,7 @@ public class RealtimePlumberSchool implements PlumberSchool
|
|||
}
|
||||
}
|
||||
|
||||
public class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
|
||||
public static class MessageTimeRejectionPolicyFactory implements RejectionPolicyFactory
|
||||
{
|
||||
@Override
|
||||
public RejectionPolicy create(final Period windowPeriod)
|
||||
|
|
Loading…
Reference in New Issue