Merge branch 'master' into subquery

This commit is contained in:
Yuval Oren 2014-01-24 16:48:35 -08:00
commit c99e468ccd
12 changed files with 49 additions and 37 deletions

View File

@ -13,6 +13,8 @@ We provide a brief description of the example to exemplify the types of things t
* `windowPeriod` is the amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.
* `basePersistDirectory` is the directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.
* `maxPendingPersists` is how many persists a plumber can do concurrently without starting to block.
Available Plumbers
------------------

View File

@ -86,6 +86,9 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore
private final Period windowPeriod;
@JsonIgnore
private final int maxPendingPersists;
@JsonIgnore
private final IndexGranularity segmentGranularity;
@ -106,6 +109,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
@ -113,7 +117,7 @@ public class RealtimeIndexTask extends AbstractTask
super(
id == null
? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
:id,
: id,
String.format(
"index_realtime_%s",
@ -135,6 +139,9 @@ public class RealtimeIndexTask extends AbstractTask
this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.maxPendingPersists = (maxPendingPersists == 0)
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
}
@ -194,8 +201,9 @@ public class RealtimeIndexTask extends AbstractTask
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity, fireDepartmentConfig.getMaxPendingPersists()
segmentGranularity
);
realtimePlumberSchool.setDefaultMaxPendingPersists(maxPendingPersists);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);

View File

@ -51,6 +51,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
null,
null,
1,
null,
null
);

View File

@ -198,6 +198,7 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
1,
IndexGranularity.HOUR,
null
);

View File

@ -46,6 +46,7 @@ public class TaskAnnouncementTest
null,
null,
new Period("PT10M"),
1,
IndexGranularity.HOUR,
null
);

View File

@ -39,7 +39,7 @@
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2-mmx1</version>
<version>0.7.2-mmx4</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>

18
pom.xml
View File

@ -218,47 +218,47 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.2.2</version>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.11</version>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.11</version>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>

View File

@ -28,23 +28,17 @@ import org.joda.time.Period;
*/
public class FireDepartmentConfig
{
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final int maxPendingPersists;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxPendingPersists = maxPendingPersists > 0
? maxPendingPersists
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
@ -61,10 +55,4 @@ public class FireDepartmentConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
}

View File

@ -77,8 +77,7 @@ public class FlushingPlumberSchool implements PlumberSchool
@JsonProperty("flushDuration") Duration flushDuration,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
)
{
this.flushDuration = flushDuration;
@ -87,9 +86,9 @@ public class FlushingPlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = maxPendingPersists;
// Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail
this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS;
Preconditions.checkArgument(maxPendingPersists > 0, "FlushingPlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration.");
Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory.");

View File

@ -44,36 +44,44 @@ import java.util.concurrent.ExecutorService;
*/
public class RealtimePlumberSchool implements PlumberSchool
{
public static final int DEFAULT_MAX_PENDING_PERSISTS = 2;
private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class);
private static final int defaultPending = 2;
private final Period windowPeriod;
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JacksonInject
@NotNull
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile int maxPendingPersists;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@ -81,8 +89,7 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersists") int maxPendingPersists
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
)
{
this.windowPeriod = windowPeriod;
@ -90,9 +97,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersists = (maxPendingPersists > 0) ? maxPendingPersists : defaultPending;
// Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail
this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS;
Preconditions.checkArgument(maxPendingPersists > 0, "RealtimePlumberSchool requires maxPendingPersists > 0");
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -145,6 +152,11 @@ public class RealtimePlumberSchool implements PlumberSchool
this.queryExecutorService = executorService;
}
public void setDefaultMaxPendingPersists(int maxPendingPersists)
{
this.maxPendingPersists = maxPendingPersists;
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{

View File

@ -78,7 +78,7 @@ public class RealtimeManagerTest
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y"), 1),
new FireDepartmentConfig(1, new Period("P1Y")),
new FirehoseFactory()
{
@Override

View File

@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR, 1
IndexGranularity.HOUR
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);