diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 3236d1aa336..ca638f44727 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; @@ -33,14 +34,16 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.client.BatchServerInventoryView; import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryViewConfig; import com.metamx.druid.client.ServerView; +import com.metamx.druid.client.SingleServerInventoryView; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; -import com.metamx.druid.coordination.BatchingDataSegmentAnnouncer; +import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; @@ -357,13 +360,29 @@ public abstract class QueryableNode extends Registering final ExecutorService exec = Executors.newFixedThreadPool( 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() ); - serverInventoryView = new ServerInventoryView( - getConfigFactory().build(ServerInventoryViewConfig.class), - getZkPaths(), - getCuratorFramework(), - exec, - getJsonMapper() - ); + + final ServerInventoryViewConfig serverInventoryViewConfig = getConfigFactory().build(ServerInventoryViewConfig.class); + final String announcerType = serverInventoryViewConfig.getAnnouncerType(); + + if ("legacy".equalsIgnoreCase(announcerType)) { + serverInventoryView = new SingleServerInventoryView( + serverInventoryViewConfig, + getZkPaths(), + getCuratorFramework(), + exec, + getJsonMapper() + ); + } else if ("batch".equalsIgnoreCase(announcerType)) { + serverInventoryView = new BatchServerInventoryView( + serverInventoryViewConfig, + getZkPaths(), + getCuratorFramework(), + exec, + getJsonMapper() + ); + } else { + throw new IAE("Unknown type %s", announcerType); + } lifecycle.addManagedInstance(serverInventoryView); } } @@ -373,18 +392,21 @@ public abstract class QueryableNode extends Registering if (requestLogger == null) { try { final String loggingType = props.getProperty("druid.request.logging.type"); - if("emitter".equals(loggingType)) { - setRequestLogger(Initialization.makeEmittingRequestLogger( - getProps(), - getEmitter() - )); - } - else if ("file".equalsIgnoreCase(loggingType)) { - setRequestLogger(Initialization.makeFileRequestLogger( - getJsonMapper(), - getScheduledExecutorFactory(), - getProps() - )); + if ("emitter".equals(loggingType)) { + setRequestLogger( + Initialization.makeEmittingRequestLogger( + getProps(), + getEmitter() + ) + ); + } else if ("file".equalsIgnoreCase(loggingType)) { + setRequestLogger( + Initialization.makeFileRequestLogger( + getJsonMapper(), + getScheduledExecutorFactory(), + getProps() + ) + ); } else { setRequestLogger(new NoopRequestLogger()); } @@ -428,19 +450,39 @@ public abstract class QueryableNode extends Registering final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); lifecycle.addManagedInstance(announcer); - setAnnouncer( - new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( - Arrays.asList( - new BatchingDataSegmentAnnouncer( - getDruidServerMetadata(), - getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class), - announcer, - getJsonMapper() - ), - new SingleDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper()) - ) - ) - ); + final ZkDataSegmentAnnouncerConfig config = getConfigFactory().build(ZkDataSegmentAnnouncerConfig.class); + final String announcerType = config.getAnnouncerType(); + + final DataSegmentAnnouncer dataSegmentAnnouncer; + if ("batch".equalsIgnoreCase(announcerType)) { + dataSegmentAnnouncer = new BatchDataSegmentAnnouncer( + getDruidServerMetadata(), + config, + announcer, + getJsonMapper() + ); + } else if ("legacy".equalsIgnoreCase(announcerType)) { + dataSegmentAnnouncer = new MultipleDataSegmentAnnouncerDataSegmentAnnouncer( + Arrays.asList( + new BatchDataSegmentAnnouncer( + getDruidServerMetadata(), + config, + announcer, + getJsonMapper() + ), + new SingleDataSegmentAnnouncer( + getDruidServerMetadata(), + getZkPaths(), + announcer, + getJsonMapper() + ) + ) + ); + } else { + throw new ISE("Unknown announcer type [%s]", announcerType); + } + + setAnnouncer(dataSegmentAnnouncer); lifecycle.addManagedInstance(getAnnouncer(), Lifecycle.Stage.LAST); } diff --git a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java new file mode 100644 index 00000000000..58497e293db --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java @@ -0,0 +1,129 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; +import com.metamx.common.ISE; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; +import com.metamx.emitter.EmittingLogger; +import org.apache.curator.framework.CuratorFramework; + +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; + +/** + */ +public class BatchServerInventoryView extends ServerInventoryView> +{ + private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); + + final ConcurrentMap> zNodes = new MapMaker().makeMap(); + + public BatchServerInventoryView( + final ServerInventoryViewConfig config, + final ZkPathsConfig zkPaths, + final CuratorFramework curator, + final ExecutorService exec, + final ObjectMapper jsonMapper + ) + { + super( + config, + log, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getLiveSegmentsPath(); + } + }, + curator, + exec, + jsonMapper, + new TypeReference>() + { + } + ); + } + + @Override + protected DruidServer addInnerInventory( + final DruidServer container, + String inventoryKey, + final Set inventory + ) + { + zNodes.put(inventoryKey, inventory); + for (DataSegment segment : inventory) { + addSingleInventory(container, segment); + } + return container; + } + + @Override + protected DruidServer updateInnerInventory( + DruidServer container, String inventoryKey, Set inventory + ) + { + Set existing = zNodes.get(inventoryKey); + if (existing == null) { + throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); + } + + for (DataSegment segment : Sets.difference(inventory, existing)) { + addSingleInventory(container, segment); + } + for (DataSegment segment : Sets.difference(existing, inventory)) { + removeSingleInventory(container, segment.getIdentifier()); + } + zNodes.put(inventoryKey, inventory); + + return container; + } + + @Override + protected DruidServer removeInnerInventory(final DruidServer container, String inventoryKey) + { + log.info("Server[%s] removed container[%s]", container.getName(), inventoryKey); + Set segments = zNodes.remove(inventoryKey); + + if (segments == null) { + log.warn("Told to remove container[%s], which didn't exist", inventoryKey); + return container; + } + + for (DataSegment segment : segments) { + removeSingleInventory(container, segment.getIdentifier()); + } + return container; + } +} diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java index c98091e0560..bd5cd70c668 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java @@ -20,16 +20,17 @@ package com.metamx.druid.client; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.MapMaker; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; import com.metamx.druid.curator.inventory.InventoryManagerConfig; -import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; @@ -43,44 +44,35 @@ import java.util.concurrent.atomic.AtomicBoolean; /** */ -public class ServerInventoryView implements ServerView, InventoryView +public abstract class ServerInventoryView implements ServerView, InventoryView { - private static final EmittingLogger log = new EmittingLogger(ServerInventoryView.class); - - private final CuratorInventoryManager inventoryManager; + private final ServerInventoryViewConfig config; + private final Logger log; + private final CuratorInventoryManager inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); - private static final Map removedSegments = new MapMaker().makeMap(); + private final Map removedSegments = new MapMaker().makeMap(); public ServerInventoryView( final ServerInventoryViewConfig config, - final ZkPathsConfig zkPaths, + final Logger log, + final InventoryManagerConfig inventoryManagerConfig, final CuratorFramework curator, final ExecutorService exec, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final TypeReference typeReference ) { - inventoryManager = new CuratorInventoryManager( + this.config = config; + this.log = log; + this.inventoryManager = new CuratorInventoryManager( curator, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return zkPaths.getAnnouncementsPath(); - } - - @Override - public String getInventoryPath() - { - return zkPaths.getServedSegmentsPath(); - } - }, + inventoryManagerConfig, exec, - new CuratorInventoryManagerStrategy() + new CuratorInventoryManagerStrategy() { @Override public DruidServer deserializeContainer(byte[] bytes) @@ -105,10 +97,10 @@ public class ServerInventoryView implements ServerView, InventoryView } @Override - public DataSegment deserializeInventory(byte[] bytes) + public InventoryType deserializeInventory(byte[] bytes) { try { - return jsonMapper.readValue(bytes, DataSegment.class); + return jsonMapper.readValue(bytes, typeReference); } catch (IOException e) { throw Throwables.propagate(e); @@ -116,7 +108,7 @@ public class ServerInventoryView implements ServerView, InventoryView } @Override - public byte[] serializeInventory(DataSegment inventory) + public byte[] serializeInventory(InventoryType inventory) { try { return jsonMapper.writeValueAsBytes(inventory); @@ -146,67 +138,27 @@ public class ServerInventoryView implements ServerView, InventoryView } @Override - public DruidServer addInventory(final DruidServer container, String inventoryKey, final DataSegment inventory) + public DruidServer addInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ) { - log.info("Server[%s] added segment[%s]", container.getName(), inventoryKey); + return addInnerInventory(container, inventoryKey, inventory); + } - if (container.getSegment(inventoryKey) != null) { - log.warn( - "Not adding or running callbacks for existing segment[%s] on server[%s]", - inventoryKey, - container.getName() - ); - - return container; - } - - final DruidServer retVal = container.addDataSegment(inventoryKey, inventory); - - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentAdded(container, inventory); - } - } - ); - - return retVal; + @Override + public DruidServer updateInventory( + DruidServer container, String inventoryKey, InventoryType inventory + ) + { + return updateInnerInventory(container, inventoryKey, inventory); } @Override public DruidServer removeInventory(final DruidServer container, String inventoryKey) { - log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey); - final DataSegment segment = container.getSegment(inventoryKey); - - if (segment == null) { - log.warn( - "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", - inventoryKey, - container.getName() - ); - - return container; - } - - final DruidServer retVal = container.removeDataSegment(inventoryKey); - - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentRemoved(container, segment); - } - } - ); - - removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime()); - return retVal; + return removeInnerInventory(container, inventoryKey); } } ); @@ -282,7 +234,12 @@ public class ServerInventoryView implements ServerView, InventoryView segmentCallbacks.put(callback, exec); } - private void runSegmentCallbacks( + public InventoryManagerConfig getInventoryManagerConfig() + { + return inventoryManager.getConfig(); + } + + protected void runSegmentCallbacks( final Function fn ) { @@ -302,7 +259,7 @@ public class ServerInventoryView implements ServerView, InventoryView } } - private void runServerCallbacks(final DruidServer server) + protected void runServerCallbacks(final DruidServer server) { for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( @@ -319,4 +276,83 @@ public class ServerInventoryView implements ServerView, InventoryView ); } } + + protected void addSingleInventory( + final DruidServer container, + final DataSegment inventory + ) + { + log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier()); + + if (container.getSegment(inventory.getIdentifier()) != null) { + log.warn( + "Not adding or running callbacks for existing segment[%s] on server[%s]", + inventory.getIdentifier(), + container.getName() + ); + + return; + } + + container.addDataSegment(inventory.getIdentifier(), inventory); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentAdded(container, inventory); + } + } + ); + } + + protected void removeSingleInventory(final DruidServer container, String inventoryKey) + { + log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey); + final DataSegment segment = container.getSegment(inventoryKey); + + if (segment == null) { + log.warn( + "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", + inventoryKey, + container.getName() + ); + + return; + } + + container.removeDataSegment(inventoryKey); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentRemoved(container, segment); + } + } + ); + + removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime()); + } + + protected abstract DruidServer addInnerInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ); + + protected abstract DruidServer updateInnerInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ); + + protected abstract DruidServer removeInnerInventory( + final DruidServer container, + String inventoryKey + ); } diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java index 68de158cbf0..6130a96a66c 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryViewConfig.java @@ -29,4 +29,8 @@ public abstract class ServerInventoryViewConfig @Config("druid.master.removedSegmentLifetime") @Default("1") public abstract int getRemovedSegmentLifetime(); + + @Config("druid.announcer.type") + @Default("legacy") + public abstract String getAnnouncerType(); } \ No newline at end of file diff --git a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java new file mode 100644 index 00000000000..4b345dc5a29 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java @@ -0,0 +1,94 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; +import com.metamx.emitter.EmittingLogger; +import org.apache.curator.framework.CuratorFramework; + +import java.util.concurrent.ExecutorService; + +/** + */ +public class SingleServerInventoryView extends ServerInventoryView +{ + private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); + + public SingleServerInventoryView( + final ServerInventoryViewConfig config, + final ZkPathsConfig zkPaths, + final CuratorFramework curator, + final ExecutorService exec, + final ObjectMapper jsonMapper + ) + { + super( + config, + log, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return zkPaths.getAnnouncementsPath(); + } + + @Override + public String getInventoryPath() + { + return zkPaths.getServedSegmentsPath(); + } + }, + curator, + exec, + jsonMapper, + new TypeReference() + { + } + ); + } + + @Override + protected DruidServer addInnerInventory( + DruidServer container, String inventoryKey, DataSegment inventory + ) + { + addSingleInventory(container, inventory); + return container; + } + + @Override + protected DruidServer updateInnerInventory( + DruidServer container, String inventoryKey, DataSegment inventory + ) + { + return addInnerInventory(container, inventoryKey, inventory); + } + + @Override + protected DruidServer removeInnerInventory(DruidServer container, String inventoryKey) + { + removeSingleInventory(container, inventoryKey); + return container; + } +} diff --git a/client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java similarity index 97% rename from client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java rename to client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java index 6b098d41b7f..5911ec2a642 100644 --- a/client/src/main/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncer.java @@ -39,9 +39,9 @@ import java.util.Set; /** */ -public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer { - private static final Logger log = new Logger(BatchingDataSegmentAnnouncer.class); + private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class); private final ZkDataSegmentAnnouncerConfig config; private final Announcer announcer; @@ -51,7 +51,7 @@ public class BatchingDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private final Set availableZNodes = Sets.newHashSet(); private final Map segmentLookup = Maps.newHashMap(); - public BatchingDataSegmentAnnouncer( + public BatchDataSegmentAnnouncer( DruidServerMetadata server, ZkDataSegmentAnnouncerConfig config, Announcer announcer, diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index ab1e31bbc49..7c35ad12618 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -135,6 +135,11 @@ public class CuratorInventoryManager } } + public InventoryManagerConfig getConfig() + { + return config; + } + public ContainerClass getInventoryValue(String containerKey) { final ContainerHolder containerHolder = containers.get(containerKey); @@ -290,11 +295,18 @@ public class CuratorInventoryManager switch (event.getType()) { case CHILD_ADDED: - case CHILD_UPDATED: - final InventoryClass inventory = strategy.deserializeInventory(child.getData()); + final InventoryClass addedInventory = strategy.deserializeInventory(child.getData()); synchronized (holder) { - holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, inventory)); + holder.setContainer(strategy.addInventory(holder.getContainer(), inventoryKey, addedInventory)); + } + + break; + case CHILD_UPDATED: + final InventoryClass updatedInventory = strategy.deserializeInventory(child.getData()); + + synchronized (holder) { + holder.setContainer(strategy.updateInventory(holder.getContainer(), inventoryKey, updatedInventory)); } break; diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java index 516045537ac..8cab619e16f 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerStrategy.java @@ -33,5 +33,6 @@ public interface CuratorInventoryManagerStrategy public void deadContainer(ContainerClass deadContainer); public ContainerClass updateContainer(ContainerClass oldContainer, ContainerClass newContainer); public ContainerClass addInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); + public ContainerClass updateInventory(ContainerClass container, String inventoryKey, InventoryClass inventory); public ContainerClass removeInventory(ContainerClass container, String inventoryKey); } diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java index d09c36f369e..741bc59d3d9 100644 --- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java @@ -32,4 +32,8 @@ public abstract class CuratorConfig @Config("druid.zk.service.sessionTimeoutMs") @Default("30000") public abstract int getZkSessionTimeoutMs(); + + @Config("druid.curator.compression.enable") + @Default("false") + public abstract boolean enableCompression(); } diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 0d49ff0cefe..de29eabd550 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -72,13 +72,13 @@ public class Initialization /** * Load properties. * Properties are layered: - * + *

* # stored in zookeeper * # runtime.properties file, * # cmdLine -D - * + *

* command line overrides runtime.properties which overrides zookeeper - * + *

* Idempotent. Thread-safe. Properties are only loaded once. * If property druid.zk.service.host is not set then do not load properties from zookeeper. * @@ -196,10 +196,9 @@ public class Initialization CuratorFrameworkFactory.builder() .connectString(curatorConfig.getZkHosts()) .sessionTimeoutMs(curatorConfig.getZkSessionTimeoutMs()) - .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) - // Don't compress stuff written just yet, need to get code deployed first. - .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) - .build(); + .retryPolicy(new BoundedExponentialBackoffRetry(1000, 45000, 30)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(curatorConfig.enableCompression())) + .build(); lifecycle.addHandler( new Lifecycle.Handler() @@ -335,9 +334,9 @@ public class Initialization } public static RequestLogger makeFileRequestLogger( - ObjectMapper objectMapper, - ScheduledExecutorFactory factory, - Properties props + ObjectMapper objectMapper, + ScheduledExecutorFactory factory, + Properties props ) throws IOException { return new FileRequestLogger( diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java index 2ff9f9172ca..e14c65027eb 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkDataSegmentAnnouncerConfig.java @@ -14,4 +14,8 @@ public abstract class ZkDataSegmentAnnouncerConfig extends ZkPathsConfig @Config("druid.zk.maxNumBytesPerNode") @Default("512000") public abstract long getMaxNumBytes(); + + @Config("druid.announcer.type") + @Default("legacy") + public abstract String getAnnouncerType(); } diff --git a/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java new file mode 100644 index 00000000000..7690b0ab06b --- /dev/null +++ b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java @@ -0,0 +1,231 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.ISE; +import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; +import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; +import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; +import com.metamx.druid.initialization.ZkPathsConfig; +import com.metamx.druid.jackson.DefaultObjectMapper; +import junit.framework.Assert; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + */ +public class BatchServerInventoryViewTest +{ + private static final String testBasePath = "/test"; + private static final Joiner joiner = Joiner.on("/"); + + private TestingCluster testingCluster; + private CuratorFramework cf; + private ObjectMapper jsonMapper; + private Announcer announcer; + private BatchDataSegmentAnnouncer segmentAnnouncer; + private Set testSegments; + private BatchServerInventoryView batchServerInventoryView; + + @Before + public void setUp() throws Exception + { + testingCluster = new TestingCluster(1); + testingCluster.start(); + + cf = CuratorFrameworkFactory.builder() + .connectString(testingCluster.getConnectString()) + .retryPolicy(new ExponentialBackoffRetry(1, 10)) + .compressionProvider(new PotentiallyGzippedCompressionProvider(true)) + .build(); + cf.start(); + cf.create().creatingParentsIfNeeded().forPath(testBasePath); + + jsonMapper = new DefaultObjectMapper(); + + announcer = new Announcer( + cf, + MoreExecutors.sameThreadExecutor() + ); + announcer.start(); + + segmentAnnouncer = new BatchDataSegmentAnnouncer( + new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "type", + "tier" + ), + new ZkDataSegmentAnnouncerConfig() + { + @Override + public String getZkBasePath() + { + return testBasePath; + } + + @Override + public int getSegmentsPerNode() + { + return 50; + } + + @Override + public long getMaxNumBytes() + { + return 100000; + } + + @Override + public String getAnnouncerType() + { + return "batch"; + } + }, + announcer, + jsonMapper + ); + segmentAnnouncer.start(); + + testSegments = Sets.newHashSet(); + for (int i = 0; i < 100; i++) { + testSegments.add(makeSegment(i)); + } + + batchServerInventoryView = new BatchServerInventoryView( + new ServerInventoryViewConfig() + { + @Override + public int getRemovedSegmentLifetime() + { + return 0; + } + + @Override + public String getAnnouncerType() + { + return "batch"; + } + }, + new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return testBasePath; + } + }, + cf, + Executors.newSingleThreadExecutor(), + jsonMapper + ); + + batchServerInventoryView.start(); + } + + @After + public void tearDown() throws Exception + { + batchServerInventoryView.stop(); + segmentAnnouncer.stop(); + announcer.stop(); + cf.close(); + testingCluster.stop(); + } + + @Test + public void testRun() throws Exception + { + segmentAnnouncer.announceSegments(testSegments); + + waitForSync(); + + DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0); + Set segments = Sets.newHashSet(server.getSegments().values()); + + Assert.assertEquals(testSegments, segments); + + DataSegment segment1 = makeSegment(101); + DataSegment segment2 = makeSegment(102); + + segmentAnnouncer.announceSegment(segment1); + segmentAnnouncer.announceSegment(segment2); + testSegments.add(segment1); + testSegments.add(segment2); + + waitForSync(); + + Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); + + segmentAnnouncer.unannounceSegment(segment1); + segmentAnnouncer.unannounceSegment(segment2); + testSegments.remove(segment1); + testSegments.remove(segment2); + + waitForSync(); + + Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments().values())); + } + + private DataSegment makeSegment(int offset) + { + return DataSegment.builder() + .dataSource("foo") + .interval( + new Interval( + new DateTime("2013-01-01").plusDays(offset), + new DateTime("2013-01-02").plusDays(offset) + ) + ) + .version(new DateTime().toString()) + .build(); + } + + private void waitForSync() throws Exception + { + Stopwatch stopwatch = new Stopwatch().start(); + while (Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) { + Thread.sleep(500); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 5000) { + throw new ISE("BatchServerInventoryView is not updating"); + } + } + } +} diff --git a/client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java b/client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java similarity index 96% rename from client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java rename to client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java index 72b32f26c98..608fe850a74 100644 --- a/client/src/test/java/com/metamx/druid/coordination/BatchingDataSegmentAnnouncerTest.java +++ b/client/src/test/java/com/metamx/druid/coordination/BatchDataSegmentAnnouncerTest.java @@ -47,7 +47,7 @@ import java.util.Set; /** */ -public class BatchingDataSegmentAnnouncerTest +public class BatchDataSegmentAnnouncerTest { private static final String testBasePath = "/test"; private static final String testSegmentsPath = "/test/segments/id"; @@ -58,7 +58,7 @@ public class BatchingDataSegmentAnnouncerTest private ObjectMapper jsonMapper; private Announcer announcer; private SegmentReader segmentReader; - private BatchingDataSegmentAnnouncer segmentAnnouncer; + private BatchDataSegmentAnnouncer segmentAnnouncer; private Set testSegments; @Before @@ -84,7 +84,7 @@ public class BatchingDataSegmentAnnouncerTest announcer.start(); segmentReader = new SegmentReader(cf, jsonMapper); - segmentAnnouncer = new BatchingDataSegmentAnnouncer( + segmentAnnouncer = new BatchDataSegmentAnnouncer( new DruidServerMetadata( "id", "host", @@ -111,6 +111,12 @@ public class BatchingDataSegmentAnnouncerTest { return 100000; } + + @Override + public String getAnnouncerType() + { + return "batch"; + } }, announcer, jsonMapper diff --git a/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java b/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java index 2d520418824..da72db06048 100644 --- a/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java +++ b/client/src/test/java/com/metamx/druid/curator/inventory/CuratorInventoryManagerTest.java @@ -209,6 +209,14 @@ public class CuratorInventoryManagerTest extends CuratorTestBase return container; } + @Override + public Map updateInventory( + Map container, String inventoryKey, Integer inventory + ) + { + return addInventory(container, inventoryKey, inventory); + } + @Override public Map removeInventory(Map container, String inventoryKey) { diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java index b807f931c5d..864a0087831 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java @@ -143,10 +143,7 @@ public class SpatialDimensionRowFormatter @Override public String toString() { - return "InputRow{" + - "timestamp=" + row.getTimestampFromEpoch() + - ", dimensions=" + row.getDimensions() + - '}'; + return row.toString(); } }; diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 0146af28dd4..cb5c32730ca 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -50,14 +50,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkCoordinatorConfig config; + private final ZkPathsConfig zkPaths; private final DruidServerMetadata me; private final DataSegmentAnnouncer announcer; private final CuratorFramework curator; private final ServerManager serverManager; - private final String loadQueueLocation; - private final String servedSegmentsLocation; - private volatile PathChildrenCache loadQueueCache; private volatile boolean started; @@ -73,13 +71,11 @@ public class ZkCoordinator implements DataSegmentChangeHandler { this.jsonMapper = jsonMapper; this.config = config; + this.zkPaths = zkPaths; this.me = me; this.announcer = announcer; this.curator = curator; this.serverManager = serverManager; - - this.loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); - this.servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); } @LifecycleStart @@ -91,6 +87,10 @@ public class ZkCoordinator implements DataSegmentChangeHandler return; } + final String loadQueueLocation = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName()); + final String servedSegmentsLocation = ZKPaths.makePath(zkPaths.getServedSegmentsPath(), me.getName()); + final String liveSegmentsLocation = ZKPaths.makePath(zkPaths.getLiveSegmentsPath(), me.getName()); + loadQueueCache = new PathChildrenCache( curator, loadQueueLocation, @@ -104,6 +104,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(loadQueueLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); + curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); if (config.isLoadFromSegmentCacheEnabled()) { loadCache(); diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 6c48aba9021..63521717e79 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -27,13 +27,16 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.IAE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; +import com.metamx.druid.client.BatchServerInventoryView; import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryViewConfig; +import com.metamx.druid.client.SingleServerInventoryView; import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.config.ConfigManager; @@ -131,9 +134,31 @@ public class MasterMain final ExecutorService exec = Executors.newFixedThreadPool( 1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ServerInventoryView-%s").build() ); - ServerInventoryView serverInventoryView = new ServerInventoryView( - configFactory.build(ServerInventoryViewConfig.class), zkPaths, curatorFramework, exec, jsonMapper - ); + + final ServerInventoryViewConfig serverInventoryViewConfig = configFactory.build(ServerInventoryViewConfig.class); + final String announcerType = serverInventoryViewConfig.getAnnouncerType(); + + final ServerInventoryView serverInventoryView; + if ("legacy".equalsIgnoreCase(announcerType)) { + serverInventoryView = new SingleServerInventoryView( + serverInventoryViewConfig, + zkPaths, + curatorFramework, + exec, + jsonMapper + ); + } else if ("batch".equalsIgnoreCase(announcerType)) { + serverInventoryView = new BatchServerInventoryView( + serverInventoryViewConfig, + zkPaths, + curatorFramework, + exec, + jsonMapper + ); + } else { + throw new IAE("Unknown type %s", announcerType); + } + lifecycle.addManagedInstance(serverInventoryView); final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index e0f7b5841ae..592e76f0d06 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -85,7 +85,7 @@ public class DruidMaster private final ZkPathsConfig zkPaths; private final JacksonConfigManager configManager; private final DatabaseSegmentManager databaseSegmentManager; - private final ServerInventoryView serverInventoryView; + private final ServerInventoryView serverInventoryView; private final DatabaseRuleManager databaseRuleManager; private final CuratorFramework curator; private final ServiceEmitter emitter; @@ -291,7 +291,7 @@ public class DruidMaster final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName); final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName + ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), to), segmentName ); loadPeon.loadSegment( diff --git a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java index 88f26351991..ef5185ac703 100644 --- a/server/src/main/java/com/metamx/druid/utils/DruidSetup.java +++ b/server/src/main/java/com/metamx/druid/utils/DruidSetup.java @@ -296,6 +296,7 @@ public class DruidSetup createPath(curator, zkPaths.getMasterPath(), out); createPath(curator, zkPaths.getLoadQueuePath(), out); createPath(curator, zkPaths.getServedSegmentsPath(), out); + createPath(curator, zkPaths.getLiveSegmentsPath(), out); createPath(curator, zkPaths.getPropertiesPath(), out); } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a497811e066..a49dc85a582 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -23,7 +23,8 @@ import com.google.common.collect.MapMaker; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; -import com.metamx.druid.client.ServerInventoryView; +import com.metamx.druid.client.SingleServerInventoryView; +import com.metamx.druid.curator.inventory.InventoryManagerConfig; import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.metrics.NoopServiceEmitter; @@ -44,7 +45,7 @@ public class DruidMasterTest private CuratorFramework curator; private LoadQueueTaskMaster taskMaster; private DatabaseSegmentManager databaseSegmentManager; - private ServerInventoryView serverInventoryView; + private SingleServerInventoryView serverInventoryView; private ScheduledExecutorFactory scheduledExecutorFactory; private DruidServer druidServer; private DataSegment segment; @@ -58,7 +59,7 @@ public class DruidMasterTest segment = EasyMock.createNiceMock(DataSegment.class); loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class); loadManagementPeons = new MapMaker().makeMap(); - serverInventoryView = EasyMock.createMock(ServerInventoryView.class); + serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); EasyMock.replay(databaseSegmentManager); @@ -169,6 +170,20 @@ public class DruidMasterTest EasyMock.expect(serverInventoryView.getInventoryValue("from")).andReturn(druidServer); EasyMock.expect(serverInventoryView.getInventoryValue("to")).andReturn(druidServer); + EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn(new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return ""; + } + + @Override + public String getInventoryPath() + { + return ""; + } + }); EasyMock.replay(serverInventoryView); master.moveSegment("from", "to", "dummySegment", null);