Merge pull request #1895 from druid-io/cleanup-codebase

Cleanup the Druid codebase
This commit is contained in:
Himanshu 2015-11-04 17:16:33 -06:00
commit 3c3ea128cc
58 changed files with 351 additions and 2122 deletions

View File

@ -103,11 +103,7 @@ The spec\_file is a path to a file that contains JSON and an example looks like:
"overwriteFiles" : false, "overwriteFiles" : false,
"ignoreInvalidRows" : false, "ignoreInvalidRows" : false,
"jobProperties" : { }, "jobProperties" : { },
"combineText" : false, "combineText" : false,
"persistInHeap" : false,
"ingestOffheap" : false,
"bufferSize" : 134217728,
"aggregationBufferRatio" : 0.5,
"rowFlushBoundary" : 300000 "rowFlushBoundary" : 300000
} }
} }

View File

@ -456,11 +456,6 @@ public class HadoopDruidIndexerConfig
} }
} }
public boolean isPersistInHeap()
{
return schema.getTuningConfig().isPersistInHeap();
}
public String getWorkingPath() public String getWorkingPath()
{ {
final String workingPath = schema.getTuningConfig().getWorkingPath(); final String workingPath = schema.getTuningConfig().getWorkingPath();

View File

@ -65,11 +65,7 @@ public class HadoopDruidIndexerJob implements Jobby
List<Jobby> jobs = Lists.newArrayList(); List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config); JobHelper.ensurePaths(config);
if (config.isPersistInHeap()) { indexJob = new IndexGeneratorJob(config);
indexJob = new IndexGeneratorJob(config);
} else {
indexJob = new LegacyIndexGeneratorJob(config);
}
jobs.add(indexJob); jobs.add(indexJob);
if (metadataStorageUpdaterJob != null) { if (metadataStorageUpdaterJob != null) {

View File

@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.indexing.TuningConfig; import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -37,11 +36,9 @@ import java.util.Map;
public class HadoopTuningConfig implements TuningConfig public class HadoopTuningConfig implements TuningConfig
{ {
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(); private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false; private static final boolean DEFAULT_USE_COMBINER = false;
public static HadoopTuningConfig makeDefaultTuningConfig() public static HadoopTuningConfig makeDefaultTuningConfig()
@ -59,11 +56,7 @@ public class HadoopTuningConfig implements TuningConfig
false, false,
null, null,
false, false,
false, false
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
); );
} }
@ -79,10 +72,6 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean ignoreInvalidRows; private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties; private final Map<String, String> jobProperties;
private final boolean combineText; private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner; private final boolean useCombiner;
@JsonCreator @JsonCreator
@ -99,10 +88,6 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties, final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText, final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner final @JsonProperty("useCombiner") Boolean useCombiner
) )
{ {
@ -120,10 +105,6 @@ public class HadoopTuningConfig implements TuningConfig
? ImmutableMap.<String, String>of() ? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties)); : ImmutableMap.copyOf(jobProperties));
this.combineText = combineText; this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
} }
@ -199,28 +180,6 @@ public class HadoopTuningConfig implements TuningConfig
return combineText; return combineText;
} }
@JsonProperty
public boolean isPersistInHeap()
{
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
@JsonProperty
public int getBufferSize(){
return bufferSize;
}
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
@JsonProperty @JsonProperty
public boolean getUseCombiner() public boolean getUseCombiner()
{ {
@ -242,10 +201,6 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner useCombiner
); );
} }
@ -265,10 +220,6 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner useCombiner
); );
} }
@ -288,10 +239,6 @@ public class HadoopTuningConfig implements TuningConfig
ignoreInvalidRows, ignoreInvalidRows,
jobProperties, jobProperties,
combineText, combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner useCombiner
); );
} }

View File

@ -30,19 +30,16 @@ import com.google.common.primitives.Longs;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
import io.druid.indexer.hadoop.SegmentInputRow; import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.LoggingProgressIndicator; import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator; import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -157,7 +154,7 @@ public class IndexGeneratorJob implements Jobby
throw new RuntimeException("No buckets?? seems there is no data to index."); throw new RuntimeException("No buckets?? seems there is no data to index.");
} }
if(config.getSchema().getTuningConfig().getUseCombiner()) { if (config.getSchema().getTuningConfig().getUseCombiner()) {
job.setCombinerClass(IndexGeneratorCombiner.class); job.setCombinerClass(IndexGeneratorCombiner.class);
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class); job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
} }
@ -200,9 +197,7 @@ public class IndexGeneratorJob implements Jobby
private static IncrementalIndex makeIncrementalIndex( private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket, Bucket theBucket,
AggregatorFactory[] aggs, AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config, HadoopDruidIndexerConfig config
boolean isOffHeap,
StupidPool bufferPool
) )
{ {
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
@ -212,19 +207,11 @@ public class IndexGeneratorJob implements Jobby
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs) .withMetrics(aggs)
.build(); .build();
if (isOffHeap) {
return new OffheapIncrementalIndex( return new OnheapIncrementalIndex(
indexSchema, indexSchema,
bufferPool, tuningConfig.getRowFlushBoundary()
true, );
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}
} }
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable> public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
@ -320,20 +307,20 @@ public class IndexGeneratorJob implements Jobby
Iterator<BytesWritable> iter = values.iterator(); Iterator<BytesWritable> iter = values.iterator();
BytesWritable first = iter.next(); BytesWritable first = iter.next();
if(iter.hasNext()) { if (iter.hasNext()) {
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));
while(iter.hasNext()) { while (iter.hasNext()) {
context.progress(); context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);
if(!index.canAppendRow()) { if (!index.canAppendRow()) {
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context); flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); index = makeIncrementalIndex(bucket, combiningAggs, config);
} }
index.add(value); index.add(value);
@ -345,10 +332,11 @@ public class IndexGeneratorJob implements Jobby
} }
} }
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context)
throws IOException, InterruptedException
{ {
Iterator<Row> rows = index.iterator(); Iterator<Row> rows = index.iterator();
while(rows.hasNext()) { while (rows.hasNext()) {
context.progress(); context.progress();
Row row = rows.next(); Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, index.getDimensions()); InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
@ -360,7 +348,8 @@ public class IndexGeneratorJob implements Jobby
index.close(); index.close();
} }
private InputRow getInputRowFromRow(final Row row, final List<String> dimensions) { private InputRow getInputRowFromRow(final Row row, final List<String> dimensions)
{
return new InputRow() return new InputRow()
{ {
@Override @Override
@ -467,14 +456,14 @@ public class IndexGeneratorJob implements Jobby
}; };
} }
protected File persist( private File persist(
final IncrementalIndex index, final IncrementalIndex index,
final Interval interval, final Interval interval,
final File file, final File file,
final ProgressIndicator progressIndicator final ProgressIndicator progressIndicator
) throws IOException ) throws IOException
{ {
return HadoopDruidIndexerConfig.INDEX_MAKER.persist( return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator index, interval, file, null, config.getIndexSpec(), progressIndicator
); );
} }
@ -514,17 +503,11 @@ public class IndexGeneratorJob implements Jobby
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* config.getSchema().getTuningConfig().getAggregationBufferRatio());
final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex( IncrementalIndex index = makeIncrementalIndex(
bucket, bucket,
combiningAggs, combiningAggs,
config, config
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
); );
try { try {
File baseFlushFile = File.createTempFile("base", "flush"); File baseFlushFile = File.createTempFile("base", "flush");
@ -570,9 +553,7 @@ public class IndexGeneratorJob implements Jobby
index = makeIncrementalIndex( index = makeIncrementalIndex(
bucket, bucket,
combiningAggs, combiningAggs,
config, config
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
); );
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
++indexCount; ++indexCount;
@ -602,7 +583,7 @@ public class IndexGeneratorJob implements Jobby
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file)); indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
} }
mergedBase = mergeQueryableIndex( mergedBase = mergeQueryableIndex(
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
); );
} }
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())

View File

@ -1,83 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexer;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
/**
*/
public class LegacyIndexGeneratorJob extends IndexGeneratorJob
{
public LegacyIndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
super(config);
}
@Override
protected void setReducerClass(Job job)
{
job.setReducerClass(LegacyIndexGeneratorReducer.class);
}
public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer
{
@Override
protected ProgressIndicator makeProgressIndicator(final Context context)
{
return new BaseProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
};
}
@Override
protected File persist(
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(index, interval, file, null, config.getIndexSpec(), progressIndicator);
}
@Override
protected File mergeQueryableIndex(
List<QueryableIndex> indexes,
AggregatorFactory[] aggs,
File file,
ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(indexes, aggs, file, config.getIndexSpec(), progressIndicator);
}
}
}

View File

@ -277,7 +277,7 @@ public class BatchDeltaIngestionTest
WindowedDataSegment windowedDataSegment WindowedDataSegment windowedDataSegment
) throws Exception ) throws Exception
{ {
IndexGeneratorJob job = new LegacyIndexGeneratorJob(config); IndexGeneratorJob job = new IndexGeneratorJob(config);
JobHelper.runJobs(ImmutableList.<Jobby>of(job), config); JobHelper.runJobs(ImmutableList.<Jobby>of(job), config);
File segmentFolder = new File( File segmentFolder = new File(
@ -380,10 +380,6 @@ public class BatchDeltaIngestionTest
false, false,
null, null,
false, false,
false,
false,
null,
null,
false false
) )
) )

View File

@ -160,10 +160,6 @@ public class DetermineHashedPartitionsJobTest
false, false,
null, null,
false, false,
false,
false,
null,
null,
false false
) )
); );

View File

@ -263,10 +263,6 @@ public class DeterminePartitionsJobTest
false, false,
null, null,
false, false,
false,
false,
null,
null,
false false
) )
) )

View File

@ -204,10 +204,6 @@ public class HadoopDruidIndexerConfigTest
false, false,
null, null,
false, false,
false,
false,
null,
null,
false false
) )
); );

View File

@ -52,10 +52,6 @@ public class HadoopTuningConfigTest
true, true,
null, null,
true, true,
true,
true,
200,
0.1f,
true true
); );
@ -73,10 +69,6 @@ public class HadoopTuningConfigTest
Assert.assertEquals(true, actual.isIgnoreInvalidRows()); Assert.assertEquals(true, actual.isIgnoreInvalidRows());
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties()); Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText()); Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.isPersistInHeap());
Assert.assertEquals(true, actual.isIngestOffheap());
Assert.assertEquals(200, actual.getBufferSize());
Assert.assertEquals(0.1f, actual.getAggregationBufferRatio(), 0.0001);
Assert.assertEquals(true, actual.getUseCombiner()); Assert.assertEquals(true, actual.getUseCombiner());
} }

View File

@ -394,10 +394,6 @@ public class IndexGeneratorJobTest
false, false,
ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false, false,
false,
false,
null,
null,
useCombiner useCombiner
) )
) )
@ -453,12 +449,6 @@ public class IndexGeneratorJobTest
verifyJob(new IndexGeneratorJob(config)); verifyJob(new IndexGeneratorJob(config));
} }
@Test
public void testLegacyIndexGeneratorJob() throws IOException
{
verifyJob(new LegacyIndexGeneratorJob(config));
}
private void verifyJob(IndexGeneratorJob job) throws IOException private void verifyJob(IndexGeneratorJob job) throws IOException
{ {
JobHelper.runJobs(ImmutableList.<Jobby>of(job), config); JobHelper.runJobs(ImmutableList.<Jobby>of(job), config);

View File

@ -112,10 +112,6 @@ public class JobHelperTest
"THISISMYACCESSKEY" "THISISMYACCESSKEY"
), ),
false, false,
false,
false,
null,
null,
false false
) )
) )

View File

@ -200,10 +200,6 @@ public class HadoopConverterJobTest
false, false,
null, null,
false, false,
false,
false,
null,
null,
false false
) )
) )

View File

@ -34,7 +34,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
@ -75,7 +74,6 @@ public class TaskToolbox
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final File taskWorkDir; private final File taskWorkDir;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexMaker indexMaker;
private final IndexIO indexIO; private final IndexIO indexIO;
public TaskToolbox( public TaskToolbox(
@ -96,7 +94,6 @@ public class TaskToolbox
ObjectMapper objectMapper, ObjectMapper objectMapper,
File taskWorkDir, File taskWorkDir,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexMaker indexMaker,
IndexIO indexIO IndexIO indexIO
) )
{ {
@ -117,7 +114,6 @@ public class TaskToolbox
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.taskWorkDir = taskWorkDir; this.taskWorkDir = taskWorkDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
} }
@ -231,9 +227,4 @@ public class TaskToolbox
{ {
return indexMerger; return indexMerger;
} }
public IndexMaker getIndexMaker()
{
return indexMaker;
}
} }

View File

@ -29,7 +29,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
@ -60,7 +59,6 @@ public class TaskToolboxFactory
private final SegmentLoaderFactory segmentLoaderFactory; private final SegmentLoaderFactory segmentLoaderFactory;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexMaker indexMaker;
private final IndexIO indexIO; private final IndexIO indexIO;
@Inject @Inject
@ -80,7 +78,6 @@ public class TaskToolboxFactory
SegmentLoaderFactory segmentLoaderFactory, SegmentLoaderFactory segmentLoaderFactory,
ObjectMapper objectMapper, ObjectMapper objectMapper,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexMaker indexMaker,
IndexIO indexIO IndexIO indexIO
) )
{ {
@ -99,7 +96,6 @@ public class TaskToolboxFactory
this.segmentLoaderFactory = segmentLoaderFactory; this.segmentLoaderFactory = segmentLoaderFactory;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
} }
@ -125,7 +121,6 @@ public class TaskToolboxFactory
objectMapper, objectMapper,
taskWorkDir, taskWorkDir,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
); );
} }

View File

@ -132,11 +132,7 @@ public class IndexTask extends AbstractFixedIntervalTask
null, null,
null, null,
shardSpec, shardSpec,
indexSpec, indexSpec
null,
null,
null,
null
); );
} }

View File

@ -277,7 +277,6 @@ public class RealtimeIndexTask extends AbstractTask
toolbox.getNewSegmentServerView(), toolbox.getNewSegmentServerView(),
toolbox.getQueryExecutorService(), toolbox.getQueryExecutorService(),
toolbox.getIndexMerger(), toolbox.getIndexMerger(),
toolbox.getIndexMaker(),
toolbox.getIndexIO() toolbox.getIndexIO()
); );

View File

@ -29,7 +29,6 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentKiller;
@ -74,7 +73,6 @@ public class TaskToolboxTest
private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
private Task task = EasyMock.createMock(Task.class); private Task task = EasyMock.createMock(Task.class);
private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class); private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class);
private IndexMaker mockIndexMaker = EasyMock.createMock(IndexMaker.class);
private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class);
@Rule @Rule
@ -102,7 +100,6 @@ public class TaskToolboxTest
new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
ObjectMapper, ObjectMapper,
mockIndexMerger, mockIndexMerger,
mockIndexMaker,
mockIndexIO mockIndexIO
); );
} }

View File

@ -36,7 +36,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -69,7 +68,6 @@ public class IndexTaskTest
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private IndexMerger indexMerger; private IndexMerger indexMerger;
private IndexMaker indexMaker;
private IndexIO indexIO; private IndexIO indexIO;
public IndexTaskTest() public IndexTaskTest()
@ -78,7 +76,6 @@ public class IndexTaskTest
TestUtils testUtils = new TestUtils(); TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper(); jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger(); indexMerger = testUtils.getTestIndexMerger();
indexMaker = testUtils.getTestIndexMaker();
indexIO = testUtils.getTestIndexIO(); indexIO = testUtils.getTestIndexIO();
} }
@ -260,7 +257,7 @@ public class IndexTaskTest
return segment; return segment;
} }
}, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(), }, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(),
indexMerger, indexMaker, indexIO indexMerger, indexIO
) )
); );

View File

@ -307,11 +307,7 @@ public class TaskSerdeTest
null, null,
1, 1,
new NoneShardSpec(), new NoneShardSpec(),
indexSpec, indexSpec
false,
false,
null,
0.3F
) )
), ),
null null
@ -348,10 +344,6 @@ public class TaskSerdeTest
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(), task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity() task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
); );
Assert.assertEquals(
task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f
);
} }
@Test @Test

View File

@ -61,7 +61,6 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
@ -106,14 +105,12 @@ public class IngestSegmentFirehoseFactoryTest
{ {
private static final ObjectMapper MAPPER; private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER; private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO; private static final IndexIO INDEX_IO;
static { static {
TestUtils testUtils = new TestUtils(); TestUtils testUtils = new TestUtils();
MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper()); MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger(); INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_MAKER = testUtils.getTestIndexMaker();
INDEX_IO = testUtils.getTestIndexIO(); INDEX_IO = testUtils.getTestIndexIO();
} }
@ -263,7 +260,6 @@ public class IngestSegmentFirehoseFactoryTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_MAKER,
INDEX_IO INDEX_IO
); );
Collection<Object[]> values = new LinkedList<>(); Collection<Object[]> values = new LinkedList<>();

View File

@ -54,7 +54,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.NoopDimFilter; import io.druid.query.filter.NoopDimFilter;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
@ -109,14 +108,12 @@ public class IngestSegmentFirehoseFactoryTimelineTest
private static final ObjectMapper MAPPER; private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER; private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO; private static final IndexIO INDEX_IO;
static { static {
TestUtils testUtils = new TestUtils(); TestUtils testUtils = new TestUtils();
MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper()); MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper());
INDEX_MERGER = testUtils.getTestIndexMerger(); INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_MAKER = testUtils.getTestIndexMaker();
INDEX_IO = testUtils.getTestIndexIO(); INDEX_IO = testUtils.getTestIndexIO();
} }
@ -331,7 +328,6 @@ public class IngestSegmentFirehoseFactoryTimelineTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_MAKER,
INDEX_IO INDEX_IO
); );
final Injector injector = Guice.createInjector( final Injector injector = Guice.createInjector(

View File

@ -79,7 +79,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -131,7 +130,6 @@ public class TaskLifecycleTest
{ {
private static final ObjectMapper MAPPER; private static final ObjectMapper MAPPER;
private static final IndexMerger INDEX_MERGER; private static final IndexMerger INDEX_MERGER;
private static final IndexMaker INDEX_MAKER;
private static final IndexIO INDEX_IO; private static final IndexIO INDEX_IO;
static { static {
@ -139,7 +137,6 @@ public class TaskLifecycleTest
MAPPER = testUtils.getTestObjectMapper(); MAPPER = testUtils.getTestObjectMapper();
INDEX_MERGER = testUtils.getTestIndexMerger(); INDEX_MERGER = testUtils.getTestIndexMerger();
INDEX_IO = testUtils.getTestIndexIO(); INDEX_IO = testUtils.getTestIndexIO();
INDEX_MAKER = testUtils.getTestIndexMaker();
} }
@Parameterized.Parameters(name = "taskStorageType={0}") @Parameterized.Parameters(name = "taskStorageType={0}")
@ -487,7 +484,6 @@ public class TaskLifecycleTest
), ),
MAPPER, MAPPER,
INDEX_MERGER, INDEX_MERGER,
INDEX_MAKER,
INDEX_IO INDEX_IO
); );
tr = new ThreadPoolTaskRunner(tb, null); tr = new ThreadPoolTaskRunner(tb, null);
@ -1034,10 +1030,6 @@ public class TaskLifecycleTest
null, null,
null, null,
null, null,
null,
null,
null,
null,
null null
); );
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);

View File

@ -35,7 +35,6 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
@ -73,7 +72,6 @@ public class WorkerTaskMonitorTest
private Worker worker; private Worker worker;
private ObjectMapper jsonMapper; private ObjectMapper jsonMapper;
private IndexMerger indexMerger; private IndexMerger indexMerger;
private IndexMaker indexMaker;
private IndexIO indexIO; private IndexIO indexIO;
public WorkerTaskMonitorTest() public WorkerTaskMonitorTest()
@ -81,7 +79,6 @@ public class WorkerTaskMonitorTest
TestUtils testUtils = new TestUtils(); TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper(); jsonMapper = testUtils.getTestObjectMapper();
indexMerger = testUtils.getTestIndexMerger(); indexMerger = testUtils.getTestIndexMerger();
indexMaker = testUtils.getTestIndexMaker();
indexIO = testUtils.getTestIndexIO(); indexIO = testUtils.getTestIndexIO();
} }
@ -159,7 +156,6 @@ public class WorkerTaskMonitorTest
), ),
jsonMapper, jsonMapper,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
), ),
null null

View File

