diff --git a/client/pom.xml b/client/pom.xml
index 207458b37c1..1c1cf4b9a99 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.19-SNAPSHOT
+ 0.5.33-SNAPSHOT
@@ -68,6 +68,10 @@
commons-codec
commons-codec
+
+ commons-httpclient
+ commons-httpclient
+
org.skife.config
config-magic
diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java
index e86efdad816..a3acc3c00d8 100644
--- a/client/src/main/java/com/metamx/druid/client/DruidServer.java
+++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
*/
public class DruidServer implements Comparable
{
+ public static final String DEFAULT_TIER = "_default_tier";
private static final Logger log = new Logger(DruidServer.class);
private final Object lock = new Object();
diff --git a/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java b/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java
index 89028021c18..d66ef146db4 100644
--- a/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java
+++ b/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java
@@ -33,9 +33,10 @@ public abstract class DruidServerConfig
public abstract String getHost();
@Config("druid.server.maxSize")
+ @Default("0")
public abstract long getMaxSize();
@Config("druid.server.tier")
- @Default("_default_tier")
+ @Default(DruidServer.DEFAULT_TIER)
public abstract String getTier();
}
diff --git a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java
index 4b345dc5a29..781c4ed0c77 100644
--- a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java
+++ b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java
@@ -62,9 +62,7 @@ public class SingleServerInventoryView extends ServerInventoryView
curator,
exec,
jsonMapper,
- new TypeReference()
- {
- }
+ new TypeReference(){}
);
}
diff --git a/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java b/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java
index 2808ce4d7b8..fb7a3044ee4 100644
--- a/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java
+++ b/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java
@@ -21,8 +21,11 @@ package com.metamx.druid.curator.cache;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.utils.ThreadUtils;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
/**
*/
@@ -48,4 +51,43 @@ public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory
{
return new PathChildrenCache(curator, path, cacheData, compressed, exec);
}
+
+ public static class Builder
+ {
+ private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
+
+ private boolean cacheData;
+ private boolean compressed;
+ private ExecutorService exec;
+
+ public Builder()
+ {
+ cacheData = true;
+ compressed = false;
+ exec = Executors.newSingleThreadExecutor(defaultThreadFactory);
+ }
+
+ public Builder withCacheData(boolean cacheData)
+ {
+ this.cacheData = cacheData;
+ return this;
+ }
+
+ public Builder withCompressed(boolean compressed)
+ {
+ this.compressed = compressed;
+ return this;
+ }
+
+ public Builder withExecutorService(ExecutorService exec)
+ {
+ this.exec = exec;
+ return this;
+ }
+
+ public SimplePathChildrenCacheFactory build()
+ {
+ return new SimplePathChildrenCacheFactory(cacheData, compressed, exec);
+ }
+ }
}
diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
index 741bc59d3d9..be46aea41f6 100644
--- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
+++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java
@@ -33,7 +33,7 @@ public abstract class CuratorConfig
@Default("30000")
public abstract int getZkSessionTimeoutMs();
- @Config("druid.curator.compression.enable")
+ @Config("druid.curator.compress")
@Default("false")
public abstract boolean enableCompression();
}
diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
index 04776d6545a..7d0c20cd7ef 100644
--- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
+++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java
@@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath();
+
+ @Override
+ @Config("druid.curator.discovery.compress")
+ public boolean enableCompression()
+ {
+ return false;
+ }
}
diff --git a/common/pom.xml b/common/pom.xml
index d9d6249b5f9..fccf2c2eeb7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.19-SNAPSHOT
+ 0.5.33-SNAPSHOT
diff --git a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
index 00dc3e18a43..79d1b5dd96e 100644
--- a/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
+++ b/common/src/main/java/com/metamx/druid/partition/PartitionHolder.java
@@ -61,6 +61,11 @@ public class PartitionHolder implements Iterable>
holderSet.add(chunk);
}
+ public int size()
+ {
+ return holderSet.size();
+ }
+
public PartitionChunk remove(PartitionChunk chunk)
{
// Somewhat funky implementation in order to return the removed object as it exists in the set
diff --git a/examples/pom.xml b/examples/pom.xml
index 9669767f81e..968a63867bd 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -9,7 +9,7 @@
com.metamx
druid
- 0.5.19-SNAPSHOT
+ 0.5.33-SNAPSHOT
diff --git a/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java b/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java
index df00f598b40..a86d239806f 100644
--- a/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java
+++ b/examples/src/test/java/druid/examples/web/InputSupplierUpdateStreamTest.java
@@ -68,6 +68,7 @@ public class InputSupplierUpdateStreamTest
updateStream.start();
Map insertedRow = updateStream.pollFromQueue(waitTime, unit);
Assert.assertEquals(expectedAnswer, insertedRow);
+ updateStream.stop();
}
//If a timestamp is missing, we should throw away the event
@@ -85,6 +86,7 @@ public class InputSupplierUpdateStreamTest
);
updateStream.start();
Assert.assertEquals(updateStream.getQueueSize(), 0);
+ updateStream.stop();
}
//If any other value is missing, we should still add the event and process it properly
@@ -105,6 +107,7 @@ public class InputSupplierUpdateStreamTest
expectedAnswer.put("item1", "value1");
expectedAnswer.put("time", 1372121562);
Assert.assertEquals(expectedAnswer, insertedRow);
+ updateStream.stop();
}
diff --git a/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java b/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java
index e31b8a8e346..86ba0476c8e 100644
--- a/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java
+++ b/examples/src/test/java/druid/examples/web/RenamingKeysUpdateStreamTest.java
@@ -47,7 +47,7 @@ public class RenamingKeysUpdateStreamTest
}
@Test
- public void testPollFromQueue() throws Exception
+ public void testPolFromQueue() throws Exception
{
InputSupplierUpdateStream updateStream = new InputSupplierUpdateStream(testCaseSupplier, timeDimension);
Map renamedKeys = new HashMap();
@@ -61,8 +61,6 @@ public class RenamingKeysUpdateStreamTest
expectedAnswer.put("i1", "value1");
expectedAnswer.put("i2", 2);
expectedAnswer.put("t", 1372121562);
-
-
Assert.assertEquals(expectedAnswer, renamer.pollFromQueue(waitTime, unit));
}
@@ -88,7 +86,6 @@ public class RenamingKeysUpdateStreamTest
renamedKeys.put("item2", "i2");
RenamingKeysUpdateStream renamer = new RenamingKeysUpdateStream(updateStream, renamedKeys);
Assert.assertEquals("time", renamer.getTimeDimension());
-
}
}
diff --git a/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java b/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java
index c1bc5c4dc34..5bb72e2af17 100644
--- a/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java
+++ b/examples/src/test/java/druid/examples/web/WebJsonSupplierTest.java
@@ -21,11 +21,11 @@ package druid.examples.web;
import org.junit.Test;
-import java.net.UnknownHostException;
+import java.io.IOException;
public class WebJsonSupplierTest
{
- @Test(expected = UnknownHostException.class)
+ @Test(expected = IOException.class)
public void checkInvalidUrl() throws Exception
{
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index 5c50eac9b54..31dc874c4d2 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.19-SNAPSHOT
+ 0.5.33-SNAPSHOT
diff --git a/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java b/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java
index 82cee9e54dd..8e429d45879 100644
--- a/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java
+++ b/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java
@@ -23,6 +23,9 @@ import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval;
+import java.io.Closeable;
+import java.io.IOException;
+
/**
*/
public interface QueryableIndex extends ColumnSelector
@@ -31,4 +34,11 @@ public interface QueryableIndex extends ColumnSelector
public int getNumRows();
public Indexed getColumnNames();
public Indexed getAvailableDimensions();
+
+ /**
+ * The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
+ * @throws IOException
+ */
+ @Deprecated
+ public void close() throws IOException;
}
diff --git a/indexing-common/src/main/java/com/metamx/druid/index/SimpleQueryableIndex.java b/indexing-common/src/main/java/com/metamx/druid/index/SimpleQueryableIndex.java
index 2f60b73adc6..f9898d3a607 100644
--- a/indexing-common/src/main/java/com/metamx/druid/index/SimpleQueryableIndex.java
+++ b/indexing-common/src/main/java/com/metamx/druid/index/SimpleQueryableIndex.java
@@ -19,10 +19,12 @@
package com.metamx.druid.index;
+import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval;
+import java.io.IOException;
import java.util.Map;
/**
@@ -34,13 +36,15 @@ public class SimpleQueryableIndex implements QueryableIndex
private final Indexed availableDimensions;
private final Column timeColumn;
private final Map otherColumns;
+ private final SmooshedFileMapper fileMapper;
public SimpleQueryableIndex(
Interval dataInterval,
Indexed columnNames,
Indexed dimNames,
Column timeColumn,
- Map otherColumns
+ Map otherColumns,
+ SmooshedFileMapper fileMapper
)
{
this.dataInterval = dataInterval;
@@ -48,6 +52,7 @@ public class SimpleQueryableIndex implements QueryableIndex
this.availableDimensions = dimNames;
this.timeColumn = timeColumn;
this.otherColumns = otherColumns;
+ this.fileMapper = fileMapper;
}
@Override
@@ -85,4 +90,10 @@ public class SimpleQueryableIndex implements QueryableIndex
{
return otherColumns.get(columnName);
}
+
+ @Override
+ public void close() throws IOException
+ {
+ fileMapper.close();
+ }
}
diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
index 24f6750785f..07008e0a42b 100644
--- a/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
+++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java
@@ -375,7 +375,8 @@ public class IndexIO
dimValueLookups,
dimColumns,
invertedIndexed,
- spatialIndexed
+ spatialIndexed,
+ smooshedFiles
);
log.debug("Mapped v8 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
@@ -761,7 +762,8 @@ public class IndexIO
.setType(ValueType.LONG)
.setGenericColumn(new LongGenericColumnSupplier(index.timestamps))
.build(),
- columns
+ columns,
+ index.getFileMapper()
);
}
}
@@ -795,7 +797,7 @@ public class IndexIO
}
final QueryableIndex index = new SimpleQueryableIndex(
- dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns
+ dataInterval, cols, dims, deserializeColumn(mapper, smooshedFiles.mapFile("__time")), columns, smooshedFiles
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java
index fcdcef4eddc..d792d80de01 100644
--- a/indexing-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java
+++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java
@@ -24,18 +24,19 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
+import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.common.logger.Logger;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedList;
import com.metamx.druid.kv.IndexedLongs;
-import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
+import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.Arrays;
@@ -57,6 +58,7 @@ public class MMappedIndex
final Map dimColumns;
final Map> invertedIndexes;
final Map spatialIndexes;
+ final SmooshedFileMapper fileMapper;
private final Map metricIndexes = Maps.newHashMap();
@@ -69,7 +71,8 @@ public class MMappedIndex
Map> dimValueLookups,
Map dimColumns,
Map> invertedIndexes,
- Map spatialIndexes
+ Map spatialIndexes,
+ SmooshedFileMapper fileMapper
)
{
this.availableDimensions = availableDimensions;
@@ -81,6 +84,7 @@ public class MMappedIndex
this.dimColumns = dimColumns;
this.invertedIndexes = invertedIndexes;
this.spatialIndexes = spatialIndexes;
+ this.fileMapper = fileMapper;
for (int i = 0; i < availableMetrics.size(); i++) {
metricIndexes.put(availableMetrics.get(i), i);
@@ -169,6 +173,18 @@ public class MMappedIndex
return (retVal == null) ? emptySet : retVal;
}
+ public SmooshedFileMapper getFileMapper()
+ {
+ return fileMapper;
+ }
+
+ public void close() throws IOException
+ {
+ if (fileMapper != null) {
+ fileMapper.close();
+ }
+ }
+
public static MMappedIndex fromIndex(Index index)
{
log.info("Converting timestamps");
@@ -273,7 +289,8 @@ public class MMappedIndex
dimValueLookups,
dimColumns,
invertedIndexes,
- spatialIndexes
+ spatialIndexes,
+ null
);
}
}
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
index 82a3c947bc3..231cbd44102 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/InputRowParser.java
@@ -1,9 +1,10 @@
package com.metamx.druid.indexer.data;
+import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
public interface InputRowParser
{
- public InputRow parse(T input);
+ public InputRow parse(T input) throws FormattedException;
public void addDimensionExclusion(String dimension);
}
diff --git a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
index 60a97c131bf..b2d9586f272 100644
--- a/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
+++ b/indexing-common/src/main/java/com/metamx/druid/indexer/data/MapInputRowParser.java
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.metamx.common.exception.FormattedException;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
@@ -37,21 +38,30 @@ public class MapInputRowParser implements InputRowParser