Merge pull request #100 from metamx/indexing_console

A console to view tasks in the indexing service
This commit is contained in:
cheddar 2013-03-07 11:13:47 -08:00
commit e2552cc033
18 changed files with 290 additions and 179 deletions

View File

@ -35,8 +35,6 @@ import com.metamx.druid.index.v1.IndexableAdapter;
import com.metamx.druid.index.v1.QueryableIndexIndexableAdapter;
import com.metamx.druid.index.v1.Rowboat;
import com.metamx.druid.index.v1.RowboatFilteringIndexAdapter;
import org.joda.time.Interval;
import javax.annotation.Nullable;

View File

@ -31,8 +31,6 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;

View File

@ -33,8 +33,6 @@ import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -1,18 +1,18 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentListUnusedAction;
import com.metamx.druid.merger.common.actions.SegmentNukeAction;
import com.metamx.druid.merger.common.TaskLock;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -52,7 +52,8 @@ public class TaskMasterLifecycle
private final TaskToolbox taskToolbox;
private volatile boolean leading = false;
private volatile TaskRunner theRunner;
private volatile TaskRunner taskRunner;
private volatile ResourceManagementScheduler resourceManagementScheduler;
private static final EmittingLogger log = new EmittingLogger(TaskMasterLifecycle.class);
@ -81,9 +82,8 @@ public class TaskMasterLifecycle
try {
log.info("By the power of Grayskull, I have the power!");
final TaskRunner taskRunner = runnerFactory.build();
theRunner = taskRunner;
final ResourceManagementScheduler scheduler = managementSchedulerFactory.build(theRunner);
taskRunner = runnerFactory.build();
resourceManagementScheduler = managementSchedulerFactory.build(taskRunner);
final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue,
taskRunner,
@ -100,7 +100,7 @@ public class TaskMasterLifecycle
leaderLifecycle.addManagedInstance(taskRunner);
Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(scheduler);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
leading = true;
@ -209,7 +209,7 @@ public class TaskMasterLifecycle
public TaskRunner getTaskRunner()
{
return theRunner;
return taskRunner;
}
public TaskQueue getTaskQueue()
@ -221,4 +221,9 @@ public class TaskMasterLifecycle
{
return taskToolbox;
}
public ResourceManagementScheduler getResourceManagementScheduler()
{
return resourceManagementScheduler;
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.task.Task;
import org.joda.time.DateTime;
@ -49,6 +50,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
this.createdTime = createdTime;
}
@JsonProperty
public Task getTask()
{
return task;
@ -64,11 +66,13 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return retryPolicy;
}
@JsonProperty
public DateTime getCreatedTime()
{
return createdTime;
}
@JsonProperty
public DateTime getQueueInsertionTime()
{
return queueInsertionTime;

View File

@ -19,6 +19,8 @@
package com.metamx.druid.merger.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
@ -66,11 +68,13 @@ public class ZkWorker implements Closeable
};
}
@JsonProperty
public Worker getWorker()
{
return worker;
}
@JsonProperty
public Set<String> getRunningTasks()
{
return Sets.newHashSet(
@ -86,11 +90,13 @@ public class ZkWorker implements Closeable
return statusCache;
}
@JsonProperty
public DateTime getLastCompletedTaskTime()
{
return lastCompletedTaskTime;
}
@JsonProperty
public boolean isAtCapacity()
{
return statusCache.getCurrentData().size() >= worker.getCapacity();

View File

@ -1,67 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.merger.coordinator.http;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
/**
*/
@Path("/mmx/merger/v1/info")
public class IndexerCoordinatorInfoResource
{
private static final Logger log = new Logger(IndexerCoordinatorInfoResource.class);
private final TaskMasterLifecycle taskMasterLifecycle;
@Inject
public IndexerCoordinatorInfoResource(TaskMasterLifecycle taskMasterLifecycle)
{
this.taskMasterLifecycle = taskMasterLifecycle;
}
@GET
@Path("/pendingTasks}")
@Produces("application/json")
public Response getPendingTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getPendingTasks()).build();
}
@GET
@Path("/runningTasks}")
@Produces("application/json")
public Response getRunningTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getRunningTasks()).build();
}
}

View File

