Add /isLeader call to overlord and coordinator. (#4282)

This is useful for putting them behind load balancers or proxies, as it lets
the load balancer know which server is currently active through an http health
check.

Also makes the method naming a little more consistent between coordinator and
overlord code.
This commit is contained in:
Gian Merlino 2017-05-19 10:46:13 +09:00 committed by Himanshu
parent 7479cbde68
commit adeecc0e72
12 changed files with 246 additions and 27 deletions

View File

@ -55,6 +55,13 @@ Returns the Druid version, loaded extensions, memory used, total memory and othe
Returns the current leader coordinator of the cluster.
* `/druid/coordinator/v1/isLeader`
Returns a JSON object with field "leader", either true or false, indicating if this server is the current leader
coordinator of the cluster. In addition, returns HTTP 200 if the server is the current leader and HTTP 404 if not.
This is suitable for use as a load balancer status check if you only want the active leader to be considered in-service
at the load balancer.
* `/druid/coordinator/v1/loadstatus`
Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster.

View File

@ -33,6 +33,26 @@ In local mode overlord is also responsible for creating peons for executing task
Local mode is typically used for simple workflows. In remote mode, the overlord and middle manager are run in separate processes and you can run each on a different server.
This mode is recommended if you intend to use the indexing service as the single endpoint for all Druid indexing.
#### Leadership status
If you have multiple overlords, just one is leading at any given time. The others are on standby. To get the current
leader overlord of the cluster, call:
```
http://<OVERLORD_IP>:<port>/druid/indexer/v1/leader
```
To see if a given server is the current leader overlord of the cluster, call:
```
http://<OVERLORD_IP>:<port>/druid/indexer/v1/isLeader
```
This returns a JSON object with field "leader", either true or false. In addition, this call returns HTTP 200 if the
server is the current leader and HTTP 404 if not. This is suitable for use as a load balancer status check if you
only want the active leader to be considered in-service at the load balancer.
#### Submitting Tasks and Querying Task Status
Tasks are submitted to the overlord node in the form of JSON objects. Tasks can be submitted via POST requests to:

View File

@ -242,12 +242,12 @@ public class TaskMaster
}
}
public boolean isLeading()
public boolean isLeader()
{
return leading;
}
public String getLeader()
public String getCurrentLeader()
{
try {
final Participant leader = leaderSelector.getLeader();

View File

@ -20,17 +20,24 @@
package io.druid.indexing.overlord.http;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.server.http.RedirectInfo;
import java.net.URI;
import java.net.URL;
import java.util.Set;
/**
*/
public class OverlordRedirectInfo implements RedirectInfo
{
private static final Set<String> LOCAL_PATHS = ImmutableSet.of(
"/druid/indexer/v1/leader",
"/druid/indexer/v1/isLeader"
);
private final TaskMaster taskMaster;
@Inject
@ -42,14 +49,14 @@ public class OverlordRedirectInfo implements RedirectInfo
@Override
public boolean doLocal(String requestURI)
{
return taskMaster.isLeading();
return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || taskMaster.isLeader();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
final String leader = taskMaster.getLeader();
final String leader = taskMaster.getCurrentLeader();
if (leader == null || leader.isEmpty()) {
return null;
} else {

View File

@ -177,7 +177,22 @@ public class OverlordResource
@Produces(MediaType.APPLICATION_JSON)
public Response getLeader()
{
return Response.ok(taskMaster.getLeader()).build();
return Response.ok(taskMaster.getCurrentLeader()).build();
}
@GET
@Path("/isLeader")
@ResourceFilters(StateResourceFilter.class)
@Produces(MediaType.APPLICATION_JSON)
public Response isLeader()
{
final boolean leading = taskMaster.isLeader();
final Map<String, Boolean> response = ImmutableMap.of("leader", leading);
if (leading) {
return Response.ok(response).build();
} else {
return Response.status(Response.Status.NOT_FOUND).entity(response).build();
}
}
@GET

View File

@ -40,18 +40,33 @@ public class OverlordRedirectInfoTest
}
@Test
public void testDoLocal()
public void testDoLocalWhenLeading()
{
EasyMock.expect(taskMaster.isLeading()).andReturn(true).anyTimes();
EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(taskMaster);
Assert.assertTrue(redirectInfo.doLocal(null));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/other/path"));
EasyMock.verify(taskMaster);
}
@Test
public void testDoLocalWhenNotLeading()
{
EasyMock.expect(taskMaster.isLeader()).andReturn(false).anyTimes();
EasyMock.replay(taskMaster);
Assert.assertFalse(redirectInfo.doLocal(null));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/leader"));
Assert.assertTrue(redirectInfo.doLocal("/druid/indexer/v1/isLeader"));
Assert.assertFalse(redirectInfo.doLocal("/druid/indexer/v1/other/path"));
EasyMock.verify(taskMaster);
}
@Test
public void testGetRedirectURLNull()
{
EasyMock.expect(taskMaster.getLeader()).andReturn(null).anyTimes();
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn(null).anyTimes();
EasyMock.replay(taskMaster);
URL url = redirectInfo.getRedirectURL("query", "/request");
Assert.assertNull(url);
@ -61,7 +76,7 @@ public class OverlordRedirectInfoTest
@Test
public void testGetRedirectURLEmpty()
{
EasyMock.expect(taskMaster.getLeader()).andReturn("").anyTimes();
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("").anyTimes();
EasyMock.replay(taskMaster);
URL url = redirectInfo.getRedirectURL("query", "/request");
Assert.assertNull(url);
@ -74,7 +89,7 @@ public class OverlordRedirectInfoTest
String host = "localhost";
String query = "foo=bar&x=y";
String request = "/request";
EasyMock.expect(taskMaster.getLeader()).andReturn(host).anyTimes();
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn(host).anyTimes();
EasyMock.replay(taskMaster);
URL url = redirectInfo.getRedirectURL(query, request);
Assert.assertEquals("http://localhost/request?foo=bar&x=y", url.toString());

View File

@ -22,6 +22,7 @@ package io.druid.indexing.overlord.http;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.indexing.common.TaskLocation;
@ -79,7 +80,10 @@ public class OverlordResourceTest
null,
new AuthConfig(true)
);
}
public void expectAuthorizationTokenCheck()
{
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTH_TOKEN)).andReturn(
new AuthorizationInfo()
{
@ -98,9 +102,40 @@ public class OverlordResourceTest
);
}
@Test
public void testLeader()
{
EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once();
EasyMock.replay(taskRunner, taskMaster, tsqa, req);
final Response response = overlordResource.getLeader();
Assert.assertEquals("boz", response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testIsLeader()
{
EasyMock.expect(taskMaster.isLeader()).andReturn(true).once();
EasyMock.expect(taskMaster.isLeader()).andReturn(false).once();
EasyMock.replay(taskRunner, taskMaster, tsqa, req);
// true
final Response response1 = overlordResource.isLeader();
Assert.assertEquals(ImmutableMap.of("leader", true), response1.getEntity());
Assert.assertEquals(200, response1.getStatus());
// false
final Response response2 = overlordResource.isLeader();
Assert.assertEquals(ImmutableMap.of("leader", false), response2.getEntity());
Assert.assertEquals(404, response2.getStatus());
}
@Test
public void testSecuredGetWaitingTask() throws Exception
{
expectAuthorizationTokenCheck();
EasyMock.expect(tsqa.getActiveTasks()).andReturn(
ImmutableList.of(
getTaskWithIdAndDatasource("id_1", "allow"),
@ -128,6 +163,8 @@ public class OverlordResourceTest
@Test
public void testSecuredGetCompleteTasks()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2", "id_3");
EasyMock.expect(tsqa.getRecentlyFinishedTaskStatuses()).andReturn(
Lists.transform(
@ -165,6 +202,8 @@ public class OverlordResourceTest
@Test
public void testSecuredGetRunningTasks()
{
expectAuthorizationTokenCheck();
List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
@ -191,6 +230,8 @@ public class OverlordResourceTest
@Test
public void testSecuredTaskPost()
{
expectAuthorizationTokenCheck();
EasyMock.replay(taskRunner, taskMaster, tsqa, req);
Task task = NoopTask.create();
Response response = overlordResource.taskPost(task, req);

View File

@ -198,11 +198,11 @@ public class OverlordTest
// basic task master lifecycle test
taskMaster.start();
announcementLatch.await();
while (!taskMaster.isLeading()) {
while (!taskMaster.isLeader()) {
// I believe the control will never reach here and thread will never sleep but just to be on safe side
Thread.sleep(10);
}
Assert.assertEquals(taskMaster.getLeader(), druidNode.getHostAndPort());
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
// Test Overlord resource stuff
overlordResource = new OverlordResource(
taskMaster,
@ -271,7 +271,7 @@ public class OverlordTest
response = overlordResource.getCompleteTasks(req);
Assert.assertEquals(2, (((List) response.getEntity()).size()));
taskMaster.stop();
Assert.assertFalse(taskMaster.isLeading());
Assert.assertFalse(taskMaster.isLeader());
EasyMock.verify(taskLockbox, taskActionClientFactory);
}

View File

@ -20,26 +20,34 @@
package io.druid.server.http;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.druid.server.coordinator.DruidCoordinator;
import java.net.URL;
import java.util.Set;
/**
*/
*/
public class CoordinatorRedirectInfo implements RedirectInfo
{
private static final Set<String> LOCAL_PATHS = ImmutableSet.of(
"/druid/coordinator/v1/leader",
"/druid/coordinator/v1/isLeader"
);
private final DruidCoordinator coordinator;
@Inject
public CoordinatorRedirectInfo(DruidCoordinator coordinator) {
public CoordinatorRedirectInfo(DruidCoordinator coordinator)
{
this.coordinator = coordinator;
}
@Override
public boolean doLocal(String requestURI)
{
return coordinator.isLeader();
return (requestURI != null && LOCAL_PATHS.contains(requestURI)) || coordinator.isLeader();
}
@Override

View File

@ -36,6 +36,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Map;
/**
*/
@ -61,6 +62,20 @@ public class CoordinatorResource
return Response.ok(coordinator.getCurrentLeader()).build();
}
@GET
@Path("/isLeader")
@Produces(MediaType.APPLICATION_JSON)
public Response isLeader()
{
final boolean leading = coordinator.isLeader();
final Map<String, Boolean> response = ImmutableMap.of("leader", leading);
if (leading) {
return Response.ok(response).build();
} else {
return Response.status(Response.Status.NOT_FOUND).entity(response).build();
}
}
@GET
@Path("/loadstatus")
@Produces(MediaType.APPLICATION_JSON)
@ -147,16 +162,16 @@ public class CoordinatorResource
)
.put(
"segmentsToDrop", Collections2.transform(
input.getSegmentsToDrop(),
new Function<DataSegment, Object>()
{
@Override
public String apply(DataSegment segment)
{
return segment.getIdentifier();
}
}
)
input.getSegmentsToDrop(),
new Function<DataSegment, Object>()
{
@Override
public String apply(DataSegment segment)
{
return segment.getIdentifier();
}
}
)
)
.build();
}

View File

@ -40,11 +40,26 @@ public class CoordinatorRedirectInfoTest
}
@Test
public void testDoLocal()
public void testDoLocalWhenLeading()
{
EasyMock.expect(druidCoordinator.isLeader()).andReturn(true).anyTimes();
EasyMock.replay(druidCoordinator);
Assert.assertTrue(coordinatorRedirectInfo.doLocal(null));
Assert.assertTrue(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/leader"));
Assert.assertTrue(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/isLeader"));
Assert.assertTrue(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/other/path"));
EasyMock.verify(druidCoordinator);
}
@Test
public void testDoLocalWhenNotLeading()
{
EasyMock.expect(druidCoordinator.isLeader()).andReturn(false).anyTimes();
EasyMock.replay(druidCoordinator);
Assert.assertFalse(coordinatorRedirectInfo.doLocal(null));
Assert.assertTrue(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/leader"));
Assert.assertTrue(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/isLeader"));
Assert.assertFalse(coordinatorRedirectInfo.doLocal("/druid/coordinator/v1/other/path"));
EasyMock.verify(druidCoordinator);
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.http;
import com.google.common.collect.ImmutableMap;
import io.druid.server.coordinator.DruidCoordinator;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
public class CoordinatorResourceTest
{
private DruidCoordinator mock;
@Before
public void setUp()
{
mock = EasyMock.createStrictMock(DruidCoordinator.class);
}
@After
public void tearDown()
{
EasyMock.verify(mock);
}
@Test
public void testLeader()
{
EasyMock.expect(mock.getCurrentLeader()).andReturn("boz").once();
EasyMock.replay(mock);
final Response response = new CoordinatorResource(mock).getLeader();
Assert.assertEquals("boz", response.getEntity());
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testIsLeader()
{
EasyMock.expect(mock.isLeader()).andReturn(true).once();
EasyMock.expect(mock.isLeader()).andReturn(false).once();
EasyMock.replay(mock);
// true
final Response response1 = new CoordinatorResource(mock).isLeader();
Assert.assertEquals(ImmutableMap.of("leader", true), response1.getEntity());
Assert.assertEquals(200, response1.getStatus());
// false
final Response response2 = new CoordinatorResource(mock).isLeader();
Assert.assertEquals(ImmutableMap.of("leader", false), response2.getEntity());
Assert.assertEquals(404, response2.getStatus());
}
}