@ -25,18 +25,14 @@ import com.metamx.common.guava.Accumulator;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow; import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@ -79,29 +75,15 @@ public class GroupByQueryHelper
} }
} }
); );
final IncrementalIndex index; final IncrementalIndex index = new OnheapIncrementalIndex(
if (query.getContextValue("useOffheap", false)) { // use granularity truncated min timestamp
index = new OffheapIncrementalIndex( // since incoming truncated timestamps may precede timeStart
// use granularity truncated min timestamp granTimeStart,
// since incoming truncated timestamps may precede timeStart gran,
granTimeStart, aggs.toArray(new AggregatorFactory[aggs.size()]),
gran, false,
aggs.toArray(new AggregatorFactory[aggs.size()]), config.getMaxResults()
bufferPool, );
false,
Integer.MAX_VALUE
);
} else {
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults()
);
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>() Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{ {
@ -142,7 +124,7 @@ public class GroupByQueryHelper
@Override @Override
public Queue accumulate(Queue accumulated, T in) public Queue accumulate(Queue accumulated, T in)
{ {
if(in == null){ if (in == null) {
throw new ISE("Cannot have null result"); throw new ISE("Cannot have null result");
} }
accumulated.offer(in); accumulated.offer(in);

View File

@ -101,7 +101,10 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@Deprecated
/** /**
* This class is not yet ready for production use and requires more work. This class provides a demonstration of how
* to build v9 segments directly.
*/ */
public class IndexMaker public class IndexMaker
{ {

View File

@ -48,7 +48,9 @@ import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@Deprecated
/** /**
* This is not yet ready for production use and requires more work.
*/ */
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator> public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{ {

View File

@ -1,19 +1,21 @@
/* /*
* Druid - a distributed column store. * Licensed to Metamarkets Group Inc. (Metamarkets) under one
* Copyright 2012 - 2015 Metamarkets Group Inc. * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* Licensed under the Apache License, Version 2.0 (the "License"); * regarding copyright ownership. Metamarkets licenses this file
* you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* * http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, * Unless required by applicable law or agreed to in writing,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * software distributed under the License is distributed on an
* See the License for the specific language governing permissions and * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* limitations under the License. * KIND, either express or implied. See the License for the
*/ * specific language governing permissions and limitations
* under the License.
*/
package io.druid.query; package io.druid.query;
@ -24,7 +26,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.UOE; import com.metamx.common.UOE;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;

View File

@ -41,6 +41,7 @@ import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
@ -54,6 +55,11 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@Ignore
/*
* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready.
*/
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class IndexMakerParameterizedTest public class IndexMakerParameterizedTest
{ {
@ -128,7 +134,7 @@ public class IndexMakerParameterizedTest
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist); IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder(); final File tempDir = temporaryFolder.newFolder();
@ -154,7 +160,7 @@ public class IndexMakerParameterizedTest
public void testPersistMerge() throws Exception public void testPersistMerge() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex( IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
@ -312,7 +318,7 @@ public class IndexMakerParameterizedTest
public void testMergeRetainsValues() throws Exception public void testMergeRetainsValues() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -371,7 +377,7 @@ public class IndexMakerParameterizedTest
public void testAppendRetainsValues() throws Exception public void testAppendRetainsValues() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -424,7 +430,7 @@ public class IndexMakerParameterizedTest
public void testMergeSpecChange() throws Exception public void testMergeSpecChange() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -490,7 +496,7 @@ public class IndexMakerParameterizedTest
public void testConvertSame() throws Exception public void testConvertSame() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -548,7 +554,7 @@ public class IndexMakerParameterizedTest
public void testConvertDifferent() throws Exception public void testConvertDifferent() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();

View File

@ -40,6 +40,7 @@ import org.joda.time.DateTime;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -56,6 +57,11 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@Ignore
/*
* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready.
*/
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class IndexMakerTest public class IndexMakerTest
{ {

View File

@ -128,7 +128,7 @@ public class IndexMergerTest
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist); IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder(); final File tempDir = temporaryFolder.newFolder();
@ -155,7 +155,7 @@ public class IndexMergerTest
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist); IncrementalIndexTest.populateIndex(timestamp, toPersist);
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value"); Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
@ -185,7 +185,7 @@ public class IndexMergerTest
public void testPersistMerge() throws Exception public void testPersistMerge() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
IncrementalIndex toPersist2 = new OnheapIncrementalIndex( IncrementalIndex toPersist2 = new OnheapIncrementalIndex(
@ -348,7 +348,7 @@ public class IndexMergerTest
public void testMergeRetainsValues() throws Exception public void testMergeRetainsValues() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -407,7 +407,7 @@ public class IndexMergerTest
public void testAppendRetainsValues() throws Exception public void testAppendRetainsValues() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -460,7 +460,7 @@ public class IndexMergerTest
public void testMergeSpecChange() throws Exception public void testMergeSpecChange() throws Exception
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndexTest.populateIndex(timestamp, toPersist1);
final File tempDir1 = temporaryFolder.newFolder(); final File tempDir1 = temporaryFolder.newFolder();
@ -527,7 +527,6 @@ public class IndexMergerTest
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
true,
new AggregatorFactory[]{ new AggregatorFactory[]{
new LongSumAggregatorFactory( new LongSumAggregatorFactory(
"longSum1", "longSum1",
@ -586,7 +585,7 @@ public class IndexMergerTest
{ {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(
true, new AggregatorFactory[]{ new AggregatorFactory[]{
new LongSumAggregatorFactory( new LongSumAggregatorFactory(
"longSum1", "longSum1",
"dim1" "dim1"

View File

@ -31,14 +31,12 @@ import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec; import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -183,20 +181,10 @@ public class TestIndex
.withQueryGranularity(QueryGranularity.NONE) .withQueryGranularity(QueryGranularity.NONE)
.withMetrics(METRIC_AGGS) .withMetrics(METRIC_AGGS)
.build(); .build();
final IncrementalIndex retVal; final IncrementalIndex retVal = new OnheapIncrementalIndex(
if (useOffheap) { schema,
retVal = new OffheapIncrementalIndex( 10000
schema, );
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
} else {
retVal = new OnheapIncrementalIndex(
schema,
10000
);
}
final AtomicLong startTime = new AtomicLong(); final AtomicLong startTime = new AtomicLong();
int lineCount; int lineCount;

View File

@ -39,7 +39,6 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
@ -55,7 +54,6 @@ import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
@ -109,7 +107,7 @@ public class IncrementalIndexTest
@Override @Override
public IncrementalIndex createIndex(AggregatorFactory[] factories) public IncrementalIndex createIndex(AggregatorFactory[] factories)
{ {
return IncrementalIndexTest.createIndex(true, factories); return IncrementalIndexTest.createIndex(factories);
} }
} }
}, },
@ -119,7 +117,7 @@ public class IncrementalIndexTest
@Override @Override
public IncrementalIndex createIndex(AggregatorFactory[] factories) public IncrementalIndex createIndex(AggregatorFactory[] factories)
{ {
return IncrementalIndexTest.createIndex(false, factories); return IncrementalIndexTest.createIndex(factories);
} }
} }
} }
@ -128,25 +126,15 @@ public class IncrementalIndexTest
); );
} }
public static IncrementalIndex createIndex(boolean offheap, AggregatorFactory[] aggregatorFactories) public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
{ {
if (null == aggregatorFactories) { if (null == aggregatorFactories) {
aggregatorFactories = defaultAggregatorFactories; aggregatorFactories = defaultAggregatorFactories;
} }
if (offheap) {
return new OffheapIncrementalIndex( return new OnheapIncrementalIndex(
0L, 0L, QueryGranularity.NONE, aggregatorFactories, 1000000
QueryGranularity.NONE, );
aggregatorFactories,
TestQueryRunners.pool,
true,
100 * 1024 * 1024
);
} else {
return new OnheapIncrementalIndex(
0L, QueryGranularity.NONE, aggregatorFactories, 1000000
);
}
} }
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
@ -479,28 +467,6 @@ public class IncrementalIndexTest
Assert.assertEquals(elementsPerThread, curr); Assert.assertEquals(elementsPerThread, curr);
} }
@Test
public void testOffheapIndexIsFull() throws IndexSizeExceededException
{
OffheapIncrementalIndex index = new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
TestQueryRunners.pool,
true,
(10 + 2) * 1024 * 1024
);
int rowCount = 0;
for (int i = 0; i < 500; i++) {
rowCount = index.add(getRow(System.currentTimeMillis(), i, 100));
if (!index.canAppendRow()) {
break;
}
}
Assert.assertTrue("rowCount : " + rowCount, rowCount > 200 && rowCount < 600);
}
@Test @Test
public void testgetDimensions() public void testgetDimensions()
{ {

View File

@ -30,7 +30,6 @@ import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory;
@ -67,6 +66,7 @@ public class IncrementalIndexStorageAdapterTest
{ {
public IncrementalIndex createIndex(); public IncrementalIndex createIndex();
} }
private final IndexCreator indexCreator; private final IndexCreator indexCreator;
public IncrementalIndexStorageAdapterTest( public IncrementalIndexStorageAdapterTest(
@ -81,36 +81,18 @@ public class IncrementalIndexStorageAdapterTest
{ {
return Arrays.asList( return Arrays.asList(
new Object[][]{ new Object[][]{
{ new IndexCreator()
{
@Override
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
}
}
},
{ {
new IndexCreator() new IndexCreator()
{ {
@Override @Override
public IncrementalIndex createIndex() public IncrementalIndex createIndex()
{ {
return new OffheapIncrementalIndex( return new OnheapIncrementalIndex(
0, 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
QueryGranularity.MINUTE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
TestQueryRunners.pool,
true,
100 * 1024 * 1024
); );
} }
} }
} }
} }
); );
} }
@ -220,31 +202,32 @@ public class IncrementalIndexStorageAdapterTest
private static GroupByQueryEngine makeGroupByQueryEngine() private static GroupByQueryEngine makeGroupByQueryEngine()
{ {
return new GroupByQueryEngine( return new GroupByQueryEngine(
Suppliers.<GroupByQueryConfig>ofInstance( Suppliers.<GroupByQueryConfig>ofInstance(
new GroupByQueryConfig() new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{ {
@Override return 5;
public int getMaxIntermediateRows()
{
return 5;
}
} }
), }
new StupidPool( ),
new Supplier<ByteBuffer>() new StupidPool(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{ {
@Override return ByteBuffer.allocate(50000);
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
} }
) }
); )
);
} }
@Test @Test
public void testResetSanity() throws IOException{ public void testResetSanity() throws IOException
{
IncrementalIndex index = indexCreator.createIndex(); IncrementalIndex index = indexCreator.createIndex();
DateTime t = DateTime.now(); DateTime t = DateTime.now();
@ -266,9 +249,11 @@ public class IncrementalIndexStorageAdapterTest
); );
IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index); IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index);
Sequence<Cursor> cursorSequence = adapter.makeCursors(new SelectorFilter("sally", "bo"), Sequence<Cursor> cursorSequence = adapter.makeCursors(
interval, new SelectorFilter("sally", "bo"),
QueryGranularity.NONE); interval,
QueryGranularity.NONE
);
Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0); Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.<Cursor>newArrayList()).get(0);
DimensionSelector dimSelector; DimensionSelector dimSelector;

View File

@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.io.Files; import com.google.common.io.Files;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
@ -46,10 +44,6 @@ public class RealtimeTuningConfig implements TuningConfig
private static final int defaultMaxPendingPersists = 0; private static final int defaultMaxPendingPersists = 0;
private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final ShardSpec defaultShardSpec = new NoneShardSpec();
private static final IndexSpec defaultIndexSpec = new IndexSpec(); private static final IndexSpec defaultIndexSpec = new IndexSpec();
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
// Might make sense for this to be a builder // Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig() public static RealtimeTuningConfig makeDefaultTuningConfig()
@ -63,11 +57,7 @@ public class RealtimeTuningConfig implements TuningConfig
defaultRejectionPolicyFactory, defaultRejectionPolicyFactory,
defaultMaxPendingPersists, defaultMaxPendingPersists,
defaultShardSpec, defaultShardSpec,
defaultIndexSpec, defaultIndexSpec
defaultPersistInHeap,
defaultIngestOffheap,
defaultBufferSize,
DEFAULT_AGG_BUFFER_RATIO
); );
} }
@ -80,10 +70,6 @@ public class RealtimeTuningConfig implements TuningConfig
private final int maxPendingPersists; private final int maxPendingPersists;
private final ShardSpec shardSpec; private final ShardSpec shardSpec;
private final IndexSpec indexSpec; private final IndexSpec indexSpec;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
@JsonCreator @JsonCreator
public RealtimeTuningConfig( public RealtimeTuningConfig(
@ -95,11 +81,7 @@ public class RealtimeTuningConfig implements TuningConfig
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("shardSpec") ShardSpec shardSpec,
@JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("indexSpec") IndexSpec indexSpec
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize,
@JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
) )
{ {
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
@ -115,10 +97,6 @@ public class RealtimeTuningConfig implements TuningConfig
this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists;
this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec;
this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec;
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
} }
@JsonProperty @JsonProperty
@ -175,28 +153,6 @@ public class RealtimeTuningConfig implements TuningConfig
return indexSpec; return indexSpec;
} }
@JsonProperty
public boolean isPersistInHeap()
{
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
@JsonProperty
public int getBufferSize(){
return bufferSize;
}
@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{ {
return new RealtimeTuningConfig( return new RealtimeTuningConfig(
@ -208,11 +164,7 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory, rejectionPolicyFactory,
maxPendingPersists, maxPendingPersists,
shardSpec, shardSpec,
indexSpec, indexSpec
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
); );
} }
@ -227,11 +179,7 @@ public class RealtimeTuningConfig implements TuningConfig
rejectionPolicyFactory, rejectionPolicyFactory,
maxPendingPersists, maxPendingPersists,
shardSpec, shardSpec,
indexSpec, indexSpec
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
); );
} }
} }

