consider mapped size in limit calculation & review comments

This commit is contained in:
nishantmonu51 2014-12-03 23:47:30 +05:30
parent da8bd7836b
commit 4dc0fdba8a
8 changed files with 46 additions and 27 deletions

View File

@ -36,19 +36,19 @@ import java.util.Map;
@JsonTypeName("hadoop")
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec defaultPartitionsSpec = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> defaultShardSpecs = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int defaultRowFlushBoundary = 80000;
private static final int defaultBufferSize = 128 * 1024 * 1024;
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
return new HadoopTuningConfig(
null,
new DateTime().toString(),
defaultPartitionsSpec,
defaultShardSpecs,
defaultRowFlushBoundary,
DEFAULT_PARTITIONS_SPEC,
DEFAULT_SHARD_SPECS,
DEFAULT_ROW_FLUSH_BOUNDARY,
false,
true,
false,
@ -57,7 +57,7 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
false,
defaultBufferSize
DEFAULT_BUFFER_SIZE
);
}
@ -96,9 +96,9 @@ public class HadoopTuningConfig implements TuningConfig
{
this.workingPath = workingPath == null ? null : workingPath;
this.version = version == null ? new DateTime().toString() : version;
this.partitionsSpec = partitionsSpec == null ? defaultPartitionsSpec : partitionsSpec;
this.shardSpecs = shardSpecs == null ? defaultShardSpecs : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary == null ? defaultRowFlushBoundary : rowFlushBoundary;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.rowFlushBoundary = rowFlushBoundary == null ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
@ -109,7 +109,7 @@ public class HadoopTuningConfig implements TuningConfig
this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
}
@JsonProperty

View File

@ -642,7 +642,8 @@ public class IndexGeneratorJob implements Jobby
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(tuningConfig.getBufferSize()),
true
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(

View File

@ -84,7 +84,8 @@ public class GroupByQueryHelper
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
bufferPool,
false
false,
Integer.MAX_VALUE
);
} else {
index = new OnheapIncrementalIndex(

View File

@ -31,6 +31,7 @@ import org.mapdb.BTreeKeySerializer;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
import org.mapdb.Store;
import java.io.DataInput;
import java.io.DataOutput;
@ -57,11 +58,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final int[] aggPositionOffsets;
private final int totalAggSize;
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final int sizeLimit;
public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
boolean deserializeComplexMetrics,
int sizeLimit
)
{
super(incrementalIndexSchema, deserializeComplexMetrics);
@ -91,6 +94,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.comparator(timeAndDimsSerializer.getComparator())
.valueSerializer(Serializer.INTEGER)
.make();
this.sizeLimit = sizeLimit;
}
public OffheapIncrementalIndex(
@ -98,7 +102,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
QueryGranularity gran,
final AggregatorFactory[] metrics,
StupidPool<ByteBuffer> bufferPool,
boolean deserializeComplexMetrics
boolean deserializeComplexMetrics,
int sizeLimit
)
{
this(
@ -107,7 +112,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
.withMetrics(metrics)
.build(),
bufferPool,
deserializeComplexMetrics
deserializeComplexMetrics,
sizeLimit
);
}
@ -220,12 +226,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
/**
- * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows.
- */
public boolean isFull()
{
return (size() + 1) * totalAggSize > bufferHolder.get().limit();
}
* - * @return true if the underlying buffer for IncrementalIndex is full and cannot accommodate more rows.
* -
*/
public boolean isFull()
{
return (size() + 1) * totalAggSize > bufferHolder.get().limit() || getCurrentSize() > sizeLimit ;
}
private int getMetricPosition(int rowOffset, int metricIndex)
{
@ -416,4 +423,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
return s1.equals(s2);
}
}
private long getCurrentSize()
{
return Store.forDB(db).getCurrSize() + Store.forDB(factsDb).getCurrSize() + bufferHolder.get().limit();
}
}

View File

@ -166,7 +166,8 @@ public class TestIndex
retVal = new OffheapIncrementalIndex(
schema,
TestQueryRunners.pool,
true
true,
100 * 1024 * 1024
);
} else {
retVal = new OnheapIncrementalIndex(

View File

@ -102,7 +102,8 @@ public class IncrementalIndexTest
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool,
true
true,
100 * 1024 * 1024
);
} else {
return new OnheapIncrementalIndex(

View File

@ -107,7 +107,8 @@ public class IncrementalIndexStorageAdapterTest
QueryGranularity.MINUTE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool,
true
true,
100 * 1024 * 1024
);
}
}

View File

@ -187,8 +187,10 @@ public class Sink implements Iterable<FireHydrant>
if (config.isIngestOffheap()) {
newIndex = new OffheapIncrementalIndex(
indexSchema,
// Assuming half space for aggregates
new OffheapBufferPool(config.getBufferSize()),
true
true,
config.getBufferSize()
);
} else {
newIndex = new OnheapIncrementalIndex(