Merge pull request #148 from metamx/refactor-indexing
Refactor indexing
|
@ -18,7 +18,8 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-client</artifactId>
|
||||
|
@ -28,7 +29,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.33-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -39,7 +40,7 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-index-common</artifactId>
|
||||
<artifactId>druid-indexing-common</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
@ -177,6 +178,10 @@
|
|||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>bytebuffer-collections</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -108,7 +108,7 @@ public class IndexingServiceClient
|
|||
throw new ISE("Cannot find instance of indexingService");
|
||||
}
|
||||
|
||||
return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort());
|
||||
return String.format("http://%s:%s/druid/indexer/v1", instance.getAddress(), instance.getPort());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.33-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Before Width: | Height: | Size: 239 KiB After Width: | Height: | Size: 239 KiB |
Before Width: | Height: | Size: 78 KiB After Width: | Height: | Size: 78 KiB |
Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 28 KiB |
Before Width: | Height: | Size: 66 KiB After Width: | Height: | Size: 66 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 95 KiB After Width: | Height: | Size: 95 KiB |
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.33-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-index-common</artifactId>
|
||||
<name>druid-index-common</name>
|
||||
<description>Druid Indexer</description>
|
||||
<artifactId>druid-indexing-common</artifactId>
|
||||
<name>druid-indexing-common</name>
|
||||
<description>Druid Indexing Common</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.33-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
|
@ -27,15 +27,13 @@ import com.metamx.druid.kv.GenericIndexed;
|
|||
*/
|
||||
public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
|
||||
{
|
||||
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
|
||||
|
||||
private final ImmutableRTree indexedTree;
|
||||
|
||||
public SpatialIndexColumnPartSupplier(
|
||||
ImmutableRTree indexedTree
|
||||
)
|
||||
{
|
||||
this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree;
|
||||
this.indexedTree = indexedTree;
|
||||
}
|
||||
|
||||
@Override
|
|
@ -692,25 +692,29 @@ public class IndexIO
|
|||
Map<String, Column> columns = Maps.newHashMap();
|
||||
|
||||
for (String dimension : index.getAvailableDimensions()) {
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(ValueType.STRING)
|
||||
.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(
|
||||
new DictionaryEncodedColumnSupplier(
|
||||
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
|
||||
)
|
||||
)
|
||||
.setBitmapIndex(
|
||||
new BitmapIndexColumnPartSupplier(
|
||||
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
|
||||
)
|
||||
);
|
||||
if (index.getSpatialIndexes().get(dimension) != null) {
|
||||
builder.setSpatialIndex(
|
||||
new SpatialIndexColumnPartSupplier(
|
||||
index.getSpatialIndexes().get(dimension)
|
||||
)
|
||||
);
|
||||
}
|
||||
columns.put(
|
||||
dimension.toLowerCase(),
|
||||
new ColumnBuilder()
|
||||
.setType(ValueType.STRING)
|
||||
.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(
|
||||
new DictionaryEncodedColumnSupplier(
|
||||
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
|
||||
)
|
||||
)
|
||||
.setBitmapIndex(
|
||||
new BitmapIndexColumnPartSupplier(
|
||||
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
|
||||
)
|
||||
).setSpatialIndex(
|
||||
new SpatialIndexColumnPartSupplier(
|
||||
index.getSpatialIndexes().get(dimension)
|
||||
)
|
||||
).build()
|
||||
builder.build()
|
||||
);
|
||||
}
|
||||
|
|
@ -705,6 +705,11 @@ public class IndexMerger
|
|||
final File invertedFile = new File(v8OutDir, "inverted.drd");
|
||||
Files.touch(invertedFile);
|
||||
out = Files.newOutputStreamSupplier(invertedFile, true);
|
||||
|
||||
final File geoFile = new File(v8OutDir, "spatial.drd");
|
||||
Files.touch(geoFile);
|
||||
OutputSupplier<FileOutputStream> spatialOut = Files.newOutputStreamSupplier(geoFile, true);
|
||||
|
||||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||
long dimStartTime = System.currentTimeMillis();
|
||||
String dimension = mergedDimensions.get(i);
|
||||
|
@ -723,6 +728,18 @@ public class IndexMerger
|
|||
);
|
||||
writer.open();
|
||||
|
||||
boolean isSpatialDim = "spatial".equals(descriptions.get(dimension));
|
||||
ByteBufferWriter<ImmutableRTree> spatialWriter = null;
|
||||
RTree tree = null;
|
||||
IOPeon spatialIoPeon = new TmpFileIOPeon();
|
||||
if (isSpatialDim) {
|
||||
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
|
||||
spatialIoPeon, dimension, IndexedRTree.objectStrategy
|
||||
);
|
||||
spatialWriter.open();
|
||||
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
|
||||
}
|
||||
|
||||
for (String dimVal : IndexedIterable.create(dimVals)) {
|
||||
progress.progress();
|
||||
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
|
||||
|
@ -745,6 +762,15 @@ public class IndexMerger
|
|||
}
|
||||
|
||||
writer.write(ImmutableConciseSet.newImmutableFromMutable(bitset));
|
||||
|
||||
if (isSpatialDim && dimVal != null) {
|
||||
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
|
||||
float[] coords = new float[stringCoords.size()];
|
||||
for (int j = 0; j < coords.length; j++) {
|
||||
coords[j] = Float.valueOf(stringCoords.get(j));
|
||||
}
|
||||
tree.insert(coords, bitset);
|
||||
}
|
||||
}
|
||||
writer.close();
|
||||
|
||||
|
@ -753,64 +779,16 @@ public class IndexMerger
|
|||
ioPeon.cleanup();
|
||||
|
||||
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
|
||||
}
|
||||
|
||||
/************ Create Geographical Indexes *************/
|
||||
// FIXME: Rewrite when indexing is updated
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.start();
|
||||
if (isSpatialDim) {
|
||||
spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
|
||||
spatialWriter.close();
|
||||
|
||||
final File geoFile = new File(v8OutDir, "spatial.drd");
|
||||
Files.touch(geoFile);
|
||||
out = Files.newOutputStreamSupplier(geoFile, true);
|
||||
|
||||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||
String dimension = mergedDimensions.get(i);
|
||||
|
||||
if (!"spatial".equals(descriptions.get(dimension))) {
|
||||
continue;
|
||||
serializerUtils.writeString(spatialOut, dimension);
|
||||
ByteStreams.copy(spatialWriter.combineStreams(), spatialOut);
|
||||
spatialIoPeon.cleanup();
|
||||
}
|
||||
|
||||
File dimOutFile = dimOuts.get(i).getFile();
|
||||
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
|
||||
|
||||
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
|
||||
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
|
||||
}
|
||||
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
|
||||
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
|
||||
|
||||
ByteBufferWriter<ImmutableRTree> writer = new ByteBufferWriter<ImmutableRTree>(
|
||||
ioPeon, dimension, IndexedRTree.objectStrategy
|
||||
);
|
||||
writer.open();
|
||||
|
||||
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
|
||||
|
||||
int count = 0;
|
||||
for (String dimVal : IndexedIterable.create(dimVals)) {
|
||||
progress.progress();
|
||||
if (dimVal == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
|
||||
float[] coords = new float[stringCoords.size()];
|
||||
for (int j = 0; j < coords.length; j++) {
|
||||
coords[j] = Float.valueOf(stringCoords.get(j));
|
||||
}
|
||||
tree.insert(coords, count);
|
||||
count++;
|
||||
}
|
||||
|
||||
writer.write(ImmutableRTree.newImmutableFromMutable(tree));
|
||||
writer.close();
|
||||
|
||||
serializerUtils.writeString(out, dimension);
|
||||
ByteStreams.copy(writer.combineStreams(), out);
|
||||
ioPeon.cleanup();
|
||||
|
||||
log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis());
|
||||
}
|
||||
|
||||
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|