From 1e9e61f78676b1971055dab580b6eeeb3decad1c Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 6 Mar 2014 11:57:08 -0800 Subject: [PATCH] fix some broken coordinator endpoints --- .../server/coordinator/DruidCoordinator.java | 63 ++++++++++++------- .../server/http/CoordinatorResource.java | 56 ++++++++++++++++- .../java/io/druid/server/http/DBResource.java | 1 - .../server/http/DatasourcesResource.java | 44 ++++++++----- .../io/druid/server/http/TiersResource.java | 5 +- 5 files changed, 126 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index f5f202b5904..4b241060b0a 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -81,7 +81,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** @@ -95,10 +94,6 @@ public class DruidCoordinator private final Object lock = new Object(); - private volatile boolean started = false; - private volatile int leaderCounter = 0; - private volatile boolean leader = false; - private final DruidCoordinatorConfig config; private final ZkPathsConfig zkPaths; private final JacksonConfigManager configManager; @@ -115,6 +110,12 @@ public class DruidCoordinator private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; + private volatile boolean started = false; + private volatile int leaderCounter = 0; + private volatile boolean leader = false; + private volatile SegmentReplicantLookup segmentReplicantLookup = null; + + @Inject public DruidCoordinator( DruidCoordinatorConfig config, @@ -197,39 +198,55 @@ public class DruidCoordinator return loadManagementPeons; } - public Map getReplicationStatus() + public Map> getReplicationStatus() { - // find expected load per datasource - final CountingMap expectedSegmentsInCluster = new CountingMap<>(); + final Map> retVal = Maps.newHashMap(); + + if (segmentReplicantLookup == null) { + return retVal; + } + final DateTime now = new DateTime(); for (DataSegment segment : getAvailableDataSegments()) { List rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource()); for (Rule rule : rules) { if (rule instanceof LoadRule && rule.appliesTo(segment, now)) { - for (Integer numReplicants : ((LoadRule) rule).getTieredReplicants().values()) { - expectedSegmentsInCluster.add(segment.getDataSource(), numReplicants); + for (Map.Entry entry : ((LoadRule) rule).getTieredReplicants().entrySet()) { + CountingMap dataSourceMap = retVal.get(entry.getKey()); + if (dataSourceMap == null) { + dataSourceMap = new CountingMap<>(); + retVal.put(entry.getKey(), dataSourceMap); + } + + int diff = Math.max( + entry.getValue() - segmentReplicantLookup.getTotalReplicants(segment.getIdentifier(), entry.getKey()), + 0 + ); + dataSourceMap.add(segment.getDataSource(), diff); } break; } } } - // find segments currently loaded per datasource - CountingMap segmentsInCluster = new CountingMap<>(); - for (DruidServer druidServer : serverInventoryView.getInventory()) { - for (DataSegment segment : druidServer.getSegments().values()) { - segmentsInCluster.add(segment.getDataSource(), 1); - } + return retVal; + } + + + public CountingMap getSegmentAvailability() + { + final CountingMap retVal = new CountingMap<>(); + + if (segmentReplicantLookup == null) { + return retVal; } - // compare available segments with currently loaded - Map loadStatus = Maps.newHashMap(); - for (Map.Entry entry : expectedSegmentsInCluster.entrySet()) { - Long actual = segmentsInCluster.get(entry.getKey()).get(); - loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue().get()); + for (DataSegment segment : getAvailableDataSegments()) { + int available = (segmentReplicantLookup.getTotalReplicants(segment.getIdentifier()) == 0) ? 0 : 1; + retVal.add(segment.getDataSource(), 1 - available); } - return loadStatus; + return retVal; } public Map getLoadStatus() @@ -808,7 +825,7 @@ public class DruidCoordinator cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName()))); } - SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + segmentReplicantLookup = SegmentReplicantLookup.make(cluster); // Stop peons for servers that aren't there anymore. final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); diff --git a/server/src/main/java/io/druid/server/http/CoordinatorResource.java b/server/src/main/java/io/druid/server/http/CoordinatorResource.java index aea61681183..218cd81904b 100644 --- a/server/src/main/java/io/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/io/druid/server/http/CoordinatorResource.java @@ -20,6 +20,7 @@ package io.druid.server.http; import com.google.common.base.Function; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -60,9 +61,14 @@ public class CoordinatorResource @Path("/loadstatus") @Produces("application/json") public Response getLoadStatus( + @QueryParam("simple") String simple, @QueryParam("full") String full ) { + if (simple != null) { + return Response.ok(coordinator.getSegmentAvailability()).build(); + } + if (full != null) { return Response.ok(coordinator.getReplicationStatus()).build(); } @@ -73,7 +79,8 @@ public class CoordinatorResource @Path("/loadqueue") @Produces("application/json") public Response getLoadQueue( - @QueryParam("simple") String simple + @QueryParam("simple") String simple, + @QueryParam("simple") String full ) { if (simple != null) { @@ -106,6 +113,51 @@ public class CoordinatorResource ) ).build(); } - return Response.ok(coordinator.getLoadManagementPeons()).build(); + + if (full != null) { + return Response.ok(coordinator.getLoadManagementPeons()).build(); + } + + return Response.ok( + Maps.transformValues( + coordinator.getLoadManagementPeons(), + new Function() + { + @Override + public Object apply(LoadQueuePeon input) + { + return new ImmutableMap.Builder<>() + .put( + "segmentsToLoad", + Collections2.transform( + input.getSegmentsToLoad(), + new Function() + { + @Override + public String apply(DataSegment segment) + { + return segment.getIdentifier(); + } + } + ) + ) + .put( + "segmentsToDrop", Collections2.transform( + input.getSegmentsToDrop(), + new Function() + { + @Override + public String apply(DataSegment segment) + { + return segment.getIdentifier(); + } + } + ) + ) + .build(); + } + } + ) + ).build(); } } \ No newline at end of file diff --git a/server/src/main/java/io/druid/server/http/DBResource.java b/server/src/main/java/io/druid/server/http/DBResource.java index f979b76961b..02277f9e79d 100644 --- a/server/src/main/java/io/druid/server/http/DBResource.java +++ b/server/src/main/java/io/druid/server/http/DBResource.java @@ -52,7 +52,6 @@ public class DBResource this.databaseSegmentManager = databaseSegmentManager; } - @GET @Path("/datasources") @Produces("application/json") diff --git a/server/src/main/java/io/druid/server/http/DatasourcesResource.java b/server/src/main/java/io/druid/server/http/DatasourcesResource.java index c77e6fee9bb..3a1a316e658 100644 --- a/server/src/main/java/io/druid/server/http/DatasourcesResource.java +++ b/server/src/main/java/io/druid/server/http/DatasourcesResource.java @@ -31,7 +31,6 @@ import io.druid.client.DruidServer; import io.druid.client.InventoryView; import io.druid.client.indexing.IndexingServiceClient; import io.druid.db.DatabaseSegmentManager; -import io.druid.segment.IndexGranularity; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -125,6 +124,35 @@ public class DatasourcesResource ).build(); } + @GET + @Path("/{dataSourceName}") + @Consumes("application/json") + public Response getTheDataSource( + @PathParam("dataSourceName") final String dataSourceName + ) + { + DruidDataSource dataSource = getDataSource(dataSourceName.toLowerCase()); + if (dataSource == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + return Response.ok(dataSource).build(); + } + + @POST + @Path("/{dataSourceName}") + @Consumes("application/json") + public Response enableDataSource( + @PathParam("dataSourceName") final String dataSourceName + ) + { + if (!databaseSegmentManager.enableDatasource(dataSourceName)) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + + return Response.status(Response.Status.OK).build(); + } + @DELETE @Path("/{dataSourceName}") @Produces("application/json") @@ -160,20 +188,6 @@ public class DatasourcesResource return Response.status(Response.Status.OK).build(); } - @POST - @Path("/{dataSourceName}") - @Consumes("application/json") - public Response enableDataSource( - @PathParam("dataSourceName") final String dataSourceName - ) - { - if (!databaseSegmentManager.enableDatasource(dataSourceName)) { - return Response.status(Response.Status.NOT_FOUND).build(); - } - - return Response.status(Response.Status.OK).build(); - } - @GET @Path("/{dataSourceName}/segments") @Produces("application/json") diff --git a/server/src/main/java/io/druid/server/http/TiersResource.java b/server/src/main/java/io/druid/server/http/TiersResource.java index 2ec84d79e32..81698e4ff3d 100644 --- a/server/src/main/java/io/druid/server/http/TiersResource.java +++ b/server/src/main/java/io/druid/server/http/TiersResource.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.collect.Table; import com.google.inject.Inject; +import com.metamx.common.MapUtils; import io.druid.client.DruidServer; import io.druid.client.InventoryView; @@ -70,10 +71,10 @@ public class TiersResource } Long currSize = tierMetadata.get("currSize"); - tierMetadata.put("currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize()); + tierMetadata.put("currSize", ((currSize == null) ? 0 : currSize) + druidServer.getCurrSize()); Long maxSize = tierMetadata.get("maxSize"); - tierMetadata.put("maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize()); + tierMetadata.put("maxSize", ((maxSize == null) ? 0 : maxSize) + druidServer.getMaxSize()); } return builder.entity(metadata).build(); }