Merge pull request #1811 from metamx/ServerDiscoverySelectorIPv6

Server discovery selector ipv6 friendly
This commit is contained in:
Nishant 2015-10-13 23:12:05 +05:30
commit 273ec21d3d
6 changed files with 112 additions and 17 deletions

View File

@ -155,7 +155,7 @@ public class RemoteTaskActionClient implements TaskActionClient
private URI makeServiceUri(final Server instance) throws URISyntaxException private URI makeServiceUri(final Server instance) throws URISyntaxException
{ {
return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action")); return new URI(instance.getScheme(), null, instance.getAddress(), instance.getPort(), "/druid/indexer/v1/action", null, null);
} }
private Server getServiceInstance() private Server getServiceInstance()

View File

@ -34,6 +34,7 @@ import org.joda.time.Interval;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -115,7 +116,15 @@ public class IndexingServiceClient
throw new ISE("Cannot find instance of indexingService"); throw new ISE("Cannot find instance of indexingService");
} }
return String.format("http://%s/druid/indexer/v1", instance.getHost()); return new URI(
instance.getScheme(),
null,
instance.getAddress(),
instance.getPort(),
"/druid/indexer/v1",
null,
null
).toString();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);

View File

@ -17,6 +17,7 @@
package io.druid.curator.discovery; package io.druid.curator.discovery;
import com.google.common.net.HostAndPort;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -62,7 +63,7 @@ public class ServerDiscoverySelector implements DiscoverySelector<Server>
@Override @Override
public String getHost() public String getHost()
{ {
return String.format("%s:%d", getAddress(), getPort()); return HostAndPort.fromParts(getAddress(), getPort()).toString();
} }
@Override @Override

View File

@ -95,16 +95,18 @@ public class BridgeQuerySegmentWalker implements QuerySegmentWalker
if (instance == null) { if (instance == null) {
return Sequences.empty(); return Sequences.empty();
} }
final Server brokerServer = brokerSelector.pick();
final String url = String.format( final URL url = new URL(
"http://%s/druid/v2/", brokerServer.getScheme(),
brokerSelector.pick().getHost() brokerServer.getAddress(),
brokerServer.getPort(),
"/druid/v2/"
); );
StatusResponseHolder response = httpClient.go( StatusResponseHolder response = httpClient.go(
new Request( new Request(
HttpMethod.POST, HttpMethod.POST,
new URL(url) url
).setContent( ).setContent(
MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON,
jsonMapper.writeValueAsBytes(query) jsonMapper.writeValueAsBytes(query)

View File

@ -42,6 +42,8 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration; import org.joda.time.Duration;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -197,7 +199,7 @@ public class CoordinatorRuleManager
return retVal; return retVal;
} }
private String getRuleURL() private String getRuleURL() throws URISyntaxException
{ {
Server server = selector.pick(); Server server = selector.pick();
@ -206,6 +208,14 @@ public class CoordinatorRuleManager
return null; return null;
} }
return String.format("http://%s%s", server.getHost(), config.get().getRulesEndpoint()); return new URI(
server.getScheme(),
null,
server.getAddress(),
server.getPort(),
config.get().getRulesEndpoint(),
null,
null
).toString();
} }
} }

View File

@ -28,6 +28,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
public class ServerDiscoverySelectorTest public class ServerDiscoverySelectorTest
{ {
@ -52,14 +53,86 @@ public class ServerDiscoverySelectorTest
EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes(); EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes();
EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes(); EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes();
EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes(); EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes();
EasyMock.replay(instance,serviceProvider); EasyMock.replay(instance, serviceProvider);
Server server = serverDiscoverySelector.pick(); Server server = serverDiscoverySelector.pick();
Assert.assertEquals(PORT,server.getPort()); Assert.assertEquals(PORT, server.getPort());
Assert.assertEquals(ADDRESS,server.getAddress()); Assert.assertEquals(ADDRESS, server.getAddress());
Assert.assertTrue(server.getHost().contains(new Integer(PORT).toString())); Assert.assertTrue(server.getHost().contains(Integer.toString(PORT)));
Assert.assertTrue(server.getHost().contains(ADDRESS)); Assert.assertTrue(server.getHost().contains(ADDRESS));
Assert.assertEquals(new String("http"), server.getScheme()); Assert.assertEquals("http", server.getScheme());
EasyMock.verify(instance,serviceProvider); EasyMock.verify(instance, serviceProvider);
final URI uri = new URI(
server.getScheme(),
null,
server.getAddress(),
server.getPort(),
"/druid/indexer/v1/action",
null,
null
);
Assert.assertEquals(PORT, uri.getPort());
Assert.assertEquals(ADDRESS, uri.getHost());
Assert.assertEquals("http", uri.getScheme());
}
@Test
public void testPickIPv6() throws Exception
{
final String ADDRESS = "2001:0db8:0000:0000:0000:ff00:0042:8329";
EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes();
EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes();
EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes();
EasyMock.replay(instance, serviceProvider);
Server server = serverDiscoverySelector.pick();
Assert.assertEquals(PORT, server.getPort());
Assert.assertEquals(ADDRESS, server.getAddress());
Assert.assertTrue(server.getHost().contains(Integer.toString(PORT)));
Assert.assertTrue(server.getHost().contains(ADDRESS));
Assert.assertEquals("http", server.getScheme());
EasyMock.verify(instance, serviceProvider);
final URI uri = new URI(
server.getScheme(),
null,
server.getAddress(),
server.getPort(),
"/druid/indexer/v1/action",
null,
null
);
Assert.assertEquals(PORT, uri.getPort());
Assert.assertEquals(String.format("[%s]", ADDRESS), uri.getHost());
Assert.assertEquals("http", uri.getScheme());
}
@Test
public void testPickIPv6Bracket() throws Exception
{
final String ADDRESS = "[2001:0db8:0000:0000:0000:ff00:0042:8329]";
EasyMock.expect(serviceProvider.getInstance()).andReturn(instance).anyTimes();
EasyMock.expect(instance.getAddress()).andReturn(ADDRESS).anyTimes();
EasyMock.expect(instance.getPort()).andReturn(PORT).anyTimes();
EasyMock.replay(instance, serviceProvider);
Server server = serverDiscoverySelector.pick();
Assert.assertEquals(PORT, server.getPort());
Assert.assertEquals(ADDRESS, server.getAddress());
Assert.assertTrue(server.getHost().contains(Integer.toString(PORT)));
Assert.assertTrue(server.getHost().contains(ADDRESS));
Assert.assertEquals("http", server.getScheme());
EasyMock.verify(instance, serviceProvider);
final URI uri = new URI(
server.getScheme(),
null,
server.getAddress(),
server.getPort(),
"/druid/indexer/v1/action",
null,
null
);
Assert.assertEquals(PORT, uri.getPort());
Assert.assertEquals(ADDRESS, uri.getHost());
Assert.assertEquals("http", uri.getScheme());
} }
@Test @Test