Use NodeType enum instead of Strings (#6377)

* Use NodeType enum instead of Strings

* Make NodeType constants uppercase

* Fix CommonCacheNotifier and NodeType/ServerType comments

* Reconsidering comment

* Fix import

* Add a comment to CommonCacheNotifier.NODE_TYPES
This commit is contained in:
Roman Leventov 2018-10-15 00:49:38 -03:00 committed by Slim Bouguerra
parent 95ab1ea737
commit aa121da25f
31 changed files with 268 additions and 259 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -57,13 +58,17 @@ public class CommonCacheNotifier
{
private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);
private static final List<String> NODE_TYPES = Arrays.asList(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER,
DruidNodeDiscoveryProvider.NODE_TYPE_MM
/**
* {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* from metadata storage.
*/
private static final List<NodeType> NODE_TYPES = Arrays.asList(
NodeType.BROKER,
NodeType.OVERLORD,
NodeType.HISTORICAL,
NodeType.PEON,
NodeType.ROUTER,
NodeType.MIDDLE_MANAGER
);
private final DruidNodeDiscoveryProvider discoveryProvider;
@ -154,7 +159,7 @@ public class CommonCacheNotifier
private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String updatedAuthorizerPrefix, byte[] serializedUserMap)
{
List<ListenableFuture<StatusResponseHolder>> futures = new ArrayList<>();
for (String nodeType : NODE_TYPES) {
for (NodeType nodeType : NODE_TYPES) {
DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType);
Collection<DiscoveryDruidNode> nodes = nodeDiscovery.getAllNodes();
for (DiscoveryDruidNode node : nodes) {

View File

@ -40,8 +40,8 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -310,7 +310,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask
new LookupNodeService(lookupTier);
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService

View File

@ -31,8 +31,8 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
@ -255,7 +255,7 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner
new LookupNodeService(lookupTier);
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService

View File

@ -37,8 +37,8 @@ import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
@ -672,7 +672,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER));
return new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService

View File

@ -32,8 +32,8 @@ import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@ -364,7 +364,7 @@ public class RealtimeIndexTask extends AbstractTask
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService

View File

@ -41,6 +41,7 @@ import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
@ -445,7 +446,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private void startWorkersHandling() throws InterruptedException
{
final CountDownLatch workerViewInitialized = new CountDownLatch(1);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{

View File

@ -33,6 +33,7 @@ import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@ -84,7 +85,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -129,7 +130,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
@ -137,7 +138,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
)
@ -172,7 +173,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -221,7 +222,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
@ -229,7 +230,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0")
)
@ -259,7 +260,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -324,7 +325,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
@ -404,7 +405,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -458,7 +459,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
@ -567,7 +568,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -621,7 +622,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 1234, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0")
)
@ -730,7 +731,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
@ -788,7 +789,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0")
)
@ -832,7 +833,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")
)
@ -865,7 +866,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
new DruidNode("service", "host3", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
NodeType.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0")
)
@ -1159,7 +1160,7 @@ public class HttpRemoteTaskRunnerTest
{
TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_MM))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

View File

