make ingestOffheap tuneable

This commit is contained in:
nishantmonu51 2014-09-30 15:30:02 +05:30
parent 3f66d3c167
commit 61c7fd2e6e
13 changed files with 55 additions and 19 deletions

View File

@ -166,6 +166,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
ignoreInvalidRows,
jobProperties,
combineText,
false,
false
);
}

View File

@ -54,6 +54,7 @@ public class HadoopTuningConfig implements TuningConfig
false,
null,
false,
false,
false
);
}
@ -70,6 +71,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator
public HadoopTuningConfig(
@ -84,7 +86,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap
)
{
this.workingPath = workingPath == null ? null : workingPath;
@ -101,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
}
@JsonProperty
@ -175,6 +179,11 @@ public class HadoopTuningConfig implements TuningConfig
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -189,7 +198,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap
persistInHeap,
ingestOffheap
);
}
@ -207,7 +217,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap
persistInHeap,
ingestOffheap
);
}
@ -225,7 +236,8 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap
persistInHeap,
ingestOffheap
);
}
}

View File

@ -634,7 +634,8 @@ public class IndexGeneratorJob implements Jobby
for (AggregatorFactory agg : aggs) {
aggsSize += agg.getMaxIntermediateSize();
}
int bufferSize = aggsSize * config.getSchema().getTuningConfig().getRowFlushBoundary();
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
@ -643,7 +644,7 @@ public class IndexGeneratorJob implements Jobby
.withMetrics(aggs)
.build(),
new OffheapBufferPool(bufferSize),
false
tuningConfig.isIngestOffheap()
);
}

View File

@ -403,7 +403,7 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir
).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null),
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
metrics
);

View File

@ -144,6 +144,7 @@ public class RealtimeIndexTask extends AbstractTask
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec(),
false,
false
),
null, null, null, null

View File

@ -57,6 +57,7 @@ import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.mapdb.BTreeKeySerializer;
import org.mapdb.CC;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Serializer;
@ -326,9 +327,12 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
this.useOffheap = true;
this.useOffheap = useOffheap;
if (this.useOffheap) {
final DBMaker dbMaker = DBMaker.newMemoryDirectDB().transactionDisable().asyncWriteEnable().cacheSoftRefEnable();
final DBMaker dbMaker = DBMaker.newMemoryDirectDB()
.transactionDisable()
.asyncWriteEnable()
.cacheSoftRefEnable();
db = dbMaker.make();
factsDb = dbMaker.make();
final TimeAndDimsSerializer timeAndDimsSerializer = new TimeAndDimsSerializer(this);

View File

@ -45,6 +45,8 @@ public class RealtimeTuningConfig implements TuningConfig
private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -58,7 +60,8 @@ public class RealtimeTuningConfig implements TuningConfig
defaultRejectionPolicyFactory,
defaultMaxPendingPersists,
defaultShardSpec,
defaultPersistInHeap
defaultPersistInHeap,
defaultIngestOffheap
);
}
@ -71,6 +74,7 @@ public class RealtimeTuningConfig implements TuningConfig
private final int maxPendingPersists;
private final ShardSpec shardSpec;
private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator
public RealtimeTuningConfig(
@ -82,7 +86,8 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -98,6 +103,7 @@ public class RealtimeTuningConfig implements TuningConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
}
@JsonProperty
@ -154,6 +160,11 @@ public class RealtimeTuningConfig implements TuningConfig
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
@ -165,7 +176,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
persistInHeap
persistInHeap,
ingestOffheap
);
}
@ -180,7 +192,8 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory,
maxPendingPersists,
shardSpec,
persistInHeap
persistInHeap,
ingestOffheap
);
}
}

View File

@ -97,6 +97,7 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(),
((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(),
schema.getShardSpec(),
false,
false
);
} else {

View File

@ -189,7 +189,7 @@ public class Sink implements Iterable<FireHydrant>
.withMetrics(schema.getAggregators())
.build(),
new OffheapBufferPool(bufferSize),
false
config.isIngestOffheap()
);
FireHydrant old;

View File

@ -77,7 +77,7 @@ public class FireDepartmentTest
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, false
null, null, null, null, null, null, null, null, false, false
),
null, null, null, null
);

View File

@ -117,6 +117,7 @@ public class RealtimeManagerTest
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -163,6 +163,7 @@ public class RealtimePlumberSchoolTest
rejectionPolicy,
null,
null,
null,
null
);

View File

@ -64,6 +64,7 @@ public class SinkTest
null,
null,
null,
false,
false
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);