Merge pull request #917 from metamx/explicit-host-port

make host+port more explicit + ipv6 support
This commit is contained in:
Fangjin Yang 2014-12-01 13:57:00 -07:00
commit 07b6e6cbe9
12 changed files with 171 additions and 47 deletions

View File

@ -160,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
} }
final List<String> command = Lists.newArrayList(); final List<String> command = Lists.newArrayList();
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); final String childHost = node.getHost();
final String taskClasspath; final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) { if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join( taskClasspath = Joiner.on(File.pathSeparator).join(

View File

@ -177,7 +177,7 @@ public class TaskMaster
} }
); );
leaderSelector.setId(node.getHost()); leaderSelector.setId(node.getHostAndPort());
leaderSelector.autoRequeue(); leaderSelector.autoRequeue();
} }

View File

@ -60,8 +60,8 @@ public class DruidServer implements Comparable
) )
{ {
this( this(
node.getHost(), node.getHostAndPort(),
node.getHost(), node.getHostAndPort(),
config.getMaxSize(), config.getMaxSize(),
type, type,
config.getTier(), config.getTier(),

View File

@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer
try { try {
instance = ServiceInstance.<Void>builder() instance = ServiceInstance.<Void>builder()
.name(serviceName) .name(serviceName)
.address(service.getHostNoPort()) .address(service.getHost())
.port(service.getPort()) .port(service.getPort())
.build(); .build();
} }

View File

@ -66,8 +66,8 @@ public class StorageNodeModule implements Module
} }
return new DruidServerMetadata( return new DruidServerMetadata(
node.getHost(), node.getHostAndPort(),
node.getHost(), node.getHostAndPort(),
config.getMaxSize(), config.getMaxSize(),
nodeType.getNodeType(), nodeType.getNodeType(),
config.getTier(), config.getTier(),

View File

@ -22,7 +22,10 @@ package io.druid.server;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import com.metamx.common.IAE;
import io.druid.common.utils.SocketUtil; import io.druid.common.utils.SocketUtil;
import javax.validation.constraints.Max; import javax.validation.constraints.Max;
@ -35,8 +38,6 @@ public class DruidNode
{ {
public static final String DEFAULT_HOST = "localhost"; public static final String DEFAULT_HOST = "localhost";
private String hostNoPort;
@JsonProperty("service") @JsonProperty("service")
@NotNull @NotNull
private String serviceName; private String serviceName;
@ -49,6 +50,22 @@ public class DruidNode
@Min(0) @Max(0xffff) @Min(0) @Max(0xffff)
private int port = -1; private int port = -1;
/**
* host = null , port = null -> host = _default_, port = -1
* host = "abc:123", port = null -> host = abc, port = 123
* host = "abc:fff", port = null -> throw IAE (invalid ipv6 host)
* host = "2001:db8:85a3::8a2e:370:7334", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = _auto_
* host = "[2001:db8:85a3::8a2e:370:7334]", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = _auto_
* host = "abc" , port = null -> host = abc, port = _auto_
* host = "abc" , port = 123 -> host = abc, port = 123
* host = "abc:123 , port = 123 -> host = abc, port = 123
* host = "abc:123 , port = 456 -> throw IAE (conflicting port)
* host = "abc:fff , port = 456 -> throw IAE (invalid ipv6 host)
* host = "[2001:db8:85a3::8a2e:370:7334]:123", port = null -> host = 2001:db8:85a3::8a2e:370:7334, port = 123
* host = "[2001:db8:85a3::8a2e:370:7334]", port = 123 -> host = 2001:db8:85a3::8a2e:370:7334, port = 123
* host = "2001:db8:85a3::8a2e:370:7334", port = 123 -> host = 2001:db8:85a3::8a2e:370:7334, port = 123
* host = null , port = 123 -> host = _default_, port = 123
*/
@JsonCreator @JsonCreator
public DruidNode( public DruidNode(
@JacksonInject @Named("serviceName") @JsonProperty("service") String serviceName, @JacksonInject @Named("serviceName") @JsonProperty("service") String serviceName,
@ -59,43 +76,40 @@ public class DruidNode
init(serviceName, host, port); init(serviceName, host, port);
} }
private void init(String serviceName, String host, Integer port) private void init(String serviceName, String host, Integer port)
{ {
Preconditions.checkNotNull(serviceName);
this.serviceName = serviceName; this.serviceName = serviceName;
if (port == null) { if(host == null && port == null) {
if (host == null) { host = DEFAULT_HOST;
setHostAndPort(DEFAULT_HOST, -1, DEFAULT_HOST); port = -1;
}
else if (host.contains(":")) {
final String[] hostParts = host.split(":");
try {
setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]);
}
catch (NumberFormatException e) {
setHostAndPort(host, -1, hostParts[0]);
}
}
else {
final int openPort = SocketUtil.findOpenPort(8080);
setHostAndPort(String.format("%s:%d", host, openPort), openPort, host);
}
} }
else { else {
if (host == null || host.contains(":")) { final HostAndPort hostAndPort;
setHostAndPort(host == null ? DEFAULT_HOST : host, port, host == null ? DEFAULT_HOST : host.split(":")[0]); if (host != null) {
hostAndPort = HostAndPort.fromString(host);
if (port != null && hostAndPort.hasPort() && port != hostAndPort.getPort()) {
throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, port);
}
} else {
hostAndPort = HostAndPort.fromParts(DEFAULT_HOST, port);
} }
else {
setHostAndPort(String.format("%s:%d", host, port), port, host); host = hostAndPort.getHostText();
if (hostAndPort.hasPort()) {
port = hostAndPort.getPort();
}
if (port == null) {
port = SocketUtil.findOpenPort(8080);
} }
} }
}
private void setHostAndPort(String host, int port, String hostNoPort)
{
this.host = host;
this.port = port; this.port = port;
this.hostNoPort = hostNoPort; this.host = host;
} }
public String getServiceName() public String getServiceName()
@ -113,9 +127,11 @@ public class DruidNode
return port; return port;
} }
public String getHostNoPort() /**
{ * Returns host and port together as something that can be used as part of a URI.
return hostNoPort; */
public String getHostAndPort() {
return HostAndPort.fromParts(host, port).toString();
} }
@Override @Override