@ -42,11 +42,7 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
private final ObjectMapper jsonMapper;
@Inject
public CuratorDruidNodeAnnouncer(
Announcer announcer,
ZkPathsConfig config,
@Json ObjectMapper jsonMapper
)
public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper)
{
this.announcer = announcer;
this.config = config;
@ -59,14 +55,12 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
try {
log.info("Announcing [%s].", discoveryDruidNode);
announcer.announce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
),
jsonMapper.writeValueAsBytes(discoveryDruidNode)
String path = ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType().toString(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
announcer.announce(path, jsonMapper.writeValueAsBytes(discoveryDruidNode));
log.info("Announced [%s].", discoveryDruidNode);
}
@ -80,13 +74,12 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
{
log.info("Unannouncing [%s].", discoveryDruidNode);
announcer.unannounce(
ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
)
String path = ZKPaths.makePath(
config.getInternalDiscoveryPath(),
discoveryDruidNode.getNodeType().toString(),
discoveryDruidNode.getDruidNode().getHostAndPortToUse()
);
announcer.unannounce(path);
log.info("Unannounced [%s].", discoveryDruidNode);
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
@ -65,7 +66,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private ExecutorService listenerExecutor;
private final Map<String, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final Map<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
private final LifecycleLock lifecycleLock = new LifecycleLock();
@ -82,27 +83,23 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
@Override
public DruidNodeDiscovery getForNodeType(String nodeType)
public DruidNodeDiscovery getForNodeType(NodeType nodeType)
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return nodeTypeWatchers.compute(
return nodeTypeWatchers.computeIfAbsent(
nodeType,
(k, v) -> {
if (v != null) {
return v;
}
log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType);
nType -> {
log.info("Creating NodeTypeWatcher for nodeType [%s].", nType);
NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher(
listenerExecutor,
curatorFramework,
config.getInternalDiscoveryPath(),
jsonMapper,
nodeType
nType
);
nodeTypeWatcher.start();
log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType);
log.info("Created NodeTypeWatcher for nodeType [%s].", nType);
return nodeTypeWatcher;
}
);
@ -154,7 +151,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final CuratorFramework curatorFramework;
private final String nodeType;
private final NodeType nodeType;
private final ObjectMapper jsonMapper;
// hostAndPort -> DiscoveryDruidNode
@ -165,7 +162,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final ExecutorService listenerExecutor;
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList();
private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<>();
private final Object lock = new Object();
@ -176,7 +173,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
CuratorFramework curatorFramework,
String basePath,
ObjectMapper jsonMapper,
String nodeType
NodeType nodeType
)
{
this.listenerExecutor = listenerExecutor;
@ -188,7 +185,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeTypeWatcher[%s]", nodeType));
this.cache = new PathChildrenCache(
curatorFramework,
ZKPaths.makePath(basePath, nodeType),
ZKPaths.makePath(basePath, nodeType.toString()),
true,
true,
cacheExecutor
@ -241,10 +238,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return;
}
DiscoveryDruidNode druidNode = jsonMapper.readValue(
data,
DiscoveryDruidNode.class
);
DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
if (!nodeType.equals(druidNode.getNodeType())) {
log.warn(
@ -255,11 +249,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
return;
}
log.info(
"Node[%s:%s] appeared.",
druidNode.getDruidNode().getHostAndPortToUse(),
druidNode
);
log.info("Node[%s:%s] appeared.", druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
addNode(druidNode);
@ -330,10 +320,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
}
private void safeSchedule(
Runnable runnable,
String errMsgFormat, Object... args
)
private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
{
listenerExecutor.submit(() -> {
try {

View File

@ -36,7 +36,7 @@ import java.util.Objects;
public class DiscoveryDruidNode
{
private final DruidNode druidNode;
private final String nodeType;
private final NodeType nodeType;
// Other metadata associated with the node e.g.
// if its a historical node then lookup information, segment loading capacity etc.
@ -45,7 +45,7 @@ public class DiscoveryDruidNode
@JsonCreator
public DiscoveryDruidNode(
@JsonProperty("druidNode") DruidNode druidNode,
@JsonProperty("nodeType") String nodeType,
@JsonProperty("nodeType") NodeType nodeType,
@JsonProperty("services") Map<String, DruidService> services
)
{
@ -64,7 +64,7 @@ public class DiscoveryDruidNode
}
@JsonProperty
public String getNodeType()
public NodeType getNodeType()
{
return nodeType;
}

View File

@ -67,7 +67,7 @@ public class DruidLeaderClient
private final HttpClient httpClient;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final String nodeTypeToWatch;
private final NodeType nodeTypeToWatch;
private final String leaderRequestPath;
@ -81,7 +81,7 @@ public class DruidLeaderClient
public DruidLeaderClient(
HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
String nodeTypeToWatch,
NodeType nodeTypeToWatch,
String leaderRequestPath,
ServerDiscoverySelector serverDiscoverySelector
)

View File

@ -35,63 +35,40 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Provider of DruidNodeDiscovery instances.
* Provider of {@link DruidNodeDiscovery} instances.
*/
public abstract class DruidNodeDiscoveryProvider
{
private static final Logger log = new Logger(DruidNodeDiscoveryProvider.class);
public static final String NODE_TYPE_COORDINATOR = "coordinator";
public static final String NODE_TYPE_HISTORICAL = "historical";
public static final String NODE_TYPE_BROKER = "broker";
public static final String NODE_TYPE_OVERLORD = "overlord";
public static final String NODE_TYPE_PEON = "peon";
public static final String NODE_TYPE_ROUTER = "router";
public static final String NODE_TYPE_MM = "middleManager";
public static final Set<String> ALL_NODE_TYPES = ImmutableSet.of(
NODE_TYPE_COORDINATOR,
NODE_TYPE_HISTORICAL,
NODE_TYPE_BROKER,
NODE_TYPE_OVERLORD,
NODE_TYPE_PEON,
NODE_TYPE_ROUTER,
NODE_TYPE_MM
private static final Map<String, Set<NodeType>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON),
DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.PEON)
);
private static final Map<String, Set<String>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_BROKER, NODE_TYPE_HISTORICAL, NODE_TYPE_PEON),
DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_HISTORICAL, NODE_TYPE_PEON),
WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NODE_TYPE_MM)
);
private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap = new ConcurrentHashMap<>(
SERVICE_TO_NODE_TYPES.size());
private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap =
new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
/**
* Get DruidNodeDiscovery instance to discover nodes of given nodeType.
*/
public abstract DruidNodeDiscovery getForNodeType(String nodeType);
public abstract DruidNodeDiscovery getForNodeType(NodeType nodeType);
/**
* Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata.
*/
public DruidNodeDiscovery getForService(String serviceName)
{
return serviceDiscoveryMap.compute(
return serviceDiscoveryMap.computeIfAbsent(
serviceName,
(k, v) -> {
if (v != null) {
return v;
}
service -> {
Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName);
Set<NodeType> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
if (nodeTypesToWatch == null) {
throw new IAE("Unknown service [%s].", serviceName);
throw new IAE("Unknown service [%s].", service);
}
ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName);
for (String nodeType : nodeTypesToWatch) {
ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service);
for (NodeType nodeType : nodeTypesToWatch) {
getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
}
return serviceDiscovery;

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonValue;
/**
*
* This is a historical occasion that this enum is different from {@link
* org.apache.druid.server.coordination.ServerType} because they are essentially the same abstraction, but merging them
* could only increase the complexity and drop the code safety, because they name the same types differently ("peon" -
* "indexer-executor" and "middleManager" - "realtime") and both expose them via JSON APIs.
*/
public enum NodeType
{
COORDINATOR("coordinator"),
HISTORICAL("historical"),
BROKER("broker"),
OVERLORD("overlord"),
PEON("peon"),
ROUTER("router"),
MIDDLE_MANAGER("middleManager");
private final String jsonName;
NodeType(String jsonName)
{
this.jsonName = jsonName;
}
/**
* Lowercase for backward compatibility, as a part of the {@link DiscoveryDruidNode}'s JSON format.
*
* Don't need to define {@link com.fasterxml.jackson.annotation.JsonCreator} because for enum types {@link JsonValue}
* serves for both serialization and deserialization, see the Javadoc comment of {@link JsonValue}.
*/
@JsonValue
public String getJsonName()
{
return jsonName;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.http.client.HttpClient;
@ -64,7 +65,7 @@ public class CoordinatorDiscoveryModule implements Module
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
NodeType.COORDINATOR,
"/druid/coordinator/v1/leader",
serverDiscoverySelector
);

View File

@ -28,6 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.http.client.HttpClient;
@ -64,7 +65,7 @@ public class IndexingServiceDiscoveryModule implements Module
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
NodeType.OVERLORD,
"/druid/indexer/v1/leader",
serverDiscoverySelector
);

View File

@ -26,7 +26,8 @@ import org.apache.druid.java.util.common.StringUtils;
/**
* This enum represents types of druid services that hold segments.
* <p>
* These types are externally visible (e.g., from the output of /druid/coordinator/v1/servers).
* These types are externally visible (e.g., from the output of {@link
* org.apache.druid.server.http.ServersResource#makeSimpleServer}).
* <p>
* For backwards compatibility, when presenting these types externally, the toString() representation
* of the enum should be used.
@ -34,6 +35,11 @@ import org.apache.druid.java.util.common.StringUtils;
* The toString() method converts the enum name() to lowercase and replaces underscores with hyphens,
* which is the format expected for the server type string prior to the patch that introduced ServerType:
* https://github.com/apache/incubator-druid/pull/4148
*
* This is a historical occasion that this enum is different from {@link org.apache.druid.discovery.NodeType} because
* they are essentially the same abstraction, but merging them could only increase the complexity and drop the code
* safety, because they name the same types differently ("indexer-executor" - "peon" and "realtime" - "middleManager")
* and both expose them via JSON APIs.
*/
public enum ServerType
{

View File

@ -28,8 +28,8 @@ import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.security.StateResourceFilter;
@ -40,6 +40,7 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.Collection;
/**
@ -59,33 +60,23 @@ public class ClusterResource
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterServers(
@QueryParam("full") boolean full
)
public Response getClusterServers(@QueryParam("full") boolean full)
{
ImmutableMap.Builder<String, Object> entityBuilder = new ImmutableMap.Builder<>();
ImmutableMap.Builder<NodeType, Object> entityBuilder = new ImmutableMap.Builder<>();
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR, full)
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD, full)
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER, full)
);
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL, full)
);
entityBuilder.put(NodeType.COORDINATOR, getNodes(NodeType.COORDINATOR, full));
entityBuilder.put(NodeType.OVERLORD, getNodes(NodeType.OVERLORD, full));
entityBuilder.put(NodeType.BROKER, getNodes(NodeType.BROKER, full));
entityBuilder.put(NodeType.HISTORICAL, getNodes(NodeType.HISTORICAL, full));
Collection<Object> mmNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_MM, full);
Collection<Object> mmNodes = getNodes(NodeType.MIDDLE_MANAGER, full);
if (!mmNodes.isEmpty()) {
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_MM, mmNodes);
entityBuilder.put(NodeType.MIDDLE_MANAGER, mmNodes);
}
Collection<Object> routerNodes = getNodes(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, full);
Collection<Object> routerNodes = getNodes(NodeType.ROUTER, full);
if (!routerNodes.isEmpty()) {
entityBuilder.put(DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER, routerNodes);
entityBuilder.put(NodeType.ROUTER, routerNodes);
}
return Response.status(Response.Status.OK).entity(entityBuilder.build()).build();
@ -94,28 +85,19 @@ public class ClusterResource
@GET
@Produces({MediaType.APPLICATION_JSON})
@Path("/{nodeType}")
public Response getClusterServers(
@PathParam("nodeType") String nodeType,
@QueryParam("full") boolean full
)
public Response getClusterServers(@PathParam("nodeType") NodeType nodeType, @QueryParam("full") boolean full)
{
if (nodeType == null || !DruidNodeDiscoveryProvider.ALL_NODE_TYPES.contains(nodeType)) {
if (nodeType == null) {
return Response.serverError()
.status(Response.Status.BAD_REQUEST)
.entity(StringUtils.format(
"Invalid nodeType [%s]. Valid node types are %s .",
nodeType,
DruidNodeDiscoveryProvider.ALL_NODE_TYPES
))
.entity("Invalid nodeType of null. Valid node types are " + Arrays.toString(NodeType.values()))
.build();
} else {
return Response.status(Response.Status.OK).entity(
getNodes(nodeType, full)
).build();
return Response.status(Response.Status.OK).entity(getNodes(nodeType, full)).build();
}
}
private Collection<Object> getNodes(String nodeType, boolean full)
private Collection<Object> getNodes(NodeType nodeType, boolean full)
{
Collection<DiscoveryDruidNode> discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeType(nodeType)
.getAllNodes();

View File

@ -29,6 +29,7 @@ import org.apache.druid.client.selector.Server;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@ -125,7 +126,7 @@ public class TieredBrokerHostSelector<T>
servers.put(entry.getValue(), new NodesHolder());
}
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER);
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{

View File

@ -29,6 +29,7 @@ import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.http.client.HttpClient;
@ -62,7 +63,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class HttpServerInventoryViewTest
@ -168,7 +168,7 @@ public class HttpServerInventoryViewTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
NodeType.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)

View File

@ -28,7 +28,7 @@ import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ServerConfig;
@ -81,25 +81,25 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
NodeType.COORDINATOR,
ImmutableMap.of()
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
NodeType.COORDINATOR,
ImmutableMap.of()
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
NodeType.OVERLORD,
ImmutableMap.of()
);
DiscoveryDruidNode node4 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
NodeType.OVERLORD,
ImmutableMap.of()
);
@ -113,8 +113,8 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
);
druidNodeDiscoveryProvider.start();
DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR);
DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD);
DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR);
DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD);
while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) {
Thread.sleep(100);

View File

@ -64,6 +64,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
/**
*/
@ -79,7 +80,7 @@ public class DruidLeaderClientTest extends BaseJettyTest
protected Injector setupInjector()
{
final DruidNode node = new DruidNode("test", "localhost", null, null, true, false);
discoveryDruidNode = new DiscoveryDruidNode(node, "test", ImmutableMap.of());
discoveryDruidNode = new DiscoveryDruidNode(node, NodeType.PEON, ImmutableMap.of());
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
@ -114,21 +115,21 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
NodeType.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct");
request.setContent("hello".getBytes("UTF-8"));
request.setContent("hello".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
}
@ -139,14 +140,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of());
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
NodeType.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
@ -166,21 +167,21 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
NodeType.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect");
request.setContent("hello".getBytes("UTF-8"));
request.setContent("hello".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
}
@ -191,33 +192,30 @@ public class DruidLeaderClientTest extends BaseJettyTest
EasyMock.expect(serverDiscoverySelector.pick()).andReturn(null).anyTimes();
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
ImmutableList.of(new DiscoveryDruidNode(
new DruidNode("test", "dummyhost", 64231, null, true, false),
"test",
ImmutableMap.of()
))
);
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(
ImmutableList.of(discoveryDruidNode)
DiscoveryDruidNode dummyNode = new DiscoveryDruidNode(
new DruidNode("test", "dummyhost", 64231, null, true, false),
NodeType.PEON,
ImmutableMap.of()
);
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(dummyNode));
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode));
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery).anyTimes();
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery).anyTimes();
EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
NodeType.PEON,
"/simple/leader",
serverDiscoverySelector
);
druidLeaderClient.start();
Request request = druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/redirect");
request.setContent("hello".getBytes("UTF-8"));
request.setContent("hello".getBytes(StandardCharsets.UTF_8));
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
}
@ -230,14 +228,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
NodeType.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);

