mirror of
https://github.com/apache/druid.git
synced 2025-02-09 19:44:57 +00:00
additional lookup status discovery http endpoints at coordinator (#4228)
* additional lookup status discovery http endpoints at coordinator * more changes * jsonize the error msgs as well * fix tests
This commit is contained in:
parent
8277284d67
commit
417714d228
@ -40,4 +40,12 @@ public class ServletResourceUtils
|
|||||||
t == null ? "null" : (t.getMessage() == null ? t.toString() : t.getMessage())
|
t == null ? "null" : (t.getMessage() == null ? t.toString() : t.getMessage())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts String errorMsg into a Map so that it produces valid json on serialization into response.
|
||||||
|
*/
|
||||||
|
public static Map<String, String> jsonize(String msgFormat, Object... args)
|
||||||
|
{
|
||||||
|
return ImmutableMap.of("error", StringUtils.safeFormat(msgFormat, args));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,7 @@ The configuration is propagated to the query serving nodes (broker / router / pe
|
|||||||
The query serving nodes have an internal API for managing lookups on the node and those are used by the coordinator.
|
The query serving nodes have an internal API for managing lookups on the node and those are used by the coordinator.
|
||||||
The coordinator periodically checks if any of the nodes need to load/drop lookups and updates them appropriately.
|
The coordinator periodically checks if any of the nodes need to load/drop lookups and updates them appropriately.
|
||||||
|
|
||||||
|
# API for configuring lookups
|
||||||
|
|
||||||
## Bulk update
|
## Bulk update
|
||||||
Lookups can be updated in bulk by posting a JSON object to `/druid/coordinator/v1/lookups`. The format of the json object is as follows:
|
Lookups can be updated in bulk by posting a JSON object to `/druid/coordinator/v1/lookups`. The format of the json object is as follows:
|
||||||
@ -232,6 +233,27 @@ To discover a list of tiers currently active in the cluster **instead of** ones
|
|||||||
## List lookup names
|
## List lookup names
|
||||||
A `GET` to `/druid/coordinator/v1/lookups/{tier}` will return a list of known lookup names for that tier.
|
A `GET` to `/druid/coordinator/v1/lookups/{tier}` will return a list of known lookup names for that tier.
|
||||||
|
|
||||||
|
# Additional API related to status of configured lookups
|
||||||
|
These end points can be used to get the propagation status of configured lookups to lookup nodes such as historicals.
|
||||||
|
|
||||||
|
## List load status of all lookups
|
||||||
|
`GET /druid/coordinator/v1/lookups/status` with optional query parameter `detailed`.
|
||||||
|
|
||||||
|
## List load status of lookups in a tier
|
||||||
|
`GET /druid/coordinator/v1/lookups/status/{tier}` with optional query parameter `detailed`.
|
||||||
|
|
||||||
|
## List load status of single lookup
|
||||||
|
`GET /druid/coordinator/v1/lookups/status/{tier}/{lookup}` with optional query parameter `detailed`.
|
||||||
|
|
||||||
|
## List lookup state of all nodes
|
||||||
|
`GET /druid/coordinator/v1/lookups/nodeStatus` with optional query parameter `discover` to discover tiers from zookeeper or configured lookup tiers are listed.
|
||||||
|
|
||||||
|
## List lookup state of nodes in a tier
|
||||||
|
`GET /druid/coordinator/v1/lookups/nodeStatus/{tier}`
|
||||||
|
|
||||||
|
## List lookup state of single node
|
||||||
|
`GET /druid/coordinator/v1/lookups/nodeStatus/{tier}/{host:port}`
|
||||||
|
|
||||||
# Internal API
|
# Internal API
|
||||||
|
|
||||||
The Peon, Router, Broker, and Historical nodes all have the ability to consume lookup configuration.
|
The Peon, Router, Broker, and Historical nodes all have the ability to consume lookup configuration.
|
||||||
|
@ -19,10 +19,14 @@
|
|||||||
|
|
||||||
package io.druid.server.http;
|
package io.druid.server.http;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
import com.google.common.net.HostAndPort;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import io.druid.audit.AuditInfo;
|
import io.druid.audit.AuditInfo;
|
||||||
import io.druid.audit.AuditManager;
|
import io.druid.audit.AuditManager;
|
||||||
@ -32,6 +36,7 @@ import io.druid.guice.annotations.Smile;
|
|||||||
import io.druid.java.util.common.IAE;
|
import io.druid.java.util.common.IAE;
|
||||||
import io.druid.java.util.common.RE;
|
import io.druid.java.util.common.RE;
|
||||||
import io.druid.java.util.common.logger.Logger;
|
import io.druid.java.util.common.logger.Logger;
|
||||||
|
import io.druid.query.lookup.LookupsState;
|
||||||
import io.druid.server.lookup.cache.LookupCoordinatorManager;
|
import io.druid.server.lookup.cache.LookupCoordinatorManager;
|
||||||
import io.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
|
import io.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
|
||||||
|
|
||||||
@ -51,7 +56,12 @@ import javax.ws.rs.core.MediaType;
|
|||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains information about lookups exposed through the coordinator
|
* Contains information about lookups exposed through the coordinator
|
||||||
@ -284,4 +294,310 @@ public class LookupCoordinatorResource
|
|||||||
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/status")
|
||||||
|
public Response getAllLookupsStatus(
|
||||||
|
@QueryParam("detailed") boolean detailed
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Map<String, Map<String, LookupExtractorFactoryMapContainer>> configuredLookups = lookupCoordinatorManager.getKnownLookups();
|
||||||
|
if (configuredLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups found"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lookupsStateOnNodes = lookupCoordinatorManager.getLastKnownLookupsStateOnNodes();
|
||||||
|
|
||||||
|
Map<String, Map<String, LookupStatus>> result = new HashMap<>();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Map<String, LookupExtractorFactoryMapContainer>> tierEntry : configuredLookups.entrySet()) {
|
||||||
|
String tier = tierEntry.getKey();
|
||||||
|
Map<String, LookupStatus> lookupStatusMap = new HashMap<>();
|
||||||
|
result.put(tier, lookupStatusMap);
|
||||||
|
Collection<HostAndPort> hosts = lookupCoordinatorManager.discoverNodesInTier(tier);
|
||||||
|
|
||||||
|
for (Map.Entry<String, LookupExtractorFactoryMapContainer> lookupsEntry : tierEntry.getValue().entrySet()) {
|
||||||
|
lookupStatusMap.put(
|
||||||
|
lookupsEntry.getKey(),
|
||||||
|
getLookupStatus(
|
||||||
|
lookupsEntry.getKey(),
|
||||||
|
lookupsEntry.getValue(),
|
||||||
|
hosts,
|
||||||
|
lookupsStateOnNodes,
|
||||||
|
detailed
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(result).build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting lookups status");
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/status/{tier}")
|
||||||
|
public Response getLookupStatusForTier(
|
||||||
|
@PathParam("tier") String tier,
|
||||||
|
@QueryParam("detailed") boolean detailed
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Map<String, Map<String, LookupExtractorFactoryMapContainer>> configuredLookups = lookupCoordinatorManager.getKnownLookups();
|
||||||
|
if (configuredLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups found"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, LookupExtractorFactoryMapContainer> tierLookups = configuredLookups.get(tier);
|
||||||
|
if (tierLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups found for tier [%s].", tier))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Map<String, LookupStatus> lookupStatusMap = new HashMap<>();
|
||||||
|
Collection<HostAndPort> hosts = lookupCoordinatorManager.discoverNodesInTier(tier);
|
||||||
|
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lookupsStateOnNodes = lookupCoordinatorManager.getLastKnownLookupsStateOnNodes();
|
||||||
|
|
||||||
|
for (Map.Entry<String, LookupExtractorFactoryMapContainer> lookupsEntry : tierLookups.entrySet()) {
|
||||||
|
lookupStatusMap.put(
|
||||||
|
lookupsEntry.getKey(),
|
||||||
|
getLookupStatus(lookupsEntry.getKey(), lookupsEntry.getValue(), hosts, lookupsStateOnNodes, detailed)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(lookupStatusMap).build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting lookups status for tier [%s].", tier);
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/status/{tier}/{lookup}")
|
||||||
|
public Response getSpecificLookupStatus(
|
||||||
|
@PathParam("tier") String tier,
|
||||||
|
@PathParam("lookup") String lookup,
|
||||||
|
@QueryParam("detailed") boolean detailed
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Map<String, Map<String, LookupExtractorFactoryMapContainer>> configuredLookups = lookupCoordinatorManager.getKnownLookups();
|
||||||
|
if (configuredLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups found"))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, LookupExtractorFactoryMapContainer> tierLookups = configuredLookups.get(tier);
|
||||||
|
if (tierLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups found for tier [%s].", tier))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
LookupExtractorFactoryMapContainer lookupDef = tierLookups.get(lookup);
|
||||||
|
if (lookupDef == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("Lookup [%s] not found for tier [%s].", lookup, tier))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(
|
||||||
|
getLookupStatus(
|
||||||
|
lookup,
|
||||||
|
lookupDef,
|
||||||
|
lookupCoordinatorManager.discoverNodesInTier(tier),
|
||||||
|
lookupCoordinatorManager.getLastKnownLookupsStateOnNodes(),
|
||||||
|
detailed
|
||||||
|
)
|
||||||
|
).build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting lookups status for tier [%s] and lookup [%s].", tier, lookup);
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
LookupStatus getLookupStatus(
|
||||||
|
String lookupName,
|
||||||
|
LookupExtractorFactoryMapContainer lookupDef,
|
||||||
|
Collection<HostAndPort> nodes,
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lastKnownLookupsState,
|
||||||
|
boolean detailed
|
||||||
|
)
|
||||||
|
{
|
||||||
|
boolean isReady = true;
|
||||||
|
List<HostAndPort> pendingHosts = detailed ? new ArrayList<>() : null;
|
||||||
|
|
||||||
|
for (HostAndPort node : nodes) {
|
||||||
|
LookupsState<LookupExtractorFactoryMapContainer> hostState = lastKnownLookupsState.get(node);
|
||||||
|
LookupExtractorFactoryMapContainer loadedOnHost = hostState != null
|
||||||
|
? hostState.getCurrent().get(lookupName)
|
||||||
|
: null;
|
||||||
|
if (loadedOnHost == null || lookupDef.replaces(loadedOnHost)) {
|
||||||
|
isReady = false;
|
||||||
|
if (detailed) {
|
||||||
|
pendingHosts.add(node);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new LookupStatus(isReady, pendingHosts);
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/nodeStatus")
|
||||||
|
public Response getAllNodesStatus(
|
||||||
|
@QueryParam("discover") boolean discover
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Collection<String> tiers = null;
|
||||||
|
if (discover) {
|
||||||
|
tiers = lookupCoordinatorManager.discoverTiers();
|
||||||
|
} else {
|
||||||
|
Map<String, Map<String, LookupExtractorFactoryMapContainer>> configuredLookups = lookupCoordinatorManager.getKnownLookups();
|
||||||
|
if (configuredLookups == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("No lookups configured."))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
tiers = configuredLookups.keySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lookupsStateOnHosts = lookupCoordinatorManager.getLastKnownLookupsStateOnNodes();
|
||||||
|
|
||||||
|
Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> result = new HashMap<>();
|
||||||
|
|
||||||
|
for (String tier : tiers) {
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> tierNodesStatus = new HashMap<>();
|
||||||
|
result.put(tier, tierNodesStatus);
|
||||||
|
|
||||||
|
Collection<HostAndPort> nodes = lookupCoordinatorManager.discoverNodesInTier(tier);
|
||||||
|
|
||||||
|
for (HostAndPort node : nodes) {
|
||||||
|
LookupsState<LookupExtractorFactoryMapContainer> lookupsState = lookupsStateOnHosts.get(node);
|
||||||
|
if (lookupsState == null) {
|
||||||
|
tierNodesStatus.put(node, new LookupsState<>(null,null,null));
|
||||||
|
} else {
|
||||||
|
tierNodesStatus.put(node, lookupsState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(result).build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting node status.");
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/nodeStatus/{tier}")
|
||||||
|
public Response getNodesStatusInTier(
|
||||||
|
@PathParam("tier") String tier
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lookupsStateOnHosts = lookupCoordinatorManager.getLastKnownLookupsStateOnNodes();
|
||||||
|
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> tierNodesStatus = new HashMap<>();
|
||||||
|
|
||||||
|
Collection<HostAndPort> nodes = lookupCoordinatorManager.discoverNodesInTier(tier);
|
||||||
|
|
||||||
|
for (HostAndPort node : nodes) {
|
||||||
|
LookupsState<LookupExtractorFactoryMapContainer> lookupsState = lookupsStateOnHosts.get(node);
|
||||||
|
if (lookupsState == null) {
|
||||||
|
tierNodesStatus.put(node, new LookupsState<>(null,null,null));
|
||||||
|
} else {
|
||||||
|
tierNodesStatus.put(node, lookupsState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Response.ok(tierNodesStatus).build();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting node status for tier [%s].", tier);
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@GET
|
||||||
|
@Produces({MediaType.APPLICATION_JSON})
|
||||||
|
@Path("/nodeStatus/{tier}/{hostAndPort}")
|
||||||
|
public Response getSpecificNodeStatus(
|
||||||
|
@PathParam("tier") String tier,
|
||||||
|
@PathParam("hostAndPort") HostAndPort hostAndPort
|
||||||
|
)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> lookupsStateOnHosts = lookupCoordinatorManager.getLastKnownLookupsStateOnNodes();
|
||||||
|
|
||||||
|
LookupsState<LookupExtractorFactoryMapContainer> lookupsState = lookupsStateOnHosts.get(hostAndPort);
|
||||||
|
if (lookupsState == null) {
|
||||||
|
return Response.status(Response.Status.NOT_FOUND)
|
||||||
|
.entity(ServletResourceUtils.jsonize("Node [%s] status is unknown.", hostAndPort))
|
||||||
|
.build();
|
||||||
|
} else {
|
||||||
|
return Response.ok(lookupsState).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error(ex, "Error getting node status for [%s].", hostAndPort);
|
||||||
|
return Response.serverError().entity(ServletResourceUtils.sanitizeException(ex)).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static class LookupStatus
|
||||||
|
{
|
||||||
|
@JsonProperty
|
||||||
|
private boolean loaded;
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
private List<HostAndPort> pendingNodes;
|
||||||
|
|
||||||
|
public LookupStatus(boolean loaded, List<HostAndPort> pendingHosts)
|
||||||
|
{
|
||||||
|
this.loaded = loaded;
|
||||||
|
this.pendingNodes = pendingHosts;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
LookupStatus that = (LookupStatus) o;
|
||||||
|
return Objects.equals(loaded, that.loaded) &&
|
||||||
|
Objects.equals(pendingNodes, that.pendingNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return Objects.hash(loaded, pendingNodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,7 +116,7 @@ public class LookupCoordinatorManager
|
|||||||
// management loop, then they get discarded automatically.
|
// management loop, then they get discarded automatically.
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final AtomicReference<Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> knownOldState =
|
final AtomicReference<Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> knownOldState =
|
||||||
new AtomicReference<>();
|
new AtomicReference<>(ImmutableMap.of());
|
||||||
|
|
||||||
// Updated by config watching service
|
// Updated by config watching service
|
||||||
private AtomicReference<Map<String, Map<String, LookupExtractorFactoryMapContainer>>> lookupMapConfigRef;
|
private AtomicReference<Map<String, Map<String, LookupExtractorFactoryMapContainer>>> lookupMapConfigRef;
|
||||||
@ -275,6 +275,7 @@ public class LookupCoordinatorManager
|
|||||||
public Collection<String> discoverTiers()
|
public Collection<String> discoverTiers()
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started");
|
||||||
return listenerDiscoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY);
|
return listenerDiscoverer.discoverChildren(LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
@ -282,6 +283,22 @@ public class LookupCoordinatorManager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Collection<HostAndPort> discoverNodesInTier(String tier)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started");
|
||||||
|
return listenerDiscoverer.getNodes(LookupModule.getTierListenerPath(tier));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> getLastKnownLookupsStateOnNodes()
|
||||||
|
{
|
||||||
|
Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started");
|
||||||
|
return knownOldState.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to find a lookupName spec for the specified lookupName.
|
* Try to find a lookupName spec for the specified lookupName.
|
||||||
*
|
*
|
||||||
|
@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.io.ByteSource;
|
import com.google.common.io.ByteSource;
|
||||||
|
import com.google.common.net.HostAndPort;
|
||||||
import io.druid.audit.AuditInfo;
|
import io.druid.audit.AuditInfo;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.java.util.common.StringUtils;
|
import io.druid.java.util.common.StringUtils;
|
||||||
|
import io.druid.query.lookup.LookupsState;
|
||||||
import io.druid.server.lookup.cache.LookupCoordinatorManager;
|
import io.druid.server.lookup.cache.LookupCoordinatorManager;
|
||||||
import io.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
|
import io.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
|
||||||
import org.easymock.Capture;
|
import org.easymock.Capture;
|
||||||
@ -77,6 +79,16 @@ public class LookupCoordinatorResourceTest
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static final HostAndPort LOOKUP_NODE = HostAndPort.fromParts("localhost", 1111);
|
||||||
|
|
||||||
|
private static final LookupsState<LookupExtractorFactoryMapContainer> LOOKUP_STATE = new LookupsState(
|
||||||
|
ImmutableMap.of(LOOKUP_NAME, SINGLE_LOOKUP), null, null
|
||||||
|
);
|
||||||
|
|
||||||
|
private static final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> NODES_LOOKUP_STATE = ImmutableMap.of(
|
||||||
|
LOOKUP_NODE, LOOKUP_STATE
|
||||||
|
);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleGet()
|
public void testSimpleGet()
|
||||||
{
|
{
|
||||||
@ -830,4 +842,225 @@ public class LookupCoordinatorResourceTest
|
|||||||
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
|
Assert.assertEquals(ImmutableMap.of("error", errMsg), response.getEntity());
|
||||||
EasyMock.verify(lookupCoordinatorManager);
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAllLookupsStatus() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.discoverNodesInTier(LOOKUP_TIER)).andReturn(ImmutableList.of(LOOKUP_NODE));
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getAllLookupsStatus(false);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_TIER,
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_NAME,
|
||||||
|
new LookupCoordinatorResource.LookupStatus(true, null)
|
||||||
|
)
|
||||||
|
), response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLookupStatusForTier() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.discoverNodesInTier(LOOKUP_TIER)).andReturn(ImmutableList.of(LOOKUP_NODE));
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getLookupStatusForTier(LOOKUP_TIER, false);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_NAME,
|
||||||
|
new LookupCoordinatorResource.LookupStatus(true, null)
|
||||||
|
), response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSpecificLookupStatus() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.discoverNodesInTier(LOOKUP_TIER)).andReturn(ImmutableList.of(LOOKUP_NODE));
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getSpecificLookupStatus(LOOKUP_TIER, LOOKUP_NAME, false);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
new LookupCoordinatorResource.LookupStatus(true, null), response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLookupStatusDetailedTrue()
|
||||||
|
{
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
EasyMock.createStrictMock(LookupCoordinatorManager.class),
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
HostAndPort newNode = HostAndPort.fromParts("localhost", 4352);
|
||||||
|
Assert.assertEquals(
|
||||||
|
new LookupCoordinatorResource.LookupStatus(false, ImmutableList.of(newNode)),
|
||||||
|
lookupCoordinatorResource.getLookupStatus(
|
||||||
|
LOOKUP_NAME,
|
||||||
|
SINGLE_LOOKUP,
|
||||||
|
ImmutableList.of(LOOKUP_NODE, newNode),
|
||||||
|
NODES_LOOKUP_STATE,
|
||||||
|
true
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetLookupStatusDetailedFalse()
|
||||||
|
{
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
EasyMock.createStrictMock(LookupCoordinatorManager.class),
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
HostAndPort newNode = HostAndPort.fromParts("localhost", 4352);
|
||||||
|
Assert.assertEquals(
|
||||||
|
new LookupCoordinatorResource.LookupStatus(false, null),
|
||||||
|
lookupCoordinatorResource.getLookupStatus(
|
||||||
|
LOOKUP_NAME,
|
||||||
|
SINGLE_LOOKUP,
|
||||||
|
ImmutableList.of(LOOKUP_NODE, newNode),
|
||||||
|
NODES_LOOKUP_STATE,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAllNodesStatus() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getKnownLookups()).andReturn(SINGLE_TIER_MAP);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.discoverNodesInTier(LOOKUP_TIER)).andReturn(ImmutableList.of(LOOKUP_NODE));
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getAllNodesStatus(false);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_TIER,
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_NODE,
|
||||||
|
LOOKUP_STATE
|
||||||
|
)
|
||||||
|
), response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetNodesStatusInTier() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.discoverNodesInTier(LOOKUP_TIER)).andReturn(ImmutableList.of(LOOKUP_NODE));
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getNodesStatusInTier(LOOKUP_TIER);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
ImmutableMap.of(
|
||||||
|
LOOKUP_NODE,
|
||||||
|
LOOKUP_STATE
|
||||||
|
), response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSpecificNodeStatus() throws Exception
|
||||||
|
{
|
||||||
|
final LookupCoordinatorManager lookupCoordinatorManager = EasyMock.createStrictMock(
|
||||||
|
LookupCoordinatorManager.class
|
||||||
|
);
|
||||||
|
EasyMock.expect(lookupCoordinatorManager.getLastKnownLookupsStateOnNodes()).andReturn(NODES_LOOKUP_STATE);
|
||||||
|
|
||||||
|
EasyMock.replay(lookupCoordinatorManager);
|
||||||
|
|
||||||
|
final LookupCoordinatorResource lookupCoordinatorResource = new LookupCoordinatorResource(
|
||||||
|
lookupCoordinatorManager,
|
||||||
|
mapper,
|
||||||
|
mapper
|
||||||
|
);
|
||||||
|
|
||||||
|
final Response response = lookupCoordinatorResource.getSpecificNodeStatus(LOOKUP_TIER, LOOKUP_NODE);
|
||||||
|
Assert.assertEquals(200, response.getStatus());
|
||||||
|
Assert.assertEquals(
|
||||||
|
LOOKUP_STATE, response.getEntity()
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.verify(lookupCoordinatorManager);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1135,7 +1135,7 @@ public class LookupCoordinatorManagerTest
|
|||||||
lookupsCommunicator
|
lookupsCommunicator
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertNull(manager.knownOldState.get());
|
Assert.assertTrue(manager.knownOldState.get().isEmpty());
|
||||||
|
|
||||||
manager.start();
|
manager.start();
|
||||||
|
|
||||||
@ -1335,6 +1335,7 @@ public class LookupCoordinatorManagerTest
|
|||||||
configManager,
|
configManager,
|
||||||
lookupCoordinatorManagerConfig
|
lookupCoordinatorManagerConfig
|
||||||
);
|
);
|
||||||
|
manager.start();
|
||||||
Assert.assertEquals(fakeChildren, manager.discoverTiers());
|
Assert.assertEquals(fakeChildren, manager.discoverTiers());
|
||||||
EasyMock.verify(discoverer);
|
EasyMock.verify(discoverer);
|
||||||
}
|
}
|
||||||
@ -1371,13 +1372,11 @@ public class LookupCoordinatorManagerTest
|
|||||||
configManager,
|
configManager,
|
||||||
lookupCoordinatorManagerConfig
|
lookupCoordinatorManagerConfig
|
||||||
);
|
);
|
||||||
try {
|
|
||||||
|
manager.start();
|
||||||
manager.discoverTiers();
|
manager.discoverTiers();
|
||||||
}
|
|
||||||
finally {
|
|
||||||
EasyMock.verify(discoverer);
|
EasyMock.verify(discoverer);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
//tests that lookups stored in db from 0.10.0 are converted and restored.
|
//tests that lookups stored in db from 0.10.0 are converted and restored.
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user