View File

@ -212,7 +212,7 @@ public class DruidClusterBridge
private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatch()
{ {
final LeaderLatch newLeaderLatch = new LeaderLatch( final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost() curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort()
); );
newLeaderLatch.addListener( newLeaderLatch.addListener(
@ -305,8 +305,8 @@ public class DruidClusterBridge
log.warn("No servers founds!"); log.warn("No servers founds!");
} else { } else {
DruidServerMetadata me = new DruidServerMetadata( DruidServerMetadata me = new DruidServerMetadata(
self.getHost(), self.getHostAndPort(),
self.getHost(), self.getHostAndPort(),
totalMaxSize, totalMaxSize,
NODE_TYPE, NODE_TYPE,
config.getTier(), config.getTier(),
@ -314,8 +314,8 @@ public class DruidClusterBridge
); );
try { try {
final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost()); final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHostAndPort());
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize); log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHostAndPort(), totalMaxSize);
announcer.update(path, jsonMapper.writeValueAsBytes(me)); announcer.update(path, jsonMapper.writeValueAsBytes(me));
} }
catch (Exception e) { catch (Exception e) {

View File

@ -454,7 +454,7 @@ public class DruidCoordinator
private LeaderLatch createNewLeaderLatch() private LeaderLatch createNewLeaderLatch()
{ {
final LeaderLatch newLeaderLatch = new LeaderLatch( final LeaderLatch newLeaderLatch = new LeaderLatch(
curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHost() curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPort()
); );
newLeaderLatch.addListener( newLeaderLatch.addListener(

View File

@ -79,7 +79,7 @@ public class EmitterModule implements Module
public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier, Emitter emitter) public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier, Emitter emitter)
{ {
final DruidNode config = configSupplier.get(); final DruidNode config = configSupplier.get();
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter); final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHostAndPort(), emitter);
EmittingLogger.registerEmitter(retVal); EmittingLogger.registerEmitter(retVal);
return retVal; return retVal;
} }

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.server;
import org.junit.Assert;
import org.junit.Test;
public class DruidNodeTest
{
@Test
public void testDefaultsAndSanity() throws Exception
{
final String service = "test/service";
DruidNode node;
node = new DruidNode(service, null, null);
Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost());
Assert.assertEquals(-1, node.getPort());
node = new DruidNode(service, "abc:123", null);
Assert.assertEquals("abc", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("abc:123", node.getHostAndPort());
node = new DruidNode(service, "2001:db8:85a3::8a2e:370:7334", null);
Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost());
Assert.assertTrue(8080 <= node.getPort());
node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]", null);
Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost());
Assert.assertTrue(8080 <= node.getPort());
node = new DruidNode(service, "abc", null);
Assert.assertEquals("abc", node.getHost());
Assert.assertTrue(8080 <= node.getPort());
node = new DruidNode(service, "abc", 123);
Assert.assertEquals("abc", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("abc:123", node.getHostAndPort());
node = new DruidNode(service, "abc:123", 123);
Assert.assertEquals("abc", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("abc:123", node.getHostAndPort());
node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]:123", null);
Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort());
node = new DruidNode(service, "2001:db8:85a3::8a2e:370:7334", 123);
Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort());
node = new DruidNode(service, "[2001:db8:85a3::8a2e:370:7334]", 123);
Assert.assertEquals("2001:db8:85a3::8a2e:370:7334", node.getHost());
Assert.assertEquals(123, node.getPort());
Assert.assertEquals("[2001:db8:85a3::8a2e:370:7334]:123", node.getHostAndPort());
node = new DruidNode(service, null, 123);
Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost());
Assert.assertEquals(123, node.getPort());
}
@Test(expected = IllegalArgumentException.class)
public void testConflictingPorts() throws Exception
{
new DruidNode("test/service", "abc:123", 456);
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidIPv6WithPort() throws Exception
{
new DruidNode("test/service", "[abc:fff]:123", 456);
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidIPv6() throws Exception
{
new DruidNode("test/service", "abc:fff", 456);
}
@Test(expected = IllegalArgumentException.class)
public void testConflictingPortsNonsense() throws Exception
{
new DruidNode("test/service", "[2001:db8:85a3::8a2e:370:7334]:123", 456);
}
}

View File

@ -167,7 +167,7 @@ public class DruidClusterBridgeTest
Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor()); Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor());
announcer.start(); announcer.start();
announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me)); announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me));
BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class); BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class);
BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class); BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class);

View File

@ -103,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable
public Worker getWorker(@Self DruidNode node, WorkerConfig config) public Worker getWorker(@Self DruidNode node, WorkerConfig config)
{ {
return new Worker( return new Worker(
node.getHost(), node.getHostAndPort(),
config.getIp(), config.getIp(),
config.getCapacity(), config.getCapacity(),
config.getVersion() config.getVersion()