Sync changes from branch new-ingestion PR #599

Sync and Resolve Conflicts
This commit is contained in:
nishantmonu51 2014-07-11 16:15:10 +05:30
parent 2d03888a2c
commit f5f05e3a9b
39 changed files with 2134 additions and 1963 deletions

View File

@ -36,8 +36,9 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.AbstractProgressIndicator;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
@ -99,7 +100,6 @@ public class IndexGeneratorJob implements Jobby
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@ -297,7 +297,7 @@ public class IndexGeneratorJob implements Jobby
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
@ -316,8 +316,8 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(file);
context.progress();
IndexMerger.persist(
index, interval, file, new IndexMerger.ProgressIndicator()
IndexMaker.persist(
index, interval, file, new AbstractProgressIndicator()
{
@Override
public void progress()
@ -345,8 +345,8 @@ public class IndexGeneratorJob implements Jobby
}
mergedBase = new File(baseFlushFile, "merged");
IndexMerger.persist(
index, interval, mergedBase, new IndexMerger.ProgressIndicator()
IndexMaker.persist(
index, interval, mergedBase, new AbstractProgressIndicator()
{
@Override
public void progress()
@ -358,8 +358,8 @@ public class IndexGeneratorJob implements Jobby
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
IndexMaker.persist(
index, interval, finalFile, new AbstractProgressIndicator()
{
@Override
public void progress()
@ -374,8 +374,8 @@ public class IndexGeneratorJob implements Jobby
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
mergedBase = IndexMaker.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator()
{
@Override
public void progress()
@ -625,7 +625,7 @@ public class IndexGeneratorJob implements Jobby
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getSchema().getDataSchema().getParser())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build(),

View File

@ -35,7 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
@ -166,7 +166,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
// Map merged segment so we can extract dimensions
@ -211,8 +211,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
IndexMerger.persist(
IndexMaker.persist(
indexToPersist.getIndex(),
dirToPersist
);

View File

@ -27,7 +27,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
@ -106,7 +106,7 @@ public class AppendTask extends MergeTaskBase
);
}
return IndexMerger.append(adapters, outDir);
return IndexMaker.append(adapters, outDir);
}
@Override

View File

@ -1,122 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
public class DeleteTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(DeleteTask.class);
@JsonCreator
public DeleteTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
id != null ? id : String.format(
"delete_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
dataSource,
Preconditions.checkNotNull(interval, "interval")
);
}
@Override
public String getType()
{
return "delete";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(
0,
QueryGranularity.NONE,
new AggregatorFactory[0],
new OffheapBufferPool(0)
);
try {
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
log.info(
"Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
);
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
finally {
empty.close();
}
}
}

View File

@ -383,7 +383,11 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec),
metrics
);
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0

View File

@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.timeline.DataSegment;
@ -60,7 +60,7 @@ public class MergeTask extends MergeTaskBase
public File merge(final Map<DataSegment, File> segments, final File outDir)
throws Exception
{
return IndexMerger.mergeQueryableIndex(
return IndexMaker.mergeQueryableIndex(
Lists.transform(
ImmutableList.copyOf(segments.values()),
new Function<File, QueryableIndex>()

View File

@ -41,7 +41,6 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;

View File

@ -43,7 +43,6 @@ import io.druid.query.QueryRunner;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),

View File

@ -46,7 +46,7 @@ public class MergeTaskBaseTest
final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments)
{
@Override
protected File merge(Map<DataSegment, File> segments, File outDir) throws Exception
protected File merge( Map<DataSegment, File> segments, File outDir) throws Exception
{
return null;
}

View File

@ -242,53 +242,6 @@ public class TaskSerdeTest
);
}
@Test
public void testDeleteTaskSerde() throws Exception
{
final DeleteTask task = new DeleteTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testDeleteTaskFromJson() throws Exception
{
final ObjectMapper jsonMapper = new DefaultObjectMapper();
final DeleteTask task = (DeleteTask) jsonMapper.readValue(
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
Task.class
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertNotNull(task.getId());
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testAppendTaskSerde() throws Exception
{

View File

@ -0,0 +1,61 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
/**
*/
public abstract class AbstractProgressIndicator implements ProgressIndicator
{
@Override
public void progress()
{
// do nothing
}
@Override
public void start()
{
// do nothing
}
@Override
public void stop()
{
// do nothing
}
@Override
public void startSection(String section)
{
// do nothing
}
@Override
public void progressSection(String section, String message)
{
// do nothing
}
@Override
public void stopSection(String section)
{
// do nothing
}
}

View File

@ -202,7 +202,7 @@ public class IndexIO
case 6:
case 7:
log.info("Old version, re-persisting.");
IndexMerger.append(
IndexMaker.append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
converted
);
@ -689,7 +689,7 @@ public class IndexIO
return new SimpleQueryableIndex(
index.getDataInterval(),
new ArrayIndexed<String>(cols, String.class),
new ArrayIndexed<>(cols, String.class),
index.getAvailableDimensions(),
new ColumnBuilder()
.setType(ValueType.LONG)
@ -723,8 +723,6 @@ public class IndexIO
Map<String, Column> columns = Maps.newHashMap();
ObjectMapper mapper = new DefaultObjectMapper();
for (String columnName : cols) {
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName)));
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -19,11 +19,14 @@
package io.druid.segment;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import org.joda.time.Interval;
/**
* An adapter to an index
*/
public interface IndexableAdapter
{
@ -31,9 +34,9 @@ public interface IndexableAdapter
int getNumRows();
Indexed<String> getAvailableDimensions();
Indexed<String> getDimensionNames();
Indexed<String> getAvailableMetrics();
Indexed<String> getMetricNames();
Indexed<String> getDimValueLookup(String dimension);
@ -42,4 +45,6 @@ public interface IndexableAdapter
IndexedInts getInverteds(String dimension, String value);
String getMetricType(String metric);
ColumnCapabilities getCapabilities(String column);
}

View File

@ -0,0 +1,100 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
*/
public class LoggingProgressIndicator extends AbstractProgressIndicator
{
private static Logger log = new Logger(LoggingProgressIndicator.class);
private final String progressName;
private final Stopwatch global;
private final Map<String, Stopwatch> sections = Maps.newHashMap();
public LoggingProgressIndicator(String progressName)
{
this.progressName = progressName;
this.global = new Stopwatch();
}
@Override
public void start()
{
log.info("Starting [%s]", progressName);
global.start();
}
@Override
public void stop()
{
long time = global.elapsed(TimeUnit.MILLISECONDS);
global.stop();
log.info("[%s] complete. Elapsed time: [%,d] millis", progressName, time);
}
@Override
public void startSection(String section)
{
log.info("[%s]: Starting [%s]", progressName, section);
Stopwatch sectionWatch = sections.get(section);
if (sectionWatch != null) {
throw new ISE("[%s]: Cannot start progress tracker for [%s]. It is already started.", progressName, section);
}
sectionWatch = new Stopwatch();
sections.put(section, sectionWatch);
sectionWatch.start();
}
@Override
public void progressSection(String section, String message)
{
Stopwatch sectionWatch = sections.remove(section);
if (sectionWatch == null) {
throw new ISE("[%s]: Cannot progress tracker for [%s]. Nothing started.", progressName, section);
}
long time = sectionWatch.elapsed(TimeUnit.MILLISECONDS);
log.info("[%s]: [%s] : %s. Elapsed time: [%,d] millis", progressName, section, message, time);
}
@Override
public void stopSection(String section)
{
Stopwatch sectionWatch = sections.remove(section);
if (sectionWatch == null) {
throw new ISE("[%s]: Cannot stop progress tracker for [%s]. Nothing started.", progressName, section);
}
long time = sectionWatch.elapsed(TimeUnit.MILLISECONDS);
sectionWatch.stop();
log.info("[%s]: [%s] has completed. Elapsed time: [%,d] millis", progressName, section, time);
}
}

View File

@ -1,197 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedLongs;
import org.joda.time.Interval;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
/**
*/
public class MMappedIndexAdapter implements IndexableAdapter
{
private final MMappedIndex index;
private final int numRows;
public MMappedIndexAdapter(MMappedIndex index)
{
this.index = index;
numRows = index.getReadOnlyTimestamps().size();
}
@Override
public Interval getDataInterval()
{
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return numRows;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Indexed<String> getAvailableMetrics()
{
return index.getAvailableMetrics();
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return index.getDimValueLookup(dimension);
}
@Override
public Iterable<Rowboat> getRows()
{
return new Iterable<Rowboat>()
{
@Override
public Iterator<Rowboat> iterator()
{
return new Iterator<Rowboat>()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final MetricHolder[] metrics;
final IndexedFloats[] floatMetrics;
final Map<String, Indexed<? extends IndexedInts>> dimensions;
final int numMetrics = index.getAvailableMetrics().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : index.getAvailableDimensions()) {
dimensions.put(dim, index.getDimColumn(dim));
}
final Indexed<String> availableMetrics = index.getAvailableMetrics();
metrics = new MetricHolder[availableMetrics.size()];
floatMetrics = new IndexedFloats[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
metrics[i] = index.getMetricHolder(availableMetrics.get(i));
if (metrics[i].getType() == MetricHolder.MetricType.FLOAT) {
floatMetrics[i] = metrics[i].getFloatType();
}
}
}
@Override
public boolean hasNext()
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
for (IndexedFloats floatMetric : floatMetrics) {
Closeables.closeQuietly(floatMetric);
}
done = true;
}
return hasNext;
}
@Override
public Rowboat next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int[][] dims = new int[dimensions.size()][];
int dimIndex = 0;
for (String dim : dimensions.keySet()) {
IndexedInts dimVals = dimensions.get(dim).get(currRow);
int[] theVals = new int[dimVals.size()];
for (int j = 0; j < theVals.length; ++j) {
theVals[j] = dimVals.get(j);
}
dims[dimIndex++] = theVals;
}
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
switch (metrics[i].getType()) {
case FLOAT:
metricArray[i] = floatMetrics[i].get(currRow);
break;
case COMPLEX:
metricArray[i] = metrics[i].getComplexType().get(currRow);
}
}
Map<String, String> descriptions = Maps.newHashMap();
for (String spatialDim : index.getSpatialIndexes().keySet()) {
descriptions.put(spatialDim, "spatial");
}
final Rowboat retVal = new Rowboat(timestamps.get(currRow), dims, metricArray, currRow, descriptions);
++currRow;
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public IndexedInts getInverteds(String dimension, String value)
{
return new ConciseCompressedIndexedInts(index.getInvertedIndex(dimension, value));
}
@Override
public String getMetricType(String metric)
{
MetricHolder holder = index.getMetricHolder(metric);
if (holder == null) {
return null;
}
return holder.getTypeName();
}
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment;
/**
*/
public interface ProgressIndicator
{
public void progress();
public void start();
public void stop();
public void startSection(String section);
public void progressSection(String section, String message);
public void stopSection(String section);
}

View File

@ -27,6 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.GenericColumn;
@ -94,18 +95,18 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
}
@Override
public Indexed<String> getAvailableDimensions()
public Indexed<String> getDimensionNames()
{
return new ListIndexed<String>(availableDimensions, String.class);
return new ListIndexed<>(availableDimensions, String.class);
}
@Override
public Indexed<String> getAvailableMetrics()
public Indexed<String> getMetricNames()
{
final Set<String> columns = Sets.newLinkedHashSet(input.getColumnNames());
final HashSet<String> dimensions = Sets.newHashSet(getAvailableDimensions());
final HashSet<String> dimensions = Sets.newHashSet(getDimensionNames());
return new ListIndexed<String>(
return new ListIndexed<>(
Lists.newArrayList(Sets.difference(columns, dimensions)),
String.class
);
@ -174,18 +175,18 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
final Object[] metrics;
final Map<String, DictionaryEncodedColumn> dimensions;
final int numMetrics = getAvailableMetrics().size();
final int numMetrics = getMetricNames().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : getAvailableDimensions()) {
for (String dim : getDimensionNames()) {
dimensions.put(dim, input.getColumn(dim).getDictionaryEncoding());
}
final Indexed<String> availableMetrics = getAvailableMetrics();
final Indexed<String> availableMetrics = getMetricNames();
metrics = new Object[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
final Column column = input.getColumn(availableMetrics.get(i));
@ -254,14 +255,8 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
}
}
Map<String, String> descriptions = Maps.newHashMap();
for (String columnName : input.getColumnNames()) {
if (input.getColumn(columnName).getSpatialIndex() != null) {
descriptions.put(columnName, "spatial");
}
}
final Rowboat retVal = new Rowboat(
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow, descriptions
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow
);
++currRow;
@ -311,4 +306,10 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
throw new ISE("Unknown type[%s]", type);
}
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return input.getColumn(column).getCapabilities();
}
}

View File

@ -37,21 +37,17 @@ public class Rowboat implements Comparable<Rowboat>
private final int rowNum;
private final Map<Integer, TreeSet<Integer>> comprisedRows;
private Map<String, String> columnDescriptor;
public Rowboat(
long timestamp,
int[][] dims,
Object[] metrics,
int rowNum,
Map<String, String> columnDescriptor
int rowNum
)
{
this.timestamp = timestamp;
this.dims = dims;
this.metrics = metrics;
this.rowNum = rowNum;
this.columnDescriptor = columnDescriptor;
this.comprisedRows = Maps.newHashMap();
}
@ -91,11 +87,6 @@ public class Rowboat implements Comparable<Rowboat>
return rowNum;
}
public Map<String, String> getDescriptions()
{
return columnDescriptor;
}
@Override
public int compareTo(Rowboat rhs)
{

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import org.joda.time.Interval;
@ -51,15 +52,15 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
}
@Override
public Indexed<String> getAvailableDimensions()
public Indexed<String> getDimensionNames()
{
return baseAdapter.getAvailableDimensions();
return baseAdapter.getDimensionNames();
}
@Override
public Indexed<String> getAvailableMetrics()
public Indexed<String> getMetricNames()
{
return baseAdapter.getAvailableMetrics();
return baseAdapter.getMetricNames();
}
@Override
@ -85,4 +86,10 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
{
return baseAdapter.getMetricType(metric);
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return baseAdapter.getCapabilities(column);
}
}

View File

@ -30,4 +30,6 @@ public interface ColumnCapabilities
public boolean hasBitmapIndexes();
public boolean hasSpatialIndexes();
public boolean hasMultipleValues();
public ColumnCapabilitiesImpl merge(ColumnCapabilities other);
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.column;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.ISE;
/**
*/
@ -109,4 +110,28 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
this.hasMultipleValues = hasMultipleValues;
return this;
}
@Override
public ColumnCapabilitiesImpl merge(ColumnCapabilities other)
{
if (other == null) {
return this;
}
if (type == null) {
type = other.getType();
}
if (!type.equals(other.getType())) {
throw new ISE("Cannot merge columns of type[%s] and [%s]", type, other.getType());
}
this.dictionaryEncoded |= other.isDictionaryEncoded();
this.runLengthEncoded |= other.isRunLengthEncoded();
this.hasInvertedIndexes |= other.hasBitmapIndexes();
this.hasSpatialIndexes |= other.hasSpatialIndexes();
this.hasMultipleValues |= other.hasMultipleValues();
return this;
}
}

