diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 5e5c93d989b..01696f86bef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -160,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer } final List command = Lists.newArrayList(); - final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); + final String childHost = node.getHost(); final String taskClasspath; if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { taskClasspath = Joiner.on(File.pathSeparator).join( diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 5d2d2b92829..61a7b595437 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -177,7 +177,7 @@ public class TaskMaster } ); - leaderSelector.setId(node.getHost()); + leaderSelector.setId(node.getHostAndPort()); leaderSelector.autoRequeue(); } diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index b96c82361bb..3ee6746bbff 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -60,8 +60,8 @@ public class DruidServer implements Comparable ) { this( - node.getHost(), - node.getHost(), + node.getHostAndPort(), + node.getHostAndPort(), config.getMaxSize(), type, config.getTier(), diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java index 581f91a57c1..57f2e4a7eca 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorServiceAnnouncer.java @@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer try { instance = ServiceInstance.builder() .name(serviceName) - .address(service.getHostNoPort()) + .address(service.getHost()) .port(service.getPort()) .build(); } diff --git a/server/src/main/java/io/druid/guice/StorageNodeModule.java b/server/src/main/java/io/druid/guice/StorageNodeModule.java index b983a74eb85..cb6c6829ac1 100644 --- a/server/src/main/java/io/druid/guice/StorageNodeModule.java +++ b/server/src/main/java/io/druid/guice/StorageNodeModule.java @@ -66,8 +66,8 @@ public class StorageNodeModule implements Module } return new DruidServerMetadata( - node.getHost(), - node.getHost(), + node.getHostAndPort(), + node.getHostAndPort(), config.getMaxSize(), nodeType.getNodeType(), config.getTier(), diff --git a/server/src/main/java/io/druid/server/DruidNode.java b/server/src/main/java/io/druid/server/DruidNode.java index 1928d6487d8..363a5614df4 100644 --- a/server/src/main/java/io/druid/server/DruidNode.java +++ b/server/src/main/java/io/druid/server/DruidNode.java @@ -22,7 +22,10 @@ package io.druid.server; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import com.google.inject.name.Named; +import com.metamx.common.IAE; import io.druid.common.utils.SocketUtil; import javax.validation.constraints.Max; @@ -35,8 +38,6 @@ public class DruidNode { public static final String DEFAULT_HOST = "localhost"; - private String hostNoPort; - @JsonProperty("service") @NotNull private String serviceName; @@ -49,6 +50,22 @@ public class DruidNode @Min(0) @Max(0xffff) private int port = -1; + /** + * host = null , port = null -> host = _default_, port = -1 + * host = "abc:123", port = null -> host = abc, port = 123 + * host = "abc:fff", port = null -> throw IAE (invalid ipv6 host) + * host = "2001:db8:85a3::8a2e:370:7334", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = _auto_ + * host = "[2001:db8:85a3::8a2e:370:7334]", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = _auto_ + * host = "abc" , port = null -> host = abc, port = _auto_ + * host = "abc" , port = 123 -> host = abc, port = 123 + * host = "abc:123 , port = 123 -> host = abc, port = 123 + * host = "abc:123 , port = 456 -> throw IAE (conflicting port) + * host = "abc:fff , port = 456 -> throw IAE (invalid ipv6 host) + * host = "[2001:db8:85a3::8a2e:370:7334]:123", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = 123 + * host = "[2001:db8:85a3::8a2e:370:7334]", port = 123 -> host = 2001:db8:85a3::8a2e:370:7334, port = 123 + * host = "2001:db8:85a3::8a2e:370:7334", port = 123 -> host = 2001:db8:85a3::8a2e:370:7334, port = 123 + * host = null , port = 123 -> host = _default_, port = 123 + */ @JsonCreator public DruidNode( @JacksonInject @Named("serviceName") @JsonProperty("service") String serviceName, @@ -59,43 +76,40 @@ public class DruidNode init(serviceName, host, port); } + private void init(String serviceName, String host, Integer port) { + Preconditions.checkNotNull(serviceName); this.serviceName = serviceName; - if (port == null) { - if (host == null) { - setHostAndPort(DEFAULT_HOST, -1, DEFAULT_HOST); - } - else if (host.contains(":")) { - final String[] hostParts = host.split(":"); - try { - setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]); - } - catch (NumberFormatException e) { - setHostAndPort(host, -1, hostParts[0]); - } - } - else { - final int openPort = SocketUtil.findOpenPort(8080); - setHostAndPort(String.format("%s:%d", host, openPort), openPort, host); - } + if(host == null && port == null) { + host = DEFAULT_HOST; + port = -1; } else { - if (host == null || host.contains(":")) { - setHostAndPort(host == null ? DEFAULT_HOST : host, port, host == null ? DEFAULT_HOST : host.split(":")[0]); + final HostAndPort hostAndPort; + if (host != null) { + hostAndPort = HostAndPort.fromString(host); + if (port != null && hostAndPort.hasPort() && port != hostAndPort.getPort()) { + throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, port); + } + } else { + hostAndPort = HostAndPort.fromParts(DEFAULT_HOST, port); } - else { - setHostAndPort(String.format("%s:%d", host, port), port, host); + + host = hostAndPort.getHostText(); + + if (hostAndPort.hasPort()) { + port = hostAndPort.getPort(); + } + + if (port == null) { + port = SocketUtil.findOpenPort(8080); } } - } - private void setHostAndPort(String host, int port, String hostNoPort) - { - this.host = host; this.port = port; - this.hostNoPort = hostNoPort; + this.host = host; } public String getServiceName() @@ -113,9 +127,11 @@ public class DruidNode return port; } - public String getHostNoPort() - { - return hostNoPort; + /** + * Returns host and port together as something that can be used as part of a URI. + */ + public String getHostAndPort() { + return HostAndPort.fromParts(host, port).toString(); } @Override diff --git a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java index 58827baf93d..b12aa3e5299 100644 --- a/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java +++ b/server/src/main/java/io/druid/server/bridge/DruidClusterBridge.java @@ -212,7 +212,7 @@ public class DruidClusterBridge private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost() + curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort() ); newLeaderLatch.addListener( @@ -305,8 +305,8 @@ public class DruidClusterBridge log.warn("No servers founds!"); } else { DruidServerMetadata me = new DruidServerMetadata( - self.getHost(), - self.getHost(), + self.getHostAndPort(), + self.getHostAndPort(), totalMaxSize, NODE_TYPE, config.getTier(), @@ -314,8 +314,8 @@ public class DruidClusterBridge ); try { - final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost()); - log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize); + final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHostAndPort()); + log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHostAndPort(), totalMaxSize); announcer.update(path, jsonMapper.writeValueAsBytes(me)); } catch (Exception e) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 310ccf1979a..7749a246a5e 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -454,7 +454,7 @@ public class DruidCoordinator private LeaderLatch createNewLeaderLatch() { final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHost() + curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPort() ); newLeaderLatch.addListener( diff --git a/server/src/main/java/io/druid/server/initialization/EmitterModule.java b/server/src/main/java/io/druid/server/initialization/EmitterModule.java index 410bd70ac82..fbac24bd9cf 100644 --- a/server/src/main/java/io/druid/server/initialization/EmitterModule.java +++ b/server/src/main/java/io/druid/server/initialization/EmitterModule.java @@ -79,7 +79,7 @@ public class EmitterModule implements Module public ServiceEmitter getServiceEmitter(@Self Supplier configSupplier, Emitter emitter) { final DruidNode config = configSupplier.get(); - final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter); + final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHostAndPort(), emitter); EmittingLogger.registerEmitter(retVal); return retVal; } diff --git a/server/src/test/java/io/druid/server/DruidNodeTest.java b/server/src/test/java/io/druid/server/DruidNodeTest.java new file mode 100644 index 00000000000..96721dece00 --- /dev/null +++ b/server/src/test/java/io/druid/server/DruidNodeTest.java @@ -0,0 +1,108 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import org.junit.Assert; +import org.junit.Test; + +public class DruidNodeTest +{ + @Test + public void testDefaultsAndSanity() throws Exception + { + final String service = "test/service"; + + DruidNode node; + + node = new DruidNode(service, null, null); + Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost()); + Assert.assertEquals(-1, node.getPort()); + + node = new DruidNode(service, "abc:123", null); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, "2001:db8:85a3::8a2e:370:7334", null); + Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost()); + Assert.assertTrue(8080 <= node.getPort()); + + node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]", null); + Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost()); + Assert.assertTrue(8080 <= node.getPort()); + + node = new DruidNode(service, "abc", null); + Assert.assertEquals("abc", node.getHost()); + Assert.assertTrue(8080 <= node.getPort()); + + node = new DruidNode(service, "abc", 123); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, "abc:123", 123); + Assert.assertEquals("abc", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("abc:123", node.getHostAndPort()); + + node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]:123", null); + Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort()); + + node = new DruidNode(service, "2001:db8:85a3::8a2e:370:7334", 123); + Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort()); + + node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]", 123); + Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost()); + Assert.assertEquals(123, node.getPort()); + Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort()); + + node = new DruidNode(service, null, 123); + Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost()); + Assert.assertEquals(123, node.getPort()); + } + + @Test(expected = IllegalArgumentException.class) + public void testConflictingPorts() throws Exception + { + new DruidNode("test/service", "abc:123", 456); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidIPv6WithPort() throws Exception + { + new DruidNode("test/service", "[abc:fff]:123", 456); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidIPv6() throws Exception + { + new DruidNode("test/service", "abc:fff", 456); + } + + @Test(expected = IllegalArgumentException.class) + public void testConflictingPortsNonsense() throws Exception + { + new DruidNode("test/service", "[2001:db8:85a3::8a2e:370:7334]:123", 456); + } +} diff --git a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java index 657d64f2d0f..7440367a562 100644 --- a/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java +++ b/server/src/test/java/io/druid/server/bridge/DruidClusterBridgeTest.java @@ -167,7 +167,7 @@ public class DruidClusterBridgeTest Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); announcer.start(); - announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me)); + announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me)); BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); diff --git a/services/src/main/java/io/druid/cli/CliMiddleManager.java b/services/src/main/java/io/druid/cli/CliMiddleManager.java index 0414c2bd562..6999b262f7a 100644 --- a/services/src/main/java/io/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/io/druid/cli/CliMiddleManager.java @@ -103,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable public Worker getWorker(@Self DruidNode node, WorkerConfig config) { return new Worker( - node.getHost(), + node.getHostAndPort(), config.getIp(), config.getCapacity(), config.getVersion()