This commit is contained in:
fjy 2014-01-20 13:05:05 -08:00
parent 8d01a46d02
commit 6404967994
14 changed files with 90 additions and 121 deletions

View File

@ -44,6 +44,7 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer; import io.druid.client.DruidServer;
import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryView;
import io.druid.client.indexing.IndexingServiceClient; import io.druid.client.indexing.IndexingServiceClient;
import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager; import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs; import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer; import io.druid.curator.discovery.ServiceAnnouncer;
@ -73,6 +74,7 @@ import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -191,40 +193,41 @@ public class DruidCoordinator
public Map<String, Double> getReplicationStatus() public Map<String, Double> getReplicationStatus()
{ {
// find expected load // find expected load per datasource
final Map<String, Integer> expectedSegmentsInCluster = Maps.newHashMap(); final CountingMap<String> expectedSegmentsInCluster = new CountingMap<>();
final DateTime now = new DateTime(); final DateTime now = new DateTime();
for (DataSegment segment : getAvailableDataSegments()) { for (DataSegment segment : getAvailableDataSegments()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource()); List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
for (Rule rule : rules) { for (Rule rule : rules) {
if (rule instanceof LoadRule && rule.appliesTo(segment, now)) { if (rule instanceof LoadRule && rule.appliesTo(segment, now)) {
Integer count = expectedSegmentsInCluster.get(segment.getIdentifier()); expectedSegmentsInCluster.add(segment.getDataSource(), ((LoadRule) rule).getReplicants());
if (count == null) { //Integer count = expectedSegmentsInCluster.get(segment.getDataSource());
count = 0; //if (count == null) {
} // count = 0;
expectedSegmentsInCluster.put(segment.getIdentifier(), count + ((LoadRule) rule).getReplicants()); //}
//expectedSegmentsInCluster.put(segment.getDataSource(), count + ((LoadRule) rule).getReplicants());
break; break;
} }
} }
} }
// find segments currently loaded // find segments currently loaded per datasource
Map<String, Integer> segmentsInCluster = Maps.newHashMap(); Map<String, Integer> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) { for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DataSegment segment : druidServer.getSegments().values()) { for (DataSegment segment : druidServer.getSegments().values()) {
Integer count = segmentsInCluster.get(segment.getIdentifier()); Integer count = segmentsInCluster.get(segment.getDataSource());
if (count == null) { if (count == null) {
count = 0; count = 0;
} }
segmentsInCluster.put(segment.getIdentifier(), count + 1); segmentsInCluster.put(segment.getDataSource(), count + 1);
} }
} }
// compare available segments with currently loaded // compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap(); Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : expectedSegmentsInCluster.entrySet()) { for (Map.Entry<String, AtomicLong> entry : expectedSegmentsInCluster.entrySet()) {
Integer actual = segmentsInCluster.get(entry.getKey()); Integer actual = segmentsInCluster.get(entry.getKey());
loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue()); loadStatus.put(entry.getKey(), 100 * (actual == null ? 0.0D : (double) actual) / entry.getValue().get());
} }
return loadStatus; return loadStatus;

View File

