mirror of https://github.com/apache/druid.git
Use internal-discovery and http for talking to overlord/coordinator leaders (#4735)
* Use internal-discovery and http for talking to overlord/coordinator leaders * CuratorDruidNodeDiscovery.getAllNodes() best effort 30 sec wait for cache initialization * DruidLeaderClientProvider to eagerly instantiate DruidNodeDiscovery when needed so that DruidNodeDiscovery impl cache gets initialized well in time * Revert "DruidLeaderClientProvider to eagerly instantiate DruidNodeDiscovery when needed so that DruidNodeDiscovery impl cache gets initialized well in time" This reverts commit f1a2432614ba56ddc2d55fe47e990d17fcfd6129. * add lifecycle to DruidLeaderClient to early initialize DruidNodeDiscovery so that it has its cache update well in time
This commit is contained in:
parent
975a50fc81
commit
834e050bc4
|
@ -20,20 +20,14 @@
|
|||
package io.druid.indexing.common.actions;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.indexing.common.RetryPolicy;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import io.druid.java.util.common.IOE;
|
||||
import io.druid.java.util.common.jackson.JacksonUtils;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import org.jboss.netty.channel.ChannelException;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
|
@ -41,35 +35,30 @@ import org.joda.time.Duration;
|
|||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
public class RemoteTaskActionClient implements TaskActionClient
|
||||
{
|
||||
private final Task task;
|
||||
private final HttpClient httpClient;
|
||||
private final ServerDiscoverySelector selector;
|
||||
private final RetryPolicyFactory retryPolicyFactory;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
private final Random random = new Random();
|
||||
|
||||
private static final Logger log = new Logger(RemoteTaskActionClient.class);
|
||||
|
||||
public RemoteTaskActionClient(
|
||||
Task task,
|
||||
HttpClient httpClient,
|
||||
ServerDiscoverySelector selector,
|
||||
DruidLeaderClient druidLeaderClient,
|
||||
RetryPolicyFactory retryPolicyFactory,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.task = task;
|
||||
this.httpClient = httpClient;
|
||||
this.selector = selector;
|
||||
this.retryPolicyFactory = retryPolicyFactory;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,27 +72,16 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
|
||||
while (true) {
|
||||
try {
|
||||
final Server server;
|
||||
final URI serviceUri;
|
||||
try {
|
||||
server = getServiceInstance();
|
||||
serviceUri = makeServiceUri(server);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Want to retry, so throw an IOException.
|
||||
throw new IOException("Failed to locate service uri", e);
|
||||
}
|
||||
|
||||
final StatusResponseHolder response;
|
||||
final FullResponseHolder fullResponseHolder;
|
||||
|
||||
log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
|
||||
log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction);
|
||||
|
||||
try {
|
||||
response = httpClient.go(
|
||||
new Request(HttpMethod.POST, serviceUri.toURL())
|
||||
.setContent(MediaType.APPLICATION_JSON, dataToSend),
|
||||
new StatusResponseHandler(Charsets.UTF_8)
|
||||
).get();
|
||||
fullResponseHolder = druidLeaderClient.go(
|
||||
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")
|
||||
.setContent(MediaType.APPLICATION_JSON, dataToSend)
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
|
||||
|
@ -111,18 +89,17 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
|
||||
if (response.getStatus().getCode() / 100 == 2) {
|
||||
if (fullResponseHolder.getStatus().getCode() / 100 == 2) {
|
||||
final Map<String, Object> responseDict = jsonMapper.readValue(
|
||||
response.getContent(),
|
||||
fullResponseHolder.getContent(),
|
||||
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
|
||||
);
|
||||
return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
|
||||
} else {
|
||||
// Want to retry, so throw an IOException.
|
||||
throw new IOE(
|
||||
"Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
|
||||
response.getStatus(),
|
||||
server.getHost()
|
||||
"Scary HTTP status returned: %s. Check your overlord logs for exceptions.",
|
||||
fullResponseHolder.getStatus()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -152,19 +129,4 @@ public class RemoteTaskActionClient implements TaskActionClient
|
|||
long retval = input + (long) jitter;
|
||||
return retval < 0 ? 0 : retval;
|
||||
}
|
||||
|
||||
private URI makeServiceUri(final Server instance) throws URISyntaxException
|
||||
{
|
||||
return new URI(instance.getScheme(), null, instance.getAddress(), instance.getPort(), "/druid/indexer/v1/action", null, null);
|
||||
}
|
||||
|
||||
private Server getServiceInstance()
|
||||
{
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexer to talk to!");
|
||||
} else {
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,10 +21,8 @@ package io.druid.indexing.common.actions;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.indexing.IndexingService;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
|
||||
|
@ -32,21 +30,18 @@ import io.druid.indexing.common.task.Task;
|
|||
*/
|
||||
public class RemoteTaskActionClientFactory implements TaskActionClientFactory
|
||||
{
|
||||
private final HttpClient httpClient;
|
||||
private final ServerDiscoverySelector selector;
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
private final RetryPolicyFactory retryPolicyFactory;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public RemoteTaskActionClientFactory(
|
||||
@Global HttpClient httpClient,
|
||||
@IndexingService ServerDiscoverySelector selector,
|
||||
@IndexingService DruidLeaderClient leaderHttpClient,
|
||||
RetryPolicyFactory retryPolicyFactory,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.selector = selector;
|
||||
this.druidLeaderClient = leaderHttpClient;
|
||||
this.retryPolicyFactory = retryPolicyFactory;
|
||||
this.jsonMapper = jsonMapper;
|
||||
}
|
||||
|
@ -54,6 +49,6 @@ public class RemoteTaskActionClientFactory implements TaskActionClientFactory
|
|||
@Override
|
||||
public TaskActionClient create(Task task)
|
||||
{
|
||||
return new RemoteTaskActionClient(task, httpClient, selector, retryPolicyFactory, jsonMapper);
|
||||
return new RemoteTaskActionClient(task, druidLeaderClient, retryPolicyFactory, jsonMapper);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,15 +19,10 @@
|
|||
|
||||
package io.druid.indexing.common.actions;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.indexing.common.RetryPolicyConfig;
|
||||
import io.druid.indexing.common.RetryPolicyFactory;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
|
@ -36,64 +31,33 @@ import io.druid.indexing.common.task.Task;
|
|||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.Intervals;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.easymock.EasyMock.anyObject;
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
|
||||
public class RemoteTaskActionClientTest
|
||||
{
|
||||
|
||||
private HttpClient httpClient;
|
||||
private ServerDiscoverySelector selector;
|
||||
private Server server;
|
||||
private DruidLeaderClient druidLeaderClient;
|
||||
List<TaskLock> result = null;
|
||||
private ObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
httpClient = createMock(HttpClient.class);
|
||||
selector = createMock(ServerDiscoverySelector.class);
|
||||
|
||||
server = new Server()
|
||||
{
|
||||
|
||||
@Override
|
||||
public String getScheme()
|
||||
{
|
||||
return "http";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPort()
|
||||
{
|
||||
return 8080;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHost()
|
||||
{
|
||||
return "localhost";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAddress()
|
||||
{
|
||||
return "localhost";
|
||||
}
|
||||
};
|
||||
druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
|
@ -106,29 +70,30 @@ public class RemoteTaskActionClientTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSubmitSimple() throws JsonProcessingException
|
||||
public void testSubmitSimple() throws Exception
|
||||
{
|
||||
Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx"));
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action"))
|
||||
.andReturn(request);
|
||||
|
||||
// return status code 200 and a list with size equals 1
|
||||
Map<String, Object> responseBody = new HashMap<String, Object>();
|
||||
responseBody.put("result", result);
|
||||
String strResult = objectMapper.writeValueAsString(responseBody);
|
||||
StatusResponseHolder responseHolder = new StatusResponseHolder(
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.OK,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
|
||||
// set up mocks
|
||||
expect(selector.pick()).andReturn(server);
|
||||
replay(selector);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
|
||||
expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn(
|
||||
Futures.immediateFuture(responseHolder)
|
||||
);
|
||||
replay(httpClient);
|
||||
|
||||
Task task = new NoopTask("id", 0, 0, null, null, null);
|
||||
RemoteTaskActionClient client = new RemoteTaskActionClient(
|
||||
task, httpClient, selector, new RetryPolicyFactory(
|
||||
task, druidLeaderClient, new RetryPolicyFactory(
|
||||
new RetryPolicyConfig()
|
||||
), objectMapper
|
||||
);
|
||||
|
@ -140,33 +105,35 @@ public class RemoteTaskActionClientTest
|
|||
}
|
||||
|
||||
Assert.assertEquals(1, result.size());
|
||||
EasyMock.verify(selector, httpClient);
|
||||
EasyMock.verify(druidLeaderClient);
|
||||
}
|
||||
|
||||
@Test(expected = IOException.class)
|
||||
public void testSubmitWithIllegalStatusCode() throws IOException
|
||||
public void testSubmitWithIllegalStatusCode() throws Exception
|
||||
{
|
||||
// return status code 400 and a list with size equals 1
|
||||
Request request = new Request(HttpMethod.POST, new URL("http://localhost:1234/xx"));
|
||||
expect(druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action"))
|
||||
.andReturn(request);
|
||||
|
||||
// return status code 200 and a list with size equals 1
|
||||
Map<String, Object> responseBody = new HashMap<String, Object>();
|
||||
responseBody.put("result", result);
|
||||
String strResult = objectMapper.writeValueAsString(responseBody);
|
||||
StatusResponseHolder responseHolder = new StatusResponseHolder(
|
||||
FullResponseHolder responseHolder = new FullResponseHolder(
|
||||
HttpResponseStatus.BAD_REQUEST,
|
||||
EasyMock.createNiceMock(HttpResponse.class),
|
||||
new StringBuilder().append(strResult)
|
||||
);
|
||||
|
||||
// set up mocks
|
||||
expect(selector.pick()).andReturn(server);
|
||||
replay(selector);
|
||||
expect(druidLeaderClient.go(request)).andReturn(responseHolder);
|
||||
replay(druidLeaderClient);
|
||||
|
||||
expect(httpClient.go(anyObject(Request.class), anyObject(StatusResponseHandler.class))).andReturn(
|
||||
Futures.immediateFuture(responseHolder)
|
||||
);
|
||||
replay(httpClient);
|
||||
|
||||
Task task = new NoopTask("id", 0, 0, null, null, null);
|
||||
RemoteTaskActionClient client = new RemoteTaskActionClient(
|
||||
task, httpClient, selector, new RetryPolicyFactory(
|
||||
task, druidLeaderClient, new RetryPolicyFactory(
|
||||
objectMapper.readValue("{\"maxRetryCount\":0}", RetryPolicyConfig.class)
|
||||
), objectMapper
|
||||
);
|
||||
|
|
|
@ -21,67 +21,48 @@ package io.druid.client.coordinator;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.StatusResponseHandler;
|
||||
import com.metamx.http.client.response.StatusResponseHolder;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.client.ImmutableSegmentLoadInfo;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.java.util.common.ISE;
|
||||
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
|
||||
public class CoordinatorClient
|
||||
{
|
||||
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
|
||||
|
||||
private final HttpClient client;
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
@Inject
|
||||
public CoordinatorClient(
|
||||
@Global HttpClient client,
|
||||
ObjectMapper jsonMapper,
|
||||
@Coordinator ServerDiscoverySelector selector
|
||||
@Coordinator DruidLeaderClient druidLeaderClient
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.selector = selector;
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
}
|
||||
|
||||
|
||||
public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk)
|
||||
{
|
||||
try {
|
||||
StatusResponseHolder response = client.go(
|
||||
new Request(
|
||||
HttpMethod.GET,
|
||||
new URL(
|
||||
StringUtils.format(
|
||||
"%s/datasources/%s/intervals/%s/serverview?partial=%s",
|
||||
baseUrl(),
|
||||
dataSource,
|
||||
interval.toString().replace("/", "_"),
|
||||
incompleteOk
|
||||
)
|
||||
)
|
||||
),
|
||||
RESPONSE_HANDLER
|
||||
).get();
|
||||
FullResponseHolder response = druidLeaderClient.go(
|
||||
druidLeaderClient.makeRequest(HttpMethod.GET,
|
||||
StringUtils.format(
|
||||
"/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
|
||||
dataSource,
|
||||
interval.toString().replace("/", "_"),
|
||||
incompleteOk
|
||||
))
|
||||
);
|
||||
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE(
|
||||
"Error while fetching serverView status[%s] content[%s]",
|
||||
|
@ -100,28 +81,4 @@ public class CoordinatorClient
|
|||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private String baseUrl()
|
||||
{
|
||||
try {
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of coordinator.. Did you set `druid.selectors.coordinator.serviceName`?");
|
||||
}
|
||||
|
||||
return new URI(
|
||||
instance.getScheme(),
|
||||
null,
|
||||
instance.getAddress(),
|
||||
instance.getPort(),
|
||||
"/druid/coordinator/v1",
|
||||
null,
|
||||
null
|
||||
).toString();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,44 +22,29 @@ package io.druid.client.indexing;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class IndexingServiceClient
|
||||
{
|
||||
private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler();
|
||||
|
||||
private final HttpClient client;
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
@Inject
|
||||
public IndexingServiceClient(
|
||||
@Global HttpClient client,
|
||||
ObjectMapper jsonMapper,
|
||||
@IndexingService ServerDiscoverySelector selector
|
||||
@IndexingService DruidLeaderClient druidLeaderClient
|
||||
)
|
||||
{
|
||||
this.client = client;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.selector = selector;
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
}
|
||||
|
||||
public void mergeSegments(List<DataSegment> segments)
|
||||
|
@ -95,39 +80,15 @@ public class IndexingServiceClient
|
|||
runQuery(new ClientConversionQuery(dataSource, interval));
|
||||
}
|
||||
|
||||
private InputStream runQuery(Object queryObject)
|
||||
private void runQuery(Object queryObject)
|
||||
{
|
||||
try {
|
||||
return client.go(
|
||||
new Request(
|
||||
druidLeaderClient.go(
|
||||
druidLeaderClient.makeRequest(
|
||||
HttpMethod.POST,
|
||||
new URL(StringUtils.format("%s/task", baseUrl()))
|
||||
).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject)),
|
||||
RESPONSE_HANDLER
|
||||
).get();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String baseUrl()
|
||||
{
|
||||
try {
|
||||
final Server instance = selector.pick();
|
||||
if (instance == null) {
|
||||
throw new ISE("Cannot find instance of indexingService");
|
||||
}
|
||||
|
||||
return new URI(
|
||||
instance.getScheme(),
|
||||
null,
|
||||
instance.getAddress(),
|
||||
instance.getPort(),
|
||||
"/druid/indexer/v1",
|
||||
null,
|
||||
null
|
||||
).toString();
|
||||
"/druid/indexer/v1/task"
|
||||
).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(queryObject))
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -168,7 +169,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
|
||||
private final Object lock = new Object();
|
||||
|
||||
private boolean cacheInitialized = false;
|
||||
private CountDownLatch cacheInitialized = new CountDownLatch(1);
|
||||
|
||||
NodeTypeWatcher(
|
||||
ExecutorService listenerExecutor,
|
||||
|
@ -197,6 +198,9 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
@Override
|
||||
public Collection<DiscoveryDruidNode> getAllNodes()
|
||||
{
|
||||
if (!isCacheInitialized(30, TimeUnit.SECONDS)) {
|
||||
log.info("cache is not initialized yet. getAllNodes() might not return full information.");
|
||||
}
|
||||
return Collections.unmodifiableCollection(nodes.values());
|
||||
}
|
||||
|
||||
|
@ -204,7 +208,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
public void registerListener(DruidNodeDiscovery.Listener listener)
|
||||
{
|
||||
synchronized (lock) {
|
||||
if (cacheInitialized) {
|
||||
if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
|
||||
ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
|
||||
safeSchedule(
|
||||
() -> {
|
||||
|
@ -278,15 +282,13 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
break;
|
||||
}
|
||||
case INITIALIZED: {
|
||||
if (cacheInitialized) {
|
||||
if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
|
||||
log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), nodeType);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Received INITIALIZED in node watcher for type [%s].", nodeType);
|
||||
|
||||
cacheInitialized = true;
|
||||
|
||||
ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(nodes.values());
|
||||
for (Listener l : nodeListeners) {
|
||||
safeSchedule(
|
||||
|
@ -297,6 +299,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
);
|
||||
}
|
||||
|
||||
cacheInitialized.countDown();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
|
@ -310,6 +313,17 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit)
|
||||
{
|
||||
try {
|
||||
return cacheInitialized.await(waitFor, timeUnit);
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void safeSchedule(
|
||||
Runnable runnable,
|
||||
String errMsgFormat, Object... args
|
||||
|
@ -329,7 +343,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
{
|
||||
DiscoveryDruidNode prev = nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
|
||||
if (prev == null) {
|
||||
if (cacheInitialized) {
|
||||
if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
|
||||
List<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
|
||||
for (Listener l : nodeListeners) {
|
||||
safeSchedule(
|
||||
|
@ -354,7 +368,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
|
|||
return;
|
||||
}
|
||||
|
||||
if (cacheInitialized) {
|
||||
if (isCacheInitialized(1, TimeUnit.MICROSECONDS)) {
|
||||
List<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
|
||||
for (Listener l : nodeListeners) {
|
||||
safeSchedule(
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.discovery;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.FullResponseHandler;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.concurrent.LifecycleLock;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected
|
||||
* via Guice with annotations @Coordinator or @IndexingService .
|
||||
* Usage:
|
||||
* Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath)
|
||||
* request.setXXX(..)
|
||||
* FullResponseHolder responseHolder = druidLeaderClient.go(request)
|
||||
*/
|
||||
public class DruidLeaderClient
|
||||
{
|
||||
private final Logger log = new Logger(DruidLeaderClient.class);
|
||||
|
||||
private static final int MAX_RETRIES = 3;
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
|
||||
private final String nodeTypeToWatch;
|
||||
|
||||
private final String leaderRequestPath;
|
||||
|
||||
//Note: This is kept for back compatibility with pre 0.11.0 releases and should be removed in future.
|
||||
private final ServerDiscoverySelector serverDiscoverySelector;
|
||||
|
||||
private LifecycleLock lifecycleLock = new LifecycleLock();
|
||||
private DruidNodeDiscovery druidNodeDiscovery;
|
||||
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
|
||||
|
||||
public DruidLeaderClient(
|
||||
HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
String nodeTypeToWatch,
|
||||
String leaderRequestPath,
|
||||
ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
|
||||
this.nodeTypeToWatch = nodeTypeToWatch;
|
||||
this.leaderRequestPath = leaderRequestPath;
|
||||
this.serverDiscoverySelector = serverDiscoverySelector;
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
public void start()
|
||||
{
|
||||
if (!lifecycleLock.canStart()) {
|
||||
throw new ISE("can't start.");
|
||||
}
|
||||
|
||||
try {
|
||||
druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch);
|
||||
lifecycleLock.started();
|
||||
log.info("Started.");
|
||||
}
|
||||
finally {
|
||||
lifecycleLock.exitStart();
|
||||
}
|
||||
}
|
||||
|
||||
@LifecycleStop
|
||||
public void stop()
|
||||
{
|
||||
if (!lifecycleLock.canStop()) {
|
||||
throw new ISE("can't stop.");
|
||||
}
|
||||
|
||||
log.info("Stopped.");
|
||||
}
|
||||
|
||||
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException
|
||||
{
|
||||
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
|
||||
}
|
||||
|
||||
public FullResponseHolder go(Request request) throws InterruptedException
|
||||
{
|
||||
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
for (int counter = 0; counter < MAX_RETRIES; counter++) {
|
||||
|
||||
final FullResponseHolder fullResponseHolder;
|
||||
|
||||
try {
|
||||
fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get();
|
||||
}
|
||||
catch (ExecutionException ex) {
|
||||
// can happen if the node is stopped.
|
||||
log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage());
|
||||
log.debug(ex, "Request[%s] failed.", request.getUrl());
|
||||
|
||||
try {
|
||||
if (request.getUrl().getQuery() == null) {
|
||||
request = withUrl(
|
||||
request,
|
||||
new URL(StringUtils.format("%s%s", getCurrentKnownLeader(false), request.getUrl().getPath()))
|
||||
);
|
||||
} else {
|
||||
request = withUrl(
|
||||
request,
|
||||
new URL(StringUtils.format("%s%s?%s",
|
||||
getCurrentKnownLeader(false),
|
||||
request.getUrl().getPath(),
|
||||
request.getUrl().getQuery()
|
||||
))
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
catch (MalformedURLException e) {
|
||||
throw new ISE(
|
||||
e,
|
||||
"failed to build url with path[%] and query string [%s].",
|
||||
request.getUrl().getPath(),
|
||||
request.getUrl().getQuery()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) {
|
||||
String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location");
|
||||
if (redirectUrlStr == null) {
|
||||
throw new ISE("No redirect location is found in response from url[%s].", request.getUrl());
|
||||
}
|
||||
|
||||
log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr);
|
||||
|
||||
final URL redirectUrl;
|
||||
try {
|
||||
redirectUrl = new URL(redirectUrlStr);
|
||||
}
|
||||
catch (MalformedURLException ex) {
|
||||
throw new ISE(
|
||||
ex,
|
||||
"Malformed redirect location is found in response from url[%s], new location[%s].",
|
||||
request.getUrl(),
|
||||
redirectUrlStr
|
||||
);
|
||||
}
|
||||
|
||||
//update known leader location
|
||||
currentKnownLeader.set(StringUtils.format(
|
||||
"%s://%s:%s",
|
||||
redirectUrl.getProtocol(),
|
||||
redirectUrl.getHost(),
|
||||
redirectUrl.getPort()
|
||||
));
|
||||
|
||||
request = withUrl(request, redirectUrl);
|
||||
} else {
|
||||
return fullResponseHolder;
|
||||
}
|
||||
}
|
||||
|
||||
throw new ISE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl());
|
||||
}
|
||||
|
||||
public String findCurrentLeader()
|
||||
{
|
||||
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
|
||||
final FullResponseHolder responseHolder;
|
||||
try {
|
||||
responseHolder = go(makeRequest(HttpMethod.GET, leaderRequestPath));
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new ISE(ex, "Couldn't find leader.");
|
||||
}
|
||||
|
||||
if (responseHolder.getStatus().getCode() == 200) {
|
||||
return responseHolder.getContent();
|
||||
} else {
|
||||
throw new ISE(
|
||||
"Couldn't find leader, failed response status is [%s] and content [%s].",
|
||||
responseHolder.getStatus().getCode(),
|
||||
responseHolder.getContent()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private String getCurrentKnownLeader(final boolean cached)
|
||||
{
|
||||
return currentKnownLeader.accumulateAndGet(
|
||||
null,
|
||||
(current, given) -> current == null || !cached ? pickOneHost() : current
|
||||
);
|
||||
}
|
||||
|
||||
private String pickOneHost()
|
||||
{
|
||||
Server server = serverDiscoverySelector.pick();
|
||||
if (server != null) {
|
||||
return StringUtils.format(
|
||||
"%s://%s:%s",
|
||||
server.getScheme(),
|
||||
server.getAddress(),
|
||||
server.getPort()
|
||||
);
|
||||
}
|
||||
|
||||
Iterator<DiscoveryDruidNode> iter = druidNodeDiscovery.getAllNodes().iterator();
|
||||
if (iter.hasNext()) {
|
||||
DiscoveryDruidNode node = iter.next();
|
||||
return StringUtils.format(
|
||||
"%s://%s",
|
||||
node.getDruidNode().getServiceScheme(),
|
||||
node.getDruidNode().getHostAndPortToUse()
|
||||
);
|
||||
}
|
||||
|
||||
throw new ISE("Couldn't find any servers.");
|
||||
}
|
||||
|
||||
private Request withUrl(Request old, URL url)
|
||||
{
|
||||
Request req = new Request(old.getMethod(), url);
|
||||
req.addHeaderValues(old.getHeaders());
|
||||
if (old.hasContent()) {
|
||||
req.setContent(old.getContent());
|
||||
}
|
||||
return req;
|
||||
}
|
||||
}
|
|
@ -22,10 +22,14 @@ package io.druid.guice;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.coordinator.Coordinator;
|
||||
import io.druid.client.coordinator.CoordinatorSelectorConfig;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import io.druid.guice.annotations.Global;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -47,4 +51,22 @@ public class CoordinatorDiscoveryModule implements Module
|
|||
{
|
||||
return serverDiscoveryFactory.createSelector(config.getServiceName());
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Coordinator
|
||||
@ManageLifecycle
|
||||
public DruidLeaderClient getLeaderHttpClient(
|
||||
@Global HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
@Coordinator ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
return new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
|
||||
"/druid/coordinator/v1/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,14 @@ package io.druid.guice;
|
|||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.Provides;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.indexing.IndexingService;
|
||||
import io.druid.client.indexing.IndexingServiceSelectorConfig;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import io.druid.guice.annotations.Global;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -47,4 +51,22 @@ public class IndexingServiceDiscoveryModule implements Module
|
|||
{
|
||||
return serverDiscoveryFactory.createSelector(config.getServiceName());
|
||||
}
|
||||
|
||||
@Provides
|
||||
@IndexingService
|
||||
@ManageLifecycle
|
||||
public DruidLeaderClient getLeaderHttpClient(
|
||||
@Global HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
@IndexingService ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
return new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
|
||||
"/druid/indexer/v1/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,12 +21,9 @@ package io.druid.server.http;
|
|||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import io.druid.client.indexing.IndexingService;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.java.util.common.ISE;
|
||||
|
||||
import org.eclipse.jetty.proxy.ProxyServlet;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
@ -38,27 +35,28 @@ import java.net.URISyntaxException;
|
|||
*/
|
||||
public class OverlordProxyServlet extends ProxyServlet
|
||||
{
|
||||
private final ServerDiscoverySelector selector;
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
|
||||
@Inject
|
||||
OverlordProxyServlet(
|
||||
@IndexingService ServerDiscoverySelector selector
|
||||
@IndexingService DruidLeaderClient druidLeaderClient
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String rewriteTarget(HttpServletRequest request)
|
||||
{
|
||||
try {
|
||||
final Server indexer = selector.pick();
|
||||
if (indexer == null) {
|
||||
throw new ISE("Can't find indexingService, did you configure druid.selectors.indexing.serviceName same as druid.service at overlord?");
|
||||
final String overlordLeader = druidLeaderClient.findCurrentLeader();
|
||||
if (overlordLeader == null) {
|
||||
throw new ISE("Can't find Overlord leader.");
|
||||
}
|
||||
|
||||
return new URI(
|
||||
request.getScheme(),
|
||||
indexer.getHost(),
|
||||
overlordLeader,
|
||||
request.getRequestURI(),
|
||||
request.getQueryString(),
|
||||
null
|
||||
|
|
|
@ -21,20 +21,15 @@ package io.druid.server.router;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import com.metamx.http.client.response.FullResponseHandler;
|
||||
import com.metamx.http.client.response.FullResponseHolder;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.concurrent.Execs;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.guice.ManageLifecycle;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.concurrent.ScheduledExecutors;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStart;
|
||||
import io.druid.java.util.common.lifecycle.LifecycleStop;
|
||||
|
@ -44,9 +39,6 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
|
|||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -60,14 +52,13 @@ public class CoordinatorRuleManager
|
|||
{
|
||||
private static final Logger log = new Logger(CoordinatorRuleManager.class);
|
||||
|
||||
private final HttpClient httpClient;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final Supplier<TieredBrokerConfig> config;
|
||||
private final ServerDiscoverySelector selector;
|
||||
|
||||
private final FullResponseHandler responseHandler;
|
||||
private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
|
||||
|
||||
private final DruidLeaderClient druidLeaderClient;
|
||||
|
||||
private volatile ScheduledExecutorService exec;
|
||||
|
||||
private final Object lock = new Object();
|
||||
|
@ -76,18 +67,15 @@ public class CoordinatorRuleManager
|
|||
|
||||
@Inject
|
||||
public CoordinatorRuleManager(
|
||||
@Global HttpClient httpClient,
|
||||
@Json ObjectMapper jsonMapper,
|
||||
Supplier<TieredBrokerConfig> config,
|
||||
ServerDiscoverySelector selector
|
||||
DruidLeaderClient druidLeaderClient
|
||||
)
|
||||
{
|
||||
this.httpClient = httpClient;
|
||||
this.jsonMapper = jsonMapper;
|
||||
this.config = config;
|
||||
this.selector = selector;
|
||||
this.druidLeaderClient = druidLeaderClient;
|
||||
|
||||
this.responseHandler = new FullResponseHandler(Charsets.UTF_8);
|
||||
this.rules = new AtomicReference<>(
|
||||
new ConcurrentHashMap<String, List<Rule>>()
|
||||
);
|
||||
|
@ -146,29 +134,16 @@ public class CoordinatorRuleManager
|
|||
public void poll()
|
||||
{
|
||||
try {
|
||||
String url = getRuleURL();
|
||||
if (url == null) {
|
||||
return;
|
||||
}
|
||||
FullResponseHolder response = druidLeaderClient.go(
|
||||
druidLeaderClient.makeRequest(HttpMethod.GET, config.get().getRulesEndpoint())
|
||||
);
|
||||
|
||||
FullResponseHolder response = httpClient.go(
|
||||
new Request(
|
||||
HttpMethod.GET,
|
||||
new URL(url)
|
||||
),
|
||||
responseHandler
|
||||
).get();
|
||||
|
||||
if (response.getStatus().equals(HttpResponseStatus.FOUND)) {
|
||||
url = response.getResponse().headers().get("Location");
|
||||
log.info("Redirecting rule request to [%s]", url);
|
||||
response = httpClient.go(
|
||||
new Request(
|
||||
HttpMethod.GET,
|
||||
new URL(url)
|
||||
),
|
||||
responseHandler
|
||||
).get();
|
||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||
throw new ISE(
|
||||
"Error while polling rules, status[%s] content[%s]",
|
||||
response.getStatus(),
|
||||
response.getContent()
|
||||
);
|
||||
}
|
||||
|
||||
ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(
|
||||
|
@ -200,24 +175,4 @@ public class CoordinatorRuleManager
|
|||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private String getRuleURL() throws URISyntaxException
|
||||
{
|
||||
Server server = selector.pick();
|
||||
|
||||
if (server == null) {
|
||||
log.error("No instances found for [%s]!", config.get().getCoordinatorServiceName());
|
||||
return null;
|
||||
}
|
||||
|
||||
return new URI(
|
||||
server.getScheme(),
|
||||
null,
|
||||
server.getAddress(),
|
||||
server.getPort(),
|
||||
config.get().getRulesEndpoint(),
|
||||
null,
|
||||
null
|
||||
).toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,274 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.discovery;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.name.Named;
|
||||
import com.google.inject.name.Names;
|
||||
import com.google.inject.servlet.GuiceFilter;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import com.metamx.http.client.Request;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.guice.GuiceInjectors;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
import io.druid.guice.LazySingleton;
|
||||
import io.druid.guice.LifecycleModule;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.initialization.Initialization;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.server.DruidNode;
|
||||
import io.druid.server.initialization.BaseJettyTest;
|
||||
import io.druid.server.initialization.ServerConfig;
|
||||
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
import org.easymock.EasyMock;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.handler.HandlerList;
|
||||
import org.eclipse.jetty.servlet.DefaultServlet;
|
||||
import org.eclipse.jetty.servlet.ServletContextHandler;
|
||||
import org.eclipse.jetty.servlet.ServletHolder;
|
||||
import org.jboss.netty.handler.codec.http.HttpMethod;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class DruidLeaderClientTest extends BaseJettyTest
|
||||
{
|
||||
private DiscoveryDruidNode discoveryDruidNode;
|
||||
private HttpClient httpClient;
|
||||
|
||||
@Override
|
||||
protected Injector setupInjector()
|
||||
{
|
||||
final DruidNode node = new DruidNode("test", "localhost", null, null, new ServerConfig());
|
||||
discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of());
|
||||
|
||||
Injector injector = Initialization.makeInjectorWithModules(
|
||||
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
|
||||
new Module()
|
||||
{
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
JsonConfigProvider.bindInstance(
|
||||
binder,
|
||||
Key.get(DruidNode.class, Self.class),
|
||||
node
|
||||
);
|
||||
binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort());
|
||||
binder.bind(JettyServerInitializer.class).to(TestJettyServerInitializer.class).in(LazySingleton.class);
|
||||
Jerseys.addResource(binder, SimpleResource.class);
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
httpClient = injector.getInstance(ClientHolder.class).getClient();
|
||||
return injector;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimple() throws Exception
|
||||
{
|
||||
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
|
||||
ImmutableList.of(discoveryDruidNode)
|
||||
);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
|
||||
|
||||
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
|
||||
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
"nodetype",
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct");
|
||||
request.setContent("hello".getBytes("UTF-8"));
|
||||
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedirection() throws Exception
|
||||
{
|
||||
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
|
||||
ImmutableList.of(discoveryDruidNode)
|
||||
);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
|
||||
|
||||
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
|
||||
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
"nodetype",
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect");
|
||||
request.setContent("hello".getBytes("UTF-8"));
|
||||
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerFailureAndRedirect() throws Exception
|
||||
{
|
||||
ServerDiscoverySelector serverDiscoverySelector = EasyMock.createMock(ServerDiscoverySelector.class);
|
||||
EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes();
|
||||
|
||||
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
|
||||
ImmutableList.of(new DiscoveryDruidNode(
|
||||
new DruidNode("test", "dummyhost", 64231, null, new ServerConfig()),
|
||||
"test",
|
||||
ImmutableMap.of()
|
||||
))
|
||||
);
|
||||
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
|
||||
ImmutableList.of(discoveryDruidNode)
|
||||
);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes();
|
||||
|
||||
EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
|
||||
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
"nodetype",
|
||||
"/simple/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect");
|
||||
request.setContent("hello".getBytes("UTF-8"));
|
||||
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFindCurrentLeader() throws Exception
|
||||
{
|
||||
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
|
||||
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
|
||||
ImmutableList.of(discoveryDruidNode)
|
||||
);
|
||||
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
|
||||
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
|
||||
|
||||
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
|
||||
|
||||
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
"nodetype",
|
||||
"/simple/leader",
|
||||
EasyMock.createNiceMock(ServerDiscoverySelector.class)
|
||||
);
|
||||
druidLeaderClient.start();
|
||||
|
||||
Assert.assertEquals("http://localhost:1234/", druidLeaderClient.findCurrentLeader());
|
||||
}
|
||||
|
||||
private static class TestJettyServerInitializer implements JettyServerInitializer
|
||||
{
|
||||
@Override
|
||||
public void initialize(Server server, Injector injector)
|
||||
{
|
||||
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
|
||||
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
|
||||
root.addFilter(GuiceFilter.class, "/*", null);
|
||||
|
||||
final HandlerList handlerList = new HandlerList();
|
||||
handlerList.setHandlers(new Handler[]{root});
|
||||
server.setHandler(handlerList);
|
||||
}
|
||||
}
|
||||
|
||||
@Path("/simple")
|
||||
public static class SimpleResource
|
||||
{
|
||||
private final int port;
|
||||
|
||||
@Inject
|
||||
public SimpleResource(@Named("port") int port)
|
||||
{
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/direct")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response direct(String input)
|
||||
{
|
||||
if ("hello".equals(input)) {
|
||||
return Response.ok("hello").build();
|
||||
} else {
|
||||
return Response.serverError().build();
|
||||
}
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("/redirect")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response redirecting() throws Exception
|
||||
{
|
||||
return Response.temporaryRedirect(new URI(StringUtils.format("http://localhost:%s/simple/direct", port))).build();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/leader")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
public Response leader()
|
||||
{
|
||||
return Response.ok("http://localhost:1234/").build();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -455,7 +455,7 @@ public class DruidCoordinatorSegmentMergerTest
|
|||
EasyMock.replay(configManager);
|
||||
|
||||
final List<List<DataSegment>> retVal = Lists.newArrayList();
|
||||
final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null)
|
||||
final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null)
|
||||
{
|
||||
@Override
|
||||
public void mergeSegments(List<DataSegment> segmentsToMerge)
|
||||
|
|
|
@ -19,8 +19,7 @@
|
|||
|
||||
package io.druid.server.http;
|
||||
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -33,17 +32,17 @@ public class OverlordProxyServletTest
|
|||
@Test
|
||||
public void testRewriteURI()
|
||||
{
|
||||
ServerDiscoverySelector selector = EasyMock.createMock(ServerDiscoverySelector.class);
|
||||
Server server = EasyMock.createMock(Server.class);
|
||||
EasyMock.expect(server.getHost()).andReturn("overlord:port");
|
||||
EasyMock.expect(selector.pick()).andReturn(server).anyTimes();
|
||||
DruidLeaderClient druidLeaderClient = EasyMock.createMock(DruidLeaderClient.class);
|
||||
EasyMock.expect(druidLeaderClient.findCurrentLeader()).andReturn("overlord:port");
|
||||
|
||||
HttpServletRequest request = EasyMock.createMock(HttpServletRequest.class);
|
||||
EasyMock.expect(request.getScheme()).andReturn("https").anyTimes();
|
||||
EasyMock.expect(request.getQueryString()).andReturn("param1=test¶m2=test2").anyTimes();
|
||||
EasyMock.expect(request.getRequestURI()).andReturn("/druid/overlord/worker").anyTimes();
|
||||
EasyMock.replay(server, selector, request);
|
||||
|
||||
URI uri = URI.create(new OverlordProxyServlet(selector).rewriteTarget(request));
|
||||
EasyMock.replay(druidLeaderClient, request);
|
||||
|
||||
URI uri = URI.create(new OverlordProxyServlet(druidLeaderClient).rewriteTarget(request));
|
||||
Assert.assertEquals("https://overlord:port/druid/overlord/worker?param1=test¶m2=test2", uri.toString());
|
||||
}
|
||||
|
||||
|
|
|
@ -27,13 +27,11 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.druid.client.DruidServer;
|
||||
import io.druid.client.selector.Server;
|
||||
import io.druid.discovery.DiscoveryDruidNode;
|
||||
import io.druid.discovery.DruidNodeDiscovery;
|
||||
import io.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Json;
|
||||
import io.druid.java.util.common.Intervals;
|
||||
import io.druid.java.util.common.Pair;
|
||||
|
@ -115,7 +113,7 @@ public class TieredBrokerHostSelectorTest
|
|||
EasyMock.replay(druidNodeDiscoveryProvider);
|
||||
|
||||
brokerSelector = new TieredBrokerHostSelector(
|
||||
new TestRuleManager(null, null, null),
|
||||
new TestRuleManager(null, null),
|
||||
new TieredBrokerConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -320,12 +318,11 @@ public class TieredBrokerHostSelectorTest
|
|||
private static class TestRuleManager extends CoordinatorRuleManager
|
||||
{
|
||||
public TestRuleManager(
|
||||
@Global HttpClient httpClient,
|
||||
@Json ObjectMapper jsonMapper,
|
||||
Supplier<TieredBrokerConfig> config
|
||||
)
|
||||
{
|
||||
super(httpClient, jsonMapper, config, null);
|
||||
super(jsonMapper, config, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -26,10 +26,12 @@ import com.google.inject.Module;
|
|||
import com.google.inject.Provides;
|
||||
import com.google.inject.TypeLiteral;
|
||||
import com.google.inject.name.Names;
|
||||
import com.metamx.http.client.HttpClient;
|
||||
import io.airlift.airline.Command;
|
||||
import io.druid.curator.discovery.DiscoveryModule;
|
||||
import io.druid.curator.discovery.ServerDiscoveryFactory;
|
||||
import io.druid.curator.discovery.ServerDiscoverySelector;
|
||||
import io.druid.discovery.DruidLeaderClient;
|
||||
import io.druid.discovery.DruidNodeDiscoveryProvider;
|
||||
import io.druid.guice.Jerseys;
|
||||
import io.druid.guice.JsonConfigProvider;
|
||||
|
@ -39,6 +41,7 @@ import io.druid.guice.ManageLifecycle;
|
|||
import io.druid.guice.QueryRunnerFactoryModule;
|
||||
import io.druid.guice.QueryableModule;
|
||||
import io.druid.guice.RouterProcessingModule;
|
||||
import io.druid.guice.annotations.Global;
|
||||
import io.druid.guice.annotations.Self;
|
||||
import io.druid.guice.http.JettyHttpClientModule;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -131,6 +134,23 @@ public class CliRouter extends ServerRunnable
|
|||
{
|
||||
return factory.createSelector(config.getCoordinatorServiceName());
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ManageLifecycle
|
||||
public DruidLeaderClient getLeaderHttpClient(
|
||||
@Global HttpClient httpClient,
|
||||
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
|
||||
ServerDiscoverySelector serverDiscoverySelector
|
||||
)
|
||||
{
|
||||
return new DruidLeaderClient(
|
||||
httpClient,
|
||||
druidNodeDiscoveryProvider,
|
||||
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
|
||||
"/druid/coordinator/v1/leader",
|
||||
serverDiscoverySelector
|
||||
);
|
||||
}
|
||||
},
|
||||
new LookupModule()
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue