Merge pull request #1504 from metamx/fix-1447

fix for #1447
This commit is contained in:
Fangjin Yang 2015-07-14 08:50:08 -07:00
commit 3f7ba58227
9 changed files with 28 additions and 9 deletions

View File

@ -132,6 +132,7 @@ public class IndexTask extends AbstractFixedIntervalTask
config.getIndexSpec(), config.getIndexSpec(),
null, null,
null, null,
null,
null null
); );
} }

View File

@ -246,7 +246,8 @@ public class TaskSerdeTest
indexSpec, indexSpec,
false, false,
false, false,
null null,
0.3F
) )
) )
); );
@ -282,6 +283,8 @@ public class TaskSerdeTest
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(), task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity() task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
); );
Assert.assertEquals(task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f);
} }
@Test @Test

View File

@ -689,6 +689,7 @@ public class TaskLifecycleTest
null, null,
null, null,
null, null,
null,
null null
); );
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -49,7 +49,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final boolean defaultPersistInHeap = false; private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false; private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M private static final int defaultBufferSize = 128 * 1024* 1024; // 128M
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
// Might make sense for this to be a builder // Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig() public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -66,7 +66,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultIndexSpec, defaultIndexSpec,
defaultPersistInHeap, defaultPersistInHeap,
defaultIngestOffheap, defaultIngestOffheap,
defaultBufferSize defaultBufferSize,
DEFAULT_AGG_BUFFER_RATIO
); );
} }
@ -82,6 +83,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final boolean persistInHeap; private final boolean persistInHeap;
private final boolean ingestOffheap; private final boolean ingestOffheap;
private final int bufferSize; private final int bufferSize;
private final float aggregationBufferRatio;
@JsonCreator @JsonCreator
public RealtimeTuningConfig( public RealtimeTuningConfig(
@ -96,7 +98,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap, @JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap, @JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize @JsonProperty("buffersize") Integer bufferSize,
@JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
) )
{ {
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -115,7 +118,7 @@ public class RealtimeTuningConfig implements TuningConfig
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap; this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
} }
@JsonProperty @JsonProperty
@ -188,6 +191,12 @@ public class RealtimeTuningConfig implements TuningConfig
return bufferSize; return bufferSize;
} }
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{ {
return new RealtimeTuningConfig( return new RealtimeTuningConfig(
@ -202,7 +211,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec, indexSpec,
persistInHeap, persistInHeap,
ingestOffheap, ingestOffheap,
bufferSize bufferSize,
aggregationBufferRatio
); );
} }
@ -220,7 +230,8 @@ public class RealtimeTuningConfig implements TuningConfig
indexSpec, indexSpec,
persistInHeap, persistInHeap,
ingestOffheap, ingestOffheap,
bufferSize bufferSize,
aggregationBufferRatio
); );
} }
} }

View File

@ -194,7 +194,7 @@ public class Sink implements Iterable<FireHydrant>
newIndex = new OffheapIncrementalIndex( newIndex = new OffheapIncrementalIndex(
indexSchema, indexSchema,
// Assuming half space for aggregates // Assuming half space for aggregates
new OffheapBufferPool(config.getBufferSize()), new OffheapBufferPool((int) ((double) config.getBufferSize() * config.getAggregationBufferRatio())),
true, true,
config.getBufferSize() config.getBufferSize()
); );

View File

@ -75,7 +75,7 @@ public class FireDepartmentTest
) )
), ),
new RealtimeTuningConfig( new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null null, null, null, null, null, null, null, null, null, false, false, null, null
) )
); );

View File

@ -110,6 +110,7 @@ public class RealtimeManagerTest
null, null,
null, null,
null, null,
null,
null null
); );
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -165,6 +165,7 @@ public class RealtimePlumberSchoolTest
null, null,
null, null,
null, null,
null,
null null
); );

View File

@ -65,6 +65,7 @@ public class SinkTest
null, null,
false, false,
false, false,
null,
null null
); );
final Sink sink = new Sink(interval, schema, tuningConfig, version); final Sink sink = new Sink(interval, schema, tuningConfig, version);