View File

@ -26,7 +26,6 @@ import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -64,7 +63,6 @@ public class FlushingPlumber extends RealtimePlumber
DataSegmentAnnouncer segmentAnnouncer, DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService, ExecutorService queryExecutorService,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexMaker indexMaker,
IndexIO indexIO IndexIO indexIO
) )
{ {
@ -80,7 +78,6 @@ public class FlushingPlumber extends RealtimePlumber
null, null,
null, null,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
); );

View File

@ -25,7 +25,6 @@ import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -50,7 +49,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final DataSegmentAnnouncer segmentAnnouncer; private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexMaker indexMaker;
private final IndexIO indexIO; private final IndexIO indexIO;
@JsonCreator @JsonCreator
@ -61,7 +59,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
@JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject IndexMerger indexMerger, @JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMaker indexMaker,
@JacksonInject IndexIO indexIO @JacksonInject IndexIO indexIO
) )
{ {
@ -74,7 +71,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
null, null,
queryExecutorService, queryExecutorService,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
); );
@ -84,7 +80,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
this.segmentAnnouncer = segmentAnnouncer; this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService; this.queryExecutorService = queryExecutorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
} }
@ -107,7 +102,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
segmentAnnouncer, segmentAnnouncer,
queryExecutorService, queryExecutorService,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
); );
} }

View File

@ -59,7 +59,6 @@ import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec; import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec; import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndex;
@ -78,8 +77,6 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk; import io.druid.timeline.partition.SingleElementPartitionChunk;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -131,7 +128,6 @@ public class RealtimePlumber implements Plumber
private volatile ExecutorService mergeExecutor = null; private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null;
private volatile IndexMerger indexMerger; private volatile IndexMerger indexMerger;
private volatile IndexMaker indexMaker;
private volatile IndexIO indexIO; private volatile IndexIO indexIO;
private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
@ -149,7 +145,6 @@ public class RealtimePlumber implements Plumber
SegmentPublisher segmentPublisher, SegmentPublisher segmentPublisher,
FilteredServerView serverView, FilteredServerView serverView,
IndexMerger indexMerger, IndexMerger indexMerger,
IndexMaker indexMaker,
IndexIO indexIO IndexIO indexIO
) )
{ {
@ -165,7 +160,6 @@ public class RealtimePlumber implements Plumber
this.segmentPublisher = segmentPublisher; this.segmentPublisher = segmentPublisher;
this.serverView = serverView; this.serverView = serverView;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
@ -508,22 +502,13 @@ public class RealtimePlumber implements Plumber
indexes.add(queryableIndex); indexes.add(queryableIndex);
} }
final File mergedFile; final File mergedFile = indexMerger.mergeQueryableIndex(
if (config.isPersistInHeap()) { indexes,
mergedFile = indexMaker.mergeQueryableIndex( schema.getAggregators(),
indexes, mergedTarget,
schema.getAggregators(), config.getIndexSpec()
mergedTarget, );
config.getIndexSpec()
);
} else {
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget,
config.getIndexSpec()
);
}
// emit merge metrics before publishing segment // emit merge metrics before publishing segment
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
@ -980,24 +965,14 @@ public class RealtimePlumber implements Plumber
try { try {
int numRows = indexToPersist.getIndex().size(); int numRows = indexToPersist.getIndex().size();
final File persistedFile;
final IndexSpec indexSpec = config.getIndexSpec(); final IndexSpec indexSpec = config.getIndexSpec();
if (config.isPersistInHeap()) { final File persistedFile = indexMerger.persist(
persistedFile = indexMaker.persist( indexToPersist.getIndex(),
indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), metaData,
metaData, indexSpec
indexSpec );
);
} else {
persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
);
}
indexToPersist.swapSegment( indexToPersist.swapSegment(
new QueryableIndexSegment( new QueryableIndexSegment(

View File

@ -25,7 +25,6 @@ import io.druid.client.FilteredServerView;
import io.druid.guice.annotations.Processing; import io.druid.guice.annotations.Processing;
import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO; import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger; import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -48,7 +47,6 @@ public class RealtimePlumberSchool implements PlumberSchool
private final FilteredServerView serverView; private final FilteredServerView serverView;
private final ExecutorService queryExecutorService; private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger; private final IndexMerger indexMerger;
private final IndexMaker indexMaker;
private final IndexIO indexIO; private final IndexIO indexIO;
@JsonCreator @JsonCreator
@ -61,7 +59,6 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject FilteredServerView serverView, @JacksonInject FilteredServerView serverView,
@JacksonInject @Processing ExecutorService executorService, @JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger, @JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMaker indexMaker,
@JacksonInject IndexIO indexIO @JacksonInject IndexIO indexIO
) )
{ {
@ -73,7 +70,6 @@ public class RealtimePlumberSchool implements PlumberSchool
this.serverView = serverView; this.serverView = serverView;
this.queryExecutorService = executorService; this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
} }
@ -98,7 +94,6 @@ public class RealtimePlumberSchool implements PlumberSchool
segmentPublisher, segmentPublisher,
serverView, serverView,
indexMerger, indexMerger,
indexMaker,
indexIO indexIO
); );
} }

View File

@ -25,12 +25,10 @@ import com.google.common.collect.Lists;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
@ -191,21 +189,10 @@ public class Sink implements Iterable<FireHydrant>
.withDimensionsSpec(schema.getParser()) .withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators()) .withMetrics(schema.getAggregators())
.build(); .build();
final IncrementalIndex newIndex; final IncrementalIndex newIndex = new OnheapIncrementalIndex(
if (config.isIngestOffheap()) { indexSchema,
newIndex = new OffheapIncrementalIndex( config.getMaxRowsInMemory()
indexSchema, );
// Assuming half space for aggregates
new OffheapBufferPool((int) ((double) config.getBufferSize() * config.getAggregationBufferRatio())),
true,
config.getBufferSize()
);
} else {
newIndex = new OnheapIncrementalIndex(
indexSchema,
config.getMaxRowsInMemory()
);
}
final FireHydrant old; final FireHydrant old;
synchronized (hydrantLock) { synchronized (hydrantLock) {

View File

@ -1,34 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Bridge
{
}

View File

@ -1,29 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import io.druid.curator.CuratorConfig;
import org.skife.config.Config;
/**
*/
public abstract class BridgeCuratorConfig extends CuratorConfig
{
@Config("druid.bridge.zk.service.host")
public abstract String getParentZkHosts();
}

View File

@ -1,133 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.guice.annotations.Global;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
import javax.ws.rs.core.MediaType;
import java.net.URL;
import java.util.List;
import java.util.Map;
/**
*/
public class BridgeQuerySegmentWalker implements QuerySegmentWalker
{
private static final Logger log = new Logger(BridgeQuerySegmentWalker.class);
private final ServerDiscoverySelector brokerSelector;
private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
private final StatusResponseHandler responseHandler;
@Inject
public BridgeQuerySegmentWalker(
ServerDiscoverySelector brokerSelector,
@Global HttpClient httpClient,
ObjectMapper jsonMapper
)
{
this.brokerSelector = brokerSelector;
this.httpClient = httpClient;
this.jsonMapper = jsonMapper;
this.responseHandler = new StatusResponseHandler(Charsets.UTF_8);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
Query<T> query, Iterable<Interval> intervals
)
{
return makeRunner();
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
Query<T> query, Iterable<SegmentDescriptor> specs
)
{
return makeRunner();
}
private <T> QueryRunner<T> makeRunner()
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
try {
Server instance = brokerSelector.pick();
if (instance == null) {
return Sequences.empty();
}
final Server brokerServer = brokerSelector.pick();
final URL url = new URL(
brokerServer.getScheme(),
brokerServer.getAddress(),
brokerServer.getPort(),
"/druid/v2/"
);
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST,
url
).setContent(
MediaType.APPLICATION_JSON,
jsonMapper.writeValueAsBytes(query)
),
responseHandler
).get();
List<T> results = jsonMapper.readValue(
response.getContent(), new TypeReference<List<T>>()
{
}
);
return Sequences.simple(results);
}
catch (Exception e) {
log.error(e, "Exception with bridge query");
return Sequences.empty();
}
}
};
}
}

View File