@ -32,7 +32,7 @@ import javax.ws.rs.core.Response;
/** /**
*/ */
@Path("/coordinator/config") @Path("/druid/coordinator/v1/config")
public class CoordinatorDynamicConfigsResource public class CoordinatorDynamicConfigsResource
{ {
private final JacksonConfigManager manager; private final JacksonConfigManager manager;

View File

@ -19,8 +19,13 @@
package io.druid.server.http; package io.druid.server.http;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.LoadQueuePeon;
import io.druid.timeline.DataSegment;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.Path; import javax.ws.rs.Path;
@ -30,7 +35,7 @@ import javax.ws.rs.core.Response;
/** /**
*/ */
@Path("/coordinator") @Path("/druid/coordinator/v1")
public class CoordinatorResource public class CoordinatorResource
{ {
private final DruidCoordinator coordinator; private final DruidCoordinator coordinator;
@ -67,8 +72,40 @@ public class CoordinatorResource
@GET @GET
@Path("loadqueue") @Path("loadqueue")
@Produces("application/json") @Produces("application/json")
public Response getLoadQueue() public Response getLoadQueue(
@QueryParam("simple") String simple
)
{ {
if (simple != null) {
return Response.ok(
Maps.transformValues(
coordinator.getLoadManagementPeons(),
new Function<LoadQueuePeon, Object>()
{
@Override
public Object apply(LoadQueuePeon input)
{
long loadSize = 0;
for (DataSegment dataSegment : input.getSegmentsToLoad()) {
loadSize += dataSegment.getSize();
}
long dropSize = 0;
for (DataSegment dataSegment : input.getSegmentsToDrop()) {
dropSize += dataSegment.getSize();
}
return new ImmutableMap.Builder<>()
.put("segmentsToLoad", input.getSegmentsToLoad().size())
.put("segmentsToDrop", input.getSegmentsToDrop().size())
.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
.build();
}
}
)
).build();
}
return Response.ok(coordinator.getLoadManagementPeons()).build(); return Response.ok(coordinator.getLoadManagementPeons()).build();
} }
} }

View File

@ -39,7 +39,7 @@ import java.util.List;
/** /**
*/ */
@Path("/db") @Path("/druid/coordinator/v1/db")
public class DBResource public class DBResource
{ {
private final DatabaseSegmentManager databaseSegmentManager; private final DatabaseSegmentManager databaseSegmentManager;

View File

@ -53,7 +53,7 @@ import java.util.TreeSet;
/** /**
*/ */
@Path("/datasources") @Path("/druid/coordinator/v1/datasources")
public class DatasourcesResource public class DatasourcesResource
{ {
private static Map<String, Object> makeSimpleDatasource(DruidDataSource input) private static Map<String, Object> makeSimpleDatasource(DruidDataSource input)

View File

@ -31,7 +31,7 @@ import javax.ws.rs.core.Response;
/** /**
*/ */
@Path("/rules") @Path("/druid/coordinator/v1/rules")
public class RulesResource public class RulesResource
{ {
private final DatabaseRuleManager databaseRuleManager; private final DatabaseRuleManager databaseRuleManager;

View File

@ -40,7 +40,7 @@ import java.util.Map;
/** /**
*/ */
@Path("/servers") @Path("/druid/coordinator/v1/servers")
public class ServersResource public class ServersResource
{ {
private static Map<String, Object> makeSimpleServer(DruidServer input) private static Map<String, Object> makeSimpleServer(DruidServer input)

View File

@ -19,6 +19,7 @@
package io.druid.server.http; package io.druid.server.http;
import com.google.api.client.util.Maps;
import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -37,19 +38,9 @@ import java.util.Set;
/** /**
*/ */
@Path("/tiers") @Path("/druid/coordinator/v1/tiers")
public class TiersResource public class TiersResource
{ {
private static Map<String, Object> makeSimpleTier(DruidServer input)
{
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("tier", input.getTier())
.put("currSize", input.getCurrSize())
.put("maxSize", input.getMaxSize())
.build();
}
private final InventoryView serverInventoryView; private final InventoryView serverInventoryView;
@Inject @Inject
@ -61,7 +52,6 @@ public class TiersResource
} }
@GET @GET
@Path("/tiers")
@Produces("application/json") @Produces("application/json")
public Response getTiers( public Response getTiers(
@QueryParam("simple") String simple @QueryParam("simple") String simple
@ -70,13 +60,20 @@ public class TiersResource
Response.ResponseBuilder builder = Response.status(Response.Status.OK); Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (simple != null) { if (simple != null) {
Table<String, String, Long> metadata = HashBasedTable.create(); Map<String, Map<String, Long>> metadata = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) { for (DruidServer druidServer : serverInventoryView.getInventory()) {
Long currSize = metadata.get(druidServer.getTier(), "currSize"); Map<String, Long> tierMetadata = metadata.get(druidServer.getTier());
metadata.put(druidServer.getTier(), "currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize());
Long maxSize = metadata.get(druidServer.getTier(), "maxSize"); if (tierMetadata == null) {
metadata.put(druidServer.getTier(), "maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize()); tierMetadata = Maps.newHashMap();
metadata.put(druidServer.getTier(), tierMetadata);
}
Long currSize = tierMetadata.get("currSize");
tierMetadata.put("currSize", (currSize == null) ? 0 : currSize + druidServer.getCurrSize());
Long maxSize = tierMetadata.get("maxSize");
tierMetadata.put("maxSize", (maxSize == null) ? 0 : maxSize + druidServer.getMaxSize());
} }
return builder.entity(metadata).build(); return builder.entity(metadata).build();
} }
@ -85,7 +82,7 @@ public class TiersResource
for (DruidServer server : serverInventoryView.getInventory()) { for (DruidServer server : serverInventoryView.getInventory()) {
tiers.add(server.getTier()); tiers.add(server.getTier());
} }
return builder.entity(tiers)
.build(); return builder.entity(tiers).build();
} }
} }

View File

@ -21,7 +21,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text(); var selected = $('#datasources option:selected').text();
$.ajax({ $.ajax({
type: 'POST', type: 'POST',
url:'/info/datasources/' + selected, url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected), data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8", contentType:"application/json; charset=utf-8",
dataType:"json", dataType:"json",
@ -50,7 +50,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text(); var selected = $('#datasources option:selected').text();
$.ajax({ $.ajax({
type: 'DELETE', type: 'DELETE',
url:'/info/datasources/' + selected, url:'/druid/coordinator/v1/datasources/' + selected,
data: JSON.stringify(selected), data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8", contentType:"application/json; charset=utf-8",
dataType:"json", dataType:"json",
@ -70,12 +70,12 @@ $(document).ready(function() {
} }
}); });
$.getJSON("/info/db/datasources", function(enabled_datasources) { $.getJSON("/druid/coordinator/v1/db/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) { $.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>')); $('#enabled_datasources').append($('<li>' + datasource + '</li>'));
}); });
$.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) { $.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources); var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) { $.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>')); $('#disabled_datasources').append($('<li>' + datasource + '</li>'));

View File

@ -2,7 +2,7 @@
$(document).ready(function() { $(document).ready(function() {
var basePath = "/info/"; var basePath = "/druid/coordinator/v1/";
var type = $('#select_type').attr('value') + ''; var type = $('#select_type').attr('value') + '';
var view = $('#select_view').attr('value') + ''; var view = $('#select_view').attr('value') + '';

View File

@ -100,8 +100,8 @@ $(document).ready(function() {
} }
// Execution stuff // Execution stuff
$.get('/info/coordinator', function(data) { $.get('/druid/coordinator/v1/leader', function(data) {
$("#coordinator").html('Current Cluster Coordinator: ' + data.host); $("#coordinator").html('Current Cluster Coordinator Leader: ' + data.host);
}); });
$('#move_segment').submit(function() { $('#move_segment').submit(function() {
@ -118,57 +118,10 @@ $(document).ready(function() {
}); });
} }
/*
$.ajax({
url:"/coordinator/move",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
},
success: function(data, status, xhr) {
for (seg in CONSOLE.selected_segments) {
CONSOLE.selected_segments[seg].children('.server_host').text($('#move_segment > .to').val());
}
}
});
*/
return false; return false;
}); });
$.get('/druid/coordinator/v1/servers?full', function(data) {
/*$
('#drop_segment').submit(function() {
var data = [];
if ($.isEmptyObject(CONSOLE.selected_segments)) {
alert("Please select at least one segment");
}
for (seg in CONSOLE.selected_segments) {
data.push({
'segmentName' : seg,
'from' : CONSOLE.selected_segments[seg]
});
}
$.ajax({
url:"/coordinator/drop",
type: "POST",
data: JSON.stringify(data),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
alert(error + ": " + xhr.responseText);
}
});
return false;
});
*/
$.get('/info/cluster', function(data) {
$('.loading').hide(); $('.loading').hide();
initTables(data); initTables(data);
@ -176,26 +129,5 @@ $(document).ready(function() {
var oTable = []; var oTable = [];
initDataTable($('#servers'), oTable); initDataTable($('#servers'), oTable);
initDataTable($('#segments'), oTable); initDataTable($('#segments'), oTable);
// init select segments
/*$("#segments tbody").click(function(event) {
var el = $(event.target.parentNode);
var key = el.children('.segment_name').text();
if (el.is("tr")) {
if (el.hasClass('row_selected')) {
el.removeClass('row_selected');
delete CONSOLE.selected_segments[key];
} else {
el.addClass('row_selected');
CONSOLE.selected_segments[key] = el;
}
var html ="";
for (segment in CONSOLE.selected_segments) {
html += segment + ' on ' + CONSOLE.selected_segments[segment].children('.server_host').text() + '<br/>';
}
$('#selected_segments').html(html);
}
});*/
}); });
}); });

View File

@ -22,7 +22,7 @@ $(document).ready(function() {
var interval = $('#interval').val(); var interval = $('#interval').val();
$.ajax({ $.ajax({
type: 'DELETE', type: 'DELETE',
url:'/info/datasources/' + selected +'?kill=true&interval=' + interval, url:'/druid/coordinator/v1/datasources/' + selected +'?kill=true&interval=' + interval,
contentType:"application/json; charset=utf-8", contentType:"application/json; charset=utf-8",
dataType:"json", dataType:"json",
error: function(xhr, status, error) { error: function(xhr, status, error) {
@ -41,7 +41,7 @@ $(document).ready(function() {
} }
}); });
$.getJSON("/info/db/datasources?includeDisabled", function(data) { $.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) { $.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource)); $('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
}); });

View File

@ -115,7 +115,7 @@ function makeTiersDropdown(rule) {
function getRules() { function getRules() {
var selected = $('#datasources option:selected').text(); var selected = $('#datasources option:selected').text();
if (selected !== "") { if (selected !== "") {
$.getJSON("/info/rules/" + selected, function(data) { $.getJSON("/druid/coordinator/v1/rules/" + selected, function(data) {
$('#rules_list').empty(); $('#rules_list').empty();
if (!$.isEmptyObject(data)) { if (!$.isEmptyObject(data)) {
$.each(data, function(index, rule) { $.each(data, function(index, rule) {
@ -189,7 +189,7 @@ $(document).ready(function() {
var selected = $('#datasources option:selected').text(); var selected = $('#datasources option:selected').text();
$.ajax({ $.ajax({
type: 'POST', type: 'POST',
url:'/info/rules/' + selected, url:'/druid/coordinator/v1/rules/' + selected,
data: JSON.stringify(rules), data: JSON.stringify(rules),
contentType:"application/json; charset=utf-8", contentType:"application/json; charset=utf-8",
dataType:"json", dataType:"json",
@ -209,11 +209,11 @@ $(document).ready(function() {
} }
}); });
$.getJSON("/info/tiers", function(theTiers) { $.getJSON("/druid/coordinator/v1/tiers", function(theTiers) {
tiers = theTiers; tiers = theTiers;
}); });
$.getJSON("/info/db/datasources", function(data) { $.getJSON("/druid/coordinator/v1/db/datasources", function(data) {
$.each(data, function(index, datasource) { $.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource)); $('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
}); });

View File

@ -53,7 +53,7 @@ class CoordinatorJettyServerInitializer implements JettyServerInitializer
// The coordinator really needs a standarized api path // The coordinator really needs a standarized api path
root.addFilter(GuiceFilter.class, "/status/*", null); root.addFilter(GuiceFilter.class, "/status/*", null);
root.addFilter(GuiceFilter.class, "/info/*", null); root.addFilter(GuiceFilter.class, "/info/*", null);
root.addFilter(GuiceFilter.class, "/coordinator/*", null); root.addFilter(GuiceFilter.class, "/druid/coordinator/*", null);
HandlerList handlerList = new HandlerList(); HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{root}); handlerList.setHandlers(new Handler[]{root});