mirror of https://github.com/apache/druid.git
make host+port more explicit
- document the behavior for node host/port initialization - throw exception if settings make no sense - fixes announcement for nodes without host/port defaults - makes code clearer as to when host vs. host+port are used
This commit is contained in:
parent
3699a41a54
commit
d23fd1e1ab
|
@ -160,7 +160,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
|
|||
}
|
||||
|
||||
final List<String> command = Lists.newArrayList();
|
||||
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
|
||||
final String childHost = node.getHost();
|
||||
final String taskClasspath;
|
||||
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
|
||||
taskClasspath = Joiner.on(File.pathSeparator).join(
|
||||
|
|
|
@ -177,7 +177,7 @@ public class TaskMaster
|
|||
}
|
||||
);
|
||||
|
||||
leaderSelector.setId(node.getHost());
|
||||
leaderSelector.setId(node.getHostAndPort());
|
||||
leaderSelector.autoRequeue();
|
||||
}
|
||||
|
||||
|
|
|
@ -60,8 +60,8 @@ public class DruidServer implements Comparable
|
|||
)
|
||||
{
|
||||
this(
|
||||
node.getHost(),
|
||||
node.getHost(),
|
||||
node.getHostAndPort(),
|
||||
node.getHostAndPort(),
|
||||
config.getMaxSize(),
|
||||
type,
|
||||
config.getTier(),
|
||||
|
|
|
@ -62,7 +62,7 @@ public class CuratorServiceAnnouncer implements ServiceAnnouncer
|
|||
try {
|
||||
instance = ServiceInstance.<Void>builder()
|
||||
.name(serviceName)
|
||||
.address(service.getHostNoPort())
|
||||
.address(service.getHost())
|
||||
.port(service.getPort())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -66,8 +66,8 @@ public class StorageNodeModule implements Module
|
|||
}
|
||||
|
||||
return new DruidServerMetadata(
|
||||
node.getHost(),
|
||||
node.getHost(),
|
||||
node.getHostAndPort(),
|
||||
node.getHostAndPort(),
|
||||
config.getMaxSize(),
|
||||
nodeType.getNodeType(),
|
||||
config.getTier(),
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.inject.name.Named;
|
||||
import com.metamx.common.IAE;
|
||||
import io.druid.common.utils.SocketUtil;
|
||||
|
||||
import javax.validation.constraints.Max;
|
||||
|
@ -35,8 +36,6 @@ public class DruidNode
|
|||
{
|
||||
public static final String DEFAULT_HOST = "localhost";
|
||||
|
||||
private String hostNoPort;
|
||||
|
||||
@JsonProperty("service")
|
||||
@NotNull
|
||||
private String serviceName;
|
||||
|
@ -59,43 +58,43 @@ public class DruidNode
|
|||
init(serviceName, host, port);
|
||||
}
|
||||
|
||||
/**
|
||||
* host = "abc:123", port = null -> host = abc, port = 123
|
||||
* host = "abc:fff", port = null -> host = abc, port = -1
|
||||
* host = "abc" , port = null -> host = abc, port = _auto_
|
||||
* host = null , port = null -> host = _default_, port = -1
|
||||
* host = "abc:123 , port = 456 -> throw IAE
|
||||
* host = "abc:fff , port = 456 -> throw IAE
|
||||
* host = "abc:123 , port = 123 -> host = abc, port = 123
|
||||
* host = "abc" , port = 123 -> host = abc, port = 123
|
||||
* host = null , port = 123 -> host = _default_, port = 123
|
||||
*/
|
||||
private void init(String serviceName, String host, Integer port)
|
||||
{
|
||||
this.serviceName = serviceName;
|
||||
|
||||
if (port == null) {
|
||||
if (host == null) {
|
||||
setHostAndPort(DEFAULT_HOST, -1, DEFAULT_HOST);
|
||||
if (host != null && host.contains(":")) {
|
||||
final String[] hostParts = host.split(":");
|
||||
int parsedPort = -1;
|
||||
try {
|
||||
parsedPort = Integer.parseInt(hostParts[1]);
|
||||
}
|
||||
else if (host.contains(":")) {
|
||||
final String[] hostParts = host.split(":");
|
||||
try {
|
||||
setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
setHostAndPort(host, -1, hostParts[0]);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
// leave -1
|
||||
}
|
||||
else {
|
||||
final int openPort = SocketUtil.findOpenPort(8080);
|
||||
setHostAndPort(String.format("%s:%d", host, openPort), openPort, host);
|
||||
if (port != null && port != parsedPort) {
|
||||
throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, port);
|
||||
}
|
||||
host = hostParts[0];
|
||||
port = parsedPort;
|
||||
}
|
||||
else {
|
||||
if (host == null || host.contains(":")) {
|
||||
setHostAndPort(host == null ? DEFAULT_HOST : host, port, host == null ? DEFAULT_HOST : host.split(":")[0]);
|
||||
}
|
||||
else {
|
||||
setHostAndPort(String.format("%s:%d", host, port), port, host);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setHostAndPort(String host, int port, String hostNoPort)
|
||||
{
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.hostNoPort = hostNoPort;
|
||||
if (port == null && host != null) {
|
||||
port = SocketUtil.findOpenPort(8080);
|
||||
}
|
||||
|
||||
this.port = port != null ? port : -1;
|
||||
this.host = host != null ? host : DEFAULT_HOST;
|
||||
}
|
||||
|
||||
public String getServiceName()
|
||||
|
@ -113,9 +112,8 @@ public class DruidNode
|
|||
return port;
|
||||
}
|
||||
|
||||
public String getHostNoPort()
|
||||
{
|
||||
return hostNoPort;
|
||||
public String getHostAndPort() {
|
||||
return String.format("%s:%d", host, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -212,7 +212,7 @@ public class DruidClusterBridge
|
|||
private LeaderLatch createNewLeaderLatch()
|
||||
{
|
||||
final LeaderLatch newLeaderLatch = new LeaderLatch(
|
||||
curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHost()
|
||||
curator, ZKPaths.makePath(config.getConnectorPath(), BRIDGE_OWNER_NODE), self.getHostAndPort()
|
||||
);
|
||||
|
||||
newLeaderLatch.addListener(
|
||||
|
@ -305,8 +305,8 @@ public class DruidClusterBridge
|
|||
log.warn("No servers founds!");
|
||||
} else {
|
||||
DruidServerMetadata me = new DruidServerMetadata(
|
||||
self.getHost(),
|
||||
self.getHost(),
|
||||
self.getHostAndPort(),
|
||||
self.getHostAndPort(),
|
||||
totalMaxSize,
|
||||
NODE_TYPE,
|
||||
config.getTier(),
|
||||
|
@ -314,8 +314,8 @@ public class DruidClusterBridge
|
|||
);
|
||||
|
||||
try {
|
||||
final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHost());
|
||||
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHost(), totalMaxSize);
|
||||
final String path = ZKPaths.makePath(config.getAnnouncementsPath(), self.getHostAndPort());
|
||||
log.info("Updating [%s] to have a maxSize of[%,d] bytes", self.getHostAndPort(), totalMaxSize);
|
||||
announcer.update(path, jsonMapper.writeValueAsBytes(me));
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -454,7 +454,7 @@ public class DruidCoordinator
|
|||
private LeaderLatch createNewLeaderLatch()
|
||||
{
|
||||
final LeaderLatch newLeaderLatch = new LeaderLatch(
|
||||
curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHost()
|
||||
curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPort()
|
||||
);
|
||||
|
||||
newLeaderLatch.addListener(
|
||||
|
|
|
@ -79,7 +79,7 @@ public class EmitterModule implements Module
|
|||
public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier, Emitter emitter)
|
||||
{
|
||||
final DruidNode config = configSupplier.get();
|
||||
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHost(), emitter);
|
||||
final ServiceEmitter retVal = new ServiceEmitter(config.getServiceName(), config.getHostAndPort(), emitter);
|
||||
EmittingLogger.registerEmitter(retVal);
|
||||
return retVal;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class DruidNodeTest
|
||||
{
|
||||
@Test
|
||||
public void testDefaultsAndSanity() throws Exception
|
||||
{
|
||||
final String service = "test/service";
|
||||
|
||||
DruidNode node;
|
||||
node = new DruidNode(service, "abc:123", null);
|
||||
Assert.assertEquals("abc", node.getHost());
|
||||
Assert.assertEquals(123, node.getPort());
|
||||
Assert.assertEquals("abc:123", node.getHostAndPort());
|
||||
|
||||
node = new DruidNode(service, "abc:fff", null);
|
||||
Assert.assertEquals("abc", node.getHost());
|
||||
Assert.assertEquals(-1, node.getPort());
|
||||
|
||||
node = new DruidNode(service, "abc", null);
|
||||
Assert.assertEquals("abc", node.getHost());
|
||||
Assert.assertTrue(8080 <= node.getPort());
|
||||
|
||||
node = new DruidNode(service, null, null);
|
||||
Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost());
|
||||
Assert.assertEquals(-1, node.getPort());
|
||||
|
||||
node = new DruidNode(service, "abc:123", 123);
|
||||
Assert.assertEquals("abc", node.getHost());
|
||||
Assert.assertEquals(123, node.getPort());
|
||||
Assert.assertEquals("abc:123", node.getHostAndPort());
|
||||
|
||||
node = new DruidNode(service, "abc", 123);
|
||||
Assert.assertEquals("abc", node.getHost());
|
||||
Assert.assertEquals(123, node.getPort());
|
||||
Assert.assertEquals("abc:123", node.getHostAndPort());
|
||||
|
||||
node = new DruidNode(service, null, 123);
|
||||
Assert.assertEquals(DruidNode.DEFAULT_HOST, node.getHost());
|
||||
Assert.assertEquals(123, node.getPort());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConflictingPorts() throws Exception
|
||||
{
|
||||
new DruidNode("test/service", "abc:123", 456);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConflictingPortsNonsense() throws Exception
|
||||
{
|
||||
new DruidNode("test/service", "abc:fff", 456);
|
||||
}
|
||||
}
|
|
@ -167,7 +167,7 @@ public class DruidClusterBridgeTest
|
|||
|
||||
Announcer announcer = new Announcer(remoteCf, Executors.newSingleThreadExecutor());
|
||||
announcer.start();
|
||||
announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHost(), jsonMapper.writeValueAsBytes(me));
|
||||
announcer.announce(zkPathsConfig.getAnnouncementsPath() + "/" + me.getHostAndPort(), jsonMapper.writeValueAsBytes(me));
|
||||
|
||||
BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = EasyMock.createMock(BatchDataSegmentAnnouncer.class);
|
||||
BatchServerInventoryView batchServerInventoryView = EasyMock.createMock(BatchServerInventoryView.class);
|
||||
|
|
|
@ -103,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable
|
|||
public Worker getWorker(@Self DruidNode node, WorkerConfig config)
|
||||
{
|
||||
return new Worker(
|
||||
node.getHost(),
|
||||
node.getHostAndPort(),
|
||||
config.getIp(),
|
||||
config.getCapacity(),
|
||||
config.getVersion()
|
||||
|
|
Loading…
Reference in New Issue