View File

@ -87,7 +87,7 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
NodeType.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
@ -95,21 +95,21 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
NodeType.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
NodeType.HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node4 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
@ -117,35 +117,35 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node5 = new DiscoveryDruidNode(
new DruidNode("s5", "h5", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node6 = new DiscoveryDruidNode(
new DruidNode("s6", "h6", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7 = new DiscoveryDruidNode(
new DruidNode("s7", "h7", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
NodeType.BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7Clone = new DiscoveryDruidNode(
new DruidNode("s7", "h7", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
NodeType.BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node8 = new DiscoveryDruidNode(
new DruidNode("s8", "h8", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
NodeType.COORDINATOR,
ImmutableMap.of()
);
@ -183,7 +183,7 @@ public class DruidNodeDiscoveryProviderTest
private List<DruidNodeDiscovery.Listener> listeners = new ArrayList<>();
@Override
public DruidNodeDiscovery getForNodeType(String nodeType)
public DruidNodeDiscovery getForNodeType(NodeType nodeType)
{
return new DruidNodeDiscovery()
{

View File

@ -26,6 +26,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.HostAndPortWithScheme;
import org.easymock.EasyMock;
@ -53,21 +54,21 @@ public class LookupNodeDiscoveryTest
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
NodeType.HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_PEON,
NodeType.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2"))
);

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.selector.Server;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@ -74,19 +75,19 @@ public class TieredBrokerHostSelectorTest
node1 = new DiscoveryDruidNode(
new DruidNode("hotBroker", "hotHost", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
NodeType.BROKER,
ImmutableMap.of()
);
node2 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost1", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
NodeType.BROKER,
ImmutableMap.of()
);
node3 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost2", 8080, null, true, false),
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
NodeType.BROKER,
ImmutableMap.of()
);
@ -105,7 +106,7 @@ public class TieredBrokerHostSelectorTest
}
};
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(DruidNodeDiscoveryProvider.NODE_TYPE_BROKER))
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);

View File

@ -34,8 +34,8 @@ import org.apache.druid.client.cache.CacheMonitor;
import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
@ -125,12 +125,10 @@ public class CliBroker extends ServerRunnable
LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_BROKER,
ImmutableList.of(LookupNodeService.class)
)
).in(LazySingleton.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class)))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
},
new LookupModule(),