View File

@ -38,8 +38,6 @@ import java.util.Arrays;
*/
public class GenericIndexedWriter<T> implements Closeable
{
private static final byte[] EMPTY_ARRAY = new byte[]{};
private final IOPeon ioPeon;
private final String filenameBase;
private final ObjectStrategy<T> strategy;

View File

@ -57,7 +57,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
if (val > maxValue) {
throw new IAE("val[%d] > maxValue[%d], please don't lie about maxValue. i[%d]", val, maxValue, i);
}
byte[] intAsBytes = Ints.toByteArray(val);
buffer.put(intAsBytes, intAsBytes.length - numBytes, numBytes);
++i;
@ -138,8 +138,8 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
if (retVal == 0) {
retVal = buffer.compareTo(o.buffer);
}
return retVal;
return retVal;
}
public int getNumBytes()
@ -149,6 +149,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
public int getSerializedSize()
{
// version, numBytes, size, remaining
return 1 + 1 + 4 + buffer.remaining();
}

View File

@ -1,145 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
/**
* Streams arrays of objects out in the binary format described by VSizeIndexed
*/
public class VSizeIndexedWriter implements Closeable
{
private static final byte version = 0x1;
private static final byte[] EMPTY_ARRAY = new byte[]{};
private final int maxId;
private CountingOutputStream headerOut = null;
private CountingOutputStream valuesOut = null;
int numWritten = 0;
private final IOPeon ioPeon;
private final String filenameBase;
public VSizeIndexedWriter(
IOPeon ioPeon,
String filenameBase,
int maxId
)
{
this.ioPeon = ioPeon;
this.filenameBase = filenameBase;
this.maxId = maxId;
}
public void open() throws IOException
{
headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header")));
valuesOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("values")));
}
public void write(List<Integer> ints) throws IOException
{
byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.fromList(ints, maxId).getBytesNoPadding();
valuesOut.write(bytesToWrite);
headerOut.write(Ints.toByteArray((int) valuesOut.getCount()));
++numWritten;
}
private String makeFilename(String suffix)
{
return String.format("%s.%s", filenameBase, suffix);
}
@Override
public void close() throws IOException
{
final byte numBytesForMax = VSizeIndexedInts.getNumBytesForMax(maxId);
valuesOut.write(new byte[4 - numBytesForMax]);
Closeables.close(headerOut, false);
Closeables.close(valuesOut, false);
final long numBytesWritten = headerOut.getCount() + valuesOut.getCount();
Preconditions.checkState(
headerOut.getCount() == (numWritten * 4),
"numWritten[%s] number of rows should have [%s] bytes written to headerOut, had[%s]",
numWritten,
numWritten * 4,
headerOut.getCount()
);
Preconditions.checkState(
numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten
);
OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta"));
try {
metaOut.write(new byte[]{version, numBytesForMax});
metaOut.write(Ints.toByteArray((int) numBytesWritten + 4));
metaOut.write(Ints.toByteArray(numWritten));
}
finally {
metaOut.close();
}
}
public InputSupplier<InputStream> combineStreams()
{
return ByteStreams.join(
Iterables.transform(
Arrays.asList("meta", "header", "values"),
new Function<String,InputSupplier<InputStream>>() {
@Override
public InputSupplier<InputStream> apply(final String input)
{
return new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
return ioPeon.makeInputStream(makeFilename(input));
}
};
}
}
)
);
}
}

