fix some broken coordinator endpoints

This commit is contained in:
fjy 2014-03-06 11:57:08 -08:00
parent 6137c374a7
commit 1e9e61f786
5 changed files with 126 additions and 43 deletions

View File

@ -81,7 +81,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -95,10 +94,6 @@ public class DruidCoordinator
private final Object lock = new Object();
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager;
@ -115,6 +110,12 @@ public class DruidCoordinator
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
@Inject
public DruidCoordinator(
DruidCoordinatorConfig config,
@ -197,39 +198,55 @@ public class DruidCoordinator
return loadManagementPeons;
}
public Map<String, Double> getReplicationStatus()
public Map<String, CountingMap<String>> getReplicationStatus()
{
// find expected load per datasource
final CountingMap<String> expectedSegmentsInCluster = new CountingMap<>();
final Map<String, CountingMap<String>> retVal = Maps.newHashMap();
if (segmentReplicantLookup == null) {
return retVal;
}
final DateTime now = new DateTime();
for (DataSegment segment : getAvailableDataSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
for (Integer numReplicants : ((LoadRule) rule).getTieredReplicants().values()) {
expectedSegmentsInCluster.add(segment.getDataSource(), numReplicants);
for (Map.Entry<String, Integer> entry : ((LoadRule) rule).getTieredReplicants().entrySet()) {
CountingMap<String> dataSourceMap = retVal.get(entry.getKey());
if (dataSourceMap == null) {
dataSourceMap = new CountingMap<>();
retVal.put(entry.getKey(), dataSourceMap);
}
int diff = Math.max(
entry.getValue() - segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), entry.getKey()),
0
);
dataSourceMap.add(segment.getDataSource(), diff);
}
break;
}
}
}
// find segments currently loaded per datasource
CountingMap<String> segmentsInCluster = new CountingMap<>();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DataSegment segment : druidServer.getSegments().values()) {
segmentsInCluster.add(segment.getDataSource(), 1);
}
return retVal;
}
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, AtomicLong> entry : expectedSegmentsInCluster.entrySet()) {
Long actual = segmentsInCluster.get(entry.getKey()).get();
loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue().get());
public CountingMap<String> getSegmentAvailability()
{
final CountingMap<String> retVal = new CountingMap<>();
if (segmentReplicantLookup == null) {
return retVal;
}
return loadStatus;
for (DataSegment segment : getAvailableDataSegments()) {
int available = (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) ? 0 : 1;
retVal.add(segment.getDataSource(), 1 - available);
}
return retVal;
}
public Map<String, Double> getLoadStatus()
@ -808,7 +825,7 @@ public class DruidCoordinator
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
}
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
// Stop peons for servers that aren't there anymore.
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());

View File

@ -20,6 +20,7 @@
package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@ -60,9 +61,14 @@ public class CoordinatorResource
@Path("/loadstatus")
@Produces("application/json")
public Response getLoadStatus(
@QueryParam("simple") String simple,
@QueryParam("full") String full
)
{
if (simple != null) {
return Response.ok(coordinator.getSegmentAvailability()).build();
}
if (full != null) {
return Response.ok(coordinator.getReplicationStatus()).build();
}
@ -73,7 +79,8 @@ public class CoordinatorResource
@Path("/loadqueue")
@Produces("application/json")
public Response getLoadQueue(
@QueryParam("simple") String simple
@QueryParam("simple") String simple,
@QueryParam("simple") String full
)
{
if (simple != null) {
@ -106,6 +113,51 @@ public class CoordinatorResource
)
).build();
}
if (full != null) {
return Response.ok(coordinator.getLoadManagementPeons()).build();
}
return Response.ok(
Maps.transformValues(
coordinator.getLoadManagementPeons(),
new Function<LoadQueuePeon, Object>()
{
@Override
public Object apply(LoadQueuePeon input)
{
return new ImmutableMap.Builder<>()
.put(
"segmentsToLoad",
Collections2.transform(
input.getSegmentsToLoad(),
new Function<DataSegment, Object>()
{
@Override
public String apply(DataSegment segment)
{
return segment.getIdentifier();
}
}
)
)
.put(
"segmentsToDrop", Collections2.transform(
input.getSegmentsToDrop(),
new Function<DataSegment, Object>()
{
@Override
public String apply(DataSegment segment)
{
return segment.getIdentifier();
}
}
)
)
.build();
}
}
)
).build();
}
}

View File

@ -52,7 +52,6 @@ public class DBResource
this.databaseSegmentManager = databaseSegmentManager;
}
@GET
@Path("/datasources")
@Produces("application/json")

View File

@ -31,7 +31,6 @@ import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.DatabaseSegmentManager;
import io.druid.segment.IndexGranularity;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -125,6 +124,35 @@ public class DatasourcesResource
).build();
}
@GET
@Path("/{dataSourceName}")
@Consumes("application/json")
public Response getTheDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase());
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(dataSource).build();
}
@POST
@Path("/{dataSourceName}")
@Consumes("application/json")
public Response enableDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@DELETE
@Path("/{dataSourceName}")
@Produces("application/json")
@ -160,20 +188,6 @@ public class DatasourcesResource
return Response.status(Response.Status.OK).build();
}
@POST
@Path("/{dataSourceName}")
@Consumes("application/json")
public Response enableDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.status(Response.Status.OK).build();
}
@GET
@Path("/{dataSourceName}/segments")
@Produces("application/json")

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
@ -70,10 +71,10 @@ public class TiersResource
}
Long currSize = tierMetadata.get("currSize");
tierMetadata.put("currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize());
tierMetadata.put("currSize", ((currSize == null) ? 0 : currSize) + druidServer.getCurrSize());
Long maxSize = tierMetadata.get("maxSize");
tierMetadata.put("maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize());
tierMetadata.put("maxSize", ((maxSize == null) ? 0 : maxSize) + druidServer.getMaxSize());
}
return builder.entity(metadata).build();
}