View File

@ -36,7 +36,7 @@ import org.apache.druid.client.HttpServerInventoryViewResource;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.ConditionalMultibind;
import org.apache.druid.guice.ConfigProvider;
import org.apache.druid.guice.Jerseys;
@ -217,12 +217,11 @@ public class CliCoordinator extends ServerRunnable
DruidCoordinatorCleanupPendingSegments.class
);
binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(Coordinator.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
ImmutableList.of()
)
).in(LazySingleton.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(Coordinator.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class));
}

View File

@ -27,8 +27,8 @@ import io.airlift.airline.Command;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CacheMonitor;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
@ -103,12 +103,15 @@ public class CliHistorical extends ServerRunnable
binder.install(new CacheModule());
MetricsModule.register(binder, CacheMonitor.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_HISTORICAL,
ImmutableList.of(DataNodeService.class, LookupNodeService.class)
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(
NodeType.HISTORICAL,
ImmutableList.of(DataNodeService.class, LookupNodeService.class)
)
)
).in(LazySingleton.class);
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
},
new LookupModule()

View File

@ -30,7 +30,7 @@ import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
@ -129,12 +129,12 @@ public class CliMiddleManager extends ServerRunnable
LifecycleModule.register(binder, Server.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_MM,
ImmutableList.of(WorkerNodeService.class)
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(
new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class))
)
).in(LazySingleton.class);
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}

