diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 5e5c93d989b..01696f86bef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -160,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } final List command = Lists.newArrayList(); - final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); + final String childHost = node.getHost(); final String taskClasspath; if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { taskClasspath = Joiner.on(File.pathSeparator).join( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 5d2d2b92829..61a7b595437 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -177,7 +177,7 @@ public class TaskMaster } ); - leaderSelector.setId(node.getHost()); + leaderSelector.setId(node.getHostAndPort()); leaderSelector.autoRequeue(); } diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index b96c82361bb..3ee6746bbff 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -60,8 +60,8 @@ public class DruidServer implements Comparable ) { this( - node.getHost(), - node.getHost(), + node.getHostAndPort(), + node.getHostAndPort(), config.getMaxSize(), type, config.getTier(), diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java index 581f91a57c1..57f2e4a7eca 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java @@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer try { instance = ServiceInstance.builder() .name(serviceName) - .address(service.getHostNoPort()) + .address(service.getHost()) .port(service.getPort()) .build(); } diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index b983a74eb85..cb6c6829ac1 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -66,8 +66,8 @@ public class StorageNodeModule implements Module } return new DruidServerMetadata( - node.getHost(), - node.getHost(), + node.getHostAndPort(), + node.getHostAndPort(), config.getMaxSize(), nodeType.getNodeType(), config.getTier(), diff --git a/server/src/main/java/io/druid/server/DruidNode.java b/server/src/main/java/io/druid/server/DruidNode.java index 1928d6487d8..cfe0a007d7b 100644 --- a/server/src/main/java/io/druid/server/DruidNode.java +++ b/server/src/main/java/io/druid/server/DruidNode.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.inject.name.Named; +import com.metamx.common.IAE; import io.druid.common.utils.SocketUtil; import javax.validation.constraints.Max; @@ -35,8 +36,6 @@ public class DruidNode { public static final String DEFAULT_HOST = "localhost"; - private String hostNoPort; - @JsonProperty("service") @NotNull private String serviceName; @@ -59,43 +58,43 @@ public class DruidNode init(serviceName, host, port); } + /** + * host = "abc:123", port = null -> host = abc, port = 123 + * host = "abc:fff", port = null -> host = abc, port = -1 + * host = "abc" , port = null -> host = abc, port = _auto_ + * host = null , port = null -> host = _default_, port = -1 + * host = "abc:123 , port = 456 -> throw IAE + * host = "abc:fff , port = 456 -> throw IAE + * host = "abc:123 , port = 123 -> host = abc, port = 123 + * host = "abc" , port = 123 -> host = abc, port = 123 + * host = null , port = 123 -> host = _default_, port = 123 + */ private void init(String serviceName, String host, Integer port) { this.serviceName = serviceName; - if (port == null) { - if (host == null) { - setHostAndPort(DEFAULT_HOST, -1, DEFAULT_HOST); + if (host != null && host.contains(":")) { + final String[] hostParts = host.split(":"); + int parsedPort = -1; + try { + parsedPort = Integer.parseInt(hostParts[1]); } - else if (host.contains(":")) { - final String[] hostParts = host.split(":"); - try { - setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]); - } - catch (NumberFormatException e) { - setHostAndPort(host, -1, hostParts[0]); - } + catch (NumberFormatException e) { + // leave -1 } - else { - final int openPort = SocketUtil.findOpenPort(8080); - setHostAndPort(String.format("%s:%d", host, openPort), openPort, host); + if (port != null && port != parsedPort) { + throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, port); } + host = hostParts[0]; + port = parsedPort; } - else { - if (host == null || host.contains(":")) { - setHostAndPort(host == null ? DEFAULT_HOST : host, port, host == null ? DEFAULT_HOST : host.split(":")[0]); - } - else { - setHostAndPort(String.format("%s:%d", host, port), port, host); - } - } - } - private void setHostAndPort(String host, int port, String hostNoPort) - { - this.host = host; - this.port = port; - this.hostNoPort = hostNoPort; + if (port == null && host != null) { + port = SocketUtil.findOpenPort(8080); + } + + this.port = port != null ? port : -1; + this.host = host != null ? host : DEFAULT_HOST; } public String getServiceName() @@ -113,9 +112,8 @@ public class DruidNode return port; } - public String getHostNoPort() - { - return hostNoPort; + public String getHostAndPort() { + return String.format("%s:%d", host, port); } @Override diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index 58827baf93d..b12aa3e5299 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -212,7 +212,7 @@ public class DruidClusterBridge private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost() + curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort() ); newLeaderLatch.addListener( @@ -305,8 +305,8 @@ public class DruidClusterBridge log.warn("No servers founds!"); } else { DruidServerMetadata me = new DruidServerMetadata( - self.getHost(), - self.getHost(), + self.getHostAndPort(), + self.getHostAndPort(), totalMaxSize, NODE_TYPE, config.getTier(), @@ -314,8 +314,8 @@ public class DruidClusterBridge ); try { - final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost()); - log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize); + final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHostAndPort()); + log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHostAndPort(), totalMaxSize); announcer.update(path, jsonMapper.writeValueAsBytes(me)); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 310ccf1979a..7749a246a5e 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -454,7 +454,7 @@ public class DruidCoordinator private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHost() + curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPort() ); newLeaderLatch.addListener( diff --git a/server/src/main/java/io/druid/server/initialization/EmitterModule.java b/server/src/main/java/io/druid/server/initialization/EmitterModule.java index 410bd70ac82..fbac24bd9cf 100644 --- a/server/src/main/java/io/druid/server/initialization/EmitterModule.java +++ b/server/src/main/java/io/druid/server/initialization/EmitterModule.java @@ -79,7 +79,7 @@ public class EmitterModule implements Module public ServiceEmitter getServiceEmitter(@Self Supplier configSupplier, Emitter emitter) { final DruidNode config = configSupplier.get(); - final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter); + final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHostAndPort(), emitter); EmittingLogger.registerEmitter(retVal); return retVal; } diff --git a/server/src/test/java/io/druid/server/DruidNodeTest.java b/server/src/test/java/io/druid/server/DruidNodeTest.java new file mode 100644 index 00000000000..083f9e5cce8 --- /dev/null +++ b/server/src/test/java/io/druid/server/DruidNodeTest.java @@ -0,0 +1,76 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import org.junit.Assert; +import org.junit.Test; + +public class DruidNodeTest +{ + @Test + public void testDefaultsAndSanity() throws Exception + { + final String service = "test/service"; + + DruidNode node; + node = new DruidNode(service, "abc:123", null); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, "abc:fff", null); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(-1, node.getPort()); + + node = new DruidNode(service, "abc", null); + Assert.assertEquals("abc", node.getHost()); + Assert.assertTrue(8080 <= node.getPort()); + + node = new DruidNode(service, null, null); + Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost()); + Assert.assertEquals(-1, node.getPort()); + + node = new DruidNode(service, "abc:123", 123); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, "abc", 123); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, null, 123); + Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost()); + Assert.assertEquals(123, node.getPort()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConflictingPorts() throws Exception + { + new DruidNode("test/service", "abc:123", 456); + } + + @Test(expected = IllegalArgumentException.class) + public void testConflictingPortsNonsense() throws Exception + { + new DruidNode("test/service", "abc:fff", 456); + } +} diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index 657d64f2d0f..7440367a562 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -167,7 +167,7 @@ public class DruidClusterBridgeTest Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); announcer.start(); - announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me)); + announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me)); BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 0414c2bd562..6999b262f7a 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -103,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable public Worker getWorker(@Self DruidNode node, WorkerConfig config) { return new Worker( - node.getHost(), + node.getHostAndPort(), config.getIp(), config.getCapacity(), config.getVersion()