@ -1,131 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
public class BridgeZkCoordinator extends BaseZkCoordinator
{
private static final Logger log = new Logger(BaseZkCoordinator.class);
private final SegmentPublisher dbSegmentPublisher;
private final MetadataSegmentManager databaseSegmentManager;
private final ServerView serverView;
private final ExecutorService exec = Execs.singleThreaded("BridgeZkCoordinatorServerView-%s");
@Inject
public BridgeZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
SegmentLoaderConfig config,
DruidServerMetadata me,
@Bridge CuratorFramework curator,
SegmentPublisher dbSegmentPublisher,
MetadataSegmentManager databaseSegmentManager,
ServerView serverView
)
{
super(jsonMapper, zkPaths, config, me, curator);
this.dbSegmentPublisher = dbSegmentPublisher;
this.databaseSegmentManager = databaseSegmentManager;
this.serverView = serverView;
}
@Override
public void loadLocalCache()
{
// do nothing
}
@Override
public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return BridgeZkCoordinator.this;
}
@Override
public void addSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
try {
log.info("Publishing segment %s", segment.getIdentifier());
dbSegmentPublisher.publishSegment(segment);
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
databaseSegmentManager.removeSegment(segment.getDataSource(), segment.getIdentifier());
serverView.registerSegmentCallback(
exec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentRemoved(
DruidServerMetadata server, DataSegment theSegment
)
{
if (theSegment.equals(segment)) {
callback.execute();
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
}

View File

@ -1,394 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.curator.announcement.Announcer;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class DruidClusterBridge
{
public static final String BRIDGE_OWNER_NODE = "_BRIDGE";
public static final String NODE_TYPE = "bridge";
private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class);
private final ObjectMapper jsonMapper;
private final DruidClusterBridgeConfig config;
private final ScheduledExecutorService exec;
private final DruidNode self;
// Communicates to the ZK cluster that this bridge node is deployed at
private final CuratorFramework curator;
private final AtomicReference<LeaderLatch> leaderLatch;
// Communicates to the remote (parent) ZK cluster
private final BridgeZkCoordinator bridgeZkCoordinator;
private final Announcer announcer;
private final ServerInventoryView<Object> serverInventoryView;
private final ZkPathsConfig zkPathsConfig;
private final DruidServerMetadata druidServerMetadata;
private final Map<DataSegment, Integer> segments = Maps.newHashMap();
private final Object lock = new Object();
private volatile boolean started = false;
private volatile boolean leader = false;
@Inject
public DruidClusterBridge(
ObjectMapper jsonMapper,
DruidClusterBridgeConfig config,
ZkPathsConfig zkPathsConfig,
DruidServerMetadata druidServerMetadata,
ScheduledExecutorFactory scheduledExecutorFactory,
@Self DruidNode self,
CuratorFramework curator,
AtomicReference<LeaderLatch> leaderLatch,
BridgeZkCoordinator bridgeZkCoordinator,
@Bridge Announcer announcer,
@Bridge final AbstractDataSegmentAnnouncer dataSegmentAnnouncer,
ServerInventoryView serverInventoryView
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.bridgeZkCoordinator = bridgeZkCoordinator;
this.zkPathsConfig = zkPathsConfig;
this.announcer = announcer;
this.serverInventoryView = serverInventoryView;
this.curator = curator;
this.leaderLatch = leaderLatch;
this.druidServerMetadata = druidServerMetadata;
this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");
this.self = self;
ExecutorService serverInventoryViewExec = Execs.singleThreaded("DruidClusterBridge-ServerInventoryView-%d");
serverInventoryView.registerSegmentCallback(
serverInventoryViewExec,
new ServerView.BaseSegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(
DruidServerMetadata server, DataSegment segment
)
{
try {
synchronized (lock) {
Integer count = segments.get(segment);
if (count == null) {
segments.put(segment, 1);
dataSegmentAnnouncer.announceSegment(segment);
} else {
segments.put(segment, count + 1);
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment)
{
try {
synchronized (lock) {
serverRemovedSegment(dataSegmentAnnouncer, segment, server);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
serverInventoryView.registerServerCallback(
serverInventoryViewExec,
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
try {
for (DataSegment dataSegment : server.getSegments().values()) {
serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server.getMetadata());
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
public boolean isLeader()
{
return leader;
}
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
started = true;
createNewLeaderLatch();
try {
leaderLatch.get().start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private LeaderLatch createNewLeaderLatch()
{
final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(zkPathsConfig.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort()
);
newLeaderLatch.addListener(
new LeaderLatchListener()
{
@Override
public void isLeader()
{
becomeLeader();
}
@Override
public void notLeader()
{
stopBeingLeader();
}
},
Execs.singleThreaded("CoordinatorLeader-%s")
);
return leaderLatch.getAndSet(newLeaderLatch);
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
stopBeingLeader();
try {
leaderLatch.get().close();
}
catch (IOException e) {
log.warn(e, "Unable to close leaderLatch, ignoring");
}
exec.shutdown();
started = false;
}
}
private void becomeLeader()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Go-Go Gadgetmobile! Starting bridge in %s", config.getStartDelay());
try {
bridgeZkCoordinator.start();
serverInventoryView.start();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
config.getStartDelay(),
config.getPeriod(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
if (leader) {
Iterable<DruidServer> servers = FunctionalIterable
.create(serverInventoryView.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
DruidServer input
)
{
return input.isAssignable();
}
}
);
long totalMaxSize = 0;
for (DruidServer server : servers) {
totalMaxSize += server.getMaxSize();
}
if (totalMaxSize == 0) {
log.warn("No servers founds!");
} else {
DruidServerMetadata me = new DruidServerMetadata(
self.getHostAndPort(),
self.getHostAndPort(),
totalMaxSize,
NODE_TYPE,
druidServerMetadata.getTier(),
druidServerMetadata.getPriority()
);
try {
final String path = ZKPaths.makePath(zkPathsConfig.getAnnouncementsPath(), self.getHostAndPort());
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize);
announcer.update(path, jsonMapper.writeValueAsBytes(me));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
if (leader) { // (We might no longer be leader)
return ScheduledExecutors.Signal.REPEAT;
} else {
return ScheduledExecutors.Signal.STOP;
}
}
}
);
leader = true;
}
catch (Exception e) {
log.makeAlert(e, "Exception becoming leader")
.emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
CloseQuietly.close(oldLatch);
try {
leaderLatch.get().start();
}
catch (Exception e1) {
// If an exception gets thrown out here, then the bridge will zombie out 'cause it won't be looking for
// the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but
// Curator likes to have "throws Exception" on methods so it might happen...
log.makeAlert(e1, "I am a zombie")
.emit();
}
}
}
}
private void stopBeingLeader()
{
synchronized (lock) {
try {
log.info("I'll get you next time, Gadget. Next time!");
bridgeZkCoordinator.stop();
serverInventoryView.stop();
leader = false;
}
catch (Exception e) {
log.makeAlert(e, "Unable to stopBeingLeader").emit();
}
}
}
private void serverRemovedSegment(
DataSegmentAnnouncer dataSegmentAnnouncer,
DataSegment segment,
DruidServerMetadata server
)
throws IOException
{
Integer count = segments.get(segment);
if (count != null) {
if (count == 1) {
dataSegmentAnnouncer.unannounceSegment(segment);
segments.remove(segment);
} else {
segments.put(segment, count - 1);
}
} else {
log.makeAlert("Trying to remove a segment that was never added?")
.addData("server", server.getHost())
.addData("segmentId", segment.getIdentifier())
.emit();
}
}
}

View File

@ -1,63 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
/**
*/
public abstract class DruidClusterBridgeConfig
{
@JsonProperty
private Duration startDelay = new Duration("PT300s");
@JsonProperty
private Duration period = new Duration("PT60s");
@JsonProperty
private String brokerServiceName = "broker";
public Duration getStartDelay()
{
return startDelay;
}
public void setStartDelay(Duration startDelay)
{
this.startDelay = startDelay;
}
public Duration getPeriod()
{
return period;
}
public void setPeriod(Duration period)
{
this.period = period;
}
public String getBrokerServiceName()
{
return brokerServiceName;
}
public void setBrokerServiceName(String brokerServiceName)
{
this.brokerServiceName = brokerServiceName;
}
}

View File

@ -1,210 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
/**
*/
public abstract class BaseZkCoordinator implements DataSegmentChangeHandler
{
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final SegmentLoaderConfig config;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
SegmentLoaderConfig config,
DruidServerMetadata me,
CuratorFramework curator
)
{
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.config = config;
this.me = me;
this.curator = curator;
}
@LifecycleStart
public void start() throws IOException
{
synchronized (lock) {
if (started) {
return;
}
log.info("Starting zkCoordinator for server[%s]", me.getName());
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s")
);
try {
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadLocalCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest request = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New request[%s] with zNode[%s].", request.asString(), path);
try {
request.go(
getDataSegmentChangeHandler(),
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", request.asString());
hasRun = true;
}
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
break;
case CHILD_REMOVED:
log.info("zNode[%s] was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
log.info("Stopping ZkCoordinator for [%s]", me);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
public boolean isStarted()
{
return started;
}
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
}

View File

@ -25,7 +25,6 @@ import com.google.inject.Provider;
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchDataSegmentAnnouncerProvider.class) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchDataSegmentAnnouncerProvider.class)
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "legacy", value = LegacyDataSegmentAnnouncerProvider.class),
@JsonSubTypes.Type(name = "batch", value = BatchDataSegmentAnnouncerProvider.class) @JsonSubTypes.Type(name = "batch", value = BatchDataSegmentAnnouncerProvider.class)
}) })
public interface DataSegmentAnnouncerProvider extends Provider<DataSegmentAnnouncer> public interface DataSegmentAnnouncerProvider extends Provider<DataSegmentAnnouncer>

View File

@ -1,44 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.annotation.JacksonInject;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
/**
*/
public class LegacyDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider
{
@JacksonInject
@NotNull
private SingleDataSegmentAnnouncer singleAnnouncer = null;
@JacksonInject
@NotNull
private BatchDataSegmentAnnouncer batchAnnouncer = null;
@Override
public DataSegmentAnnouncer get()
{
return new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
Arrays.<DataSegmentAnnouncer>asList(singleAnnouncer, batchAnnouncer)
);
}
}

View File

@ -1,30 +1,33 @@
/* /*
* Druid - a distributed column store. * Licensed to Metamarkets Group Inc. (Metamarkets) under one
* Copyright 2012 - 2015 Metamarkets Group Inc. * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* Licensed under the Apache License, Version 2.0 (the "License"); * regarding copyright ownership. Metamarkets licenses this file
* you may not use this file except in compliance with the License. * to you under the Apache License, Version 2.0 (the
* You may obtain a copy of the License at * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0 *
* * http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software *
* distributed under the License is distributed on an "AS IS" BASIS, * Unless required by applicable law or agreed to in writing,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * software distributed under the License is distributed on an
* See the License for the specific language governing permissions and * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* limitations under the License. * KIND, either express or implied. See the License for the
*/ * specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination; package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderConfig;
@ -32,6 +35,11 @@ import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -41,7 +49,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -50,17 +57,25 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class ZkCoordinator extends BaseZkCoordinator public class ZkCoordinator implements DataSegmentChangeHandler
{ {
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final SegmentLoaderConfig config; private final SegmentLoaderConfig config;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private final DataSegmentAnnouncer announcer; private final DataSegmentAnnouncer announcer;
private final ServerManager serverManager; private final ServerManager serverManager;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
@Inject
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
public ZkCoordinator( public ZkCoordinator(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
SegmentLoaderConfig config, SegmentLoaderConfig config,
@ -72,17 +87,155 @@ public class ZkCoordinator extends BaseZkCoordinator
ScheduledExecutorFactory factory ScheduledExecutorFactory factory
) )
{ {
super(jsonMapper, zkPaths, config, me, curator);
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.config = config; this.config = config;
this.me = me;
this.curator = curator;
this.announcer = announcer; this.announcer = announcer;
this.serverManager = serverManager; this.serverManager = serverManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
} }
@Override @LifecycleStart
public void start() throws IOException
{
synchronized (lock) {
if (started) {
return;
}
log.info("Starting zkCoordinator for server[%s]", me.getName());
final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName());
final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName());
final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName());
loadQueueCache = new PathChildrenCache(
curator,
loadQueueLocation,
true,
true,
Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s")
);
try {
curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadLocalCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest request = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);
log.info("New request[%s] with zNode[%s].", request.asString(), path);
try {
request.go(
getDataSegmentChangeHandler(),
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", request.asString());
hasRun = true;
}
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw Throwables.propagate(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
break;
case CHILD_REMOVED:
log.info("zNode[%s] was removed", event.getData().getPath());
break;
default:
log.info("Ignoring event[%s]", event);
}
}
}
);
loadQueueCache.start();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
}
}
@LifecycleStop
public void stop()
{
log.info("Stopping ZkCoordinator for [%s]", me);
synchronized (lock) {
if (!started) {
return;
}
try {
loadQueueCache.close();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
loadQueueCache = null;
started = false;
}
}
}
public boolean isStarted()
{
return started;
}
public void loadLocalCache() public void loadLocalCache()
{ {
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
@ -129,7 +282,6 @@ public class ZkCoordinator extends BaseZkCoordinator
); );
} }
@Override
public DataSegmentChangeHandler getDataSegmentChangeHandler() public DataSegmentChangeHandler getDataSegmentChangeHandler()
{ {
return ZkCoordinator.this; return ZkCoordinator.this;
@ -168,7 +320,7 @@ public class ZkCoordinator extends BaseZkCoordinator
{ {
try { try {
log.info("Loading segment %s", segment.getIdentifier()); log.info("Loading segment %s", segment.getIdentifier());
if(loadSegment(segment, callback)) { if (loadSegment(segment, callback)) {
try { try {
announcer.announceSegment(segment); announcer.announceSegment(segment);
} }
@ -203,24 +355,34 @@ public class ZkCoordinator extends BaseZkCoordinator
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>(); final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) { for (final DataSegment segment : segments) {
loadingExecutor.submit( loadingExecutor.submit(
new Runnable() { new Runnable()
{
@Override @Override
public void run() { public void run()
{
try { try {
log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); log.info(
"Loading segment[%d/%d][%s]",
counter.getAndIncrement(),
numSegments,
segment.getIdentifier()
);
final boolean loaded = loadSegment(segment, callback); final boolean loaded = loadSegment(segment, callback);
if (loaded) { if (loaded) {
try { try {
backgroundSegmentAnnouncer.announceSegment(segment); backgroundSegmentAnnouncer.announceSegment(segment);
} catch (InterruptedException e) { }
catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted"); throw new SegmentLoadingException(e, "Loading Interrupted");
} }
} }
} catch (SegmentLoadingException e) { }
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier()); log.error(e, "[%s] failed to load", segment.getIdentifier());
failedSegments.add(segment); failedSegments.add(segment);
} finally { }
finally {
latch.countDown(); latch.countDown();
} }
} }
@ -228,14 +390,15 @@ public class ZkCoordinator extends BaseZkCoordinator
); );
} }
try{ try {
latch.await(); latch.await();
if(failedSegments.size() > 0) { if (failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments); .addData("failedSegments", failedSegments);
} }
} catch(InterruptedException e) { }
catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted"); log.makeAlert(e, "LoadingInterrupted");
} }
@ -244,8 +407,8 @@ public class ZkCoordinator extends BaseZkCoordinator
} }
catch (SegmentLoadingException e) { catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size()) .addData("numSegments", segments.size())
.emit(); .emit();
} }
finally { finally {
callback.execute(); callback.execute();
@ -298,7 +461,8 @@ public class ZkCoordinator extends BaseZkCoordinator
} }
} }
private static class BackgroundSegmentAnnouncer implements AutoCloseable { private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis; private final int intervalMillis;

View File

@ -87,13 +87,12 @@ public class FireDepartmentTest
null, null,
null, null,
TestHelper.getTestIndexMerger(), TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMaker(),
TestHelper.getTestIndexIO() TestHelper.getTestIndexIO()
), ),
null null
), ),
new RealtimeTuningConfig( new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null, null null, null, null, null, null, null, null, null, null
) )
); );