View File

@ -38,7 +38,7 @@ import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceSelectorConfig;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
@ -234,12 +234,11 @@ public class CliOverlord extends ServerRunnable
LifecycleModule.register(binder, Server.class);
}
binder.bind(DiscoverySideEffectsProvider.Child.class).annotatedWith(IndexingService.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_OVERLORD,
ImmutableList.of()
)
).in(LazySingleton.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.annotatedWith(IndexingService.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class));
}

View File

@ -32,6 +32,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@ -120,12 +121,10 @@ public class CliRouter extends ServerRunnable
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
binder.bind(DiscoverySideEffectsProvider.Child.class).toProvider(
new DiscoverySideEffectsProvider(
DruidNodeDiscoveryProvider.NODE_TYPE_ROUTER,
ImmutableList.of()
)
).in(LazySingleton.class);
binder
.bind(DiscoverySideEffectsProvider.Child.class)
.toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of()))
.in(LazySingleton.class);
LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
}
@ -134,7 +133,6 @@ public class CliRouter extends ServerRunnable
public ServerDiscoverySelector getCoordinatorServerDiscoverySelector(
TieredBrokerConfig config,
ServerDiscoveryFactory factory
)
{
return factory.createSelector(config.getCoordinatorServiceName());
@ -151,7 +149,7 @@ public class CliRouter extends ServerRunnable
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
DruidNodeDiscoveryProvider.NODE_TYPE_COORDINATOR,
NodeType.COORDINATOR,
"/druid/coordinator/v1/leader",
serverDiscoverySelector
);

