From b8ba9138ff2f781e3d2b25c7b8510aad37e32b92 Mon Sep 17 00:00:00 2001 From: cheddar Date: Tue, 23 Apr 2013 19:20:56 -0500 Subject: [PATCH] 1) Fix bug with Master not starting up without an indexer specified 2) Fix bug with PotentiallyGzippedCompressionProvider only catching ZipExceptions and not IOException (java 6 throws IO, java 7 throws Zip) 3) Create DruidServerMetadata and use that instead of DruidServer to represent the current server --- .../java/com/metamx/druid/QueryableNode.java | 45 ++++----- .../com/metamx/druid/client/DruidServer.java | 51 ++++------- .../CuratorDataSegmentAnnouncer.java | 4 +- .../coordination/DruidServerMetadata.java | 91 +++++++++++++++++++ ...PotentiallyGzippedCompressionProvider.java | 3 +- .../inventory/CuratorInventoryManager.java | 1 + .../druid/coordination/ZkCoordinator.java | 4 +- .../com/metamx/druid/http/ComputeNode.java | 8 +- .../com/metamx/druid/http/MasterMain.java | 17 ++-- .../druid/http/MasterServletModule.java | 8 +- .../druid/master/DruidMasterBalancer.java | 2 +- .../metamx/druid/metrics/ServerMonitor.java | 5 +- .../druid/coordination/ZkCoordinatorTest.java | 33 +------ 13 files changed, 159 insertions(+), 113 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index e7a7ca0595b..9d33ff963bc 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -33,13 +33,13 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; -import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.client.ServerInventoryThingieConfig; import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; @@ -85,7 +85,7 @@ public abstract class QueryableNode extends Registering private final ConfigurationObjectFactory configFactory; private final String nodeType; - private DruidServer druidServer = null; + private DruidServerMetadata druidServerMetadata = null; private ServiceEmitter emitter = null; private List monitors = null; private Server server = null; @@ -127,9 +127,9 @@ public abstract class QueryableNode extends Registering this.nodeType = nodeType; } - public T setDruidServer(DruidServer druidServer) + public T setDruidServerMetadata(DruidServerMetadata druidServerMetadata) { - checkFieldNotSetAndSet("druidServer", druidServer); + checkFieldNotSetAndSet("druidServerMetadata", druidServerMetadata); return (T) this; } @@ -146,7 +146,7 @@ public abstract class QueryableNode extends Registering checkFieldNotSetAndSet("announcer", announcer); return (T) this; } - + @SuppressWarnings("unchecked") public T setEmitter(ServiceEmitter emitter) { @@ -237,10 +237,10 @@ public abstract class QueryableNode extends Registering return configFactory; } - public DruidServer getDruidServer() + public DruidServerMetadata getDruidServerMetadata() { - initializeDruidServer(); - return druidServer; + initializeDruidServerMetadata(); + return druidServerMetadata; } public CuratorFramework getCuratorFramework() @@ -297,10 +297,18 @@ public abstract class QueryableNode extends Registering return serverInventoryThingie; } - private void initializeDruidServer() + private void initializeDruidServerMetadata() { - if (druidServer == null) { - setDruidServer(new DruidServer(getConfigFactory().build(DruidServerConfig.class), nodeType)); + if (druidServerMetadata == null) { + final DruidServerConfig serverConfig = getConfigFactory().build(DruidServerConfig.class); + setDruidServerMetadata( + new DruidServerMetadata( + serverConfig.getServerName(), + serverConfig.getHost(), + serverConfig.getMaxSize(), + nodeType, serverConfig.getTier() + ) + ); } } @@ -377,14 +385,10 @@ public abstract class QueryableNode extends Registering private void initializeAnnouncer() { if (announcer == null) { - setAnnouncer( - new CuratorDataSegmentAnnouncer( - getDruidServer(), - getZkPaths(), - new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")), - getJsonMapper() - ) - ); + final Announcer announcer = new Announcer(getCuratorFramework(), Execs.singleThreaded("Announcer-%s")); + lifecycle.addManagedInstance(announcer); + + setAnnouncer(new CuratorDataSegmentAnnouncer(getDruidServerMetadata(), getZkPaths(), announcer, getJsonMapper())); lifecycle.addManagedInstance(getAnnouncer()); } } @@ -436,8 +440,7 @@ public abstract class QueryableNode extends Registering private void initializeEmitter() { if (emitter == null) { - final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder() - .withNumConnections(1); + final HttpClientConfig.Builder configBuilder = HttpClientConfig.builder().withNumConnections(1); final String emitterTimeoutDuration = props.getProperty("druid.emitter.timeOut"); if (emitterTimeoutDuration != null) { diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java index 89de7ee7c70..e3491dd82d8 100644 --- a/client/src/main/java/com/metamx/druid/client/DruidServer.java +++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; +import com.metamx.druid.coordination.DruidServerMetadata; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,14 +37,10 @@ public class DruidServer implements Comparable private final Object lock = new Object(); - private final String name; private final ConcurrentMap dataSources; private final ConcurrentMap segments; - private final String host; - private final long maxSize; - private final String type; - private final String tier; + private final DruidServerMetadata metadata; private volatile long currSize; @@ -70,11 +67,7 @@ public class DruidServer implements Comparable @JsonProperty("tier") String tier ) { - this.name = name; - this.host = host; - this.maxSize = maxSize; - this.type = type; - this.tier = tier; + this.metadata = new DruidServerMetadata(name, host, maxSize, type, tier); this.dataSources = new ConcurrentHashMap(); this.segments = new ConcurrentHashMap(); @@ -82,24 +75,18 @@ public class DruidServer implements Comparable public String getName() { - return name; + return metadata.getName(); } - public Map getStringProps() + public DruidServerMetadata getMetadata() { - return ImmutableMap.of( - "name", name, - "host", host, - "maxSize", String.valueOf(maxSize), - "type", type, - "tier", tier - ); + return metadata; } @JsonProperty public String getHost() { - return host; + return metadata.getHost(); } @JsonProperty @@ -111,19 +98,19 @@ public class DruidServer implements Comparable @JsonProperty public long getMaxSize() { - return maxSize; + return metadata.getMaxSize(); } @JsonProperty public String getType() { - return type; + return metadata.getType(); } @JsonProperty public String getTier() { - return tier; + return metadata.getTier(); } @JsonProperty @@ -175,7 +162,7 @@ public class DruidServer implements Comparable DataSegment segment = segments.get(segmentName); if (segment == null) { - log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", name, segmentName); + log.warn("Asked to remove data segment that doesn't exist!? server[%s], segment[%s]", getName(), segmentName); return this; } @@ -186,7 +173,7 @@ public class DruidServer implements Comparable "Asked to remove data segment from dataSource[%s] that doesn't exist, but the segment[%s] exists!?!?!?! wtf? server[%s]", segment.getDataSource(), segmentName, - name + getName() ); return this; } @@ -224,7 +211,7 @@ public class DruidServer implements Comparable DruidServer that = (DruidServer) o; - if (name != null ? !name.equals(that.name) : that.name != null) { + if (getName() != null ? !getName().equals(that.getName()) : that.getName() != null) { return false; } @@ -234,19 +221,13 @@ public class DruidServer implements Comparable @Override public int hashCode() { - return name != null ? name.hashCode() : 0; + return getName() != null ? getName().hashCode() : 0; } @Override public String toString() { - return "DruidServer{" + - "name='" + name + '\'' + - ", host='" + host + '\'' + - ", maxSize=" + maxSize + - ", type=" + type + - ", tier=" + tier + - '}'; + return metadata.toString(); } @Override @@ -259,6 +240,6 @@ public class DruidServer implements Comparable return 1; } - return name.compareTo(((DruidServer) o).name); + return getName().compareTo(((DruidServer) o).getName()); } } diff --git a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java index 38d9981e29d..ff16b7e9025 100644 --- a/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/CuratorDataSegmentAnnouncer.java @@ -41,7 +41,7 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer private final Object lock = new Object(); - private final DruidServer server; + private final DruidServerMetadata server; private final ZkPathsConfig config; private final Announcer announcer; private final ObjectMapper jsonMapper; @@ -50,7 +50,7 @@ public class CuratorDataSegmentAnnouncer implements DataSegmentAnnouncer private volatile boolean started = false; public CuratorDataSegmentAnnouncer( - DruidServer server, + DruidServerMetadata server, ZkPathsConfig config, Announcer announcer, ObjectMapper jsonMapper diff --git a/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java new file mode 100644 index 00000000000..25c9c9875e8 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/coordination/DruidServerMetadata.java @@ -0,0 +1,91 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class DruidServerMetadata +{ + private final String name; + private final String host; + private final long maxSize; + private final String tier; + private final String type; + + @JsonCreator + public DruidServerMetadata( + @JsonProperty("name") String name, + @JsonProperty("host") String host, + @JsonProperty("maxSize") long maxSize, + @JsonProperty("type") String type, @JsonProperty("tier") String tier + ) + { + this.name = name; + this.host = host; + this.maxSize = maxSize; + this.tier = tier; + this.type = type; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public long getMaxSize() + { + return maxSize; + } + + @JsonProperty + public String getTier() + { + return tier; + } + + @JsonProperty + public String getType() + { + return type; + } + + @Override + public String toString() + { + return "DruidServer{" + + "name='" + name + '\'' + + ", host='" + host + '\'' + + ", maxSize=" + maxSize + + ", tier='" + tier + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/client/src/main/java/com/metamx/druid/curator/PotentiallyGzippedCompressionProvider.java b/client/src/main/java/com/metamx/druid/curator/PotentiallyGzippedCompressionProvider.java index 34bfad8fe3c..676d1adcee6 100644 --- a/client/src/main/java/com/metamx/druid/curator/PotentiallyGzippedCompressionProvider.java +++ b/client/src/main/java/com/metamx/druid/curator/PotentiallyGzippedCompressionProvider.java @@ -22,6 +22,7 @@ package com.metamx.druid.curator; import com.netflix.curator.framework.api.CompressionProvider; import com.netflix.curator.framework.imps.GzipCompressionProvider; +import java.io.IOException; import java.util.zip.ZipException; /** @@ -48,7 +49,7 @@ public class PotentiallyGzippedCompressionProvider implements CompressionProvide try { return base.decompress(path, data); } - catch (ZipException e) { + catch (IOException e) { return data; } } diff --git a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java index d2f23380810..71f06e71456 100644 --- a/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/curator/inventory/CuratorInventoryManager.java @@ -209,6 +209,7 @@ public class CuratorInventoryManager } final String inventoryPath = String.format("%s/%s", config.getContainerPath(), containerKey); + log.info("Creating listener on inventory[%s]", inventoryPath); PathChildrenCache containerCache = cacheFactory.make(curatorFramework, inventoryPath); containerCache.getListenable().addListener(new InventoryCacheListener(containerKey, inventoryPath)); diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 17b925bef37..5c2cf775934 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -53,7 +53,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final ZkCoordinatorConfig config; - private final DruidServer me; + private final DruidServerMetadata me; private final DataSegmentAnnouncer announcer; private final CuratorFramework curator; private final ServerManager serverManager; @@ -68,7 +68,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler public ZkCoordinator( ObjectMapper jsonMapper, ZkCoordinatorConfig config, - DruidServer me, + DruidServerMetadata me, DataSegmentAnnouncer announcer, CuratorFramework curator, ServerManager serverManager, diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 3b591f83bd0..57f2ef1db38 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -31,13 +31,9 @@ import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.BaseServerNode; -import com.metamx.druid.client.DruidServer; -import com.metamx.druid.client.DruidServerConfig; -import com.metamx.druid.concurrent.Execs; import com.metamx.druid.coordination.ServerManager; import com.metamx.druid.coordination.ZkCoordinator; import com.metamx.druid.coordination.ZkCoordinatorConfig; -import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.jackson.DefaultObjectMapper; @@ -121,7 +117,7 @@ public class ComputeNode extends BaseServerNode final ZkCoordinator coordinator = new ZkCoordinator( getJsonMapper(), getConfigFactory().build(ZkCoordinatorConfig.class), - getDruidServer(), + getDruidServerMetadata(), getAnnouncer(), getCuratorFramework(), serverManager, @@ -129,7 +125,7 @@ public class ComputeNode extends BaseServerNode ); lifecycle.addManagedInstance(coordinator); - monitors.add(new ServerMonitor(getDruidServer(), serverManager)); + monitors.add(new ServerMonitor(getDruidServerMetadata(), serverManager)); startMonitoring(monitors); final Context root = new Context(getServer(), "/", Context.SESSIONS); diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index b3cb5e99f3e..2643b657569 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -99,16 +99,13 @@ public class MasterMain final ConfigurationObjectFactory configFactory = Config.createFactory(props); final Lifecycle lifecycle = new Lifecycle(); - final HttpClient httpClient = HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).withReadTimeout( - new Duration( - PropUtils.getProperty( - props, - "druid.emitter.timeOut" - ) - ) - ).build(), lifecycle - ); + final HttpClientConfig.Builder httpClientConfigBuilder = HttpClientConfig.builder().withNumConnections(1); + + final String emitterTimeout = props.getProperty("druid.emitter.timeOut"); + if (emitterTimeout != null) { + httpClientConfigBuilder.withReadTimeout(new Duration(emitterTimeout)); + } + final HttpClient httpClient = HttpClientInit.createClient(httpClientConfigBuilder.build(), lifecycle); final ServiceEmitter emitter = new ServiceEmitter( PropUtils.getProperty(props, "druid.service"), diff --git a/server/src/main/java/com/metamx/druid/http/MasterServletModule.java b/server/src/main/java/com/metamx/druid/http/MasterServletModule.java index 63f51922b3f..02066ebf3ed 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterServletModule.java +++ b/server/src/main/java/com/metamx/druid/http/MasterServletModule.java @@ -22,6 +22,7 @@ package com.metamx.druid.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.inject.Provides; +import com.google.inject.util.Providers; import com.metamx.druid.client.ServerInventoryThingie; import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.db.DatabaseRuleManager; @@ -69,7 +70,12 @@ public class MasterServletModule extends JerseyServletModule bind(DatabaseSegmentManager.class).toInstance(segmentInventoryManager); bind(DatabaseRuleManager.class).toInstance(databaseRuleManager); bind(DruidMaster.class).toInstance(master); - bind(IndexingServiceClient.class).toInstance(indexingServiceClient); + if (indexingServiceClient == null) { + bind(IndexingServiceClient.class).toProvider(Providers.of(null)); + } + else { + bind(IndexingServiceClient.class).toInstance(indexingServiceClient); + } serve("/*").with(GuiceContainer.class); } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index d2e548fd10f..9f031146cce 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -67,7 +67,7 @@ public class DruidMasterBalancer implements DruidMasterHelper if (holder.getLifetime() <= 0) { log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier) .addData("segment", holder.getSegment().getIdentifier()) - .addData("server", holder.getFromServer().getStringProps()) + .addData("server", holder.getFromServer().getMetadata()) .emit(); } } diff --git a/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java b/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java index 176f9e3d174..7db7e3ba922 100644 --- a/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java +++ b/server/src/main/java/com/metamx/druid/metrics/ServerMonitor.java @@ -20,6 +20,7 @@ package com.metamx.druid.metrics; import com.metamx.druid.client.DruidServer; +import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.ServerManager; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -29,11 +30,11 @@ import java.util.Map; public class ServerMonitor extends AbstractMonitor { - private final DruidServer druidServer; + private final DruidServerMetadata druidServer; private final ServerManager serverManager; public ServerMonitor( - DruidServer druidServer, + DruidServerMetadata druidServer, ServerManager serverManager ) { diff --git a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java index 797253cbf02..addcda42378 100644 --- a/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/com/metamx/druid/coordination/ZkCoordinatorTest.java @@ -25,9 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.client.DruidServer; -import com.metamx.druid.client.DruidServerConfig; -import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.CacheTestSegmentLoader; @@ -108,35 +105,7 @@ public class ZkCoordinatorTest return cacheDir; } }, - new DruidServer( - new DruidServerConfig() - { - @Override - public String getServerName() - { - return "dummyServer"; - } - - @Override - public String getHost() - { - return "dummyHost"; - } - - @Override - public long getMaxSize() - { - return 0; - } - - @Override - public String getTier() - { - return "normal"; - } - }, - "dummyType" - ), + new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal"), announcer, curator, serverManager,