diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 9f621de9bb3..37394f714e9 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -103,11 +103,7 @@ The spec\_file is a path to a file that contains JSON and an example looks like: "overwriteFiles" : false, "ignoreInvalidRows" : false, "jobProperties" : { }, - "combineText" : false, - "persistInHeap" : false, - "ingestOffheap" : false, - "bufferSize" : 134217728, - "aggregationBufferRatio" : 0.5, + "combineText" : false, "rowFlushBoundary" : 300000 } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index fd1f2118e6d..1ece1d9aba0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -456,11 +456,6 @@ public class HadoopDruidIndexerConfig } } - public boolean isPersistInHeap() - { - return schema.getTuningConfig().isPersistInHeap(); - } - public String getWorkingPath() { final String workingPath = schema.getTuningConfig().getWorkingPath(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java index 8020065d3c6..9f80c38cf0f 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerJob.java @@ -65,11 +65,7 @@ public class HadoopDruidIndexerJob implements Jobby List jobs = Lists.newArrayList(); JobHelper.ensurePaths(config); - if (config.isPersistInHeap()) { - indexJob = new IndexGeneratorJob(config); - } else { - indexJob = new LegacyIndexGeneratorJob(config); - } + indexJob = new IndexGeneratorJob(config); jobs.add(indexJob); if (metadataStorageUpdaterJob != null) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 69a244ea84b..5e551cf6292 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; import io.druid.segment.IndexSpec; -import io.druid.segment.data.BitmapSerde; import io.druid.segment.indexing.TuningConfig; import org.joda.time.DateTime; @@ -37,11 +36,9 @@ import java.util.Map; public class HadoopTuningConfig implements TuningConfig { private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); - private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.>of(); + private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; - private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024; - private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f; private static final boolean DEFAULT_USE_COMBINER = false; public static HadoopTuningConfig makeDefaultTuningConfig() @@ -59,11 +56,7 @@ public class HadoopTuningConfig implements TuningConfig false, null, false, - false, - false, - DEFAULT_BUFFER_SIZE, - DEFAULT_AGG_BUFFER_RATIO, - DEFAULT_USE_COMBINER + false ); } @@ -79,10 +72,6 @@ public class HadoopTuningConfig implements TuningConfig private final boolean ignoreInvalidRows; private final Map jobProperties; private final boolean combineText; - private final boolean persistInHeap; - private final boolean ingestOffheap; - private final int bufferSize; - private final float aggregationBufferRatio; private final boolean useCombiner; @JsonCreator @@ -99,10 +88,6 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, - final @JsonProperty("persistInHeap") boolean persistInHeap, - final @JsonProperty("ingestOffheap") boolean ingestOffheap, - final @JsonProperty("bufferSize") Integer bufferSize, - final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio, final @JsonProperty("useCombiner") Boolean useCombiner ) { @@ -120,10 +105,6 @@ public class HadoopTuningConfig implements TuningConfig ? ImmutableMap.of() : ImmutableMap.copyOf(jobProperties)); this.combineText = combineText; - this.persistInHeap = persistInHeap; - this.ingestOffheap = ingestOffheap; - this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize; - this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio; this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); } @@ -199,28 +180,6 @@ public class HadoopTuningConfig implements TuningConfig return combineText; } - @JsonProperty - public boolean isPersistInHeap() - { - return persistInHeap; - } - - @JsonProperty - public boolean isIngestOffheap(){ - return ingestOffheap; - } - - @JsonProperty - public int getBufferSize(){ - return bufferSize; - } - - @JsonProperty - public float getAggregationBufferRatio() - { - return aggregationBufferRatio; - } - @JsonProperty public boolean getUseCombiner() { @@ -242,10 +201,6 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap, - ingestOffheap, - bufferSize, - aggregationBufferRatio, useCombiner ); } @@ -265,10 +220,6 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap, - ingestOffheap, - bufferSize, - aggregationBufferRatio, useCombiner ); } @@ -288,10 +239,6 @@ public class HadoopTuningConfig implements TuningConfig ignoreInvalidRows, jobProperties, combineText, - persistInHeap, - ingestOffheap, - bufferSize, - aggregationBufferRatio, useCombiner ); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 3c8ffed5612..97a33bf5b09 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -30,19 +30,16 @@ import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; -import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; import io.druid.indexer.hadoop.SegmentInputRow; -import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.LoggingProgressIndicator; import io.druid.segment.ProgressIndicator; import io.druid.segment.QueryableIndex; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; @@ -157,7 +154,7 @@ public class IndexGeneratorJob implements Jobby throw new RuntimeException("No buckets?? seems there is no data to index."); } - if(config.getSchema().getTuningConfig().getUseCombiner()) { + if (config.getSchema().getTuningConfig().getUseCombiner()) { job.setCombinerClass(IndexGeneratorCombiner.class); job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class); } @@ -200,9 +197,7 @@ public class IndexGeneratorJob implements Jobby private static IncrementalIndex makeIncrementalIndex( Bucket theBucket, AggregatorFactory[] aggs, - HadoopDruidIndexerConfig config, - boolean isOffHeap, - StupidPool bufferPool + HadoopDruidIndexerConfig config ) { final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); @@ -212,19 +207,11 @@ public class IndexGeneratorJob implements Jobby .withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity()) .withMetrics(aggs) .build(); - if (isOffHeap) { - return new OffheapIncrementalIndex( - indexSchema, - bufferPool, - true, - tuningConfig.getBufferSize() - ); - } else { - return new OnheapIncrementalIndex( - indexSchema, - tuningConfig.getRowFlushBoundary() - ); - } + + return new OnheapIncrementalIndex( + indexSchema, + tuningConfig.getRowFlushBoundary() + ); } public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper @@ -320,20 +307,20 @@ public class IndexGeneratorJob implements Jobby Iterator iter = values.iterator(); BytesWritable first = iter.next(); - if(iter.hasNext()) { + if (iter.hasNext()) { SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; - IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config); index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); - while(iter.hasNext()) { + while (iter.hasNext()) { context.progress(); InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); - if(!index.canAppendRow()) { + if (!index.canAppendRow()) { log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); flushIndexToContextAndClose(key, index, context); - index = makeIncrementalIndex(bucket, combiningAggs, config, false, null); + index = makeIncrementalIndex(bucket, combiningAggs, config); } index.add(value); @@ -345,10 +332,11 @@ public class IndexGeneratorJob implements Jobby } } - private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException + private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) + throws IOException, InterruptedException { Iterator rows = index.iterator(); - while(rows.hasNext()) { + while (rows.hasNext()) { context.progress(); Row row = rows.next(); InputRow inputRow = getInputRowFromRow(row, index.getDimensions()); @@ -360,7 +348,8 @@ public class IndexGeneratorJob implements Jobby index.close(); } - private InputRow getInputRowFromRow(final Row row, final List dimensions) { + private InputRow getInputRowFromRow(final Row row, final List dimensions) + { return new InputRow() { @Override @@ -467,14 +456,14 @@ public class IndexGeneratorJob implements Jobby }; } - protected File persist( + private File persist( final IncrementalIndex index, final Interval interval, final File file, final ProgressIndicator progressIndicator ) throws IOException { - return HadoopDruidIndexerConfig.INDEX_MAKER.persist( + return HadoopDruidIndexerConfig.INDEX_MERGER.persist( index, interval, file, null, config.getIndexSpec(), progressIndicator ); } @@ -514,17 +503,11 @@ public class IndexGeneratorJob implements Jobby Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); - final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize(); - final int aggregationBufferSize = (int) ((double) maxTotalBufferSize - * config.getSchema().getTuningConfig().getAggregationBufferRatio()); - final StupidPool bufferPool = new OffheapBufferPool(aggregationBufferSize); IncrementalIndex index = makeIncrementalIndex( bucket, combiningAggs, - config, - config.getSchema().getTuningConfig().isIngestOffheap(), - bufferPool + config ); try { File baseFlushFile = File.createTempFile("base", "flush"); @@ -570,9 +553,7 @@ public class IndexGeneratorJob implements Jobby index = makeIncrementalIndex( bucket, combiningAggs, - config, - config.getSchema().getTuningConfig().isIngestOffheap(), - bufferPool + config ); startTime = System.currentTimeMillis(); ++indexCount; @@ -602,7 +583,7 @@ public class IndexGeneratorJob implements Jobby indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file)); } mergedBase = mergeQueryableIndex( - indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator + indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator ); } final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath()) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java deleted file mode 100644 index cd2ad0cabda..00000000000 --- a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.indexer; - -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.BaseProgressIndicator; -import io.druid.segment.ProgressIndicator; -import io.druid.segment.QueryableIndex; -import io.druid.segment.incremental.IncrementalIndex; -import org.apache.hadoop.mapreduce.Job; -import org.joda.time.Interval; - -import java.io.File; -import java.io.IOException; -import java.util.List; - -/** - */ -public class LegacyIndexGeneratorJob extends IndexGeneratorJob -{ - public LegacyIndexGeneratorJob( - HadoopDruidIndexerConfig config - ) - { - super(config); - } - - @Override - protected void setReducerClass(Job job) - { - job.setReducerClass(LegacyIndexGeneratorReducer.class); - } - - public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer - { - @Override - protected ProgressIndicator makeProgressIndicator(final Context context) - { - return new BaseProgressIndicator() - { - @Override - public void progress() - { - context.progress(); - } - }; - } - - @Override - protected File persist( - IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator - ) throws IOException - { - return HadoopDruidIndexerConfig.INDEX_MERGER.persist(index, interval, file, null, config.getIndexSpec(), progressIndicator); - } - - @Override - protected File mergeQueryableIndex( - List indexes, - AggregatorFactory[] aggs, - File file, - ProgressIndicator progressIndicator - ) throws IOException - { - return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(indexes, aggs, file, config.getIndexSpec(), progressIndicator); - } - } -} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index a2d8d7e4ba2..3350041abe5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -277,7 +277,7 @@ public class BatchDeltaIngestionTest WindowedDataSegment windowedDataSegment ) throws Exception { - IndexGeneratorJob job = new LegacyIndexGeneratorJob(config); + IndexGeneratorJob job = new IndexGeneratorJob(config); JobHelper.runJobs(ImmutableList.of(job), config); File segmentFolder = new File( @@ -380,10 +380,6 @@ public class BatchDeltaIngestionTest false, null, false, - false, - false, - null, - null, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 0ce605116ad..2219bb837a4 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -160,10 +160,6 @@ public class DetermineHashedPartitionsJobTest false, null, false, - false, - false, - null, - null, false ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index cf160158cbb..dc1e4722be1 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -263,10 +263,6 @@ public class DeterminePartitionsJobTest false, null, false, - false, - false, - null, - null, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 5447b90c7f8..7bb88bc4e36 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -204,10 +204,6 @@ public class HadoopDruidIndexerConfigTest false, null, false, - false, - false, - null, - null, false ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index e7bbd46f428..78d4e44e165 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -52,10 +52,6 @@ public class HadoopTuningConfigTest true, null, true, - true, - true, - 200, - 0.1f, true ); @@ -73,10 +69,6 @@ public class HadoopTuningConfigTest Assert.assertEquals(true, actual.isIgnoreInvalidRows()); Assert.assertEquals(ImmutableMap.of(), actual.getJobProperties()); Assert.assertEquals(true, actual.isCombineText()); - Assert.assertEquals(true, actual.isPersistInHeap()); - Assert.assertEquals(true, actual.isIngestOffheap()); - Assert.assertEquals(200, actual.getBufferSize()); - Assert.assertEquals(0.1f, actual.getAggregationBufferRatio(), 0.0001); Assert.assertEquals(true, actual.getUseCombiner()); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 000ab0728af..361ac646d75 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -394,10 +394,6 @@ public class IndexGeneratorJobTest false, ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored false, - false, - false, - null, - null, useCombiner ) ) @@ -453,12 +449,6 @@ public class IndexGeneratorJobTest verifyJob(new IndexGeneratorJob(config)); } - @Test - public void testLegacyIndexGeneratorJob() throws IOException - { - verifyJob(new LegacyIndexGeneratorJob(config)); - } - private void verifyJob(IndexGeneratorJob job) throws IOException { JobHelper.runJobs(ImmutableList.of(job), config); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 02b12a9bf5a..1c4294c2ca0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -112,10 +112,6 @@ public class JobHelperTest "THISISMYACCESSKEY" ), false, - false, - false, - null, - null, false ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 5836f04f541..b7705c89d15 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -200,10 +200,6 @@ public class HadoopConverterJobTest false, null, false, - false, - false, - null, - null, false ) ) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index 4dcfb452928..fb5b7515a7e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -34,7 +34,6 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; @@ -75,7 +74,6 @@ public class TaskToolbox private final ObjectMapper objectMapper; private final File taskWorkDir; private final IndexMerger indexMerger; - private final IndexMaker indexMaker; private final IndexIO indexIO; public TaskToolbox( @@ -96,7 +94,6 @@ public class TaskToolbox ObjectMapper objectMapper, File taskWorkDir, IndexMerger indexMerger, - IndexMaker indexMaker, IndexIO indexIO ) { @@ -117,7 +114,6 @@ public class TaskToolbox this.objectMapper = objectMapper; this.taskWorkDir = taskWorkDir; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); - this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @@ -231,9 +227,4 @@ public class TaskToolbox { return indexMerger; } - - public IndexMaker getIndexMaker() - { - return indexMaker; - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 5c1f1d9aa23..9ddb303cde9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -29,7 +29,6 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; @@ -60,7 +59,6 @@ public class TaskToolboxFactory private final SegmentLoaderFactory segmentLoaderFactory; private final ObjectMapper objectMapper; private final IndexMerger indexMerger; - private final IndexMaker indexMaker; private final IndexIO indexIO; @Inject @@ -80,7 +78,6 @@ public class TaskToolboxFactory SegmentLoaderFactory segmentLoaderFactory, ObjectMapper objectMapper, IndexMerger indexMerger, - IndexMaker indexMaker, IndexIO indexIO ) { @@ -99,7 +96,6 @@ public class TaskToolboxFactory this.segmentLoaderFactory = segmentLoaderFactory; this.objectMapper = objectMapper; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); - this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @@ -125,7 +121,6 @@ public class TaskToolboxFactory objectMapper, taskWorkDir, indexMerger, - indexMaker, indexIO ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 2308d2ae07b..a36e63f891c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -132,11 +132,7 @@ public class IndexTask extends AbstractFixedIntervalTask null, null, shardSpec, - indexSpec, - null, - null, - null, - null + indexSpec ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index ffe30f3da6b..22000c4a540 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -277,7 +277,6 @@ public class RealtimeIndexTask extends AbstractTask toolbox.getNewSegmentServerView(), toolbox.getQueryExecutorService(), toolbox.getIndexMerger(), - toolbox.getIndexMaker(), toolbox.getIndexIO() ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 453daea4179..0197c5c346e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -29,7 +29,6 @@ import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; @@ -74,7 +73,6 @@ public class TaskToolboxTest private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class); private Task task = EasyMock.createMock(Task.class); private IndexMerger mockIndexMerger = EasyMock.createMock(IndexMerger.class); - private IndexMaker mockIndexMaker = EasyMock.createMock(IndexMaker.class); private IndexIO mockIndexIO = EasyMock.createMock(IndexIO.class); @Rule @@ -102,7 +100,6 @@ public class TaskToolboxTest new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager), ObjectMapper, mockIndexMerger, - mockIndexMaker, mockIndexIO ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 0188336252d..a01ffb62c48 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -36,7 +36,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; @@ -69,7 +68,6 @@ public class IndexTaskTest private final IndexSpec indexSpec; private final ObjectMapper jsonMapper; private IndexMerger indexMerger; - private IndexMaker indexMaker; private IndexIO indexIO; public IndexTaskTest() @@ -78,7 +76,6 @@ public class IndexTaskTest TestUtils testUtils = new TestUtils(); jsonMapper = testUtils.getTestObjectMapper(); indexMerger = testUtils.getTestIndexMerger(); - indexMaker = testUtils.getTestIndexMaker(); indexIO = testUtils.getTestIndexIO(); } @@ -260,7 +257,7 @@ public class IndexTaskTest return segment; } }, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(), - indexMerger, indexMaker, indexIO + indexMerger, indexIO ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 742f5552d37..ca4e267e208 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -307,11 +307,7 @@ public class TaskSerdeTest null, 1, new NoneShardSpec(), - indexSpec, - false, - false, - null, - 0.3F + indexSpec ) ), null @@ -348,10 +344,6 @@ public class TaskSerdeTest task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(), task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity() ); - Assert.assertEquals( - task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), - task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f - ); } @Test diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 3d1b4dc0e9e..b7a81d32d31 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -61,7 +61,6 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -106,14 +105,12 @@ public class IngestSegmentFirehoseFactoryTest { private static final ObjectMapper MAPPER; private static final IndexMerger INDEX_MERGER; - private static final IndexMaker INDEX_MAKER; private static final IndexIO INDEX_IO; static { TestUtils testUtils = new TestUtils(); MAPPER = setupInjectablesInObjectMapper(testUtils.getTestObjectMapper()); INDEX_MERGER = testUtils.getTestIndexMerger(); - INDEX_MAKER = testUtils.getTestIndexMaker(); INDEX_IO = testUtils.getTestIndexIO(); } @@ -263,7 +260,6 @@ public class IngestSegmentFirehoseFactoryTest ), MAPPER, INDEX_MERGER, - INDEX_MAKER, INDEX_IO ); Collection values = new LinkedList<>(); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 9837d186f8c..3bd5de2c6ae 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -54,7 +54,6 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.NoopDimFilter; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.incremental.IncrementalIndexSchema; @@ -109,14 +108,12 @@ public class IngestSegmentFirehoseFactoryTimelineTest private static final ObjectMapper MAPPER; private static final IndexMerger INDEX_MERGER; - private static final IndexMaker INDEX_MAKER; private static final IndexIO INDEX_IO; static { TestUtils testUtils = new TestUtils(); MAPPER = IngestSegmentFirehoseFactoryTest.setupInjectablesInObjectMapper(testUtils.getTestObjectMapper()); INDEX_MERGER = testUtils.getTestIndexMerger(); - INDEX_MAKER = testUtils.getTestIndexMaker(); INDEX_IO = testUtils.getTestIndexIO(); } @@ -331,7 +328,6 @@ public class IngestSegmentFirehoseFactoryTimelineTest ), MAPPER, INDEX_MERGER, - INDEX_MAKER, INDEX_IO ); final Injector injector = Guice.createInjector( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 5f661dae0c0..7d326d76567 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -79,7 +79,6 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; @@ -131,7 +130,6 @@ public class TaskLifecycleTest { private static final ObjectMapper MAPPER; private static final IndexMerger INDEX_MERGER; - private static final IndexMaker INDEX_MAKER; private static final IndexIO INDEX_IO; static { @@ -139,7 +137,6 @@ public class TaskLifecycleTest MAPPER = testUtils.getTestObjectMapper(); INDEX_MERGER = testUtils.getTestIndexMerger(); INDEX_IO = testUtils.getTestIndexIO(); - INDEX_MAKER = testUtils.getTestIndexMaker(); } @Parameterized.Parameters(name = "taskStorageType={0}") @@ -487,7 +484,6 @@ public class TaskLifecycleTest ), MAPPER, INDEX_MERGER, - INDEX_MAKER, INDEX_IO ); tr = new ThreadPoolTaskRunner(tb, null); @@ -1034,10 +1030,6 @@ public class TaskLifecycleTest null, null, null, - null, - null, - null, - null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index d18829dd576..9b36224fdaa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -35,7 +35,6 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; @@ -73,7 +72,6 @@ public class WorkerTaskMonitorTest private Worker worker; private ObjectMapper jsonMapper; private IndexMerger indexMerger; - private IndexMaker indexMaker; private IndexIO indexIO; public WorkerTaskMonitorTest() @@ -81,7 +79,6 @@ public class WorkerTaskMonitorTest TestUtils testUtils = new TestUtils(); jsonMapper = testUtils.getTestObjectMapper(); indexMerger = testUtils.getTestIndexMerger(); - indexMaker = testUtils.getTestIndexMaker(); indexIO = testUtils.getTestIndexIO(); } @@ -159,7 +156,6 @@ public class WorkerTaskMonitorTest ), jsonMapper, indexMerger, - indexMaker, indexIO ), null diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 5f07b6fcf39..821a2f697d9 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -25,18 +25,14 @@ import com.metamx.common.guava.Accumulator; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; -import io.druid.data.input.Row; -import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -79,29 +75,15 @@ public class GroupByQueryHelper } } ); - final IncrementalIndex index; - if (query.getContextValue("useOffheap", false)) { - index = new OffheapIncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]), - bufferPool, - false, - Integer.MAX_VALUE - ); - } else { - index = new OnheapIncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]), - false, - config.getMaxResults() - ); - } + final IncrementalIndex index = new OnheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false, + config.getMaxResults() + ); Accumulator accumulator = new Accumulator() { @@ -142,7 +124,7 @@ public class GroupByQueryHelper @Override public Queue accumulate(Queue accumulated, T in) { - if(in == null){ + if (in == null) { throw new ISE("Cannot have null result"); } accumulated.offer(in); diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java index cbb832074d5..c754a0c2a1e 100644 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ b/processing/src/main/java/io/druid/segment/IndexMaker.java @@ -101,7 +101,10 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; +@Deprecated /** + * This class is not yet ready for production use and requires more work. This class provides a demonstration of how + * to build v9 segments directly. */ public class IndexMaker { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index a7c850b5cd7..61055879862 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -48,7 +48,9 @@ import java.util.WeakHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.atomic.AtomicInteger; +@Deprecated /** + * This is not yet ready for production use and requires more work. */ public class OffheapIncrementalIndex extends IncrementalIndex { diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 2cec20404e5..318aeedd330 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.query; @@ -24,7 +26,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.UOE; import com.metamx.common.guava.Sequence; - import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; diff --git a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java index 422e70e2469..21d68cf1bb0 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java @@ -41,6 +41,7 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -54,6 +55,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +@Ignore +/* +* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready. +*/ + @RunWith(Parameterized.class) public class IndexMakerParameterizedTest { @@ -128,7 +134,7 @@ public class IndexMakerParameterizedTest { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); @@ -154,7 +160,7 @@ public class IndexMakerParameterizedTest public void testPersistMerge() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndex toPersist2 = new OnheapIncrementalIndex( @@ -312,7 +318,7 @@ public class IndexMakerParameterizedTest public void testMergeRetainsValues() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -371,7 +377,7 @@ public class IndexMakerParameterizedTest public void testAppendRetainsValues() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -424,7 +430,7 @@ public class IndexMakerParameterizedTest public void testMergeSpecChange() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -490,7 +496,7 @@ public class IndexMakerParameterizedTest public void testConvertSame() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -548,7 +554,7 @@ public class IndexMakerParameterizedTest public void testConvertDifferent() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMakerTest.java index 02467447896..efadffa3fe4 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerTest.java @@ -40,6 +40,7 @@ import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,6 +57,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@Ignore +/* +* IndexMaker is not yet ready for production. Enable this test when IndexMaker is ready. +*/ + @RunWith(Parameterized.class) public class IndexMakerTest { diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index d9e337ff28e..a33fa7656bc 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -128,7 +128,7 @@ public class IndexMergerTest { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); @@ -155,7 +155,7 @@ public class IndexMergerTest { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist); Map segmentMetadata = ImmutableMap.of("key", "value"); @@ -185,7 +185,7 @@ public class IndexMergerTest public void testPersistMerge() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); IncrementalIndex toPersist2 = new OnheapIncrementalIndex( @@ -348,7 +348,7 @@ public class IndexMergerTest public void testMergeRetainsValues() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -407,7 +407,7 @@ public class IndexMergerTest public void testAppendRetainsValues() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -460,7 +460,7 @@ public class IndexMergerTest public void testMergeSpecChange() throws Exception { final long timestamp = System.currentTimeMillis(); - IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(true, null); + IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); final File tempDir1 = temporaryFolder.newFolder(); @@ -527,7 +527,6 @@ public class IndexMergerTest { final long timestamp = System.currentTimeMillis(); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( - true, new AggregatorFactory[]{ new LongSumAggregatorFactory( "longSum1", @@ -586,7 +585,7 @@ public class IndexMergerTest { final long timestamp = System.currentTimeMillis(); IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex( - true, new AggregatorFactory[]{ + new AggregatorFactory[]{ new LongSumAggregatorFactory( "longSum1", "dim1" diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 1001f2a82fa..8a74909c6d7 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -31,14 +31,12 @@ import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularity; -import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; @@ -183,20 +181,10 @@ public class TestIndex .withQueryGranularity(QueryGranularity.NONE) .withMetrics(METRIC_AGGS) .build(); - final IncrementalIndex retVal; - if (useOffheap) { - retVal = new OffheapIncrementalIndex( - schema, - TestQueryRunners.pool, - true, - 100 * 1024 * 1024 - ); - } else { - retVal = new OnheapIncrementalIndex( - schema, - 10000 - ); - } + final IncrementalIndex retVal = new OnheapIncrementalIndex( + schema, + 10000 + ); final AtomicLong startTime = new AtomicLong(); int lineCount; diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 8b4a3c40af3..b28e493a60f 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -39,7 +39,6 @@ import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; -import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -55,7 +54,6 @@ import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; @@ -109,7 +107,7 @@ public class IncrementalIndexTest @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return IncrementalIndexTest.createIndex(true, factories); + return IncrementalIndexTest.createIndex(factories); } } }, @@ -119,7 +117,7 @@ public class IncrementalIndexTest @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return IncrementalIndexTest.createIndex(false, factories); + return IncrementalIndexTest.createIndex(factories); } } } @@ -128,25 +126,15 @@ public class IncrementalIndexTest ); } - public static IncrementalIndex createIndex(boolean offheap, AggregatorFactory[] aggregatorFactories) + public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories) { if (null == aggregatorFactories) { aggregatorFactories = defaultAggregatorFactories; } - if (offheap) { - return new OffheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - aggregatorFactories, - TestQueryRunners.pool, - true, - 100 * 1024 * 1024 - ); - } else { - return new OnheapIncrementalIndex( - 0L, QueryGranularity.NONE, aggregatorFactories, 1000000 - ); - } + + return new OnheapIncrementalIndex( + 0L, QueryGranularity.NONE, aggregatorFactories, 1000000 + ); } public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException @@ -479,28 +467,6 @@ public class IncrementalIndexTest Assert.assertEquals(elementsPerThread, curr); } - @Test - public void testOffheapIndexIsFull() throws IndexSizeExceededException - { - OffheapIncrementalIndex index = new OffheapIncrementalIndex( - 0L, - QueryGranularity.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - TestQueryRunners.pool, - true, - (10 + 2) * 1024 * 1024 - ); - int rowCount = 0; - for (int i = 0; i < 500; i++) { - rowCount = index.add(getRow(System.currentTimeMillis(), i, 100)); - if (!index.canAppendRow()) { - break; - } - } - - Assert.assertTrue("rowCount : " + rowCount, rowCount > 200 && rowCount < 600); - } - @Test public void testgetDimensions() { diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 1919b326ebd..181b4d2a24c 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -30,7 +30,6 @@ import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; import io.druid.query.Result; -import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; @@ -67,6 +66,7 @@ public class IncrementalIndexStorageAdapterTest { public IncrementalIndex createIndex(); } + private final IndexCreator indexCreator; public IncrementalIndexStorageAdapterTest( @@ -81,36 +81,18 @@ public class IncrementalIndexStorageAdapterTest { return Arrays.asList( new Object[][]{ - { new IndexCreator() - { - @Override - public IncrementalIndex createIndex() - { - return new OnheapIncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); - } - } - - }, { new IndexCreator() { @Override public IncrementalIndex createIndex() { - return new OffheapIncrementalIndex( - 0, - QueryGranularity.MINUTE, - new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - TestQueryRunners.pool, - true, - 100 * 1024 * 1024 + return new OnheapIncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 ); } } } - } ); } @@ -220,31 +202,32 @@ public class IncrementalIndexStorageAdapterTest private static GroupByQueryEngine makeGroupByQueryEngine() { return new GroupByQueryEngine( - Suppliers.ofInstance( - new GroupByQueryConfig() + Suppliers.ofInstance( + new GroupByQueryConfig() + { + @Override + public int getMaxIntermediateRows() { - @Override - public int getMaxIntermediateRows() - { - return 5; - } + return 5; } - ), - new StupidPool( - new Supplier() + } + ), + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(50000); - } + return ByteBuffer.allocate(50000); } - ) - ); + } + ) + ); } @Test - public void testResetSanity() throws IOException{ + public void testResetSanity() throws IOException + { IncrementalIndex index = indexCreator.createIndex(); DateTime t = DateTime.now(); @@ -266,9 +249,11 @@ public class IncrementalIndexStorageAdapterTest ); IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index); - Sequence cursorSequence = adapter.makeCursors(new SelectorFilter("sally", "bo"), - interval, - QueryGranularity.NONE); + Sequence cursorSequence = adapter.makeCursors( + new SelectorFilter("sally", "bo"), + interval, + QueryGranularity.NONE + ); Cursor cursor = Sequences.toList(Sequences.limit(cursorSequence, 1), Lists.newArrayList()).get(0); DimensionSelector dimSelector; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 60cb36e1ab1..19bf663df54 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.io.Files; import io.druid.segment.IndexSpec; -import io.druid.segment.data.BitmapSerde; -import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; @@ -46,10 +44,6 @@ public class RealtimeTuningConfig implements TuningConfig private static final int defaultMaxPendingPersists = 0; private static final ShardSpec defaultShardSpec = new NoneShardSpec(); private static final IndexSpec defaultIndexSpec = new IndexSpec(); - private static final boolean defaultPersistInHeap = false; - private static final boolean defaultIngestOffheap = false; - private static final int defaultBufferSize = 128 * 1024* 1024; // 128M - private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f; // Might make sense for this to be a builder public static RealtimeTuningConfig makeDefaultTuningConfig() @@ -63,11 +57,7 @@ public class RealtimeTuningConfig implements TuningConfig defaultRejectionPolicyFactory, defaultMaxPendingPersists, defaultShardSpec, - defaultIndexSpec, - defaultPersistInHeap, - defaultIngestOffheap, - defaultBufferSize, - DEFAULT_AGG_BUFFER_RATIO + defaultIndexSpec ); } @@ -80,10 +70,6 @@ public class RealtimeTuningConfig implements TuningConfig private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; - private final boolean persistInHeap; - private final boolean ingestOffheap; - private final int bufferSize; - private final float aggregationBufferRatio; @JsonCreator public RealtimeTuningConfig( @@ -95,11 +81,7 @@ public class RealtimeTuningConfig implements TuningConfig @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, - @JsonProperty("indexSpec") IndexSpec indexSpec, - @JsonProperty("persistInHeap") Boolean persistInHeap, - @JsonProperty("ingestOffheap") Boolean ingestOffheap, - @JsonProperty("buffersize") Integer bufferSize, - @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio + @JsonProperty("indexSpec") IndexSpec indexSpec ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -115,10 +97,6 @@ public class RealtimeTuningConfig implements TuningConfig this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; - this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap; - this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap; - this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize; - this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio; } @JsonProperty @@ -175,28 +153,6 @@ public class RealtimeTuningConfig implements TuningConfig return indexSpec; } - @JsonProperty - public boolean isPersistInHeap() - { - return persistInHeap; - } - - @JsonProperty - public boolean isIngestOffheap(){ - return ingestOffheap; - } - - @JsonProperty - public int getBufferSize(){ - return bufferSize; - } - - @JsonProperty - public float getAggregationBufferRatio() - { - return aggregationBufferRatio; - } - public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -208,11 +164,7 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - indexSpec, - persistInHeap, - ingestOffheap, - bufferSize, - aggregationBufferRatio + indexSpec ); } @@ -227,11 +179,7 @@ public class RealtimeTuningConfig implements TuningConfig rejectionPolicyFactory, maxPendingPersists, shardSpec, - indexSpec, - persistInHeap, - ingestOffheap, - bufferSize, - aggregationBufferRatio + indexSpec ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 86f250fb460..cd260d0a2f5 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -26,7 +26,6 @@ import io.druid.common.guava.ThreadRenamingCallable; import io.druid.concurrent.Execs; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -64,7 +63,6 @@ public class FlushingPlumber extends RealtimePlumber DataSegmentAnnouncer segmentAnnouncer, ExecutorService queryExecutorService, IndexMerger indexMerger, - IndexMaker indexMaker, IndexIO indexIO ) { @@ -80,7 +78,6 @@ public class FlushingPlumber extends RealtimePlumber null, null, indexMerger, - indexMaker, indexIO ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index a986745a124..d54ff38abc3 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -25,7 +25,6 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -50,7 +49,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool private final DataSegmentAnnouncer segmentAnnouncer; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; - private final IndexMaker indexMaker; private final IndexIO indexIO; @JsonCreator @@ -61,7 +59,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool @JacksonInject DataSegmentAnnouncer segmentAnnouncer, @JacksonInject @Processing ExecutorService queryExecutorService, @JacksonInject IndexMerger indexMerger, - @JacksonInject IndexMaker indexMaker, @JacksonInject IndexIO indexIO ) { @@ -74,7 +71,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool null, queryExecutorService, indexMerger, - indexMaker, indexIO ); @@ -84,7 +80,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool this.segmentAnnouncer = segmentAnnouncer; this.queryExecutorService = queryExecutorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); - this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @@ -107,7 +102,6 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool segmentAnnouncer, queryExecutorService, indexMerger, - indexMaker, indexIO ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1f2a5863751..cd9261057f8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -59,7 +59,6 @@ import io.druid.query.SegmentDescriptor; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.QueryableIndex; @@ -78,8 +77,6 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.SingleElementPartitionChunk; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -131,7 +128,6 @@ public class RealtimePlumber implements Plumber private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; private volatile IndexMerger indexMerger; - private volatile IndexMaker indexMaker; private volatile IndexIO indexIO; private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; @@ -149,7 +145,6 @@ public class RealtimePlumber implements Plumber SegmentPublisher segmentPublisher, FilteredServerView serverView, IndexMerger indexMerger, - IndexMaker indexMaker, IndexIO indexIO ) { @@ -165,7 +160,6 @@ public class RealtimePlumber implements Plumber this.segmentPublisher = segmentPublisher; this.serverView = serverView; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); - this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); @@ -508,22 +502,13 @@ public class RealtimePlumber implements Plumber indexes.add(queryableIndex); } - final File mergedFile; - if (config.isPersistInHeap()) { - mergedFile = indexMaker.mergeQueryableIndex( - indexes, - schema.getAggregators(), - mergedTarget, - config.getIndexSpec() - ); - } else { - mergedFile = indexMerger.mergeQueryableIndex( - indexes, - schema.getAggregators(), - mergedTarget, - config.getIndexSpec() - ); - } + final File mergedFile = indexMerger.mergeQueryableIndex( + indexes, + schema.getAggregators(), + mergedTarget, + config.getIndexSpec() + ); + // emit merge metrics before publishing segment metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); @@ -980,24 +965,14 @@ public class RealtimePlumber implements Plumber try { int numRows = indexToPersist.getIndex().size(); - final File persistedFile; final IndexSpec indexSpec = config.getIndexSpec(); - if (config.isPersistInHeap()) { - persistedFile = indexMaker.persist( - indexToPersist.getIndex(), - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - metaData, - indexSpec - ); - } else { - persistedFile = indexMerger.persist( - indexToPersist.getIndex(), - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - metaData, - indexSpec - ); - } + final File persistedFile = indexMerger.persist( + indexToPersist.getIndex(), + new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), + metaData, + indexSpec + ); indexToPersist.swapSegment( new QueryableIndexSegment( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index ffe5d78b6b9..ee40c2058d4 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -25,7 +25,6 @@ import io.druid.client.FilteredServerView; import io.druid.guice.annotations.Processing; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.segment.IndexIO; -import io.druid.segment.IndexMaker; import io.druid.segment.IndexMerger; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -48,7 +47,6 @@ public class RealtimePlumberSchool implements PlumberSchool private final FilteredServerView serverView; private final ExecutorService queryExecutorService; private final IndexMerger indexMerger; - private final IndexMaker indexMaker; private final IndexIO indexIO; @JsonCreator @@ -61,7 +59,6 @@ public class RealtimePlumberSchool implements PlumberSchool @JacksonInject FilteredServerView serverView, @JacksonInject @Processing ExecutorService executorService, @JacksonInject IndexMerger indexMerger, - @JacksonInject IndexMaker indexMaker, @JacksonInject IndexIO indexIO ) { @@ -73,7 +70,6 @@ public class RealtimePlumberSchool implements PlumberSchool this.serverView = serverView; this.queryExecutorService = executorService; this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger"); - this.indexMaker = Preconditions.checkNotNull(indexMaker, "Null IndexMaker"); this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO"); } @@ -98,7 +94,6 @@ public class RealtimePlumberSchool implements PlumberSchool segmentPublisher, serverView, indexMerger, - indexMaker, indexIO ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index fceee9aaff5..f4d827661e7 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -25,12 +25,10 @@ import com.google.common.collect.Lists; import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; -import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; @@ -191,21 +189,10 @@ public class Sink implements Iterable .withDimensionsSpec(schema.getParser()) .withMetrics(schema.getAggregators()) .build(); - final IncrementalIndex newIndex; - if (config.isIngestOffheap()) { - newIndex = new OffheapIncrementalIndex( - indexSchema, - // Assuming half space for aggregates - new OffheapBufferPool((int) ((double) config.getBufferSize() * config.getAggregationBufferRatio())), - true, - config.getBufferSize() - ); - } else { - newIndex = new OnheapIncrementalIndex( - indexSchema, - config.getMaxRowsInMemory() - ); - } + final IncrementalIndex newIndex = new OnheapIncrementalIndex( + indexSchema, + config.getMaxRowsInMemory() + ); final FireHydrant old; synchronized (hydrantLock) { diff --git a/server/src/main/java/io/druid/server/bridge/Bridge.java b/server/src/main/java/io/druid/server/bridge/Bridge.java deleted file mode 100644 index 1345a5113b0..00000000000 --- a/server/src/main/java/io/druid/server/bridge/Bridge.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.google.inject.BindingAnnotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - */ -@BindingAnnotation -@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) -@Retention(RetentionPolicy.RUNTIME) -public @interface Bridge -{ -} diff --git a/server/src/main/java/io/druid/server/bridge/BridgeCuratorConfig.java b/server/src/main/java/io/druid/server/bridge/BridgeCuratorConfig.java deleted file mode 100644 index ea215ca583b..00000000000 --- a/server/src/main/java/io/druid/server/bridge/BridgeCuratorConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import io.druid.curator.CuratorConfig; -import org.skife.config.Config; - -/** - */ -public abstract class BridgeCuratorConfig extends CuratorConfig -{ - @Config("druid.bridge.zk.service.host") - public abstract String getParentZkHosts(); -} diff --git a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java b/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java deleted file mode 100644 index edc0c16fa83..00000000000 --- a/server/src/main/java/io/druid/server/bridge/BridgeQuerySegmentWalker.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.inject.Inject; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import com.metamx.common.logger.Logger; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; -import io.druid.client.selector.Server; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.guice.annotations.Global; -import io.druid.query.Query; -import io.druid.query.QueryRunner; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.SegmentDescriptor; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.joda.time.Interval; - -import javax.ws.rs.core.MediaType; -import java.net.URL; -import java.util.List; -import java.util.Map; - -/** - */ -public class BridgeQuerySegmentWalker implements QuerySegmentWalker -{ - private static final Logger log = new Logger(BridgeQuerySegmentWalker.class); - - private final ServerDiscoverySelector brokerSelector; - private final HttpClient httpClient; - private final ObjectMapper jsonMapper; - private final StatusResponseHandler responseHandler; - - @Inject - public BridgeQuerySegmentWalker( - ServerDiscoverySelector brokerSelector, - @Global HttpClient httpClient, - ObjectMapper jsonMapper - ) - { - this.brokerSelector = brokerSelector; - this.httpClient = httpClient; - this.jsonMapper = jsonMapper; - this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); - } - - @Override - public QueryRunner getQueryRunnerForIntervals( - Query query, Iterable intervals - ) - { - return makeRunner(); - } - - @Override - public QueryRunner getQueryRunnerForSegments( - Query query, Iterable specs - ) - { - return makeRunner(); - } - - private QueryRunner makeRunner() - { - return new QueryRunner() - { - @Override - public Sequence run(Query query, Map responseContext) - { - try { - Server instance = brokerSelector.pick(); - if (instance == null) { - return Sequences.empty(); - } - final Server brokerServer = brokerSelector.pick(); - final URL url = new URL( - brokerServer.getScheme(), - brokerServer.getAddress(), - brokerServer.getPort(), - "/druid/v2/" - ); - - StatusResponseHolder response = httpClient.go( - new Request( - HttpMethod.POST, - url - ).setContent( - MediaType.APPLICATION_JSON, - jsonMapper.writeValueAsBytes(query) - ), - responseHandler - ).get(); - - List results = jsonMapper.readValue( - response.getContent(), new TypeReference>() - { - } - ); - - return Sequences.simple(results); - } - catch (Exception e) { - log.error(e, "Exception with bridge query"); - - return Sequences.empty(); - } - } - }; - } -} diff --git a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java b/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java deleted file mode 100644 index d0a828b8608..00000000000 --- a/server/src/main/java/io/druid/server/bridge/BridgeZkCoordinator.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import com.metamx.common.logger.Logger; -import io.druid.client.ServerView; -import io.druid.concurrent.Execs; -import io.druid.metadata.MetadataSegmentManager; -import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.segment.realtime.SegmentPublisher; -import io.druid.server.coordination.BaseZkCoordinator; -import io.druid.server.coordination.DataSegmentChangeCallback; -import io.druid.server.coordination.DataSegmentChangeHandler; -import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import java.util.concurrent.ExecutorService; - -/** - */ -public class BridgeZkCoordinator extends BaseZkCoordinator -{ - private static final Logger log = new Logger(BaseZkCoordinator.class); - - private final SegmentPublisher dbSegmentPublisher; - private final MetadataSegmentManager databaseSegmentManager; - private final ServerView serverView; - - private final ExecutorService exec = Execs.singleThreaded("BridgeZkCoordinatorServerView-%s"); - - @Inject - public BridgeZkCoordinator( - ObjectMapper jsonMapper, - ZkPathsConfig zkPaths, - SegmentLoaderConfig config, - DruidServerMetadata me, - @Bridge CuratorFramework curator, - SegmentPublisher dbSegmentPublisher, - MetadataSegmentManager databaseSegmentManager, - ServerView serverView - ) - { - super(jsonMapper, zkPaths, config, me, curator); - - this.dbSegmentPublisher = dbSegmentPublisher; - this.databaseSegmentManager = databaseSegmentManager; - this.serverView = serverView; - } - - @Override - public void loadLocalCache() - { - // do nothing - } - - @Override - public DataSegmentChangeHandler getDataSegmentChangeHandler() - { - return BridgeZkCoordinator.this; - } - - @Override - public void addSegment(final DataSegment segment, final DataSegmentChangeCallback callback) - { - try { - log.info("Publishing segment %s", segment.getIdentifier()); - dbSegmentPublisher.publishSegment(segment); - serverView.registerSegmentCallback( - exec, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment theSegment - ) - { - if (theSegment.equals(segment)) { - callback.execute(); - } - return ServerView.CallbackAction.CONTINUE; - } - } - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback) - { - databaseSegmentManager.removeSegment(segment.getDataSource(), segment.getIdentifier()); - serverView.registerSegmentCallback( - exec, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment theSegment - ) - { - if (theSegment.equals(segment)) { - callback.execute(); - } - return ServerView.CallbackAction.CONTINUE; - } - } - ); - } -} diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java deleted file mode 100644 index 40e14f67979..00000000000 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import com.google.inject.Inject; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.guava.FunctionalIterable; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.emitter.EmittingLogger; -import io.druid.client.DruidServer; -import io.druid.client.ServerInventoryView; -import io.druid.client.ServerView; -import io.druid.concurrent.Execs; -import io.druid.curator.announcement.Announcer; -import io.druid.guice.ManageLifecycle; -import io.druid.guice.annotations.Self; -import io.druid.server.DruidNode; -import io.druid.server.coordination.AbstractDataSegmentAnnouncer; -import io.druid.server.coordination.DataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.framework.recipes.leader.LeaderLatchListener; -import org.apache.curator.utils.ZKPaths; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; - -/** - */ -@ManageLifecycle -public class DruidClusterBridge -{ - public static final String BRIDGE_OWNER_NODE = "_BRIDGE"; - public static final String NODE_TYPE = "bridge"; - - private static final EmittingLogger log = new EmittingLogger(DruidClusterBridge.class); - - private final ObjectMapper jsonMapper; - private final DruidClusterBridgeConfig config; - private final ScheduledExecutorService exec; - private final DruidNode self; - - // Communicates to the ZK cluster that this bridge node is deployed at - private final CuratorFramework curator; - private final AtomicReference leaderLatch; - - // Communicates to the remote (parent) ZK cluster - private final BridgeZkCoordinator bridgeZkCoordinator; - private final Announcer announcer; - private final ServerInventoryView serverInventoryView; - private final ZkPathsConfig zkPathsConfig; - - private final DruidServerMetadata druidServerMetadata; - - private final Map segments = Maps.newHashMap(); - private final Object lock = new Object(); - - private volatile boolean started = false; - private volatile boolean leader = false; - - @Inject - public DruidClusterBridge( - ObjectMapper jsonMapper, - DruidClusterBridgeConfig config, - ZkPathsConfig zkPathsConfig, - DruidServerMetadata druidServerMetadata, - ScheduledExecutorFactory scheduledExecutorFactory, - @Self DruidNode self, - CuratorFramework curator, - AtomicReference leaderLatch, - BridgeZkCoordinator bridgeZkCoordinator, - @Bridge Announcer announcer, - @Bridge final AbstractDataSegmentAnnouncer dataSegmentAnnouncer, - ServerInventoryView serverInventoryView - ) - { - this.jsonMapper = jsonMapper; - this.config = config; - this.bridgeZkCoordinator = bridgeZkCoordinator; - this.zkPathsConfig = zkPathsConfig; - this.announcer = announcer; - this.serverInventoryView = serverInventoryView; - this.curator = curator; - this.leaderLatch = leaderLatch; - this.druidServerMetadata = druidServerMetadata; - - this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); - this.self = self; - - ExecutorService serverInventoryViewExec = Execs.singleThreaded("DruidClusterBridge-ServerInventoryView-%d"); - - serverInventoryView.registerSegmentCallback( - serverInventoryViewExec, - new ServerView.BaseSegmentCallback() - { - @Override - public ServerView.CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) - { - try { - synchronized (lock) { - Integer count = segments.get(segment); - if (count == null) { - segments.put(segment, 1); - dataSegmentAnnouncer.announceSegment(segment); - } else { - segments.put(segment, count + 1); - } - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - return ServerView.CallbackAction.CONTINUE; - } - - @Override - public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) - { - try { - synchronized (lock) { - serverRemovedSegment(dataSegmentAnnouncer, segment, server); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - - return ServerView.CallbackAction.CONTINUE; - } - } - ); - - serverInventoryView.registerServerCallback( - serverInventoryViewExec, - new ServerView.ServerCallback() - { - @Override - public ServerView.CallbackAction serverRemoved(DruidServer server) - { - try { - for (DataSegment dataSegment : server.getSegments().values()) { - serverRemovedSegment(dataSegmentAnnouncer, dataSegment, server.getMetadata()); - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } - return ServerView.CallbackAction.CONTINUE; - } - } - - ); - } - - public boolean isLeader() - { - return leader; - } - - @LifecycleStart - public void start() - { - synchronized (lock) { - if (started) { - return; - } - started = true; - - createNewLeaderLatch(); - try { - leaderLatch.get().start(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - - private LeaderLatch createNewLeaderLatch() - { - final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(zkPathsConfig.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort() - ); - - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() - { - becomeLeader(); - } - - @Override - public void notLeader() - { - stopBeingLeader(); - } - }, - Execs.singleThreaded("CoordinatorLeader-%s") - ); - - return leaderLatch.getAndSet(newLeaderLatch); - } - - @LifecycleStop - public void stop() - { - synchronized (lock) { - if (!started) { - return; - } - - stopBeingLeader(); - - try { - leaderLatch.get().close(); - } - catch (IOException e) { - log.warn(e, "Unable to close leaderLatch, ignoring"); - } - - exec.shutdown(); - - started = false; - } - } - - private void becomeLeader() - { - synchronized (lock) { - if (!started) { - return; - } - - log.info("Go-Go Gadgetmobile! Starting bridge in %s", config.getStartDelay()); - try { - bridgeZkCoordinator.start(); - serverInventoryView.start(); - - ScheduledExecutors.scheduleWithFixedDelay( - exec, - config.getStartDelay(), - config.getPeriod(), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - if (leader) { - Iterable servers = FunctionalIterable - .create(serverInventoryView.getInventory()) - .filter( - new Predicate() - { - @Override - public boolean apply( - DruidServer input - ) - { - return input.isAssignable(); - } - } - ); - - long totalMaxSize = 0; - for (DruidServer server : servers) { - totalMaxSize += server.getMaxSize(); - } - - if (totalMaxSize == 0) { - log.warn("No servers founds!"); - } else { - DruidServerMetadata me = new DruidServerMetadata( - self.getHostAndPort(), - self.getHostAndPort(), - totalMaxSize, - NODE_TYPE, - druidServerMetadata.getTier(), - druidServerMetadata.getPriority() - ); - - try { - final String path = ZKPaths.makePath(zkPathsConfig.getAnnouncementsPath(), self.getHostAndPort()); - log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize); - announcer.update(path, jsonMapper.writeValueAsBytes(me)); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - if (leader) { // (We might no longer be leader) - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; - } - } - } - ); - - leader = true; - } - catch (Exception e) { - log.makeAlert(e, "Exception becoming leader") - .emit(); - final LeaderLatch oldLatch = createNewLeaderLatch(); - CloseQuietly.close(oldLatch); - try { - leaderLatch.get().start(); - } - catch (Exception e1) { - // If an exception gets thrown out here, then the bridge will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e1, "I am a zombie") - .emit(); - } - } - } - } - - private void stopBeingLeader() - { - synchronized (lock) { - try { - log.info("I'll get you next time, Gadget. Next time!"); - - bridgeZkCoordinator.stop(); - serverInventoryView.stop(); - - leader = false; - } - catch (Exception e) { - log.makeAlert(e, "Unable to stopBeingLeader").emit(); - } - } - } - - private void serverRemovedSegment( - DataSegmentAnnouncer dataSegmentAnnouncer, - DataSegment segment, - DruidServerMetadata server - ) - throws IOException - { - Integer count = segments.get(segment); - if (count != null) { - if (count == 1) { - dataSegmentAnnouncer.unannounceSegment(segment); - segments.remove(segment); - } else { - segments.put(segment, count - 1); - } - } else { - log.makeAlert("Trying to remove a segment that was never added?") - .addData("server", server.getHost()) - .addData("segmentId", segment.getIdentifier()) - .emit(); - } - } -} diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java deleted file mode 100644 index 2ea73bfa6bc..00000000000 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridgeConfig.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.joda.time.Duration; - -/** - */ -public abstract class DruidClusterBridgeConfig -{ - @JsonProperty - private Duration startDelay = new Duration("PT300s"); - @JsonProperty - private Duration period = new Duration("PT60s"); - @JsonProperty - private String brokerServiceName = "broker"; - - public Duration getStartDelay() - { - return startDelay; - } - - public void setStartDelay(Duration startDelay) - { - this.startDelay = startDelay; - } - - public Duration getPeriod() - { - return period; - } - - public void setPeriod(Duration period) - { - this.period = period; - } - - public String getBrokerServiceName() - { - return brokerServiceName; - } - - public void setBrokerServiceName(String brokerServiceName) - { - this.brokerServiceName = brokerServiceName; - } -} diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java deleted file mode 100644 index c1faea8a165..00000000000 --- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.coordination; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.emitter.EmittingLogger; -import io.druid.concurrent.Execs; -import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.server.initialization.ZkPathsConfig; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; -import org.apache.curator.utils.ZKPaths; - -import java.io.IOException; - -/** - */ -public abstract class BaseZkCoordinator implements DataSegmentChangeHandler -{ - private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); - - private final Object lock = new Object(); - - private final ObjectMapper jsonMapper; - private final ZkPathsConfig zkPaths; - private final SegmentLoaderConfig config; - private final DruidServerMetadata me; - private final CuratorFramework curator; - - private volatile PathChildrenCache loadQueueCache; - private volatile boolean started = false; - - public BaseZkCoordinator( - ObjectMapper jsonMapper, - ZkPathsConfig zkPaths, - SegmentLoaderConfig config, - DruidServerMetadata me, - CuratorFramework curator - ) - { - this.jsonMapper = jsonMapper; - this.zkPaths = zkPaths; - this.config = config; - this.me = me; - this.curator = curator; - } - - @LifecycleStart - public void start() throws IOException - { - synchronized (lock) { - if (started) { - return; - } - - log.info("Starting zkCoordinator for server[%s]", me.getName()); - - final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); - final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); - final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName()); - - loadQueueCache = new PathChildrenCache( - curator, - loadQueueLocation, - true, - true, - Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s") - ); - - try { - curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); - curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); - curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); - - loadLocalCache(); - - loadQueueCache.getListenable().addListener( - new PathChildrenCacheListener() - { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception - { - final ChildData child = event.getData(); - switch (event.getType()) { - case CHILD_ADDED: - final String path = child.getPath(); - final DataSegmentChangeRequest request = jsonMapper.readValue( - child.getData(), DataSegmentChangeRequest.class - ); - - log.info("New request[%s] with zNode[%s].", request.asString(), path); - - try { - request.go( - getDataSegmentChangeHandler(), - new DataSegmentChangeCallback() - { - boolean hasRun = false; - - @Override - public void execute() - { - try { - if (!hasRun) { - curator.delete().guaranteed().forPath(path); - log.info("Completed request [%s]", request.asString()); - hasRun = true; - } - } - catch (Exception e) { - try { - curator.delete().guaranteed().forPath(path); - } - catch (Exception e1) { - log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); - } - log.error(e, "Exception while removing zNode[%s]", path); - throw Throwables.propagate(e); - } - } - } - ); - } - catch (Exception e) { - try { - curator.delete().guaranteed().forPath(path); - } - catch (Exception e1) { - log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); - } - - log.makeAlert(e, "Segment load/unload: uncaught exception.") - .addData("node", path) - .addData("nodeProperties", request) - .emit(); - } - - break; - case CHILD_REMOVED: - log.info("zNode[%s] was removed", event.getData().getPath()); - break; - default: - log.info("Ignoring event[%s]", event); - } - } - } - ); - loadQueueCache.start(); - } - catch (Exception e) { - Throwables.propagateIfPossible(e, IOException.class); - throw Throwables.propagate(e); - } - - started = true; - } - } - - @LifecycleStop - public void stop() - { - log.info("Stopping ZkCoordinator for [%s]", me); - synchronized (lock) { - if (!started) { - return; - } - - try { - loadQueueCache.close(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - loadQueueCache = null; - started = false; - } - } - } - - public boolean isStarted() - { - return started; - } - - public abstract void loadLocalCache(); - - public abstract DataSegmentChangeHandler getDataSegmentChangeHandler(); -} diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncerProvider.java b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncerProvider.java index 8fe19aea3e1..92304292670 100644 --- a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncerProvider.java +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncerProvider.java @@ -25,7 +25,6 @@ import com.google.inject.Provider; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchDataSegmentAnnouncerProvider.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "legacy", value = LegacyDataSegmentAnnouncerProvider.class), @JsonSubTypes.Type(name = "batch", value = BatchDataSegmentAnnouncerProvider.class) }) public interface DataSegmentAnnouncerProvider extends Provider diff --git a/server/src/main/java/io/druid/server/coordination/LegacyDataSegmentAnnouncerProvider.java b/server/src/main/java/io/druid/server/coordination/LegacyDataSegmentAnnouncerProvider.java deleted file mode 100644 index 3dc8df1dc35..00000000000 --- a/server/src/main/java/io/druid/server/coordination/LegacyDataSegmentAnnouncerProvider.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.coordination; - -import com.fasterxml.jackson.annotation.JacksonInject; - -import javax.validation.constraints.NotNull; -import java.util.Arrays; - -/** - */ -public class LegacyDataSegmentAnnouncerProvider implements DataSegmentAnnouncerProvider -{ - @JacksonInject - @NotNull - private SingleDataSegmentAnnouncer singleAnnouncer = null; - - @JacksonInject - @NotNull - private BatchDataSegmentAnnouncer batchAnnouncer = null; - - @Override - public DataSegmentAnnouncer get() - { - return new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( - Arrays.asList(singleAnnouncer, batchAnnouncer) - ); - } -} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 0a9523aaba5..427f4dc6a45 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -1,30 +1,33 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.server.coordination; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Inject; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; import io.druid.segment.loading.SegmentLoaderConfig; @@ -32,6 +35,11 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; import java.io.File; import java.io.IOException; @@ -41,7 +49,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -50,17 +57,25 @@ import java.util.concurrent.atomic.AtomicInteger; /** */ -public class ZkCoordinator extends BaseZkCoordinator +public class ZkCoordinator implements DataSegmentChangeHandler { private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class); + private final Object lock = new Object(); + private final ObjectMapper jsonMapper; + private final ZkPathsConfig zkPaths; private final SegmentLoaderConfig config; + private final DruidServerMetadata me; + private final CuratorFramework curator; private final DataSegmentAnnouncer announcer; private final ServerManager serverManager; private final ScheduledExecutorService exec; - @Inject + + private volatile PathChildrenCache loadQueueCache; + private volatile boolean started = false; + public ZkCoordinator( ObjectMapper jsonMapper, SegmentLoaderConfig config, @@ -72,17 +87,155 @@ public class ZkCoordinator extends BaseZkCoordinator ScheduledExecutorFactory factory ) { - super(jsonMapper, zkPaths, config, me, curator); - this.jsonMapper = jsonMapper; + this.zkPaths = zkPaths; this.config = config; + this.me = me; + this.curator = curator; this.announcer = announcer; this.serverManager = serverManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); } - @Override + @LifecycleStart + public void start() throws IOException + { + synchronized (lock) { + if (started) { + return; + } + + log.info("Starting zkCoordinator for server[%s]", me.getName()); + + final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); + final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); + final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName()); + + loadQueueCache = new PathChildrenCache( + curator, + loadQueueLocation, + true, + true, + Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s") + ); + + try { + curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); + curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); + curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); + + loadLocalCache(); + + loadQueueCache.getListenable().addListener( + new PathChildrenCacheListener() + { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception + { + final ChildData child = event.getData(); + switch (event.getType()) { + case CHILD_ADDED: + final String path = child.getPath(); + final DataSegmentChangeRequest request = jsonMapper.readValue( + child.getData(), DataSegmentChangeRequest.class + ); + + log.info("New request[%s] with zNode[%s].", request.asString(), path); + + try { + request.go( + getDataSegmentChangeHandler(), + new DataSegmentChangeCallback() + { + boolean hasRun = false; + + @Override + public void execute() + { + try { + if (!hasRun) { + curator.delete().guaranteed().forPath(path); + log.info("Completed request [%s]", request.asString()); + hasRun = true; + } + } + catch (Exception e) { + try { + curator.delete().guaranteed().forPath(path); + } + catch (Exception e1) { + log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); + } + log.error(e, "Exception while removing zNode[%s]", path); + throw Throwables.propagate(e); + } + } + } + ); + } + catch (Exception e) { + try { + curator.delete().guaranteed().forPath(path); + } + catch (Exception e1) { + log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path); + } + + log.makeAlert(e, "Segment load/unload: uncaught exception.") + .addData("node", path) + .addData("nodeProperties", request) + .emit(); + } + + break; + case CHILD_REMOVED: + log.info("zNode[%s] was removed", event.getData().getPath()); + break; + default: + log.info("Ignoring event[%s]", event); + } + } + } + ); + loadQueueCache.start(); + } + catch (Exception e) { + Throwables.propagateIfPossible(e, IOException.class); + throw Throwables.propagate(e); + } + + started = true; + } + } + + @LifecycleStop + public void stop() + { + log.info("Stopping ZkCoordinator for [%s]", me); + synchronized (lock) { + if (!started) { + return; + } + + try { + loadQueueCache.close(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + loadQueueCache = null; + started = false; + } + } + } + + public boolean isStarted() + { + return started; + } + public void loadLocalCache() { final long start = System.currentTimeMillis(); @@ -129,7 +282,6 @@ public class ZkCoordinator extends BaseZkCoordinator ); } - @Override public DataSegmentChangeHandler getDataSegmentChangeHandler() { return ZkCoordinator.this; @@ -168,7 +320,7 @@ public class ZkCoordinator extends BaseZkCoordinator { try { log.info("Loading segment %s", segment.getIdentifier()); - if(loadSegment(segment, callback)) { + if (loadSegment(segment, callback)) { try { announcer.announceSegment(segment); } @@ -203,24 +355,34 @@ public class ZkCoordinator extends BaseZkCoordinator final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>(); for (final DataSegment segment : segments) { loadingExecutor.submit( - new Runnable() { + new Runnable() + { @Override - public void run() { + public void run() + { try { - log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier()); + log.info( + "Loading segment[%d/%d][%s]", + counter.getAndIncrement(), + numSegments, + segment.getIdentifier() + ); final boolean loaded = loadSegment(segment, callback); if (loaded) { try { backgroundSegmentAnnouncer.announceSegment(segment); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SegmentLoadingException(e, "Loading Interrupted"); } } - } catch (SegmentLoadingException e) { + } + catch (SegmentLoadingException e) { log.error(e, "[%s] failed to load", segment.getIdentifier()); failedSegments.add(segment); - } finally { + } + finally { latch.countDown(); } } @@ -228,14 +390,15 @@ public class ZkCoordinator extends BaseZkCoordinator ); } - try{ + try { latch.await(); - if(failedSegments.size() > 0) { + if (failedSegments.size() > 0) { log.makeAlert("%,d errors seen while loading segments", failedSegments.size()) - .addData("failedSegments", failedSegments); + .addData("failedSegments", failedSegments); } - } catch(InterruptedException e) { + } + catch (InterruptedException e) { Thread.currentThread().interrupt(); log.makeAlert(e, "LoadingInterrupted"); } @@ -244,8 +407,8 @@ public class ZkCoordinator extends BaseZkCoordinator } catch (SegmentLoadingException e) { log.makeAlert(e, "Failed to load segments -- likely problem with announcing.") - .addData("numSegments", segments.size()) - .emit(); + .addData("numSegments", segments.size()) + .emit(); } finally { callback.execute(); @@ -298,7 +461,8 @@ public class ZkCoordinator extends BaseZkCoordinator } } - private static class BackgroundSegmentAnnouncer implements AutoCloseable { + private static class BackgroundSegmentAnnouncer implements AutoCloseable + { private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class); private final int intervalMillis; diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 824f07890f4..4a17b89d873 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -87,13 +87,12 @@ public class FireDepartmentTest null, null, TestHelper.getTestIndexMerger(), - TestHelper.getTestIndexMaker(), TestHelper.getTestIndexIO() ), null ), new RealtimeTuningConfig( - null, null, null, null, null, null, null, null, null, false, false, null, null + null, null, null, null, null, null, null, null, null ) ); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 2bb2c2ea8fa..3048f0df937 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -150,10 +150,6 @@ public class RealtimeManagerTest null, null, null, - null, - null, - null, - null, null ); plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString())); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 56c0494150e..cab23a1c84f 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -181,10 +181,6 @@ public class RealtimePlumberSchoolTest rejectionPolicy, null, null, - null, - null, - null, - null, null ); @@ -197,7 +193,6 @@ public class RealtimePlumberSchoolTest serverView, MoreExecutors.sameThreadExecutor(), TestHelper.getTestIndexMerger(), - TestHelper.getTestIndexMaker(), TestHelper.getTestIndexIO() ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 925b41baa01..b5caf135f84 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -64,10 +64,6 @@ public class SinkTest null, null, null, - null, - false, - false, - null, null ); final Sink sink = new Sink(interval, schema, tuningConfig, version); diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java deleted file mode 100644 index 6fe203dc0a8..00000000000 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.server.bridge; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.lifecycle.Lifecycle; -import io.druid.client.BatchServerInventoryView; -import io.druid.client.DruidServer; -import io.druid.client.ServerView; -import io.druid.curator.PotentiallyGzippedCompressionProvider; -import io.druid.curator.announcement.Announcer; -import io.druid.metadata.MetadataSegmentManager; -import io.druid.jackson.DefaultObjectMapper; -import io.druid.segment.loading.SegmentLoaderConfig; -import io.druid.segment.realtime.SegmentPublisher; -import io.druid.server.DruidNode; -import io.druid.server.coordination.BatchDataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ZkPathsConfig; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.test.TestingCluster; -import org.easymock.EasyMock; -import org.joda.time.Duration; -import org.junit.Test; - -import java.util.Arrays; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - -/** - */ -public class DruidClusterBridgeTest -{ - - public static final int WAIT_MAX_RETRY = 100; - public static final int WAIT_SLEEP_MILLIS = 200; - - @Test - public void testRun() throws Exception - { - TestingCluster localCluster = new TestingCluster(1); - localCluster.start(); - - CuratorFramework localCf = CuratorFrameworkFactory.builder() - .connectString(localCluster.getConnectString()) - .retryPolicy(new ExponentialBackoffRetry(1, 10)) - .compressionProvider( - new PotentiallyGzippedCompressionProvider( - false - ) - ) - .build(); - localCf.start(); - - - TestingCluster remoteCluster = new TestingCluster(1); - remoteCluster.start(); - - CuratorFramework remoteCf = CuratorFrameworkFactory.builder() - .connectString(remoteCluster.getConnectString()) - .retryPolicy(new ExponentialBackoffRetry(1, 10)) - .compressionProvider( - new PotentiallyGzippedCompressionProvider( - false - ) - ) - .build(); - remoteCf.start(); - - ObjectMapper jsonMapper = new DefaultObjectMapper(); - DruidClusterBridgeConfig config = new DruidClusterBridgeConfig() - { - @Override - public Duration getStartDelay() - { - return new Duration(0); - } - - @Override - public Duration getPeriod() - { - return new Duration(Long.MAX_VALUE); - } - }; - - ScheduledExecutorFactory factory = ScheduledExecutors.createFactory(new Lifecycle()); - - DruidNode me = new DruidNode( - "me", - "localhost", - 8080 - ); - - AtomicReference leaderLatch = new AtomicReference<>(new LeaderLatch(localCf, "/test")); - - ZkPathsConfig zkPathsConfig = new ZkPathsConfig() - { - @Override - public String getBase() - { - return "/druid"; - } - }; - DruidServerMetadata metadata = new DruidServerMetadata( - "test", - "localhost", - 1000, - "bridge", - DruidServer.DEFAULT_TIER, - 0 - ); - SegmentPublisher dbSegmentPublisher = EasyMock.createMock(SegmentPublisher.class); - EasyMock.replay(dbSegmentPublisher); - MetadataSegmentManager databaseSegmentManager = EasyMock.createMock(MetadataSegmentManager.class); - EasyMock.replay(databaseSegmentManager); - ServerView serverView = EasyMock.createMock(ServerView.class); - EasyMock.replay(serverView); - - BridgeZkCoordinator bridgeZkCoordinator = new BridgeZkCoordinator( - jsonMapper, - zkPathsConfig, - new SegmentLoaderConfig(), - metadata, - remoteCf, - dbSegmentPublisher, - databaseSegmentManager, - serverView - ); - - Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); - announcer.start(); - announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me)); - - BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); - BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); - EasyMock.expect(batchServerInventoryView.getInventory()).andReturn( - Arrays.asList( - new DruidServer("1", "localhost", 117, "historical", DruidServer.DEFAULT_TIER, 0), - new DruidServer("2", "localhost", 1, "historical", DruidServer.DEFAULT_TIER, 0) - ) - ); - batchServerInventoryView.registerSegmentCallback( - EasyMock.anyObject(), - EasyMock.anyObject() - ); - batchServerInventoryView.registerServerCallback( - EasyMock.anyObject(), - EasyMock.anyObject() - ); - EasyMock.expectLastCall(); - batchServerInventoryView.start(); - EasyMock.expectLastCall(); - batchServerInventoryView.stop(); - EasyMock.expectLastCall(); - EasyMock.replay(batchServerInventoryView); - - DruidClusterBridge bridge = new DruidClusterBridge( - jsonMapper, - config, - zkPathsConfig, - metadata, - factory, - me, - localCf, - leaderLatch, - bridgeZkCoordinator, - announcer, - batchDataSegmentAnnouncer, - batchServerInventoryView - ); - - bridge.start(); - - int retry = 0; - while (!bridge.isLeader()) { - if (retry > WAIT_MAX_RETRY) { - throw new ISE("Unable to become leader"); - } - - Thread.sleep(WAIT_SLEEP_MILLIS); - retry++; - } - - String path = "/druid/announcements/localhost:8080"; - retry = 0; - while (remoteCf.checkExists().forPath(path) == null) { - if (retry > WAIT_MAX_RETRY) { - throw new ISE("Unable to announce"); - } - - Thread.sleep(WAIT_SLEEP_MILLIS); - retry++; - } - - boolean verified = verifyUpdate(jsonMapper, path, remoteCf); - retry = 0; - while (!verified) { - if (retry > WAIT_MAX_RETRY) { - throw new ISE("No updates to bridge node occurred"); - } - - Thread.sleep(WAIT_SLEEP_MILLIS); - retry++; - - verified = verifyUpdate(jsonMapper, path, remoteCf); - } - - announcer.stop(); - bridge.stop(); - - remoteCf.close(); - remoteCluster.close(); - localCf.close(); - localCluster.close(); - - EasyMock.verify(batchServerInventoryView); - EasyMock.verify(dbSegmentPublisher); - EasyMock.verify(databaseSegmentManager); - EasyMock.verify(serverView); - } - - private boolean verifyUpdate(ObjectMapper jsonMapper, String path, CuratorFramework remoteCf) throws Exception - { - DruidServerMetadata announced = jsonMapper.readValue( - remoteCf.getData().forPath(path), - DruidServerMetadata.class - ); - - return (118 == announced.getMaxSize()); - } -} diff --git a/services/src/main/java/io/druid/cli/CliBridge.java b/services/src/main/java/io/druid/cli/CliBridge.java deleted file mode 100644 index 505014038be..00000000000 --- a/services/src/main/java/io/druid/cli/CliBridge.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.cli; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Module; -import com.google.inject.Provides; -import com.google.inject.name.Names; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.common.logger.Logger; -import io.airlift.airline.Command; -import io.druid.concurrent.Execs; -import io.druid.curator.PotentiallyGzippedCompressionProvider; -import io.druid.curator.announcement.Announcer; -import io.druid.curator.discovery.ServerDiscoveryFactory; -import io.druid.curator.discovery.ServerDiscoverySelector; -import io.druid.metadata.MetadataSegmentManager; -import io.druid.metadata.MetadataSegmentManagerConfig; -import io.druid.metadata.MetadataSegmentManagerProvider; -import io.druid.guice.ConfigProvider; -import io.druid.guice.Jerseys; -import io.druid.guice.JsonConfigProvider; -import io.druid.guice.LazySingleton; -import io.druid.guice.LifecycleModule; -import io.druid.guice.ManageLifecycle; -import io.druid.guice.ManageLifecycleLast; -import io.druid.guice.NodeTypeConfig; -import io.druid.query.QuerySegmentWalker; -import io.druid.server.QueryResource; -import io.druid.server.bridge.Bridge; -import io.druid.server.bridge.BridgeCuratorConfig; -import io.druid.server.bridge.BridgeQuerySegmentWalker; -import io.druid.server.bridge.BridgeZkCoordinator; -import io.druid.server.bridge.DruidClusterBridge; -import io.druid.server.bridge.DruidClusterBridgeConfig; -import io.druid.server.coordination.AbstractDataSegmentAnnouncer; -import io.druid.server.coordination.BatchDataSegmentAnnouncer; -import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; -import io.druid.server.initialization.ZkPathsConfig; -import io.druid.server.initialization.jetty.JettyServerInitializer; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.BoundedExponentialBackoffRetry; -import org.eclipse.jetty.server.Server; - -import java.util.List; - -/** - */ -@Command( - name = "bridge", - description = "This is a highly experimental node to use at your own discretion" -) -public class CliBridge extends ServerRunnable -{ - private static final Logger log = new Logger(CliBridge.class); - - public CliBridge() - { - super(log); - } - - @Override - protected List getModules() - { - return ImmutableList.of( - new Module() - { - @Override - public void configure(Binder binder) - { - binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/bridge"); - binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8089); - - ConfigProvider.bind(binder, BridgeCuratorConfig.class); - - binder.bind(BridgeZkCoordinator.class).in(ManageLifecycle.class); - binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("bridge")); - - JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); - binder.bind(MetadataSegmentManager.class) - .toProvider(MetadataSegmentManagerProvider.class) - .in(ManageLifecycle.class); - - binder.bind(QuerySegmentWalker.class).to(BridgeQuerySegmentWalker.class).in(LazySingleton.class); - binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); - LifecycleModule.register(binder, QueryResource.class); - - ConfigProvider.bind(binder, DruidClusterBridgeConfig.class); - binder.bind(DruidClusterBridge.class); - LifecycleModule.register(binder, DruidClusterBridge.class); - - LifecycleModule.register(binder, BridgeZkCoordinator.class); - - LifecycleModule.register(binder, Server.class); - } - - @Provides - @LazySingleton - @Bridge - public CuratorFramework getBridgeCurator(final BridgeCuratorConfig bridgeCuratorConfig, Lifecycle lifecycle) - { - final CuratorFramework framework = - CuratorFrameworkFactory.builder() - .connectString(bridgeCuratorConfig.getParentZkHosts()) - .sessionTimeoutMs(bridgeCuratorConfig.getZkSessionTimeoutMs()) - .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) - .compressionProvider( - new PotentiallyGzippedCompressionProvider( - bridgeCuratorConfig.getEnableCompression() - ) - ) - .build(); - - lifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - log.info("Starting Curator for %s", bridgeCuratorConfig.getParentZkHosts()); - framework.start(); - } - - @Override - public void stop() - { - log.info("Stopping Curator"); - framework.close(); - } - } - ); - - return framework; - } - - @Provides - @ManageLifecycle - public ServerDiscoverySelector getServerDiscoverySelector( - DruidClusterBridgeConfig config, - ServerDiscoveryFactory factory - - ) - { - return factory.createSelector(config.getBrokerServiceName()); - } - - @Provides - @ManageLifecycle - @Bridge - public Announcer getBridgeAnnouncer( - @Bridge CuratorFramework curator - ) - { - return new Announcer(curator, Execs.singleThreaded("BridgeAnnouncer-%s")); - } - - @Provides - @ManageLifecycleLast - @Bridge - public AbstractDataSegmentAnnouncer getBridgeDataSegmentAnnouncer( - DruidServerMetadata metadata, - BatchDataSegmentAnnouncerConfig config, - ZkPathsConfig zkPathsConfig, - @Bridge Announcer announcer, - ObjectMapper jsonMapper - ) - { - return new BatchDataSegmentAnnouncer( - metadata, - config, - zkPathsConfig, - announcer, - jsonMapper - ); - } - } - ); - } -} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index 113bdf19c31..e38f0ccb0c0 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -48,7 +48,7 @@ public class Main .withCommands( CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class, CliMiddleManager.class, - CliBridge.class, CliRouter.class + CliRouter.class ); builder.withGroup("example")