View File

@ -27,6 +27,7 @@ import com.google.inject.Provider;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.DruidService;
import org.apache.druid.discovery.NodeType;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
@ -58,8 +59,8 @@ public abstract class ServerRunnable extends GuiceRunnable
}
/**
* This is a helper class used by CliXXX classes to announce DiscoveryDruidNode
* as part of lifecycle Stage.LAST .
* This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
* as part of {@link Lifecycle.Stage#LAST}.
*/
protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
{
@ -77,10 +78,10 @@ public abstract class ServerRunnable extends GuiceRunnable
@Inject
private Injector injector;
private final String nodeType;
private final NodeType nodeType;
private final List<Class<? extends DruidService>> serviceClasses;
public DiscoverySideEffectsProvider(String nodeType, List<Class<? extends DruidService>> serviceClasses)
public DiscoverySideEffectsProvider(NodeType nodeType, List<Class<? extends DruidService>> serviceClasses)
{
this.nodeType = nodeType;
this.serviceClasses = serviceClasses;
@ -95,10 +96,7 @@ public abstract class ServerRunnable extends GuiceRunnable
builder.put(service.getName(), service);
}
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode,
nodeType,
builder.build()
);
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeType, builder.build());
lifecycle.addHandler(
new Lifecycle.Handler()