View File

@ -20,7 +20,6 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@ -29,11 +28,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
@ -49,6 +48,9 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
@ -76,10 +78,11 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class IncrementalIndex implements Iterable<Row>, Closeable
{
private static final Logger log = new Logger(IncrementalIndex.class);
private static final Joiner JOINER = Joiner.on(",");
private final long minTimestamp;
private final QueryGranularity gran;
private final Set<Function<InputRow, InputRow>> rowTransformers;
private final AggregatorFactory[] metrics;
private final Map<String, Integer> metricIndexes;
private final Map<String, String> metricTypes;
@ -89,9 +92,10 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
private final int totalAggSize;
private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final ConcurrentSkipListMap<TimeAndDims, Integer> facts;
private final ResourceHolder<ByteBuffer> bufferHolder;
private volatile AtomicInteger numEntries = new AtomicInteger();
@ -116,6 +120,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
this.rowTransformers = Sets.newHashSet();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
@ -280,17 +285,44 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
this.totalAggSize = currAggSize;
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<String>();
this.dimensions = new CopyOnWriteArrayList<>();
int index = 0;
for (String dim : incrementalIndexSchema.getDimensions()) {
for (String dim : incrementalIndexSchema.getDimensionsSpec().getDimensions()) {
dimensionOrder.put(dim, index++);
dimensions.add(dim);
}
this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions();
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions);
// This should really be more generic
List<SpatialDimensionSchema> spatialDimensions = incrementalIndexSchema.getDimensionsSpec().getSpatialDimensions();
if (!spatialDimensions.isEmpty()) {
this.rowTransformers.add(new SpatialDimensionRowTransformer(spatialDimensions));
}
this.columnCapabilities = Maps.newHashMap();
for (Map.Entry<String, String> entry : metricTypes.entrySet()) {
ValueType type;
if (entry.getValue().equalsIgnoreCase("float")) {
type = ValueType.FLOAT;
} else {
type = ValueType.COMPLEX;
}
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(type);
columnCapabilities.put(entry.getKey(), capabilities);
}
for (String dimension : dimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
capabilities.setHasSpatialIndexes(true);
columnCapabilities.put(spatialDimension.getDimName(), capabilities);
}
this.bufferHolder = bufferPool.take();
this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<TimeAndDims, Integer>();
this.facts = new ConcurrentSkipListMap<>();
}
public IncrementalIndex(
@ -336,6 +368,18 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
);
}
public InputRow formatRow(InputRow row)
{
for (Function<InputRow, InputRow> rowTransformer : rowTransformers) {
row = rowTransformer.apply(row);
}
if (row == null) {
throw new IAE("Row is null? How can this be?!");
}
return row;
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
@ -350,7 +394,7 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
*/
public int add(InputRow row)
{
row = spatialDimensionRowFormatter.formatRow(row);
row = formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
@ -364,6 +408,18 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
// Set column capabilities as data is coming in
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dimension, capabilities);
}
if (dimensionValues.size() > 1) {
capabilities.setHasMultipleValues(true);
}
Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
@ -466,16 +522,6 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return dimensions;
}
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
public SpatialDimensionRowFormatter getSpatialDimensionRowFormatter()
{
return spatialDimensionRowFormatter;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
@ -541,6 +587,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
return aggs[metricIndex];
}
ColumnCapabilities getCapabilities(String column)
{
return columnCapabilities.get(column);
}
ConcurrentSkipListMap<TimeAndDims, Integer> getFacts()
{
return facts;

View File

@ -26,6 +26,8 @@ import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -111,13 +113,13 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
@Override
public Indexed<String> getAvailableDimensions()
public Indexed<String> getDimensionNames()
{
return new ListIndexed<String>(index.getDimensions(), String.class);
}
@Override
public Indexed<String> getAvailableMetrics()
public Indexed<String> getMetricNames()
{
return new ListIndexed<String>(index.getMetricNames(), String.class);
}
@ -208,16 +210,11 @@ public class IncrementalIndexAdapter implements IndexableAdapter
.get(index.getMetricBuffer(), index.getMetricPosition(rowOffset, i));
}
Map<String, String> description = Maps.newHashMap();
for (SpatialDimensionSchema spatialDimensionSchema : index.getSpatialDimensions()) {
description.put(spatialDimensionSchema.getDimName(), "spatial");
}
return new Rowboat(
timeAndDims.getTimestamp(),
dims,
metrics,
count++,
description
count++
);
}
}
@ -287,4 +284,10 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{
return index.getMetricType(metric);
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return index.getCapabilities(column);
}
}

