mirror of https://github.com/apache/druid.git
Merger: Tweaks to DB tables. Create tables automatically.
This commit is contained in:
parent
1dfe133554
commit
1a6594524f
|
@ -71,6 +71,63 @@ public class DbConnector
|
|||
);
|
||||
}
|
||||
|
||||
public static void createTaskTable(final DBI dbi, final String taskTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
taskTableName,
|
||||
String.format(
|
||||
"CREATE TABLE `%s` (\n"
|
||||
+ " `id` varchar(255) NOT NULL,\n"
|
||||
+ " `created_date` tinytext NOT NULL,\n"
|
||||
+ " `datasource` varchar(255) NOT NULL,\n"
|
||||
+ " `payload` longtext NOT NULL,\n"
|
||||
+ " `status_payload` longtext NOT NULL,\n"
|
||||
+ " `version` tinytext,\n"
|
||||
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
|
||||
+ " PRIMARY KEY (`id`)\n"
|
||||
+ ")",
|
||||
taskTableName
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
taskLogsTableName,
|
||||
String.format(
|
||||
"CREATE TABLE `%s` (\n"
|
||||
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
|
||||
+ " `task_id` varchar(255) DEFAULT NULL,\n"
|
||||
+ " `log_payload` longtext,\n"
|
||||
+ " PRIMARY KEY (`id`),\n"
|
||||
+ " KEY `task_id` (`task_id`)\n"
|
||||
+ ")",
|
||||
taskLogsTableName
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName)
|
||||
{
|
||||
createTable(
|
||||
dbi,
|
||||
taskLocksTableName,
|
||||
String.format(
|
||||
"CREATE TABLE `%s` (\n"
|
||||
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
|
||||
+ " `task_id` varchar(255) DEFAULT NULL,\n"
|
||||
+ " `lock_payload` longtext,\n"
|
||||
+ " PRIMARY KEY (`id`),\n"
|
||||
+ " KEY `task_id` (`task_id`)\n"
|
||||
+ ")",
|
||||
taskLocksTableName
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public static void createTable(
|
||||
final DBI dbi,
|
||||
final String tableName,
|
||||
|
|
|
@ -82,14 +82,15 @@ public class DbTaskStorage implements TaskStorage
|
|||
{
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %s (id, created_date, payload, status_code, status_payload) VALUES (:id, :created_date, :payload, :status_code, :status_payload)",
|
||||
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
|
||||
dbConnectorConfig.getTaskTable()
|
||||
)
|
||||
)
|
||||
.bind("id", task.getId())
|
||||
.bind("created_date", new DateTime().toString())
|
||||
.bind("datasource", task.getDataSource())
|
||||
.bind("payload", jsonMapper.writeValueAsString(task))
|
||||
.bind("status_code", status.getStatusCode().toString())
|
||||
.bind("active", status.isRunnable() ? 1 : 0)
|
||||
.bind("status_payload", jsonMapper.writeValueAsString(status))
|
||||
.execute();
|
||||
|
||||
|
@ -122,13 +123,12 @@ public class DbTaskStorage implements TaskStorage
|
|||
{
|
||||
return handle.createStatement(
|
||||
String.format(
|
||||
"UPDATE %s SET status_code = :status_code, status_payload = :status_payload WHERE id = :id AND status_code = :old_status_code",
|
||||
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
|
||||
dbConnectorConfig.getTaskTable()
|
||||
)
|
||||
)
|
||||
.bind("id", status.getId())
|
||||
.bind("status_code", status.getStatusCode().toString())
|
||||
.bind("old_status_code", TaskStatus.Status.RUNNING.toString())
|
||||
.bind("active", status.isRunnable() ? 1 : 0)
|
||||
.bind("status_payload", jsonMapper.writeValueAsString(status))
|
||||
.execute();
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ public class DbTaskStorage implements TaskStorage
|
|||
);
|
||||
|
||||
if(updated != 1) {
|
||||
throw new IllegalStateException(String.format("Running task not found: %s", status.getId()));
|
||||
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,34 +201,40 @@ public class DbTaskStorage implements TaskStorage
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<String> getRunningTaskIds()
|
||||
public List<Task> getRunningTasks()
|
||||
{
|
||||
return dbi.withHandle(
|
||||
new HandleCallback<List<String>>()
|
||||
new HandleCallback<List<Task>>()
|
||||
{
|
||||
@Override
|
||||
public List<String> withHandle(Handle handle) throws Exception
|
||||
public List<Task> withHandle(Handle handle) throws Exception
|
||||
{
|
||||
final List<Map<String, Object>> dbTasks =
|
||||
handle.createQuery(
|
||||
String.format(
|
||||
"SELECT id FROM %s WHERE status_code = :status_code",
|
||||
"SELECT id, payload, status_payload FROM %s WHERE active = 1",
|
||||
dbConnectorConfig.getTaskTable()
|
||||
)
|
||||
)
|
||||
.bind("status_code", TaskStatus.Status.RUNNING.toString())
|
||||
.list();
|
||||
|
||||
return Lists.transform(
|
||||
dbTasks, new Function<Map<String, Object>, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(Map<String, Object> row)
|
||||
{
|
||||
return row.get("id").toString();
|
||||
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
|
||||
for (final Map<String, Object> row : dbTasks) {
|
||||
final String id = row.get("id").toString();
|
||||
|
||||
try {
|
||||
final Task task = jsonMapper.readValue(row.get("payload").toString(), Task.class);
|
||||
final TaskStatus status = jsonMapper.readValue(row.get("status_payload").toString(), TaskStatus.class);
|
||||
|
||||
if (status.isRunnable()) {
|
||||
tasks.add(task);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return tasks.build();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage
|
|||
}
|
||||
|
||||
@Override
|
||||
public List<String> getRunningTaskIds()
|
||||
public List<Task> getRunningTasks()
|
||||
{
|
||||
giant.lock();
|
||||
|
||||
try {
|
||||
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
|
||||
final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
|
||||
for(final TaskStuff taskStuff : tasks.values()) {
|
||||
if(taskStuff.getStatus().isRunnable()) {
|
||||
listBuilder.add(taskStuff.getTask().getId());
|
||||
listBuilder.add(taskStuff.getTask());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -98,10 +98,8 @@ public class TaskQueue
|
|||
// Get all running tasks and their locks
|
||||
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
|
||||
|
||||
for (final String taskId : taskStorage.getRunningTaskIds()) {
|
||||
for (final Task task : taskStorage.getRunningTasks()) {
|
||||
try {
|
||||
// .get since TaskStorage semantics should mean this task is always found
|
||||
final Task task = taskStorage.getTask(taskId).get();
|
||||
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
|
||||
|
||||
queue.add(task);
|
||||
|
@ -111,16 +109,8 @@ public class TaskQueue
|
|||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit();
|
||||
|
||||
// A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on
|
||||
// any old Exception or even IOException...
|
||||
if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) {
|
||||
// Mark this task a failure, and continue bootstrapping
|
||||
taskStorage.setStatus(TaskStatus.failure(taskId));
|
||||
} else {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
log.makeAlert("Failed to bootstrap task").addData("task", task.getId()).emit();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -77,9 +77,9 @@ public interface TaskStorage
|
|||
public List<TaskAction> getAuditLogs(String taskid);
|
||||
|
||||
/**
|
||||
* Returns a list of currently-running task IDs as stored in the storage facility, in no particular order.
|
||||
* Returns a list of currently-running tasks as stored in the storage facility, in no particular order.
|
||||
*/
|
||||
public List<String> getRunningTaskIds();
|
||||
public List<Task> getRunningTasks();
|
||||
|
||||
/**
|
||||
* Returns a list of locks for a particular task.
|
||||
|
|
|
@ -46,7 +46,6 @@ import com.metamx.druid.config.ConfigManager;
|
|||
import com.metamx.druid.config.ConfigManagerConfig;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.db.DbConnector;
|
||||
import com.metamx.druid.db.DbConnectorConfig;
|
||||
import com.metamx.druid.http.GuiceServletConfig;
|
||||
import com.metamx.druid.http.RedirectFilter;
|
||||
import com.metamx.druid.http.RedirectInfo;
|
||||
|
@ -151,7 +150,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
|||
private RestS3Service s3Service = null;
|
||||
private List<Monitor> monitors = null;
|
||||
private ServiceEmitter emitter = null;
|
||||
private DbConnectorConfig dbConnectorConfig = null;
|
||||
private IndexerDbConnectorConfig dbConnectorConfig = null;
|
||||
private DBI dbi = null;
|
||||
private IndexerCoordinatorConfig config = null;
|
||||
private MergerDBCoordinator mergerDBCoordinator = null;
|
||||
|
@ -251,6 +250,10 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
|||
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
|
||||
initializeDB();
|
||||
|
||||
DbConnector.createTaskTable(dbi, dbConnectorConfig.getTaskTable());
|
||||
DbConnector.createTaskLogTable(dbi, dbConnectorConfig.getTaskLogTable());
|
||||
DbConnector.createTaskLockTable(dbi, dbConnectorConfig.getTaskLockTable());
|
||||
|
||||
final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);
|
||||
DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
|
||||
JacksonConfigManager configManager =
|
||||
|
@ -541,7 +544,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
|
|||
private void initializeDB()
|
||||
{
|
||||
if (dbConnectorConfig == null) {
|
||||
dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
|
||||
dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
|
||||
}
|
||||
if (dbi == null) {
|
||||
dbi = new DbConnector(dbConnectorConfig).getDBI();
|
||||
|
|
|
@ -30,11 +30,11 @@ import com.google.inject.Inject;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.client.DataSegment;
|
||||
import com.metamx.druid.config.JacksonConfigManager;
|
||||
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
|
||||
import com.metamx.druid.merger.common.TaskStatus;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionClient;
|
||||
import com.metamx.druid.merger.common.actions.TaskActionHolder;
|
||||
import com.metamx.druid.merger.common.task.Task;
|
||||
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
|
||||
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
|
||||
import com.metamx.druid.merger.coordinator.TaskQueue;
|
||||
import com.metamx.druid.merger.coordinator.TaskRunner;
|
||||
|
|
Loading…
Reference in New Issue