@ -42,6 +42,7 @@ import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.MasterMain;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
@ -111,6 +112,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder;
import org.mortbay.resource.ResourceCollection;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
@ -281,8 +283,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
)
);
final Context root = new Context(server, "/", Context.SESSIONS);
final Context staticContext = new Context(server, "/static", Context.SESSIONS);
staticContext.addServlet(new ServletHolder(new DefaultServlet()), "/*");
ResourceCollection resourceCollection = new ResourceCollection(new String[] {
IndexerCoordinatorNode.class.getClassLoader().getResource("static").toExternalForm(),
IndexerCoordinatorNode.class.getClassLoader().getResource("indexer_static").toExternalForm()
});
staticContext.setBaseResource(resourceCollection);
final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*");
root.addEventListener(new GuiceServletConfig(injector));

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@ -31,6 +32,8 @@ import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
import com.metamx.druid.merger.coordinator.scaling.ScalingStats;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupManager;
import com.metamx.emitter.service.ServiceEmitter;
@ -144,7 +147,11 @@ public class IndexerCoordinatorResource
final Optional<TaskStatus> status = taskStorageQueryAdapter.getSameGroupMergedStatus(taskid);
final Set<DataSegment> segments = taskStorageQueryAdapter.getSameGroupNewSegments(taskid);
final Map<String, Object> ret = jsonMapper.convertValue(status, new TypeReference<Map<String, Object>>(){});
final Map<String, Object> ret = jsonMapper.convertValue(
status, new TypeReference<Map<String, Object>>()
{
}
);
ret.put("segments", segments);
return Response.ok().entity(ret).build();
@ -182,4 +189,48 @@ public class IndexerCoordinatorResource
return Response.ok().entity(retMap).build();
}
@GET
@Path("/pendingTasks")
@Produces("application/json")
public Response getPendingTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getPendingTasks()).build();
}
@GET
@Path("/runningTasks")
@Produces("application/json")
public Response getRunningTasks()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getRunningTasks()).build();
}
@GET
@Path("/workers")
@Produces("application/json")
public Response getWorkers()
{
if (taskMasterLifecycle.getTaskRunner() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getTaskRunner().getWorkers()).build();
}
@GET
@Path("/scaling")
@Produces("application/json")
public Response getScalingState()
{
if (taskMasterLifecycle.getResourceManagementScheduler() == null) {
return Response.noContent().build();
}
return Response.ok(taskMasterLifecycle.getResourceManagementScheduler().getStats()).build();
}
}

View File

