make maxPendingPersistPeriod configurable

This commit is contained in:
nishantmonu51 2014-01-17 11:01:55 +05:30
parent 6f5e1afbff
commit fb819abd6f
5 changed files with 22 additions and 6 deletions

View File

@ -194,7 +194,7 @@ public class RealtimeIndexTask extends AbstractTask
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
segmentGranularity, fireDepartmentConfig.getMaxPendingPersistBatches()
);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);

View File

@ -28,17 +28,23 @@ import org.joda.time.Period;
*/
public class FireDepartmentConfig
{
private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2;
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final int maxPendingPersistBatches;
@JsonCreator
public FireDepartmentConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("maxPendingPersistBatches") int maxPendingPersistBatches
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxPendingPersistBatches = maxPendingPersistBatches > 0
? maxPendingPersistBatches
: MAX_PENDING_PERSIST_BATCHES_DEFAULT;
Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory);
Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod");
@ -55,4 +61,10 @@ public class FireDepartmentConfig
{
return intermediatePersistPeriod;
}
@JsonProperty
public int getMaxPendingPersistBatches()
{
return maxPendingPersistBatches;
}
}

View File

@ -95,6 +95,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final File basePersistDirectory;
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
private final int maxPendingPersistBatches;
private volatile boolean shuttingDown = false;
@JacksonInject
@NotNull
@ -125,7 +126,8 @@ public class RealtimePlumberSchool implements PlumberSchool
public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("maxPendingPersistBatches") int maxPendingPersistBatches
)
{
this.windowPeriod = windowPeriod;
@ -133,7 +135,9 @@ public class RealtimePlumberSchool implements PlumberSchool
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
this.maxPendingPersistBatches = maxPendingPersistBatches;
Preconditions.checkArgument(maxPendingPersistBatches > 0);
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
@ -476,7 +480,7 @@ public class RealtimePlumberSchool implements PlumberSchool
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.blockingSingleThreaded(
"plumber_persist_%d",2
"plumber_persist_%d", maxPendingPersistBatches
);
}
if (scheduledExecutor == null) {

View File

@ -78,7 +78,7 @@ public class RealtimeManagerTest
Arrays.<FireDepartment>asList(
new FireDepartment(
schema,
new FireDepartmentConfig(1, new Period("P1Y")),
new FireDepartmentConfig(1, new Period("P1Y"), 1),
new FirehoseFactory()
{
@Override

View File

@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
new Period("PT10m"),
tmpDir,
IndexGranularity.HOUR
IndexGranularity.HOUR, 1
);
announcer = EasyMock.createMock(DataSegmentAnnouncer.class);