Merge branch 'master' into query-prioritization
|
@ -18,7 +18,8 @@
|
|||
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-client</artifactId>
|
||||
|
@ -28,7 +29,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.31-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
@ -39,7 +40,7 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-index-common</artifactId>
|
||||
<artifactId>druid-indexing-common</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
@ -91,7 +92,6 @@
|
|||
<dependency>
|
||||
<groupId>org.mortbay.jetty</groupId>
|
||||
<artifactId>jetty-util</artifactId>
|
||||
<version>6.1.26</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
|
@ -178,6 +178,10 @@
|
|||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>bytebuffer-collections</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -145,11 +145,10 @@ public class BrokerServerView implements TimelineServerView
|
|||
|
||||
private QueryableDruidServer removeServer(DruidServer server)
|
||||
{
|
||||
QueryableDruidServer retVal = clients.remove(server.getName());
|
||||
for (DataSegment segment : server.getSegments().values()) {
|
||||
serverRemovedSegment(server, segment);
|
||||
}
|
||||
return retVal;
|
||||
return clients.remove(server.getName());
|
||||
}
|
||||
|
||||
private void serverAddedSegment(final DruidServer server, final DataSegment segment)
|
||||
|
|
|
@ -57,6 +57,7 @@ import com.metamx.druid.query.segment.MultipleSpecificSegmentSpec;
|
|||
import com.metamx.druid.query.segment.SegmentDescriptor;
|
||||
import com.metamx.druid.result.BySegmentResultValueClass;
|
||||
import com.metamx.druid.result.Result;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
|
@ -74,7 +75,7 @@ import java.util.concurrent.Executors;
|
|||
*/
|
||||
public class CachingClusteredClient<T> implements QueryRunner<T>
|
||||
{
|
||||
private static final Logger log = new Logger(CachingClusteredClient.class);
|
||||
private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
|
||||
|
||||
private final QueryToolChestWarehouse warehouse;
|
||||
private final TimelineServerView serverView;
|
||||
|
@ -307,7 +308,8 @@ public class CachingClusteredClient<T> implements QueryRunner<T>
|
|||
|
||||
final QueryRunner clientQueryable = serverView.getQueryRunner(server);
|
||||
if (clientQueryable == null) {
|
||||
throw new ISE("WTF!? server[%s] doesn't have a client Queryable?", server);
|
||||
log.makeAlert("WTF!? server[%s] doesn't have a client Queryable?", server).emit();
|
||||
continue;
|
||||
}
|
||||
|
||||
final Sequence<T> resultSeqToAdd;
|
||||
|
|
|
@ -108,7 +108,7 @@ public class IndexingServiceClient
|
|||
throw new ISE("Cannot find instance of indexingService");
|
||||
}
|
||||
|
||||
return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort());
|
||||
return String.format("http://%s:%s/druid/indexer/v1", instance.getAddress(), instance.getPort());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.curator.framework.CuratorFramework;
|
|||
import org.apache.curator.x.discovery.ServiceDiscovery;
|
||||
import org.mortbay.jetty.servlet.Context;
|
||||
import org.mortbay.jetty.servlet.ServletHolder;
|
||||
import org.mortbay.servlet.GzipFilter;
|
||||
import org.skife.config.ConfigurationObjectFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -214,6 +215,7 @@ public class BrokerNode extends QueryableNode<BrokerNode>
|
|||
new ServletHolder(new QueryServlet(getJsonMapper(), getSmileMapper(), texasRanger, getEmitter(), getRequestLogger())),
|
||||
"/druid/v2/*"
|
||||
);
|
||||
root.addFilter(GzipFilter.class, "/*", 0);
|
||||
|
||||
root.addEventListener(new GuiceServletConfig(injector));
|
||||
root.addFilter(GuiceFilter.class, "/druid/v2/datasources/*", 0);
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.31-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Before Width: | Height: | Size: 239 KiB After Width: | Height: | Size: 239 KiB |
Before Width: | Height: | Size: 78 KiB After Width: | Height: | Size: 78 KiB |
Before Width: | Height: | Size: 28 KiB After Width: | Height: | Size: 28 KiB |
Before Width: | Height: | Size: 66 KiB After Width: | Height: | Size: 66 KiB |
Before Width: | Height: | Size: 35 KiB After Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 95 KiB After Width: | Height: | Size: 95 KiB |
|
@ -9,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.31-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-index-common</artifactId>
|
||||
<name>druid-index-common</name>
|
||||
<description>Druid Indexer</description>
|
||||
<artifactId>druid-indexing-common</artifactId>
|
||||
<name>druid-indexing-common</name>
|
||||
<description>Druid Indexing Common</description>
|
||||
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.4.31-SNAPSHOT</version>
|
||||
<version>0.5.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
|
@ -27,15 +27,13 @@ import com.metamx.druid.kv.GenericIndexed;
|
|||
*/
|
||||
public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
|
||||
{
|
||||
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
|
||||
|
||||
private final ImmutableRTree indexedTree;
|
||||
|
||||
public SpatialIndexColumnPartSupplier(
|
||||
ImmutableRTree indexedTree
|
||||
)
|
||||
{
|
||||
this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree;
|
||||
this.indexedTree = indexedTree;
|
||||
}
|
||||
|
||||
@Override
|
|
@ -692,25 +692,29 @@ public class IndexIO
|
|||
Map<String, Column> columns = Maps.newHashMap();
|
||||
|
||||
for (String dimension : index.getAvailableDimensions()) {
|
||||
ColumnBuilder builder = new ColumnBuilder()
|
||||
.setType(ValueType.STRING)
|
||||
.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(
|
||||
new DictionaryEncodedColumnSupplier(
|
||||
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
|
||||
)
|
||||
)
|
||||
.setBitmapIndex(
|
||||
new BitmapIndexColumnPartSupplier(
|
||||
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
|
||||
)
|
||||
);
|
||||
if (index.getSpatialIndexes().get(dimension) != null) {
|
||||
builder.setSpatialIndex(
|
||||
new SpatialIndexColumnPartSupplier(
|
||||
index.getSpatialIndexes().get(dimension)
|
||||
)
|
||||
);
|
||||
}
|
||||
columns.put(
|
||||
dimension.toLowerCase(),
|
||||
new ColumnBuilder()
|
||||
.setType(ValueType.STRING)
|
||||
.setHasMultipleValues(true)
|
||||
.setDictionaryEncodedColumn(
|
||||
new DictionaryEncodedColumnSupplier(
|
||||
index.getDimValueLookup(dimension), null, (index.getDimColumn(dimension))
|
||||
)
|
||||
)
|
||||
.setBitmapIndex(
|
||||
new BitmapIndexColumnPartSupplier(
|
||||
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
|
||||
)
|
||||
).setSpatialIndex(
|
||||
new SpatialIndexColumnPartSupplier(
|
||||
index.getSpatialIndexes().get(dimension)
|
||||
)
|
||||
).build()
|
||||
builder.build()
|
||||
);
|
||||
}
|
||||
|
|
@ -705,6 +705,11 @@ public class IndexMerger
|
|||
final File invertedFile = new File(v8OutDir, "inverted.drd");
|
||||
Files.touch(invertedFile);
|
||||
out = Files.newOutputStreamSupplier(invertedFile, true);
|
||||
|
||||
final File geoFile = new File(v8OutDir, "spatial.drd");
|
||||
Files.touch(geoFile);
|
||||
OutputSupplier<FileOutputStream> spatialOut = Files.newOutputStreamSupplier(geoFile, true);
|
||||
|
||||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||
long dimStartTime = System.currentTimeMillis();
|
||||
String dimension = mergedDimensions.get(i);
|
||||
|
@ -723,6 +728,18 @@ public class IndexMerger
|
|||
);
|
||||
writer.open();
|
||||
|
||||
boolean isSpatialDim = "spatial".equals(descriptions.get(dimension));
|
||||
ByteBufferWriter<ImmutableRTree> spatialWriter = null;
|
||||
RTree tree = null;
|
||||
IOPeon spatialIoPeon = new TmpFileIOPeon();
|
||||
if (isSpatialDim) {
|
||||
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
|
||||
spatialIoPeon, dimension, IndexedRTree.objectStrategy
|
||||
);
|
||||
spatialWriter.open();
|
||||
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
|
||||
}
|
||||
|
||||
for (String dimVal : IndexedIterable.create(dimVals)) {
|
||||
progress.progress();
|
||||
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
|
||||
|
@ -745,6 +762,15 @@ public class IndexMerger
|
|||
}
|
||||
|
||||
writer.write(ImmutableConciseSet.newImmutableFromMutable(bitset));
|
||||
|
||||
if (isSpatialDim && dimVal != null) {
|
||||
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
|
||||
float[] coords = new float[stringCoords.size()];
|
||||
for (int j = 0; j < coords.length; j++) {
|
||||
coords[j] = Float.valueOf(stringCoords.get(j));
|
||||
}
|
||||
tree.insert(coords, bitset);
|
||||
}
|
||||
}
|
||||
writer.close();
|
||||
|
||||
|
@ -753,64 +779,16 @@ public class IndexMerger
|
|||
ioPeon.cleanup();
|
||||
|
||||
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
|
||||
}
|
||||
|
||||
/************ Create Geographical Indexes *************/
|
||||
// FIXME: Rewrite when indexing is updated
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.start();
|
||||
if (isSpatialDim) {
|
||||
spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
|
||||
spatialWriter.close();
|
||||
|
||||
final File geoFile = new File(v8OutDir, "spatial.drd");
|
||||
Files.touch(geoFile);
|
||||
out = Files.newOutputStreamSupplier(geoFile, true);
|
||||
|
||||
for (int i = 0; i < mergedDimensions.size(); ++i) {
|
||||
String dimension = mergedDimensions.get(i);
|
||||
|
||||
if (!"spatial".equals(descriptions.get(dimension))) {
|
||||
continue;
|
||||
serializerUtils.writeString(spatialOut, dimension);
|
||||
ByteStreams.copy(spatialWriter.combineStreams(), spatialOut);
|
||||
spatialIoPeon.cleanup();
|
||||
}
|
||||
|
||||
File dimOutFile = dimOuts.get(i).getFile();
|
||||
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
|
||||
|
||||
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
|
||||
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
|
||||
}
|
||||
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
|
||||
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
|
||||
|
||||
ByteBufferWriter<ImmutableRTree> writer = new ByteBufferWriter<ImmutableRTree>(
|
||||
ioPeon, dimension, IndexedRTree.objectStrategy
|
||||
);
|
||||
writer.open();
|
||||
|
||||
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
|
||||
|
||||
int count = 0;
|
||||
for (String dimVal : IndexedIterable.create(dimVals)) {
|
||||
progress.progress();
|
||||
if (dimVal == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
|
||||
float[] coords = new float[stringCoords.size()];
|
||||
for (int j = 0; j < coords.length; j++) {
|
||||
coords[j] = Float.valueOf(stringCoords.get(j));
|
||||
}
|
||||
tree.insert(coords, count);
|
||||
count++;
|
||||
}
|
||||
|
||||
writer.write(ImmutableRTree.newImmutableFromMutable(tree));
|
||||
writer.close();
|
||||
|
||||
serializerUtils.writeString(out, dimension);
|
||||
ByteStreams.copy(writer.combineStreams(), out);
|
||||
ioPeon.cleanup();
|
||||
|
||||
log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis());
|
||||
}
|
||||
|
||||
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|