View File

@ -150,10 +150,6 @@ public class RealtimeManagerTest
null, null,
null, null,
null, null,
null,
null,
null,
null,
null null
); );
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));

View File

@ -181,10 +181,6 @@ public class RealtimePlumberSchoolTest
rejectionPolicy, rejectionPolicy,
null, null,
null, null,
null,
null,
null,
null,
null null
); );
@ -197,7 +193,6 @@ public class RealtimePlumberSchoolTest
serverView, serverView,
MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor(),
TestHelper.getTestIndexMerger(), TestHelper.getTestIndexMerger(),
TestHelper.getTestIndexMaker(),
TestHelper.getTestIndexIO() TestHelper.getTestIndexIO()
); );

View File

@ -64,10 +64,6 @@ public class SinkTest
null, null,
null, null,
null, null,
null,
false,
false,
null,
null null
); );
final Sink sink = new Sink(interval, schema, tuningConfig, version); final Sink sink = new Sink(interval, schema, tuningConfig, version);

View File

@ -1,253 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.client.BatchServerInventoryView;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.DruidNode;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.easymock.EasyMock;
import org.joda.time.Duration;
import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DruidClusterBridgeTest
{
public static final int WAIT_MAX_RETRY = 100;
public static final int WAIT_SLEEP_MILLIS = 200;
@Test
public void testRun() throws Exception
{
TestingCluster localCluster = new TestingCluster(1);
localCluster.start();
CuratorFramework localCf = CuratorFrameworkFactory.builder()
.connectString(localCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
false
)
)
.build();
localCf.start();
TestingCluster remoteCluster = new TestingCluster(1);
remoteCluster.start();
CuratorFramework remoteCf = CuratorFrameworkFactory.builder()
.connectString(remoteCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1, 10))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
false
)
)
.build();
remoteCf.start();
ObjectMapper jsonMapper = new DefaultObjectMapper();
DruidClusterBridgeConfig config = new DruidClusterBridgeConfig()
{
@Override
public Duration getStartDelay()
{
return new Duration(0);
}
@Override
public Duration getPeriod()
{
return new Duration(Long.MAX_VALUE);
}
};
ScheduledExecutorFactory factory = ScheduledExecutors.createFactory(new Lifecycle());
DruidNode me = new DruidNode(
"me",
"localhost",
8080
);
AtomicReference<LeaderLatch> leaderLatch = new AtomicReference<>(new LeaderLatch(localCf, "/test"));
ZkPathsConfig zkPathsConfig = new ZkPathsConfig()
{
@Override
public String getBase()
{
return "/druid";
}
};
DruidServerMetadata metadata = new DruidServerMetadata(
"test",
"localhost",
1000,
"bridge",
DruidServer.DEFAULT_TIER,
0
);
SegmentPublisher dbSegmentPublisher = EasyMock.createMock(SegmentPublisher.class);
EasyMock.replay(dbSegmentPublisher);
MetadataSegmentManager databaseSegmentManager = EasyMock.createMock(MetadataSegmentManager.class);
EasyMock.replay(databaseSegmentManager);
ServerView serverView = EasyMock.createMock(ServerView.class);
EasyMock.replay(serverView);
BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator(
jsonMapper,
zkPathsConfig,
new SegmentLoaderConfig(),
metadata,
remoteCf,
dbSegmentPublisher,
databaseSegmentManager,
serverView
);
Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor());
announcer.start();
announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me));
BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class);
BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
EasyMock.expect(batchServerInventoryView.getInventory()).andReturn(
Arrays.asList(
new DruidServer("1", "localhost", 117, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("2", "localhost", 1, "historical", DruidServer.DEFAULT_TIER, 0)
)
);
batchServerInventoryView.registerSegmentCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.SegmentCallback>anyObject()
);
batchServerInventoryView.registerServerCallback(
EasyMock.<Executor>anyObject(),
EasyMock.<ServerView.ServerCallback>anyObject()
);
EasyMock.expectLastCall();
batchServerInventoryView.start();
EasyMock.expectLastCall();
batchServerInventoryView.stop();
EasyMock.expectLastCall();
EasyMock.replay(batchServerInventoryView);
DruidClusterBridge bridge = new DruidClusterBridge(
jsonMapper,
config,
zkPathsConfig,
metadata,
factory,
me,
localCf,
leaderLatch,
bridgeZkCoordinator,
announcer,
batchDataSegmentAnnouncer,
batchServerInventoryView
);
bridge.start();
int retry = 0;
while (!bridge.isLeader()) {
if (retry > WAIT_MAX_RETRY) {
throw new ISE("Unable to become leader");
}
Thread.sleep(WAIT_SLEEP_MILLIS);
retry++;
}
String path = "/druid/announcements/localhost:8080";
retry = 0;
while (remoteCf.checkExists().forPath(path) == null) {
if (retry > WAIT_MAX_RETRY) {
throw new ISE("Unable to announce");
}
Thread.sleep(WAIT_SLEEP_MILLIS);
retry++;
}
boolean verified = verifyUpdate(jsonMapper, path, remoteCf);
retry = 0;
while (!verified) {
if (retry > WAIT_MAX_RETRY) {
throw new ISE("No updates to bridge node occurred");
}
Thread.sleep(WAIT_SLEEP_MILLIS);
retry++;
verified = verifyUpdate(jsonMapper, path, remoteCf);
}
announcer.stop();
bridge.stop();
remoteCf.close();
remoteCluster.close();
localCf.close();
localCluster.close();
EasyMock.verify(batchServerInventoryView);
EasyMock.verify(dbSegmentPublisher);
EasyMock.verify(databaseSegmentManager);
EasyMock.verify(serverView);
}
private boolean verifyUpdate(ObjectMapper jsonMapper, String path, CuratorFramework remoteCf) throws Exception
{
DruidServerMetadata announced = jsonMapper.readValue(
remoteCf.getData().forPath(path),
DruidServerMetadata.class
);
return (118 == announced.getMaxSize());
}
}

