Merge pull request #125 from metamx/indexing-service-stuff

Indexing service stuff
This commit is contained in:
cheddar 2013-04-19 17:03:56 -07:00
commit 42c3f278ed
13 changed files with 236 additions and 55 deletions

View File

@ -278,6 +278,7 @@ public class Initialization
final ServiceInstance serviceInstance = final ServiceInstance serviceInstance =
ServiceInstance.builder() ServiceInstance.builder()
.name(config.getServiceName().replace('/', ':')) .name(config.getServiceName().replace('/', ':'))
.address(addressFromHost(config.getHost()))
.port(config.getPort()) .port(config.getPort())
.build(); .build();
final ServiceDiscovery serviceDiscovery = final ServiceDiscovery serviceDiscovery =
@ -361,6 +362,16 @@ public class Initialization
return String.format("%s/%s", basePath, PROP_SUBPATH); return String.format("%s/%s", basePath, PROP_SUBPATH);
} }
public static String addressFromHost(final String host)
{
final int colon = host.indexOf(':');
if (colon < 0) {
return host;
} else {
return host.substring(0, colon);
}
}
/** /**
* Validate and Resolve Properties. * Validate and Resolve Properties.
* Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value. * Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value.

View File

@ -28,6 +28,9 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.service") @Config("druid.service")
public abstract String getServiceName(); public abstract String getServiceName();
@Config("druid.host")
public abstract String getHost();
@Config("druid.port") @Config("druid.port")
public abstract int getPort(); public abstract int getPort();

View File

@ -71,6 +71,62 @@ public class DbConnector
); );
} }
public static void createTaskTable(final DBI dbi, final String taskTableName)
{
createTable(
dbi,
taskTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`)\n"
+ ")",
taskTableName
)
);
}
public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName)
{
createTable(
dbi,
taskLogsTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
)
);
}
public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName)
{
createTable(
dbi,
taskLocksTableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
)
);
}
public static void createTable( public static void createTable(
final DBI dbi, final DBI dbi,
final String tableName, final String tableName,
@ -125,6 +181,11 @@ public class DbConnector
dataSource.setPassword(config.getDatabasePassword()); dataSource.setPassword(config.getDatabasePassword());
dataSource.setUrl(config.getDatabaseConnectURI()); dataSource.setUrl(config.getDatabaseConnectURI());
if (config.isValidationQuery()) {
dataSource.setValidationQuery(config.getValidationQuery());
dataSource.setTestOnBorrow(true);
}
return dataSource; return dataSource;
} }
} }

View File

@ -41,4 +41,16 @@ public abstract class DbConnectorConfig
@JsonProperty("segmentTable") @JsonProperty("segmentTable")
@Config("druid.database.segmentTable") @Config("druid.database.segmentTable")
public abstract String getSegmentTable(); public abstract String getSegmentTable();
@JsonProperty("validationQuery")
@Config("druid.database.validation")
public boolean isValidationQuery() {
return false;
}
@JsonProperty("validationQuery")
@Config("druid.database.validationQuery")
public String getValidationQuery() {
return "SELECT 1";
}
} }

View File

@ -40,8 +40,10 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose; import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
@ -62,19 +64,22 @@ public class RealtimeIndexTask extends AbstractTask
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
@JsonIgnore @JsonIgnore
final Schema schema; private final Schema schema;
@JsonIgnore @JsonIgnore
final FirehoseFactory firehoseFactory; private final FirehoseFactory firehoseFactory;
@JsonIgnore @JsonIgnore
final FireDepartmentConfig fireDepartmentConfig; private final FireDepartmentConfig fireDepartmentConfig;
@JsonIgnore @JsonIgnore
final Period windowPeriod; private final Period windowPeriod;
@JsonIgnore @JsonIgnore
final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
@JsonIgnore
private final DateTime minTime;
@JsonIgnore @JsonIgnore
private volatile Plumber plumber = null; private volatile Plumber plumber = null;
@ -95,7 +100,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename? @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("minTime") DateTime minTime
) )
{ {
super( super(
@ -116,6 +122,7 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig; this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod; this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity; this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
} }
@Override @Override
@ -156,7 +163,19 @@ public class RealtimeIndexTask extends AbstractTask
if (shutdown) { if (shutdown) {
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}
log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod);
} }
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like
@ -347,6 +366,12 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity; return segmentGranularity;
} }
@JsonProperty
public DateTime getMinTime()
{
return minTime;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher public static class TaskActionSegmentPublisher implements SegmentPublisher
{ {
final Task task; final Task task;

View File

@ -82,15 +82,16 @@ public class DbTaskStorage implements TaskStorage
{ {
handle.createStatement( handle.createStatement(
String.format( String.format(
"INSERT INTO %s (id, created_date, payload, status_code, status_payload) VALUES (:id, :created_date, :payload, :status_code, :status_payload)", "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbConnectorConfig.getTaskTable() dbConnectorConfig.getTaskTable()
) )
) )
.bind("id", task.getId()) .bind("id", task.getId())
.bind("created_date", new DateTime().toString()) .bind("created_date", new DateTime().toString())
.bind("payload", jsonMapper.writeValueAsString(task)) .bind("datasource", task.getDataSource())
.bind("status_code", status.getStatusCode().toString()) .bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("status_payload", jsonMapper.writeValueAsString(status)) .bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute(); .execute();
return null; return null;
@ -122,21 +123,20 @@ public class DbTaskStorage implements TaskStorage
{ {
return handle.createStatement( return handle.createStatement(
String.format( String.format(
"UPDATE %s SET status_code = :status_code, status_payload = :status_payload WHERE id = :id AND status_code = :old_status_code", "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbConnectorConfig.getTaskTable() dbConnectorConfig.getTaskTable()
) )
) )
.bind("id", status.getId()) .bind("id", status.getId())
.bind("status_code", status.getStatusCode().toString()) .bind("active", status.isRunnable() ? 1 : 0)
.bind("old_status_code", TaskStatus.Status.RUNNING.toString()) .bind("status_payload", jsonMapper.writeValueAsBytes(status))
.bind("status_payload", jsonMapper.writeValueAsString(status))
.execute(); .execute();
} }
} }
); );
if(updated != 1) { if(updated != 1) {
throw new IllegalStateException(String.format("Running task not found: %s", status.getId())); throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
} }
} }
@ -163,7 +163,7 @@ public class DbTaskStorage implements TaskStorage
return Optional.absent(); return Optional.absent();
} else { } else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks); final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue(dbStatus.get("payload").toString(), Task.class)); return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("payload"), Task.class));
} }
} }
} }
@ -193,7 +193,7 @@ public class DbTaskStorage implements TaskStorage
return Optional.absent(); return Optional.absent();
} else { } else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses); final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue(dbStatus.get("status_payload").toString(), TaskStatus.class)); return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("status_payload"), TaskStatus.class));
} }
} }
} }
@ -201,34 +201,40 @@ public class DbTaskStorage implements TaskStorage
} }
@Override @Override
public List<String> getRunningTaskIds() public List<Task> getRunningTasks()
{ {
return dbi.withHandle( return dbi.withHandle(
new HandleCallback<List<String>>() new HandleCallback<List<Task>>()
{ {
@Override @Override
public List<String> withHandle(Handle handle) throws Exception public List<Task> withHandle(Handle handle) throws Exception
{ {
final List<Map<String, Object>> dbTasks = final List<Map<String, Object>> dbTasks =
handle.createQuery( handle.createQuery(
String.format( String.format(
"SELECT id FROM %s WHERE status_code = :status_code", "SELECT id, payload, status_payload FROM %s WHERE active = 1",
dbConnectorConfig.getTaskTable() dbConnectorConfig.getTaskTable()
) )
) )
.bind("status_code", TaskStatus.Status.RUNNING.toString())
.list(); .list();
return Lists.transform( final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
dbTasks, new Function<Map<String, Object>, String>() for (final Map<String, Object> row : dbTasks) {
{ final String id = row.get("id").toString();
@Override
public String apply(Map<String, Object> row) try {
{ final Task task = jsonMapper.readValue((byte[])row.get("payload"), Task.class);
return row.get("id").toString(); final TaskStatus status = jsonMapper.readValue((byte[])row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
} catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
} }
} }
);
return tasks.build();
} }
} }
); );
@ -260,7 +266,7 @@ public class DbTaskStorage implements TaskStorage
) )
) )
.bind("task_id", taskid) .bind("task_id", taskid)
.bind("lock_payload", jsonMapper.writeValueAsString(taskLock)) .bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
.execute(); .execute();
} }
} }
@ -340,7 +346,7 @@ public class DbTaskStorage implements TaskStorage
) )
) )
.bind("task_id", task.getId()) .bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsString(taskAction)) .bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute(); .execute();
} }
} }
@ -373,7 +379,7 @@ public class DbTaskStorage implements TaskStorage
public TaskAction apply(Map<String, Object> row) public TaskAction apply(Map<String, Object> row)
{ {
try { try {
return jsonMapper.readValue(row.get("log_payload").toString(), TaskAction.class); return jsonMapper.readValue((byte[])row.get("log_payload"), TaskAction.class);
} catch(Exception e) { } catch(Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
@ -405,7 +411,7 @@ public class DbTaskStorage implements TaskStorage
final Map<Long, TaskLock> retMap = Maps.newHashMap(); final Map<Long, TaskLock> retMap = Maps.newHashMap();
for(final Map<String, Object> row : dbTaskLocks) { for(final Map<String, Object> row : dbTaskLocks) {
retMap.put((Long)row.get("id"), jsonMapper.readValue(row.get("lock_payload").toString(), TaskLock.class)); retMap.put((Long)row.get("id"), jsonMapper.readValue((byte[])row.get("lock_payload"), TaskLock.class));
} }
return retMap; return retMap;
} }

View File

@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage
} }
@Override @Override
public List<String> getRunningTaskIds() public List<Task> getRunningTasks()
{ {
giant.lock(); giant.lock();
try { try {
final ImmutableList.Builder<String> listBuilder = ImmutableList.builder(); final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
for(final TaskStuff taskStuff : tasks.values()) { for(final TaskStuff taskStuff : tasks.values()) {
if(taskStuff.getStatus().isRunnable()) { if(taskStuff.getStatus().isRunnable()) {
listBuilder.add(taskStuff.getTask().getId()); listBuilder.add(taskStuff.getTask());
} }
} }

View File

@ -98,10 +98,8 @@ public class TaskQueue
// Get all running tasks and their locks // Get all running tasks and their locks
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create(); final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
for (final String taskId : taskStorage.getRunningTaskIds()) { for (final Task task : taskStorage.getRunningTasks()) {
try { try {
// .get since TaskStorage semantics should mean this task is always found
final Task task = taskStorage.getTask(taskId).get();
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId()); final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
queue.add(task); queue.add(task);
@ -111,18 +109,10 @@ public class TaskQueue
} }
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit(); log.makeAlert("Failed to bootstrap task").addData("task", task.getId()).emit();
// A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on
// any old Exception or even IOException...
if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) {
// Mark this task a failure, and continue bootstrapping
taskStorage.setStatus(TaskStatus.failure(taskId));
} else {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
}
// Sort locks by version // Sort locks by version
final Ordering<Map.Entry<TaskLock, Task>> byVersionOrdering = new Ordering<Map.Entry<TaskLock, Task>>() final Ordering<Map.Entry<TaskLock, Task>> byVersionOrdering = new Ordering<Map.Entry<TaskLock, Task>>()

View File

@ -77,9 +77,9 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid); public List<TaskAction> getAuditLogs(String taskid);
/** /**
* Returns a list of currently-running task IDs as stored in the storage facility, in no particular order. * Returns a list of currently-running tasks as stored in the storage facility, in no particular order.
*/ */
public List<String> getRunningTaskIds(); public List<Task> getRunningTasks();
/** /**
* Returns a list of locks for a particular task. * Returns a list of locks for a particular task.

View File

@ -606,6 +606,10 @@ public class IndexerCoordinatorNode extends RegisteringNode
taskStorage = new HeapMemoryTaskStorage(); taskStorage = new HeapMemoryTaskStorage();
} else if (config.getStorageImpl().equals("db")) { } else if (config.getStorageImpl().equals("db")) {
final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class); final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class);
DbConnector.createTaskTable(dbi, dbConnectorConfig.getTaskTable());
DbConnector.createTaskLogTable(dbi, dbConnectorConfig.getTaskLogTable());
DbConnector.createTaskLockTable(dbi, dbConnectorConfig.getTaskLockTable());
taskStorage = new DbTaskStorage( taskStorage = new DbTaskStorage(
getJsonMapper(), getJsonMapper(),
dbConnectorConfig, dbConnectorConfig,

View File

@ -30,11 +30,11 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.actions.TaskActionHolder;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.common.tasklogs.TaskLogProvider;
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskQueue;
import com.metamx.druid.merger.coordinator.TaskRunner; import com.metamx.druid.merger.coordinator.TaskRunner;

View File

@ -205,7 +205,8 @@ public class TaskSerdeTest
null, null,
null, null,
new Period("PT10M"), new Period("PT10M"),
IndexGranularity.HOUR IndexGranularity.HOUR,
null
); );
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -0,0 +1,68 @@
package com.metamx.druid.realtime;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.NoSuchElementException;
/**
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp.
* Not thread-safe.
*/
public class MinTimeFirehose implements Firehose
{
private final Firehose firehose;
private final DateTime minTime;
private InputRow savedInputRow = null;
public MinTimeFirehose(Firehose firehose, DateTime minTime)
{
this.firehose = firehose;
this.minTime = minTime;
}
@Override
public boolean hasMore()
{
if (savedInputRow != null) {
return true;
}
while (firehose.hasMore()) {
final InputRow row = firehose.nextRow();
if (acceptable(row)) {
savedInputRow = row;
return true;
}
}
return false;
}
@Override
public InputRow nextRow()
{
final InputRow row = savedInputRow;
savedInputRow = null;
return row;
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
private boolean acceptable(InputRow row)
{
return row.getTimestampFromEpoch() >= minTime.getMillis();
}
}