View File

@ -19,39 +19,30 @@
package io.druid.segment.incremental;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import java.util.Collections;
import java.util.List;
/**
*/
public class IncrementalIndexSchema
{
private final long minTimestamp;
private final QueryGranularity gran;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] metrics;
public IncrementalIndexSchema(
long minTimestamp,
QueryGranularity gran,
List<String> dimensions,
List<SpatialDimensionSchema> spatialDimensions,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] metrics
)
{
this.minTimestamp = minTimestamp;
this.gran = gran;
this.dimensions = dimensions;
this.spatialDimensions = spatialDimensions;
this.dimensionsSpec = dimensionsSpec;
this.metrics = metrics;
}
@ -65,14 +56,9 @@ public class IncrementalIndexSchema
return gran;
}
public List<String> getDimensions()
public DimensionsSpec getDimensionsSpec()
{
return dimensions;
}
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
return dimensionsSpec;
}
public AggregatorFactory[] getMetrics()
@ -84,16 +70,14 @@ public class IncrementalIndexSchema
{
private long minTimestamp;
private QueryGranularity gran;
private List<String> dimensions;
private List<SpatialDimensionSchema> spatialDimensions;
private DimensionsSpec dimensionsSpec;
private AggregatorFactory[] metrics;
public Builder()
{
this.minTimestamp = 0L;
this.gran = QueryGranularity.NONE;
this.dimensions = Lists.newArrayList();
this.spatialDimensions = Lists.newArrayList();
this.dimensionsSpec = new DimensionsSpec(null, null, null);
this.metrics = new AggregatorFactory[]{};
}
@ -109,44 +93,25 @@ public class IncrementalIndexSchema
return this;
}
public Builder withDimensions(Iterable<String> dimensions)
public Builder withDimensionsSpec(DimensionsSpec dimensionsSpec)
{
this.dimensions = Lists.newArrayList(
Iterables.transform(
dimensions, new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
)
);
Collections.sort(this.dimensions);
this.dimensionsSpec = dimensionsSpec;
return this;
}
public Builder withSpatialDimensions(InputRowParser parser)
public Builder withDimensionsSpec(InputRowParser parser)
{
if (parser != null
&& parser.getParseSpec() != null
&& parser.getParseSpec().getDimensionsSpec() != null
&& parser.getParseSpec().getDimensionsSpec().getSpatialDimensions() != null) {
this.spatialDimensions = parser.getParseSpec().getDimensionsSpec().getSpatialDimensions();
&& parser.getParseSpec().getDimensionsSpec() != null) {
this.dimensionsSpec = parser.getParseSpec().getDimensionsSpec();
} else {
this.spatialDimensions = Lists.newArrayList();
this.dimensionsSpec = new DimensionsSpec(null, null, null);
}
return this;
}
public Builder withSpatialDimensions(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
return this;
}
public Builder withMetrics(AggregatorFactory[] metrics)
{
this.metrics = metrics;
@ -156,7 +121,7 @@ public class IncrementalIndexSchema
public IncrementalIndexSchema build()
{
return new IncrementalIndexSchema(
minTimestamp, gran, dimensions, spatialDimensions, metrics
minTimestamp, gran, dimensionsSpec, metrics
);
}
}

