diff --git a/.gitignore b/.gitignore
index 31985c78113..845a2ddc007 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,4 @@ target
examples/rand/RealtimeNode.out
examples/twitter/RealtimeNode.out
*.log
+*.DS_Store
diff --git a/build.sh b/build.sh
index fe534e319f7..158a0cd6153 100755
--- a/build.sh
+++ b/build.sh
@@ -10,8 +10,6 @@ SCRIPT_DIR=`pwd`
popd
VERSION=`cat pom.xml | grep version | head -4 | tail -1 | sed 's_.*\([^<]*\).*_\1_'`
-#TAR_FILE=${SCRIPT_DIR}/${PROJECT}-${VERSION}.tar.gz
-#rm -f ${TAR_FILE}
echo Using Version[${VERSION}]
diff --git a/client/pom.xml b/client/pom.xml
index 14632df35df..daf8fbd96b2 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -18,8 +18,7 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
-
+
4.0.0
com.metamx.druid
druid-client
@@ -29,7 +28,7 @@
com.metamx
druid
- 0.5.0-SNAPSHOT
+ 0.5.23-SNAPSHOT
@@ -69,6 +68,10 @@
commons-codec
commons-codec
+
+ commons-httpclient
+ commons-httpclient
+
org.skife.config
config-magic
@@ -186,6 +189,10 @@
com.metamx
bytebuffer-collections
+
+ net.jpountz.lz4
+ lz4
+
diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java
index 5e119c33fd4..0b8f4b6e5f0 100644
--- a/client/src/main/java/com/metamx/druid/QueryableNode.java
+++ b/client/src/main/java/com/metamx/druid/QueryableNode.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@@ -32,21 +34,30 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
+import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.ServerView;
+import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.concurrent.Execs;
-import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
+import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
+import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.CuratorConfig;
+import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
+import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.http.log.RequestLogger;
+import com.metamx.druid.http.NoopRequestLogger;
+import com.metamx.druid.http.RequestLogger;
+import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
+import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
@@ -356,12 +367,32 @@ public abstract class QueryableNode extends Registering
private void initializeServerInventoryView()
{
if (serverInventoryView == null) {
- serverInventoryView = new ServerInventoryView(
- getConfigFactory().build(ServerInventoryViewConfig.class),
- getZkPaths(),
- getCuratorFramework(),
- getJsonMapper()
+ final ExecutorService exec = Executors.newFixedThreadPool(
+ 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build()
);
+
+ final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class);
+ final String announcerType = serverInventoryViewConfig.getAnnouncerType();
+
+ if ("legacy".equalsIgnoreCase(announcerType)) {
+ serverInventoryView = new SingleServerInventoryView(
+ serverInventoryViewConfig,
+ getZkPaths(),
+ getCuratorFramework(),
+ exec,
+ getJsonMapper()
+ );
+ } else if ("batch".equalsIgnoreCase(announcerType)) {
+ serverInventoryView = new BatchServerInventoryView(
+ serverInventoryViewConfig,
+ getZkPaths(),
+ getCuratorFramework(),
+ exec,
+ getJsonMapper()
+ );
+ } else {
+ throw new IAE("Unknown type %s", announcerType);
+ }
lifecycle.addManagedInstance(serverInventoryView);
}
}
@@ -371,13 +402,23 @@ public abstract class QueryableNode extends Registering
if (requestLogger == null) {
try {
final String loggingType = props.getProperty("druid.request.logging.type");
- if("emitter".equals(loggingType)) {
- setRequestLogger(Initialization.makeEmittingRequestLogger(getProps(), getEmitter()));
- }
- else {
+ if ("emitter".equals(loggingType)) {
setRequestLogger(
- Initialization.makeFileRequestLogger(getJsonMapper(), getScheduledExecutorFactory(), getProps())
+ Initialization.makeEmittingRequestLogger(
+ getProps(),
+ getEmitter()
+ )
);
+ } else if ("file".equalsIgnoreCase(loggingType)) {
+ setRequestLogger(
+ Initialization.makeFileRequestLogger(
+ getJsonMapper(),
+ getScheduledExecutorFactory(),
+ getProps()
+ )
+ );
+ } else {
+ setRequestLogger(new NoopRequestLogger());
}
}
catch (IOException e) {
@@ -419,7 +460,40 @@ public abstract class QueryableNode extends Registering
final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s"));
lifecycle.addManagedInstance(announcer);
- setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()));
+ final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class);
+ final String announcerType = config.getAnnouncerType();
+
+ final DataSegmentAnnouncer dataSegmentAnnouncer;
+ if ("batch".equalsIgnoreCase(announcerType)) {
+ dataSegmentAnnouncer = new BatchDataSegmentAnnouncer(
+ getDruidServerMetadata(),
+ config,
+ announcer,
+ getJsonMapper()
+ );
+ } else if ("legacy".equalsIgnoreCase(announcerType)) {
+ dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
+ Arrays.asList(
+ new BatchDataSegmentAnnouncer(
+ getDruidServerMetadata(),
+ config,
+ announcer,
+ getJsonMapper()
+ ),
+ new SingleDataSegmentAnnouncer(
+ getDruidServerMetadata(),
+ getZkPaths(),
+ announcer,
+ getJsonMapper()
+ )
+ )
+ );
+ } else {
+ throw new ISE("Unknown announcer type [%s]", announcerType);
+ }
+
+ setAnnouncer(dataSegmentAnnouncer);
+
lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST);
}
}
diff --git a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java
new file mode 100644
index 00000000000..58497e293db
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java
@@ -0,0 +1,129 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
+import com.metamx.common.ISE;
+import com.metamx.druid.curator.inventory.InventoryManagerConfig;
+import com.metamx.druid.initialization.ZkPathsConfig;
+import com.metamx.emitter.EmittingLogger;
+import org.apache.curator.framework.CuratorFramework;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+/**
+ */
+public class BatchServerInventoryView extends ServerInventoryView>
+{
+ private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
+
+ final ConcurrentMap> zNodes = new MapMaker().makeMap();
+
+ public BatchServerInventoryView(
+ final ServerInventoryViewConfig config,
+ final ZkPathsConfig zkPaths,
+ final CuratorFramework curator,
+ final ExecutorService exec,
+ final ObjectMapper jsonMapper
+ )
+ {
+ super(
+ config,
+ log,
+ new InventoryManagerConfig()
+ {
+ @Override
+ public String getContainerPath()
+ {
+ return zkPaths.getAnnouncementsPath();
+ }
+
+ @Override
+ public String getInventoryPath()
+ {
+ return zkPaths.getLiveSegmentsPath();
+ }
+ },
+ curator,
+ exec,
+ jsonMapper,
+ new TypeReference>()
+ {
+ }
+ );
+ }
+
+ @Override
+ protected DruidServer addInnerInventory(
+ final DruidServer container,
+ String inventoryKey,
+ final Set inventory
+ )
+ {
+ zNodes.put(inventoryKey, inventory);
+ for (DataSegment segment : inventory) {
+ addSingleInventory(container, segment);
+ }
+ return container;
+ }
+
+ @Override
+ protected DruidServer updateInnerInventory(
+ DruidServer container, String inventoryKey, Set inventory
+ )
+ {
+ Set existing = zNodes.get(inventoryKey);
+ if (existing == null) {
+ throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
+ }
+
+ for (DataSegment segment : Sets.difference(inventory, existing)) {
+ addSingleInventory(container, segment);
+ }
+ for (DataSegment segment : Sets.difference(existing, inventory)) {
+ removeSingleInventory(container, segment.getIdentifier());
+ }
+ zNodes.put(inventoryKey, inventory);
+
+ return container;
+ }
+
+ @Override
+ protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey)
+ {
+ log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey);
+ Set segments = zNodes.remove(inventoryKey);
+
+ if (segments == null) {
+ log.warn("Told to remove container[%s], which didn't exist", inventoryKey);
+ return container;
+ }
+
+ for (DataSegment segment : segments) {
+ removeSingleInventory(container, segment.getIdentifier());
+ }
+ return container;
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
index 44c35d78b13..72519cbeb8c 100644
--- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
+++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java
@@ -49,6 +49,7 @@ import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn;
+import com.metamx.druid.query.Queries;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.QueryToolChestWarehouse;
@@ -125,12 +126,18 @@ public class CachingClusteredClient implements QueryRunner
&& strategy != null;
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
- final Query rewrittenQuery;
+
+ ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder();
+
+ final String priority = query.getContextValue("priority", "0");
+ contextBuilder.put("priority", priority);
+
if (populateCache) {
- rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("bySegment", "true", "intermediate", "true"));
- } else {
- rewrittenQuery = query.withOverriddenContext(ImmutableMap.of("intermediate", "true"));
+ contextBuilder.put("bySegment", "true");
}
+ contextBuilder.put("intermediate", "true");
+
+ final Query rewrittenQuery = query.withOverriddenContext(contextBuilder.build());
VersionedIntervalTimeline timeline = serverView.getTimeline(query.getDataSource());
if (timeline == null) {
diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java
index d93003c9ca3..0f10a2d8e8b 100644
--- a/client/src/main/java/com/metamx/druid/client/DruidServer.java
+++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java
@@ -26,6 +26,7 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.initialization.DruidNode;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -118,7 +119,8 @@ public class DruidServer implements Comparable
@JsonProperty
public Map getSegments()
{
- return ImmutableMap.copyOf(segments);
+ // Copying the map slows things down a lot here, don't use Immutable Map here
+ return Collections.unmodifiableMap(segments);
}
public DataSegment getSegment(String segmentName)
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
index dc4d0f79460..ef2dfb19257 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java
@@ -20,6 +20,7 @@
package com.metamx.druid.client;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
@@ -28,6 +29,7 @@ import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.concurrent.Execs;
+import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
@@ -45,28 +47,32 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
*/
-@ManageLifecycle
-public class ServerInventoryView implements ServerView, InventoryView
+public abstract class ServerInventoryView implements ServerView, InventoryView
{
- private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class);
-
- private final CuratorInventoryManager inventoryManager;
+ private final ServerInventoryViewConfig config;
+ private final Logger log;
+ private final CuratorInventoryManager inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ConcurrentMap serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap();
- private static final Map removedSegments = new MapMaker().makeMap();
+ private final Map removedSegments = new MapMaker().makeMap();
@Inject
public ServerInventoryView(
final ServerInventoryViewConfig config,
- final ZkPathsConfig zkPaths,
+ final Logger log,
+ final InventoryManagerConfig inventoryManagerConfig,
final CuratorFramework curator,
- final ObjectMapper jsonMapper
+ final ExecutorService exec,
+ final ObjectMapper jsonMapper,
+ final TypeReference typeReference
)
{
- inventoryManager = new CuratorInventoryManager(
+ this.config = config;
+ this.log = log;
+ this.inventoryManager = new CuratorInventoryManager(
curator,
new InventoryManagerConfig()
{
@@ -108,10 +114,10 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
- public DataSegment deserializeInventory(byte[] bytes)
+ public InventoryType deserializeInventory(byte[] bytes)
{
try {
- return jsonMapper.readValue(bytes, DataSegment.class);
+ return jsonMapper.readValue(bytes, typeReference);
}
catch (IOException e) {
throw Throwables.propagate(e);
@@ -119,7 +125,7 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
- public byte[] serializeInventory(DataSegment inventory)
+ public byte[] serializeInventory(InventoryType inventory)
{
try {
return jsonMapper.writeValueAsBytes(inventory);
@@ -149,67 +155,27 @@ public class ServerInventoryView implements ServerView, InventoryView
}
@Override
- public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory)
+ public DruidServer addInventory(
+ final DruidServer container,
+ String inventoryKey,
+ final InventoryType inventory
+ )
{
- log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey);
+ return addInnerInventory(container, inventoryKey, inventory);
+ }
- if (container.getSegment(inventoryKey) != null) {
- log.warn(
- "Not adding or running callbacks for existing segment[%s] on server[%s]",
- inventoryKey,
- container.getName()
- );
-
- return container;
- }
-
- final DruidServer retVal = container.addDataSegment(inventoryKey, inventory);
-
- runSegmentCallbacks(
- new Function()
- {
- @Override
- public CallbackAction apply(SegmentCallback input)
- {
- return input.segmentAdded(container, inventory);
- }
- }
- );
-
- return retVal;
+ @Override
+ public DruidServer updateInventory(
+ DruidServer container, String inventoryKey, InventoryType inventory
+ )
+ {
+ return updateInnerInventory(container, inventoryKey, inventory);
}
@Override
public DruidServer removeInventory(final DruidServer container, String inventoryKey)
{
- log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
- final DataSegment segment = container.getSegment(inventoryKey);
-
- if (segment == null) {
- log.warn(
- "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
- inventoryKey,
- container.getName()
- );
-
- return container;
- }
-
- final DruidServer retVal = container.removeDataSegment(inventoryKey);
-
- runSegmentCallbacks(
- new Function()
- {
- @Override
- public CallbackAction apply(SegmentCallback input)
- {
- return input.segmentRemoved(container, segment);
- }
- }
- );
-
- removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
- return retVal;
+ return removeInnerInventory(container, inventoryKey);
}
}
);
@@ -285,7 +251,12 @@ public class ServerInventoryView implements ServerView, InventoryView
segmentCallbacks.put(callback, exec);
}
- private void runSegmentCallbacks(
+ public InventoryManagerConfig getInventoryManagerConfig()
+ {
+ return inventoryManager.getConfig();
+ }
+
+ protected void runSegmentCallbacks(
final Function fn
)
{
@@ -305,7 +276,7 @@ public class ServerInventoryView implements ServerView, InventoryView
}
}
- private void runServerCallbacks(final DruidServer server)
+ protected void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
@@ -322,4 +293,83 @@ public class ServerInventoryView implements ServerView, InventoryView
);
}
}
+
+ protected void addSingleInventory(
+ final DruidServer container,
+ final DataSegment inventory
+ )
+ {
+ log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
+
+ if (container.getSegment(inventory.getIdentifier()) != null) {
+ log.warn(
+ "Not adding or running callbacks for existing segment[%s] on server[%s]",
+ inventory.getIdentifier(),
+ container.getName()
+ );
+
+ return;
+ }
+
+ container.addDataSegment(inventory.getIdentifier(), inventory);
+
+ runSegmentCallbacks(
+ new Function()
+ {
+ @Override
+ public CallbackAction apply(SegmentCallback input)
+ {
+ return input.segmentAdded(container, inventory);
+ }
+ }
+ );
+ }
+
+ protected void removeSingleInventory(final DruidServer container, String inventoryKey)
+ {
+ log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
+ final DataSegment segment = container.getSegment(inventoryKey);
+
+ if (segment == null) {
+ log.warn(
+ "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]",
+ inventoryKey,
+ container.getName()
+ );
+
+ return;
+ }
+
+ container.removeDataSegment(inventoryKey);
+
+ runSegmentCallbacks(
+ new Function()
+ {
+ @Override
+ public CallbackAction apply(SegmentCallback input)
+ {
+ return input.segmentRemoved(container, segment);
+ }
+ }
+ );
+
+ removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
+ }
+
+ protected abstract DruidServer addInnerInventory(
+ final DruidServer container,
+ String inventoryKey,
+ final InventoryType inventory
+ );
+
+ protected abstract DruidServer updateInnerInventory(
+ final DruidServer container,
+ String inventoryKey,
+ final InventoryType inventory
+ );
+
+ protected abstract DruidServer removeInnerInventory(
+ final DruidServer container,
+ String inventoryKey
+ );
}
diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java
index 68de158cbf0..6130a96a66c 100644
--- a/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java
+++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java
@@ -29,4 +29,8 @@ public abstract class ServerInventoryViewConfig
@Config("druid.master.removedSegmentLifetime")
@Default("1")
public abstract int getRemovedSegmentLifetime();
+
+ @Config("druid.announcer.type")
+ @Default("legacy")
+ public abstract String getAnnouncerType();
}
\ No newline at end of file
diff --git a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java
new file mode 100644
index 00000000000..4b345dc5a29
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java
@@ -0,0 +1,94 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.metamx.druid.curator.inventory.InventoryManagerConfig;
+import com.metamx.druid.initialization.ZkPathsConfig;
+import com.metamx.emitter.EmittingLogger;
+import org.apache.curator.framework.CuratorFramework;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ */
+public class SingleServerInventoryView extends ServerInventoryView
+{
+ private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
+
+ public SingleServerInventoryView(
+ final ServerInventoryViewConfig config,
+ final ZkPathsConfig zkPaths,
+ final CuratorFramework curator,
+ final ExecutorService exec,
+ final ObjectMapper jsonMapper
+ )
+ {
+ super(
+ config,
+ log,
+ new InventoryManagerConfig()
+ {
+ @Override
+ public String getContainerPath()
+ {
+ return zkPaths.getAnnouncementsPath();
+ }
+
+ @Override
+ public String getInventoryPath()
+ {
+ return zkPaths.getServedSegmentsPath();
+ }
+ },
+ curator,
+ exec,
+ jsonMapper,
+ new TypeReference()
+ {
+ }
+ );
+ }
+
+ @Override
+ protected DruidServer addInnerInventory(
+ DruidServer container, String inventoryKey, DataSegment inventory
+ )
+ {
+ addSingleInventory(container, inventory);
+ return container;
+ }
+
+ @Override
+ protected DruidServer updateInnerInventory(
+ DruidServer container, String inventoryKey, DataSegment inventory
+ )
+ {
+ return addInnerInventory(container, inventoryKey, inventory);
+ }
+
+ @Override
+ protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey)
+ {
+ removeSingleInventory(container, inventoryKey);
+ return container;
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java
new file mode 100644
index 00000000000..4728430b4e7
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/client/cache/LZ4Transcoder.java
@@ -0,0 +1,81 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.client.cache;
+
+import com.google.common.primitives.Ints;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Decompressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.spy.memcached.transcoders.SerializingTranscoder;
+
+import java.nio.ByteBuffer;
+
+public class LZ4Transcoder extends SerializingTranscoder
+{
+
+ private final LZ4Factory lz4Factory;
+
+ public LZ4Transcoder()
+ {
+ super();
+ lz4Factory = LZ4Factory.fastestJavaInstance();
+ }
+
+ public LZ4Transcoder(int max)
+ {
+ super(max);
+ lz4Factory = LZ4Factory.fastestJavaInstance();
+ }
+
+ @Override
+ protected byte[] compress(byte[] in)
+ {
+ if (in == null) {
+ throw new NullPointerException("Can't compress null");
+ }
+
+ LZ4Compressor compressor = lz4Factory.fastCompressor();
+
+ byte[] out = new byte[compressor.maxCompressedLength(in.length)];
+ int compressedLength = compressor.compress(in, 0, in.length, out, 0);
+
+ getLogger().debug("Compressed %d bytes to %d", in.length, compressedLength);
+
+ return ByteBuffer.allocate(Ints.BYTES + compressedLength)
+ .putInt(in.length)
+ .put(out, 0, compressedLength)
+ .array();
+ }
+
+ @Override
+ protected byte[] decompress(byte[] in)
+ {
+ byte[] out = null;
+ if(in != null) {
+ LZ4Decompressor decompressor = lz4Factory.decompressor();
+
+ int size = ByteBuffer.wrap(in).getInt();
+
+ out = new byte[size];
+ decompressor.decompress(in, Ints.BYTES, out, 0, out.length);
+ }
+ return out == null ? null : out;
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java
index 9dca20d9fb8..fb6fa72ce46 100644
--- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java
+++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java
@@ -53,9 +53,10 @@ public class MemcachedCache implements Cache
public static MemcachedCache create(final MemcachedCacheConfig config)
{
try {
- SerializingTranscoder transcoder = new SerializingTranscoder(config.getMaxObjectSize());
- // disable compression
- transcoder.setCompressionThreshold(Integer.MAX_VALUE);
+ LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize());
+
+ // always use compression
+ transcoder.setCompressionThreshold(0);
return new MemcachedCache(
new MemcachedClient(
diff --git a/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java
new file mode 100644
index 00000000000..1bb0b8114ee
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/coordination/AbstractDataSegmentAnnouncer.java
@@ -0,0 +1,100 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.coordination;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import com.metamx.common.lifecycle.LifecycleStart;
+import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.curator.announcement.Announcer;
+import com.metamx.druid.initialization.ZkPathsConfig;
+import org.apache.curator.utils.ZKPaths;
+
+/**
+ */
+public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer
+{
+ private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class);
+
+ private final DruidServerMetadata server;
+ private final ZkPathsConfig config;
+ private final Announcer announcer;
+ private final ObjectMapper jsonMapper;
+
+ private final Object lock = new Object();
+
+ private volatile boolean started = false;
+
+ protected AbstractDataSegmentAnnouncer(
+ DruidServerMetadata server,
+ ZkPathsConfig config,
+ Announcer announcer,
+ ObjectMapper jsonMapper
+ )
+ {
+ this.server = server;
+ this.config = config;
+ this.announcer = announcer;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ synchronized (lock) {
+ if (started) {
+ return;
+ }
+
+ try {
+ final String path = makeAnnouncementPath();
+ log.info("Announcing self[%s] at [%s]", server, path);
+ announcer.announce(path, jsonMapper.writeValueAsBytes(server));
+ }
+ catch (JsonProcessingException e) {
+ throw Throwables.propagate(e);
+ }
+
+ started = true;
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ synchronized (lock) {
+ if (!started) {
+ return;
+ }
+
+ log.info("Stopping %s with config[%s]", getClass(), config);
+ announcer.unannounce(makeAnnouncementPath());
+
+ started = false;
+ }
+ }
+
+ private String makeAnnouncementPath()
+ {
+ return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
new file mode 100644
index 00000000000..5911ec2a642
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java
@@ -0,0 +1,290 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.coordination;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.metamx.common.ISE;
+import com.metamx.common.logger.Logger;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.curator.announcement.Announcer;
+import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
+import org.apache.curator.utils.ZKPaths;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
+{
+ private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
+
+ private final ZkDataSegmentAnnouncerConfig config;
+ private final Announcer announcer;
+ private final ObjectMapper jsonMapper;
+ private final String liveSegmentLocation;
+
+ private final Set availableZNodes = Sets.newHashSet();
+ private final Map segmentLookup = Maps.newHashMap();
+
+ public BatchDataSegmentAnnouncer(
+ DruidServerMetadata server,
+ ZkDataSegmentAnnouncerConfig config,
+ Announcer announcer,
+ ObjectMapper jsonMapper
+ )
+ {
+ super(server, config, announcer, jsonMapper);
+
+ this.config = config;
+ this.announcer = announcer;
+ this.jsonMapper = jsonMapper;
+ this.liveSegmentLocation = ZKPaths.makePath(config.getLiveSegmentsPath(), server.getName());
+ }
+
+ @Override
+ public void announceSegment(DataSegment segment) throws IOException
+ {
+ int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
+ if (newBytesLen > config.getMaxNumBytes()) {
+ throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
+ }
+
+ // create new batch
+ if (availableZNodes.isEmpty()) {
+ SegmentZNode availableZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
+ availableZNode.addSegment(segment);
+
+ log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+ announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
+ segmentLookup.put(segment, availableZNode);
+ availableZNodes.add(availableZNode);
+ } else { // update existing batch
+ Iterator iter = availableZNodes.iterator();
+ boolean done = false;
+ while (iter.hasNext() && !done) {
+ SegmentZNode availableZNode = iter.next();
+ if (availableZNode.getBytes().length + newBytesLen < config.getMaxNumBytes()) {
+ availableZNode.addSegment(segment);
+
+ log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), availableZNode.getPath());
+ announcer.update(availableZNode.getPath(), availableZNode.getBytes());
+ segmentLookup.put(segment, availableZNode);
+
+ if (availableZNode.getCount() >= config.getSegmentsPerNode()) {
+ availableZNodes.remove(availableZNode);
+ }
+
+ done = true;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unannounceSegment(DataSegment segment) throws IOException
+ {
+ final SegmentZNode segmentZNode = segmentLookup.remove(segment);
+ segmentZNode.removeSegment(segment);
+
+ log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
+ if (segmentZNode.getCount() == 0) {
+ availableZNodes.remove(segmentZNode);
+ announcer.unannounce(segmentZNode.getPath());
+ } else {
+ announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
+ availableZNodes.add(segmentZNode);
+ }
+ }
+
+ @Override
+ public void announceSegments(Iterable segments) throws IOException
+ {
+ SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
+ Set batch = Sets.newHashSet();
+ int byteSize = 0;
+ int count = 0;
+
+ for (DataSegment segment : segments) {
+ int newBytesLen = jsonMapper.writeValueAsBytes(segment).length;
+
+ if (newBytesLen > config.getMaxNumBytes()) {
+ throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxNumBytes());
+ }
+
+ if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxNumBytes()) {
+ segmentZNode.addSegments(batch);
+ announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
+
+ segmentZNode = new SegmentZNode(makeServedSegmentPath(new DateTime().toString()));
+ batch = Sets.newHashSet();
+ count = 0;
+ byteSize = 0;
+ }
+
+ log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
+ segmentLookup.put(segment, segmentZNode);
+ batch.add(segment);
+ count++;
+ byteSize += newBytesLen;
+ }
+
+ segmentZNode.addSegments(batch);
+ announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
+ }
+
+ @Override
+ public void unannounceSegments(Iterable segments) throws IOException
+ {
+ for (DataSegment segment : segments) {
+ unannounceSegment(segment);
+ }
+ }
+
+ private String makeServedSegmentPath(String zNode)
+ {
+ return ZKPaths.makePath(liveSegmentLocation, zNode);
+ }
+
+ private class SegmentZNode
+ {
+ private final String path;
+
+ private byte[] bytes = new byte[]{};
+ private int count = 0;
+
+ public SegmentZNode(String path)
+ {
+ this.path = path;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public int getCount()
+ {
+ return count;
+ }
+
+ public byte[] getBytes()
+ {
+ return bytes;
+ }
+
+ public Set getSegments()
+ {
+ if (bytes.length == 0) {
+ return Sets.newHashSet();
+ }
+ try {
+ return jsonMapper.readValue(
+ bytes, new TypeReference>()
+ {
+ }
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public void addSegment(DataSegment segment)
+ {
+ Set zkSegments = getSegments();
+ zkSegments.add(segment);
+
+ try {
+ bytes = jsonMapper.writeValueAsBytes(zkSegments);
+ }
+ catch (Exception e) {
+ zkSegments.remove(segment);
+ throw Throwables.propagate(e);
+ }
+
+ count++;
+ }
+
+ public void addSegments(Set segments)
+ {
+ Set zkSegments = getSegments();
+ zkSegments.addAll(segments);
+
+ try {
+ bytes = jsonMapper.writeValueAsBytes(zkSegments);
+ }
+ catch (Exception e) {
+ zkSegments.removeAll(segments);
+ throw Throwables.propagate(e);
+ }
+
+ count += segments.size();
+ }
+
+ public void removeSegment(DataSegment segment)
+ {
+ Set zkSegments = getSegments();
+ zkSegments.remove(segment);
+
+ try {
+ bytes = jsonMapper.writeValueAsBytes(zkSegments);
+ }
+ catch (Exception e) {
+ zkSegments.add(segment);
+ throw Throwables.propagate(e);
+ }
+
+ count--;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SegmentZNode that = (SegmentZNode) o;
+
+ if (!path.equals(that.path)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return path.hashCode();
+ }
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java
index 699c4b1e8ce..71eaaa37276 100644
--- a/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java
+++ b/client/src/main/java/com/metamx/druid/coordination/DataSegmentAnnouncer.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.coordination;
import com.metamx.druid.client.DataSegment;
@@ -7,5 +26,10 @@ import java.io.IOException;
public interface DataSegmentAnnouncer
{
public void announceSegment(DataSegment segment) throws IOException;
+
public void unannounceSegment(DataSegment segment) throws IOException;
+
+ public void announceSegments(Iterable segments) throws IOException;
+
+ public void unannounceSegments(Iterable segments) throws IOException;
}
diff --git a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java
index 7c4e55cdd8e..ce38bd6b0a0 100644
--- a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java
+++ b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java
@@ -81,7 +81,7 @@ public class DruidServerMetadata
@Override
public String toString()
{
- return "DruidServer{" +
+ return "DruidServerMetadata{" +
"name='" + name + '\'' +
", host='" + host + '\'' +
", maxSize=" + maxSize +
diff --git a/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java
new file mode 100644
index 00000000000..053a3d85fac
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java
@@ -0,0 +1,92 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.coordination;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.metamx.common.lifecycle.LifecycleStart;
+import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.druid.client.DataSegment;
+import com.metamx.druid.curator.announcement.Announcer;
+import com.metamx.druid.initialization.ZkPathsConfig;
+
+import java.io.IOException;
+
+/**
+ * This class has the greatest name ever
+ */
+public class MultipleDataSegmentAnnouncerDataSegmentAnnouncer implements DataSegmentAnnouncer
+{
+ private final Iterable dataSegmentAnnouncers;
+
+ public MultipleDataSegmentAnnouncerDataSegmentAnnouncer(
+ Iterable dataSegmentAnnouncers
+ )
+ {
+ this.dataSegmentAnnouncers = dataSegmentAnnouncers;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.start();
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ for (AbstractDataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.stop();
+ }
+ }
+
+ @Override
+ public void announceSegment(DataSegment segment) throws IOException
+ {
+ for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.announceSegment(segment);
+ }
+ }
+
+ @Override
+ public void unannounceSegment(DataSegment segment) throws IOException
+ {
+ for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.unannounceSegment(segment);
+ }
+ }
+
+ @Override
+ public void announceSegments(Iterable segments) throws IOException
+ {
+ for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.announceSegments(segments);
+ }
+ }
+
+ @Override
+ public void unannounceSegments(Iterable segments) throws IOException
+ {
+ for (DataSegmentAnnouncer dataSegmentAnnouncer : dataSegmentAnnouncers) {
+ dataSegmentAnnouncer.unannounceSegments(segments);
+ }
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java
similarity index 68%
rename from client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java
rename to client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java
index f420d90bbbd..e238e3b91a3 100644
--- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java
+++ b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java
@@ -19,7 +19,6 @@
package com.metamx.druid.coordination;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
@@ -33,14 +32,10 @@ import org.apache.curator.utils.ZKPaths;
import java.io.IOException;
-public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
+public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
{
- private static final Logger log = new Logger(CuratorDataSegmentAnnouncer.class);
+ private static final Logger log = new Logger(SingleDataSegmentAnnouncer.class);
- private final Object lock = new Object();
-
- private final DruidServerMetadata server;
- private final ZkPathsConfig config;
private final Announcer announcer;
private final ObjectMapper jsonMapper;
private final String servedSegmentsLocation;
@@ -49,55 +44,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
@Inject
public CuratorDataSegmentAnnouncer(
+ public SingleDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,
Announcer announcer,
ObjectMapper jsonMapper
)
{
- this.server = server;
- this.config = config;
+ super(server, config, announcer, jsonMapper);
+
this.announcer = announcer;
this.jsonMapper = jsonMapper;
this.servedSegmentsLocation = ZKPaths.makePath(config.getServedSegmentsPath(), server.getName());
}
- @LifecycleStart
- public void start()
- {
- synchronized (lock) {
- if (started) {
- return;
- }
-
- try {
- final String path = makeAnnouncementPath();
- log.info("Announcing self[%s] at [%s]", server, path);
- announcer.announce(path, jsonMapper.writeValueAsBytes(server));
- }
- catch (JsonProcessingException e) {
- throw Throwables.propagate(e);
- }
-
- started = true;
- }
- }
-
- @LifecycleStop
- public void stop()
- {
- synchronized (lock) {
- if (!started) {
- return;
- }
-
- log.info("Stopping CuratorDataSegmentAnnouncer with config[%s]", config);
- announcer.unannounce(makeAnnouncementPath());
-
- started = false;
- }
- }
-
public void announceSegment(DataSegment segment) throws IOException
{
final String path = makeServedSegmentPath(segment);
@@ -112,8 +72,20 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer
announcer.unannounce(path);
}
- private String makeAnnouncementPath() {
- return ZKPaths.makePath(config.getAnnouncementsPath(), server.getName());
+ @Override
+ public void announceSegments(Iterable segments) throws IOException
+ {
+ for (DataSegment segment : segments) {
+ announceSegment(segment);
+ }
+ }
+
+ @Override
+ public void unannounceSegments(Iterable segments) throws IOException
+ {
+ for (DataSegment segment : segments) {
+ unannounceSegment(segment);
+ }
}
private String makeServedSegmentPath(DataSegment segment)
diff --git a/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java b/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java
index 10e7e381278..2f396e02824 100644
--- a/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java
+++ b/client/src/main/java/com/metamx/druid/curator/CuratorConfig.java
@@ -32,4 +32,8 @@ public abstract class CuratorConfig
@Config("druid.zk.service.sessionTimeoutMs")
@Default("30000")
public abstract int getZkSessionTimeoutMs();
+
+ @Config("druid.curator.compress")
+ @Default("false")
+ public abstract boolean enableCompression();
}
diff --git a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
index f7c074938c7..727f7704771 100644
--- a/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
+++ b/client/src/main/java/com/metamx/druid/curator/announcement/Announcer.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.curator.announcement;
import com.google.common.base.Throwables;
@@ -6,6 +25,7 @@ import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.IAE;
+import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
@@ -21,11 +41,14 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +65,7 @@ public class Announcer
private final List> toAnnounce = Lists.newArrayList();
private final ConcurrentMap listeners = new MapMaker().makeMap();
private final ConcurrentMap> announcements = new MapMaker().makeMap();
+ private final List parentsIBuilt = new CopyOnWriteArrayList();
private boolean started = false;
@@ -92,6 +116,15 @@ public class Announcer
unannounce(ZKPaths.makePath(basePath, announcementPath));
}
}
+
+ for (String parent : parentsIBuilt) {
+ try {
+ curator.delete().forPath(parent);
+ }
+ catch (Exception e) {
+ log.info(e, "Unable to delete parent[%s], boooo.", parent);
+ }
+ }
}
}
@@ -99,7 +132,7 @@ public class Announcer
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
- * @param path The path to announce at
+ * @param path The path to announce at
* @param bytes The payload to announce
*/
public void announce(String path, byte[] bytes)
@@ -114,10 +147,19 @@ public class Announcer
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();
+ boolean buildParentPath = false;
ConcurrentMap subPaths = announcements.get(parentPath);
if (subPaths == null) {
+ try {
+ if (curator.checkExists().forPath(parentPath) == null) {
+ buildParentPath = true;
+ }
+ }
+ catch (Exception e) {
+ log.debug(e, "Problem checking if the parent existed, ignoring.");
+ }
// I don't have a watcher on this path yet, create a Map and start watching.
announcements.putIfAbsent(parentPath, new MapMaker().makeMap());
@@ -127,7 +169,7 @@ public class Announcer
// Synchronize to make sure that I only create a listener once.
synchronized (finalSubPaths) {
- if (! listeners.containsKey(parentPath)) {
+ if (!listeners.containsKey(parentPath)) {
final PathChildrenCache cache = factory.make(curator, parentPath);
cache.getListenable().addListener(
new PathChildrenCacheListener()
@@ -186,17 +228,15 @@ public class Announcer
}
);
- try {
- synchronized (toAnnounce) {
- if (started) {
- cache.start();
- listeners.put(parentPath, cache);
+ synchronized (toAnnounce) {
+ if (started) {
+ if (buildParentPath) {
+ createPath(parentPath);
}
+ startCache(cache);
+ listeners.put(parentPath, cache);
}
}
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
}
}
@@ -208,11 +248,11 @@ public class Announcer
if (started) {
byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);
- if (oldBytes != null) {
- throw new IAE("Already announcing[%s], cannot announce it twice.", path);
+ if (oldBytes == null) {
+ created = true;
+ } else if (!Arrays.equals(oldBytes, bytes)) {
+ throw new IAE("Cannot reannounce different values under the same path");
}
-
- created = true;
}
}
@@ -226,15 +266,48 @@ public class Announcer
}
}
+ public void update(final String path, final byte[] bytes)
+ {
+ final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
+
+ final String parentPath = pathAndNode.getPath();
+ final String nodePath = pathAndNode.getNode();
+
+ ConcurrentMap subPaths = announcements.get(parentPath);
+
+ if (subPaths == null || subPaths.get(nodePath) == null) {
+ throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
+ }
+
+ synchronized (toAnnounce) {
+ try {
+ byte[] oldBytes = subPaths.get(nodePath);
+
+ if (!Arrays.equals(oldBytes, bytes)) {
+ subPaths.put(nodePath, bytes);
+ updateAnnouncement(path, bytes);
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
private String createAnnouncement(final String path, byte[] value) throws Exception
{
return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value);
}
+ private Stat updateAnnouncement(final String path, final byte[] value) throws Exception
+ {
+ return curator.setData().compressed().inBackground().forPath(path, value);
+ }
+
/**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
- *
+ *
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
*
* @param path
@@ -265,4 +338,26 @@ public class Announcer
throw Throwables.propagate(e);
}
}
+
+ private void startCache(PathChildrenCache cache)
+ {
+ try {
+ cache.start();
+ }
+ catch (Exception e) {
+ Closeables.closeQuietly(cache);
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private void createPath(String parentPath)
+ {
+ try {
+ curator.create().creatingParentsIfNeeded().forPath(parentPath);
+ parentsIBuilt.add(parentPath);
+ }
+ catch (Exception e) {
+ log.info(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
+ }
+ }
}
diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
index ab1e31bbc49..7c35ad12618 100644
--- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
+++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java
@@ -135,6 +135,11 @@ public class CuratorInventoryManager
}
}
+ public InventoryManagerConfig getConfig()
+ {
+ return config;
+ }
+
public ContainerClass getInventoryValue(String containerKey)
{
final ContainerHolder containerHolder = containers.get(containerKey);
@@ -290,11 +295,18 @@ public class CuratorInventoryManager
switch (event.getType()) {
case CHILD_ADDED:
- case CHILD_UPDATED:
- final InventoryClass inventory = strategy.deserializeInventory(child.getData());
+ final InventoryClass addedInventory = strategy.deserializeInventory(child.getData());
synchronized (holder) {
- holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory));
+ holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory));
+ }
+
+ break;
+ case CHILD_UPDATED:
+ final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData());
+
+ synchronized (holder) {
+ holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory));
}
break;
diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java
index 516045537ac..8cab619e16f 100644
--- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java
+++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java
@@ -33,5 +33,6 @@ public interface CuratorInventoryManagerStrategy
public void deadContainer(ContainerClass deadContainer);
public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer);
public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
+ public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory);
public ContainerClass removeInventory(ContainerClass container, String inventoryKey);
}
diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
index e6460c0f147..c4660cc2bfe 100644
--- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java
+++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java
@@ -86,13 +86,13 @@ public class Initialization
/**
* Load properties.
* Properties are layered:
- *
+ *
* # stored in zookeeper
* # runtime.properties file,
* # cmdLine -D
- *
+ *
* command line overrides runtime.properties which overrides zookeeper
- *
+ *
* Idempotent. Thread-safe. Properties are only loaded once.
* If property druid.zk.service.host is not set then do not load properties from zookeeper.
*
@@ -210,10 +210,9 @@ public class Initialization
CuratorFrameworkFactory.builder()
.connectString(curatorConfig.getZkHosts())
.sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs())
- .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
- // Don't compress stuff written just yet, need to get code deployed first.
- .compressionProvider(new PotentiallyGzippedCompressionProvider(false))
- .build();
+ .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30))
+ .compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression()))
+ .build();
lifecycle.addHandler(
new Lifecycle.Handler()
@@ -346,9 +345,9 @@ public class Initialization
}
public static RequestLogger makeFileRequestLogger(
- ObjectMapper objectMapper,
- ScheduledExecutorFactory factory,
- Properties props
+ ObjectMapper objectMapper,
+ ScheduledExecutorFactory factory,
+ Properties props
) throws IOException
{
return new FileRequestLogger(
diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java
new file mode 100644
index 00000000000..e14c65027eb
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java
@@ -0,0 +1,21 @@
+package com.metamx.druid.initialization;
+
+import org.skife.config.Config;
+import org.skife.config.Default;
+
+/**
+ */
+public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig
+{
+ @Config("druid.zk.segmentsPerNode")
+ @Default("50")
+ public abstract int getSegmentsPerNode();
+
+ @Config("druid.zk.maxNumBytesPerNode")
+ @Default("512000")
+ public abstract long getMaxNumBytes();
+
+ @Config("druid.announcer.type")
+ @Default("legacy")
+ public abstract String getAnnouncerType();
+}
diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java
index 2f04e61b309..065829fd9ea 100644
--- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java
+++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java
@@ -21,6 +21,7 @@ package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config;
+import org.skife.config.Default;
public abstract class ZkPathsConfig
{
@@ -45,6 +46,12 @@ public abstract class ZkPathsConfig
return defaultPath("servedSegments");
}
+ @Config("druid.zk.paths.liveSegmentsPath")
+ public String getLiveSegmentsPath()
+ {
+ return defaultPath("segments");
+ }
+
@Config("druid.zk.paths.loadQueuePath")
public String getLoadQueuePath()
{
diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java
index 52fc99a35b1..8acc43a7585 100644
--- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java
+++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java
@@ -20,6 +20,7 @@
package com.metamx.druid.query;
import com.google.common.base.Function;
+import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -34,7 +35,6 @@ import com.metamx.druid.Query;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -77,12 +77,14 @@ public class ChainedExecutionQueryRunner implements QueryRunner
{
this.exec = exec;
this.ordering = ordering;
- this.queryables = Iterables.unmodifiableIterable(queryables);
+ this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
}
@Override
public Sequence run(final Query query)
{
+ final int priority = Integer.parseInt(query.getContextValue("priority", "0"));
+
return new BaseSequence>(
new BaseSequence.IteratorMaker>()
{
@@ -99,7 +101,7 @@ public class ChainedExecutionQueryRunner implements QueryRunner
public Future> apply(final QueryRunner input)
{
return exec.submit(
- new Callable>()
+ new PrioritizedCallable>(priority)
{
@Override
public List call() throws Exception
diff --git a/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java
new file mode 100644
index 00000000000..f9d377694e8
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/DelegatingExecutorService.java
@@ -0,0 +1,127 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.query;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ */
+public class DelegatingExecutorService implements ExecutorService
+{
+ private final ExecutorService delegate;
+
+ public DelegatingExecutorService(ExecutorService delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List shutdownNow()
+ {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
+ {
+ return delegate.awaitTermination(l, timeUnit);
+ }
+
+ @Override
+ public Future submit(Callable tCallable)
+ {
+ return delegate.submit(tCallable);
+ }
+
+ @Override
+ public Future submit(Runnable runnable, T t)
+ {
+ return delegate.submit(runnable, t);
+ }
+
+ @Override
+ public Future> submit(Runnable runnable)
+ {
+ return delegate.submit(runnable);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> callables) throws InterruptedException
+ {
+ return delegate.invokeAll(callables);
+ }
+
+ @Override
+ public List> invokeAll(
+ Collection extends Callable> callables,
+ long l,
+ TimeUnit timeUnit
+ ) throws InterruptedException
+ {
+ return delegate.invokeAll(callables, l, timeUnit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> callables) throws InterruptedException, ExecutionException
+ {
+ return delegate.invokeAny(callables);
+ }
+
+ @Override
+ public T invokeAny(
+ Collection extends Callable> callables,
+ long l,
+ TimeUnit timeUnit
+ ) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return delegate.invokeAny(callables, l, timeUnit);
+ }
+
+ @Override
+ public void execute(Runnable runnable)
+ {
+ delegate.execute(runnable);
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java
index 1aee5cb48a5..b981fde0975 100644
--- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java
+++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java
@@ -1,3 +1,22 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
package com.metamx.druid.query;
import com.metamx.common.lifecycle.LifecycleStop;
@@ -5,14 +24,19 @@ import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-@ManageLifecycle
-public class MetricsEmittingExecutorService extends AbstractExecutorService
+public class MetricsEmittingExecutorService extends DelegatingExecutorService
{
private final ExecutorService base;
private final ServiceEmitter emitter;
@@ -24,42 +48,13 @@ public class MetricsEmittingExecutorService extends AbstractExecutorService
ServiceMetricEvent.Builder metricBuilder
)
{
+ super(base);
+
this.base = base;
this.emitter = emitter;
this.metricBuilder = metricBuilder;
}
- @Override
- public void shutdown()
- {
- base.shutdown();
- }
-
- @Override
- @LifecycleStop
- public List shutdownNow()
- {
- return base.shutdownNow();
- }
-
- @Override
- public boolean isShutdown()
- {
- return base.isShutdown();
- }
-
- @Override
- public boolean isTerminated()
- {
- return base.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
- {
- return base.awaitTermination(l, timeUnit);
- }
-
@Override
public void execute(Runnable runnable)
{
diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java
new file mode 100644
index 00000000000..3771b8a03c2
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/PrioritizedCallable.java
@@ -0,0 +1,39 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.query;
+
+import java.util.concurrent.Callable;
+
+/**
+ */
+public abstract class PrioritizedCallable implements Callable
+{
+ final int priority;
+
+ public PrioritizedCallable(int priority)
+ {
+ this.priority = priority;
+ }
+
+ public int getPriority()
+ {
+ return priority;
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java
new file mode 100644
index 00000000000..f943a0c112f
--- /dev/null
+++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java
@@ -0,0 +1,158 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.query;
+
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.metamx.common.concurrent.ExecutorServiceConfig;
+import com.metamx.common.lifecycle.Lifecycle;
+
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class PrioritizedExecutorService extends AbstractExecutorService
+{
+ public static PrioritizedExecutorService create(Lifecycle lifecycle, ExecutorServiceConfig config)
+ {
+ final PrioritizedExecutorService service = new PrioritizedExecutorService(
+ new ThreadPoolExecutor(
+ config.getNumThreads(),
+ config.getNumThreads(),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new PriorityBlockingQueue(),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat(config.getFormatString()).build()
+ )
+ );
+
+ lifecycle.addHandler(
+ new Lifecycle.Handler()
+ {
+ @Override
+ public void start() throws Exception
+ {
+ }
+
+ @Override
+ public void stop()
+ {
+ service.shutdownNow();
+ }
+ }
+ );
+
+ return service;
+ }
+ private static final int DEFAULT_PRIORITY = 0;
+
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+
+ public PrioritizedExecutorService(
+ ThreadPoolExecutor threadPoolExecutor
+ )
+ {
+ this.threadPoolExecutor = threadPoolExecutor;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ threadPoolExecutor.shutdown();
+ }
+
+ @Override
+ public List shutdownNow()
+ {
+ return threadPoolExecutor.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return threadPoolExecutor.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return threadPoolExecutor.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException
+ {
+ return threadPoolExecutor.awaitTermination(l, timeUnit);
+ }
+
+ @Override
+ public void execute(Runnable runnable)
+ {
+ threadPoolExecutor.execute(runnable);
+ }
+
+ @Override
+ protected RunnableFuture newTaskFor(final Callable tCallable)
+ {
+ Callable theCallable = tCallable;
+ if (!(tCallable instanceof PrioritizedCallable)) {
+ theCallable = new PrioritizedCallable(DEFAULT_PRIORITY)
+ {
+ @Override
+ public T call() throws Exception
+ {
+ return tCallable.call();
+ }
+ };
+ }
+ return new PrioritizedFuture((PrioritizedCallable) theCallable);
+ }
+
+ private static class PrioritizedFuture extends FutureTask implements Comparable
+ {
+ private final PrioritizedCallable callable;
+
+ public PrioritizedFuture(PrioritizedCallable callable)
+ {
+ super(callable);
+ this.callable = callable;
+ }
+
+ public int getPriority()
+ {
+ return callable.getPriority();
+ }
+
+ @Override
+ public int compareTo(PrioritizedFuture future)
+ {
+ return -Ints.compare(getPriority(), future.getPriority());
+ }
+ }
+}
diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
index 2210a0f5a7b..1cfceac4db8 100644
--- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
+++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java
@@ -46,13 +46,16 @@ import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest
{
- private static final TypeReference TYPE_REFERENCE = new TypeReference(){};
+ private static final TypeReference TYPE_REFERENCE = new TypeReference()
+ {
+ };
private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4};
@Override
@@ -228,6 +231,6 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest List filterSegments(TimeBoundaryQuery query, List input)
+ public List filterSegments(TimeBoundaryQuery query, List segments)
{
- if(input.size() <= 1) {
- return input;
+ if (segments.size() <= 1) {
+ return segments;
}
- return Lists.newArrayList(input.get(0), input.get(input.size() - 1));
+ final T first = segments.get(0);
+ final T second = segments.get(segments.size() - 1);
+
+ return Lists.newArrayList(
+ Iterables.filter(
+ segments,
+ new Predicate()
+ {
+ @Override
+ public boolean apply(T input)
+ {
+ return input.getInterval().overlaps(first.getInterval()) || input.getInterval()
+ .overlaps(second.getInterval());
+ }
+ }
+ )
+ );
}
@Override
diff --git a/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java
new file mode 100644
index 00000000000..7690b0ab06b
--- /dev/null
+++ b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java
@@ -0,0 +1,231 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package com.metamx.druid.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.metamx.common.ISE;
+import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
+import com.metamx.druid.coordination.DruidServerMetadata;
+import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
+import com.metamx.druid.curator.announcement.Announcer;
+import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
+import com.metamx.druid.initialization.ZkPathsConfig;
+import com.metamx.druid.jackson.DefaultObjectMapper;
+import junit.framework.Assert;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingCluster;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public class BatchServerInventoryViewTest
+{
+ private static final String testBasePath = "/test";
+ private static final Joiner joiner = Joiner.on("/");
+
+ private TestingCluster testingCluster;
+ private CuratorFramework cf;
+ private ObjectMapper jsonMapper;
+ private Announcer announcer;
+ private BatchDataSegmentAnnouncer segmentAnnouncer;
+ private Set testSegments;
+ private BatchServerInventoryView batchServerInventoryView;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ testingCluster = new TestingCluster(1);
+ testingCluster.start();
+
+ cf = CuratorFrameworkFactory.builder()
+ .connectString(testingCluster.getConnectString())
+ .retryPolicy(new ExponentialBackoffRetry(1, 10))
+ .compressionProvider(new PotentiallyGzippedCompressionProvider(true))
+ .build();
+ cf.start();
+ cf.create().creatingParentsIfNeeded().forPath(testBasePath);
+
+ jsonMapper = new DefaultObjectMapper();
+
+ announcer = new Announcer(
+ cf,
+ MoreExecutors.sameThreadExecutor()
+ );
+ announcer.start();
+
+ segmentAnnouncer = new BatchDataSegmentAnnouncer(
+ new DruidServerMetadata(
+ "id",
+ "host",
+ Long.MAX_VALUE,
+ "type",
+ "tier"
+ ),
+ new ZkDataSegmentAnnouncerConfig()
+ {
+ @Override
+ public String getZkBasePath()
+ {
+ return testBasePath;
+ }
+
+ @Override
+ public int getSegmentsPerNode()
+ {
+ return 50;
+ }
+
+ @Override
+ public long getMaxNumBytes()
+ {
+ return 100000;
+ }
+
+ @Override
+ public String getAnnouncerType()
+ {
+ return "batch";
+ }
+ },
+ announcer,
+ jsonMapper
+ );
+ segmentAnnouncer.start();
+
+ testSegments = Sets.newHashSet();
+ for (int i = 0; i < 100; i++) {
+ testSegments.add(makeSegment(i));
+ }
+
+ batchServerInventoryView = new BatchServerInventoryView(
+ new ServerInventoryViewConfig()
+ {
+ @Override
+ public int getRemovedSegmentLifetime()
+ {
+ return 0;
+ }
+
+ @Override
+ public String getAnnouncerType()
+ {
+ return "batch";
+ }
+ },
+ new ZkPathsConfig()
+ {
+ @Override
+ public String getZkBasePath()
+ {
+ return testBasePath;
+ }
+ },
+ cf,
+ Executors.newSingleThreadExecutor(),
+ jsonMapper
+ );
+
+ batchServerInventoryView.start();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ batchServerInventoryView.stop();
+ segmentAnnouncer.stop();
+ announcer.stop();
+ cf.close();
+ testingCluster.stop();
+ }
+
+ @Test
+ public void testRun() throws Exception
+ {
+ segmentAnnouncer.announceSegments(testSegments);
+
+ waitForSync();
+
+ DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
+ Set segments = Sets.newHashSet(server.getSegments().values());
+
+ Assert.assertEquals(testSegments, segments);
+
+ DataSegment segment1 = makeSegment(101);
+ DataSegment segment2 = makeSegment(102);
+
+ segmentAnnouncer.announceSegment(segment1);
+ segmentAnnouncer.announceSegment(segment2);
+ testSegments.add(segment1);
+ testSegments.add(segment2);
+
+ waitForSync();
+
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
+
+ segmentAnnouncer.unannounceSegment(segment1);
+ segmentAnnouncer.unannounceSegment(segment2);
+ testSegments.remove(segment1);
+ testSegments.remove(segment2);
+
+ waitForSync();
+
+ Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values()));
+ }
+
+ private DataSegment makeSegment(int offset)
+ {
+ return DataSegment.builder()
+ .dataSource("foo")
+ .interval(
+ new Interval(
+ new DateTime("2013-01-01").plusDays(offset),
+ new DateTime("2013-01-02").plusDays(offset)
+ )
+ )
+ .version(new DateTime().toString())
+ .build();
+ }
+
+ private void waitForSync() throws Exception
+ {
+ Stopwatch stopwatch = new Stopwatch().start();
+ while (Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
+ Thread.sleep(500);
+ if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) {
+ throw new ISE("BatchServerInventoryView is not updating");
+ }
+ }
+ }
+}
diff --git a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java
index 287d208db62..23ca0ea9693 100644
--- a/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java
+++ b/client/src/test/java/com/metamx/druid/client/cache/MemcachedCacheTest.java
@@ -52,8 +52,8 @@ import java.util.concurrent.TimeoutException;
*/
public class MemcachedCacheTest
{
- private static final byte[] HI = "hi".getBytes();
- private static final byte[] HO = "ho".getBytes();
+ private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
+ private static final byte[] HO = "hooooooooooooooooooo".getBytes();
private MemcachedCache cache;
@Before
@@ -124,7 +124,13 @@ public class MemcachedCacheTest
class MockMemcachedClient implements MemcachedClientIF
{
private final ConcurrentMap theMap = new ConcurrentHashMap();
- private final Transcoder