fixes #1447
This commit is contained in:
Nishant 2015-07-09 19:05:48 +05:30
parent 66d105940d
commit 5fe27fe4ad
9 changed files with 28 additions and 9 deletions

View File

@ -132,6 +132,7 @@ public class IndexTask extends AbstractFixedIntervalTask
config.getIndexSpec(),
null,
null,
null,
null
);
}

View File

@ -246,7 +246,8 @@ public class TaskSerdeTest
indexSpec,
false,
false,
null
null,
0.3F
)
)
);
@ -282,6 +283,8 @@ public class TaskSerdeTest
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f);
}
@Test

View File

@ -689,6 +689,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -49,7 +49,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -66,7 +66,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultIndexSpec,
defaultPersistInHeap,
defaultIngestOffheap,
defaultBufferSize
defaultBufferSize,
DEFAULT_AGG_BUFFER_RATIO
);
}
@ -82,6 +83,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
@JsonCreator
public RealtimeTuningConfig(
@ -96,7 +98,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize
@JsonProperty("buffersize") Integer bufferSize,
@JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -115,7 +118,7 @@ public class RealtimeTuningConfig implements TuningConfig
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
}
@JsonProperty
@ -188,6 +191,12 @@ public class RealtimeTuningConfig implements TuningConfig
return bufferSize;
}
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -202,7 +211,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
@ -220,7 +230,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
}

View File

@ -194,7 +194,7 @@ public class Sink implements Iterable<FireHydrant>
newIndex = new OffheapIncrementalIndex(
indexSchema,
// Assuming half space for aggregates
new OffheapBufferPool(config.getBufferSize()),
new OffheapBufferPool((int) ((double) config.getBufferSize() * config.getAggregationBufferRatio())),
true,
config.getBufferSize()
);

View File

@ -75,7 +75,7 @@ public class FireDepartmentTest
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null
null, null, null, null, null, null, null, null, null, false, false, null, null
)
);

View File

@ -106,6 +106,7 @@ public class RealtimeManagerTest
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -165,6 +165,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null,
null
);

View File

@ -65,6 +65,7 @@ public class SinkTest
null,
false,
false,
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);