View File

@ -40,7 +40,7 @@ import java.util.Set;
/**
* We throw away all invalid spatial dimensions
*/
public class SpatialDimensionRowFormatter
public class SpatialDimensionRowTransformer implements Function<InputRow, InputRow>
{
private static final Joiner JOINER = Joiner.on(",");
private static final Splitter SPLITTER = Splitter.on(",");
@ -49,7 +49,7 @@ public class SpatialDimensionRowFormatter
private final Set<String> spatialDimNames;
private final Set<String> spatialPartialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
public SpatialDimensionRowTransformer(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
@ -82,7 +82,8 @@ public class SpatialDimensionRowFormatter
);
}
public InputRow formatRow(final InputRow row)
@Override
public InputRow apply(final InputRow row)
{
final Map<String, List<String>> spatialLookup = Maps.newHashMap();

View File

@ -42,12 +42,7 @@ import java.nio.channels.WritableByteChannel;
*/
public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
{
@JsonCreator
public static DictionaryEncodedColumnPartSerde createDeserializer(boolean singleValued)
{
return new DictionaryEncodedColumnPartSerde();
}
private final boolean isSingleValued;
private final GenericIndexed<String> dictionary;
private final VSizeIndexedInts singleValuedColumn;
private final VSizeIndexed multiValuedColumn;
@ -64,6 +59,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
ImmutableRTree spatialIndex
)
{
this.isSingleValued = multiValCol == null;
this.dictionary = dictionary;
this.singleValuedColumn = singleValCol;
this.multiValuedColumn = multiValCol;
@ -86,20 +82,25 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
this.size = size;
}
private DictionaryEncodedColumnPartSerde()
@JsonCreator
public DictionaryEncodedColumnPartSerde(
@JsonProperty("isSingleValued") boolean isSingleValued
)
{
dictionary = null;
singleValuedColumn = null;
multiValuedColumn = null;
bitmaps = null;
spatialIndex = null;
size = 0;
this.isSingleValued = isSingleValued;
this.dictionary = null;
this.singleValuedColumn = null;
this.multiValuedColumn = null;
this.bitmaps = null;
this.spatialIndex = null;
this.size = 0;
}
@JsonProperty
private boolean isSingleValued()
{
return singleValuedColumn != null;
return isSingleValued;
}
@Override
@ -111,14 +112,26 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
@Override
public void write(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{(byte) (isSingleValued() ? 0x0 : 0x1)}));
dictionary.writeToChannel(channel);
if (isSingleValued()) {
singleValuedColumn.writeToChannel(channel);
} else {
multiValuedColumn.writeToChannel(channel);
channel.write(ByteBuffer.wrap(new byte[]{(byte) (isSingleValued ? 0x0 : 0x1)}));
if (dictionary != null) {
dictionary.writeToChannel(channel);
}
bitmaps.writeToChannel(channel);
if (isSingleValued()) {
if (singleValuedColumn != null) {
singleValuedColumn.writeToChannel(channel);
}
} else {
if (multiValuedColumn != null) {
multiValuedColumn.writeToChannel(channel);
}
}
if (bitmaps != null) {
bitmaps.writeToChannel(channel);
}
if (spatialIndex != null) {
ByteBufferSerializer.writeToChannel(spatialIndex, IndexedRTree.objectStrategy, channel);
}

View File

@ -48,12 +48,16 @@ public class EmptyIndexTest
IncrementalIndex emptyIndex = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0], TestQueryRunners.pool);
IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(new Interval("2012-08-01/P3D"), emptyIndex);
IndexMerger.merge(Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir);
IndexMaker.merge(
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],
tmpDir
);
QueryableIndex emptyQueryableIndex = IndexIO.loadIndex(tmpDir);
Assert.assertEquals("getAvailableDimensions", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getAvailableMetrics", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));
Assert.assertEquals("getDimensionNames", 0, Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
Assert.assertEquals("getMetricNames", 0, Iterables.size(emptyQueryableIndex.getColumnNames()));
Assert.assertEquals("getDataInterval", new Interval("2012-08-01/P3D"), emptyQueryableIndex.getDataInterval());
Assert.assertEquals("getReadOnlyTimestamps", 0, emptyQueryableIndex.getTimeColumn().getLength());
}

View File

@ -38,7 +38,7 @@ import java.util.Arrays;
/**
*/
public class IndexMergerTest
public class IndexMakerTest
{
@Test
public void testPersistCaseInsensitive() throws Exception
@ -49,7 +49,7 @@ public class IndexMergerTest
final File tempDir = Files.createTempDir();
try {
QueryableIndex index = IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir));
QueryableIndex index = IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir));
Assert.assertEquals(2, index.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -88,20 +88,24 @@ public class IndexMergerTest
final File tempDir2 = Files.createTempDir();
final File mergedDir = Files.createTempDir();
try {
QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1));
QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1));
Assert.assertEquals(2, index1.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(2, index1.getColumnNames().size());
QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2));
QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2));
Assert.assertEquals(2, index2.getTimeColumn().getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
Assert.assertEquals(2, index2.getColumnNames().size());
QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, mergedDir)
IndexMaker.mergeQueryableIndex(
Arrays.asList(index1, index2),
new AggregatorFactory[]{},
mergedDir
)
);
Assert.assertEquals(3, merged.getTimeColumn().getLength());
@ -141,10 +145,10 @@ public class IndexMergerTest
)
);
final QueryableIndex index1 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMerger.persist(toPersist1, tmpDir2));
final QueryableIndex index1 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir1));
final QueryableIndex index2 = IndexIO.loadIndex(IndexMaker.persist(toPersist1, tmpDir2));
final QueryableIndex merged = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
IndexMaker.mergeQueryableIndex(Arrays.asList(index1, index2), new AggregatorFactory[]{}, tmpDir3)
);
Assert.assertEquals(1, index1.getTimeColumn().getLength());
@ -155,7 +159,8 @@ public class IndexMergerTest
Assert.assertEquals(1, merged.getTimeColumn().getLength());
Assert.assertEquals(ImmutableList.of("dim2"), ImmutableList.copyOf(merged.getAvailableDimensions()));
} finally {
}
finally {
FileUtils.deleteQuietly(tmpDir1);
FileUtils.deleteQuietly(tmpDir2);
FileUtils.deleteQuietly(tmpDir3);

View File

@ -131,11 +131,11 @@ public class TestIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(top, DATA_INTERVAL, topFile);
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile);
IndexMaker.persist(top, DATA_INTERVAL, topFile);
IndexMaker.persist(bottom, DATA_INTERVAL, bottomFile);
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
IndexMaker.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile
@ -229,7 +229,7 @@ public class TestIndex
someTmpFile.mkdirs();
someTmpFile.deleteOnExit();
IndexMerger.persist(index, someTmpFile);
IndexMaker.persist(index, someTmpFile);
return IndexIO.loadIndex(someTmpFile);
}
catch (IOException e) {

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
@ -45,7 +46,7 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -71,13 +72,17 @@ import java.util.Random;
public class SpatialFilterBonusTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
private final Segment segment;
public SpatialFilterBonusTest(Segment segment)
{
this.segment = segment;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -106,11 +111,15 @@ public class SpatialFilterBonusTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
@ -221,7 +230,7 @@ public class SpatialFilterBonusTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
IndexMaker.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
@ -232,13 +241,18 @@ public class SpatialFilterBonusTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
TestQueryRunners.pool
);
@ -246,29 +260,40 @@ public class SpatialFilterBonusTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
TestQueryRunners.pool
);
IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
)
).build(),
TestQueryRunners.pool
);
@ -384,12 +409,12 @@ public class SpatialFilterBonusTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
IndexMaker.persist(first, DATA_INTERVAL, firstFile);
IndexMaker.persist(second, DATA_INTERVAL, secondFile);
IndexMaker.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
IndexMaker.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
@ -403,13 +428,6 @@ public class SpatialFilterBonusTest
}
}
private final Segment segment;
public SpatialFilterBonusTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
@ -45,7 +46,7 @@ import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -71,13 +72,17 @@ import java.util.Random;
public class SpatialFilterTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long");
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -106,11 +111,15 @@ public class SpatialFilterTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
)
).build(),
@ -236,7 +245,7 @@ public class SpatialFilterTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
IndexMaker.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
@ -247,11 +256,15 @@ public class SpatialFilterTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
)
).build(),
@ -261,13 +274,18 @@ public class SpatialFilterTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
)
).build(),
TestQueryRunners.pool
);
@ -275,13 +293,18 @@ public class SpatialFilterTest
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
.withDimensionsSpec(
new DimensionsSpec(
null,
null,
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
)
).build(),
TestQueryRunners.pool
);
@ -414,12 +437,12 @@ public class SpatialFilterTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
IndexMaker.persist(first, DATA_INTERVAL, firstFile);
IndexMaker.persist(second, DATA_INTERVAL, secondFile);
IndexMaker.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
IndexMaker.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
@ -433,13 +456,6 @@ public class SpatialFilterTest
}
}
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{
@ -454,7 +470,7 @@ public class SpatialFilterTest
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
Arrays.asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
@ -462,7 +478,7 @@ public class SpatialFilterTest
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -505,7 +521,7 @@ public class SpatialFilterTest
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
Arrays.asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
@ -513,7 +529,7 @@ public class SpatialFilterTest
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -522,7 +538,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-02T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -531,7 +547,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-03T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -540,7 +556,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-04T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
@ -549,7 +565,7 @@ public class SpatialFilterTest
.build()
)
),
new Result<TimeseriesResultValue>(
new Result<>(
new DateTime("2013-01-05T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()

View File

@ -100,8 +100,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
}
catch (Exception e) {
synchronized (lock) {
try {
stop();
try { stop();
}
catch (IOException e1) {
log.error(e1, "Exception when stopping InventoryManager that couldn't start.");

View File

@ -33,7 +33,7 @@ import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
@ -349,7 +349,7 @@ public class RealtimePlumber implements Plumber
indexes.add(queryableIndex);
}
mergedFile = IndexMerger.mergeQueryableIndex(
mergedFile = IndexMaker.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget
@ -714,7 +714,7 @@ public class RealtimePlumber implements Plumber
try {
int numRows = indexToPersist.getIndex().size();
File persistedFile = IndexMerger.persist(
File persistedFile = IndexMaker.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);

View File

@ -27,7 +27,6 @@ import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
@ -40,7 +39,6 @@ import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@ -187,7 +185,7 @@ public class Sink implements Iterable<FireHydrant>
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(schema.getGranularitySpec().getQueryGranularity())
.withSpatialDimensions(schema.getParser())
.withDimensionsSpec(schema.getParser())
.withMetrics(schema.getAggregators())
.build(),
new OffheapBufferPool(bufferSize)