@ -64,7 +64,6 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
protected void configureServlets()
{
bind(IndexerCoordinatorResource.class);
bind(IndexerCoordinatorInfoResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);

View File

@ -19,6 +19,8 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
/**
@ -34,11 +36,13 @@ public class AutoScalingData<T>
this.nodes = nodes;
}
@JsonProperty
public List<String> getNodeIds()
{
return nodeIds;
}
@JsonProperty
public List<T> getNodes()
{
return nodes;

View File

@ -1,5 +1,7 @@
package com.metamx.druid.merger.coordinator.scaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import org.joda.time.DateTime;
@ -19,7 +21,6 @@ public class ScalingStats
}
private static final Comparator<ScalingEvent> comparator = new Comparator<ScalingEvent>()
{
@Override
public int compare(ScalingEvent s1, ScalingEvent s2)
@ -28,11 +29,13 @@ public class ScalingStats
}
};
private final MinMaxPriorityQueue<ScalingEvent> recentNodes;
private final Object lock = new Object();
private final MinMaxPriorityQueue<ScalingEvent> recentEvents;
public ScalingStats(int capacity)
{
this.recentNodes = MinMaxPriorityQueue
this.recentEvents = MinMaxPriorityQueue
.orderedBy(comparator)
.maximumSize(capacity)
.create();
@ -40,7 +43,8 @@ public class ScalingStats
public void addProvisionEvent(AutoScalingData data)
{
recentNodes.add(
synchronized (lock) {
recentEvents.add(
new ScalingEvent(
data,
new DateTime(),
@ -48,10 +52,12 @@ public class ScalingStats
)
);
}
}
public void addTerminateEvent(AutoScalingData data)
{
recentNodes.add(
synchronized (lock) {
recentEvents.add(
new ScalingEvent(
data,
new DateTime(),
@ -59,13 +65,17 @@ public class ScalingStats
)
);
}
}
@JsonValue
public List<ScalingEvent> toList()
{
List<ScalingEvent> retVal = Lists.newArrayList(recentNodes);
synchronized (lock) {
List<ScalingEvent> retVal = Lists.newArrayList(recentEvents);
Collections.sort(retVal, comparator);
return retVal;
}
}
public static class ScalingEvent
{
@ -84,16 +94,19 @@ public class ScalingStats
this.event = event;
}
@JsonProperty
public AutoScalingData getData()
{
return data;
}
@JsonProperty
public DateTime getTimestamp()
{
return timestamp;
}
@JsonProperty
public EVENT getEvent()
{
return event;

View File

@ -0,0 +1,59 @@
<!DOCTYPE HTML>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<html>
<head>
<title>Druid Indexer Coordinator Console</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Indexer Coordinator Console"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/demo_table.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="js/console-0.0.1.js"></script>
</head>
<body>
<div class="container">
<div class="heading">Coordinator Console</div>
<h2>Running Tasks</h2>
<div class="running_loading">Loading Running Tasks... this may take a few minutes</div>
<table id="runningTable"></table>
<h2>Pending Tasks</h2>
<div class="pending_loading">Loading Pending Tasks... this may take a few minutes</div>
<table id="pendingTable"></table>
<h2>Workers</h2>
<div class="workers_loading">Loading Workers... this may take a few minutes</div>
<table id="workerTable"></table>
<h2>Event Log</h2>
<div class="events_loading">Loading Event Log... this may take a few minutes</div>
<table id="eventTable"></table>
</div>
</body>
</html>

View File

@ -0,0 +1,25 @@
// requires tableHelper
var oTable = [];
$(document).ready(function() {
$.get('/mmx/merger/v1/runningTasks', function(data) {
$('.running_loading').hide();
buildTable(data, $('#runningTable'), ["segments"]);
});
$.get('/mmx/merger/v1/pendingTasks', function(data) {
$('.pending_loading').hide();
buildTable(data, $('#pendingTable'), ["segments"]);
});
$.get('/mmx/merger/v1/workers', function(data) {
$('.workers_loading').hide();
buildTable(data, $('#workerTable'));
});
$.get('/mmx/merger/v1/scaling', function(data) {
$('.events_loading').hide();
buildTable(data, $('#eventTable'));
});
});

View File

@ -1,73 +0,0 @@
// requires dataTables
var oTable = null;
// flattens JSON from Druid and builds a table row per segment
function initTables(data) {
var resultTable = new DruidTable();
var row = 0;
// parse JSON
for (var entry in data) {
// build server table
for (var field in data[entry]) {
if (!(data[entry][field] instanceof Object)) {
resultTable.setCell(row, field, data[entry][field]);
}
}
row++;
}
resultTable.toHTMLTable($('#result_table'));
}
function initDataTable(el) {
// dataTable stuff (http://www.datatables.net/)
var asInitVals = [];
oTable = el.dataTable({
"oLanguage": {
"sSearch": "Search all columns:"
},
"oSearch": {
"sSearch": "",
"bRegex": true
},
"sPaginationType": "full_numbers",
"bProcessing": true,
"bDeferRender": true
});
$("thead input").keyup(function() {
oTable.fnFilter(this.value, oTable.children("thead").find("input").index(this), true);
});
$("thead input").each(function(i) {
asInitVals[i] = this.value;
});
$("thead input").focus(function() {
if (this.className === "search_init" ) {
this.className = "";
this.value = "";
}
});
$("thead input").blur(function(i) {
if (this.value === "" ) {
this.className = "search_init";
this.value = asInitVals[$("thead input").index(this)];
}
});
}
function buildTable(data, el) {
if (oTable != null) {
oTable.fnDestroy();
el.empty();
}
initTables(data);
initDataTable(el);
}

View File

@ -0,0 +1,81 @@
// requires jQuery, druidTable, dataTables
var oTable = [];
// flattens JSON from Druid and builds a table row per segment
function buildTable(data, el, dontDisplay, table, row) {
table = typeof table !== 'undefined' ? table : new DruidTable();
row = typeof row !== 'undefined' ? row : 0;
dontDisplay = typeof dontDisplay !== 'undefined' ? dontDisplay : [];
if (!Array.isArray(data) || data.length == 0) {
return;
}
if (oTable[el.attr('id')] != null) {
oTable[el.attr('id')].fnDestroy();
el.empty();
}
// parse JSON
for (var item in data) {
setTable(data[item], el, dontDisplay, table, row);
row++;
}
table.toHTMLTable(el);
initDataTable(el);
}
function setTable(data, el, dontDisplay, table, row) {
for (var field in data) {
if (_.contains(dontDisplay, field)) {
// do nothing
} else if (Array.isArray(data[field])) {
table.setCell(row, field, JSON.stringify(data[field]));
} else if (!(data[field] instanceof Object)) {
table.setCell(row, field, data[field]);
} else {
setTable(data[field], el, dontDisplay, table, row);
}
}
}
function initDataTable(el) {
// dataTable stuff (http://www.datatables.net/)
var asInitVals = [];
oTable[el.attr('id')] = el.dataTable({
"oLanguage": {
"sSearch": "Search all columns:"
},
"oSearch": {
"sSearch": "",
"bRegex": true
},
"sPaginationType": "full_numbers",
"bProcessing": true
});
$("thead input").keyup(function() {
var tbl = oTable[$(this).parents('table').attr('id')];
tbl.fnFilter(this.value, tbl.children("thead").find("input").index(this), true);
});
$("thead input").each(function(i) {
asInitVals[i] = this.value;
});
$("thead input").focus(function() {
if (this.className === "search_init" ) {
this.className = "";
this.value = "";
}
});
$("thead input").blur(function(i) {
if (this.value === "" ) {
this.className = "search_init";
this.value = asInitVals[$("thead input").index(this)];
}
});
}

View File

@ -33,7 +33,7 @@
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="js/tablehelper-0.0.1.js"></script>
<script type="text/javascript" src="js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="js/handlers-0.0.1.js"></script>
</head>