View File

@ -1,200 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import io.airlift.airline.Command;
import io.druid.concurrent.Execs;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.ManageLifecycleLast;
import io.druid.guice.NodeTypeConfig;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.QueryResource;
import io.druid.server.bridge.Bridge;
import io.druid.server.bridge.BridgeCuratorConfig;
import io.druid.server.bridge.BridgeQuerySegmentWalker;
import io.druid.server.bridge.BridgeZkCoordinator;
import io.druid.server.bridge.DruidClusterBridge;
import io.druid.server.bridge.DruidClusterBridgeConfig;
import io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.BoundedExponentialBackoffRetry;
import org.eclipse.jetty.server.Server;
import java.util.List;
/**
*/
@Command(
name = "bridge",
description = "This is a highly experimental node to use at your own discretion"
)
public class CliBridge extends ServerRunnable
{
private static final Logger log = new Logger(CliBridge.class);
public CliBridge()
{
super(log);
}
@Override
protected List<? extends Module> getModules()
{
return ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/bridge");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8089);
ConfigProvider.bind(binder, BridgeCuratorConfig.class);
binder.bind(BridgeZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("bridge"));
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
binder.bind(MetadataSegmentManager.class)
.toProvider(MetadataSegmentManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(BridgeQuerySegmentWalker.class).in(LazySingleton.class);
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
ConfigProvider.bind(binder, DruidClusterBridgeConfig.class);
binder.bind(DruidClusterBridge.class);
LifecycleModule.register(binder, DruidClusterBridge.class);
LifecycleModule.register(binder, BridgeZkCoordinator.class);
LifecycleModule.register(binder, Server.class);
}
@Provides
@LazySingleton
@Bridge
public CuratorFramework getBridgeCurator(final BridgeCuratorConfig bridgeCuratorConfig, Lifecycle lifecycle)
{
final CuratorFramework framework =
CuratorFrameworkFactory.builder()
.connectString(bridgeCuratorConfig.getParentZkHosts())
.sessionTimeoutMs(bridgeCuratorConfig.getZkSessionTimeoutMs())
.retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
.compressionProvider(
new PotentiallyGzippedCompressionProvider(
bridgeCuratorConfig.getEnableCompression()
)
)
.build();
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
log.info("Starting Curator for %s", bridgeCuratorConfig.getParentZkHosts());
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}
);
return framework;
}
@Provides
@ManageLifecycle
public ServerDiscoverySelector getServerDiscoverySelector(
DruidClusterBridgeConfig config,
ServerDiscoveryFactory factory
)
{
return factory.createSelector(config.getBrokerServiceName());
}
@Provides
@ManageLifecycle
@Bridge
public Announcer getBridgeAnnouncer(
@Bridge CuratorFramework curator
)
{
return new Announcer(curator, Execs.singleThreaded("BridgeAnnouncer-%s"));
}
@Provides
@ManageLifecycleLast
@Bridge
public AbstractDataSegmentAnnouncer getBridgeDataSegmentAnnouncer(
DruidServerMetadata metadata,
BatchDataSegmentAnnouncerConfig config,
ZkPathsConfig zkPathsConfig,
@Bridge Announcer announcer,
ObjectMapper jsonMapper
)
{
return new BatchDataSegmentAnnouncer(
metadata,
config,
zkPathsConfig,
announcer,
jsonMapper
);
}
}
);
}
}

View File

@ -48,7 +48,7 @@ public class Main
.withCommands( .withCommands(
CliCoordinator.class, CliHistorical.class, CliBroker.class, CliCoordinator.class, CliHistorical.class, CliBroker.class,
CliRealtime.class, CliOverlord.class, CliMiddleManager.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class,
CliBridge.class, CliRouter.class CliRouter.class
); );
builder.withGroup("example") builder.withGroup("example")