diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index 1b4dcbd3a27..fd0e4390fbd 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -278,6 +278,7 @@ public class Initialization final ServiceInstance serviceInstance = ServiceInstance.builder() .name(config.getServiceName().replace('/', ':')) + .address(addressFromHost(config.getHost())) .port(config.getPort()) .build(); final ServiceDiscovery serviceDiscovery = @@ -361,6 +362,16 @@ public class Initialization return String.format("%s/%s", basePath, PROP_SUBPATH); } + public static String addressFromHost(final String host) + { + final int colon = host.indexOf(':'); + if (colon < 0) { + return host; + } else { + return host.substring(0, colon); + } + } + /** * Validate and Resolve Properties. * Resolve zpaths with props like druid.zk.paths.*Path using druid.zk.paths.base value. diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 62cbfe44eb9..04776d6545a 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -28,6 +28,9 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig @Config("druid.service") public abstract String getServiceName(); + @Config("druid.host") + public abstract String getHost(); + @Config("druid.port") public abstract int getPort(); diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index b8ab7a4747e..1e73b731353 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -71,6 +71,62 @@ public class DbConnector ); } + public static void createTaskTable(final DBI dbi, final String taskTableName) + { + createTable( + dbi, + taskTableName, + String.format( + "CREATE TABLE `%s` (\n" + + " `id` varchar(255) NOT NULL,\n" + + " `created_date` tinytext NOT NULL,\n" + + " `datasource` varchar(255) NOT NULL,\n" + + " `payload` longblob NOT NULL,\n" + + " `status_payload` longblob NOT NULL,\n" + + " `active` tinyint(1) NOT NULL DEFAULT '0',\n" + + " PRIMARY KEY (`id`)\n" + + ")", + taskTableName + ) + ); + } + + public static void createTaskLogTable(final DBI dbi, final String taskLogsTableName) + { + createTable( + dbi, + taskLogsTableName, + String.format( + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `log_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", + taskLogsTableName + ) + ); + } + + public static void createTaskLockTable(final DBI dbi, final String taskLocksTableName) + { + createTable( + dbi, + taskLocksTableName, + String.format( + "CREATE TABLE `%s` (\n" + + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + " `task_id` varchar(255) DEFAULT NULL,\n" + + " `lock_payload` longblob,\n" + + " PRIMARY KEY (`id`),\n" + + " KEY `task_id` (`task_id`)\n" + + ")", + taskLocksTableName + ) + ); + } + public static void createTable( final DBI dbi, final String tableName, @@ -125,6 +181,11 @@ public class DbConnector dataSource.setPassword(config.getDatabasePassword()); dataSource.setUrl(config.getDatabaseConnectURI()); + if (config.isValidationQuery()) { + dataSource.setValidationQuery(config.getValidationQuery()); + dataSource.setTestOnBorrow(true); + } + return dataSource; } } diff --git a/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java b/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java index b1a9a1b3e2e..ce1c14c08b0 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnectorConfig.java @@ -41,4 +41,16 @@ public abstract class DbConnectorConfig @JsonProperty("segmentTable") @Config("druid.database.segmentTable") public abstract String getSegmentTable(); + + @JsonProperty("validationQuery") + @Config("druid.database.validation") + public boolean isValidationQuery() { + return false; + } + + @JsonProperty("validationQuery") + @Config("druid.database.validationQuery") + public String getValidationQuery() { + return "SELECT 1"; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java index 75f9958b89e..9dda1a98656 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/RealtimeIndexTask.java @@ -40,8 +40,10 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentMetrics; +import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.GracefulShutdownFirehose; +import com.metamx.druid.realtime.MinTimeFirehose; import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.Schema; @@ -62,19 +64,22 @@ public class RealtimeIndexTask extends AbstractTask private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class); @JsonIgnore - final Schema schema; + private final Schema schema; @JsonIgnore - final FirehoseFactory firehoseFactory; + private final FirehoseFactory firehoseFactory; @JsonIgnore - final FireDepartmentConfig fireDepartmentConfig; + private final FireDepartmentConfig fireDepartmentConfig; @JsonIgnore - final Period windowPeriod; + private final Period windowPeriod; @JsonIgnore - final IndexGranularity segmentGranularity; + private final IndexGranularity segmentGranularity; + + @JsonIgnore + private final DateTime minTime; @JsonIgnore private volatile Plumber plumber = null; @@ -95,7 +100,8 @@ public class RealtimeIndexTask extends AbstractTask @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename? @JsonProperty("windowPeriod") Period windowPeriod, - @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity + @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, + @JsonProperty("minTime") DateTime minTime ) { super( @@ -116,6 +122,7 @@ public class RealtimeIndexTask extends AbstractTask this.fireDepartmentConfig = fireDepartmentConfig; this.windowPeriod = windowPeriod; this.segmentGranularity = segmentGranularity; + this.minTime = minTime; } @Override @@ -156,7 +163,19 @@ public class RealtimeIndexTask extends AbstractTask if (shutdown) { return TaskStatus.success(getId()); } - firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod); + + Firehose wrappedFirehose = firehoseFactory.connect(); + if (minTime != null) { + log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime); + wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime); + } + + log.info( + "Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]", + segmentGranularity, + windowPeriod + ); + firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod); } // TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like @@ -347,6 +366,12 @@ public class RealtimeIndexTask extends AbstractTask return segmentGranularity; } + @JsonProperty + public DateTime getMinTime() + { + return minTime; + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java index 4d2e7416013..4e13adfdd55 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java @@ -82,15 +82,16 @@ public class DbTaskStorage implements TaskStorage { handle.createStatement( String.format( - "INSERT INTO %s (id, created_date, payload, status_code, status_payload) VALUES (:id, :created_date, :payload, :status_code, :status_payload)", + "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", dbConnectorConfig.getTaskTable() ) ) .bind("id", task.getId()) .bind("created_date", new DateTime().toString()) - .bind("payload", jsonMapper.writeValueAsString(task)) - .bind("status_code", status.getStatusCode().toString()) - .bind("status_payload", jsonMapper.writeValueAsString(status)) + .bind("datasource", task.getDataSource()) + .bind("payload", jsonMapper.writeValueAsBytes(task)) + .bind("active", status.isRunnable() ? 1 : 0) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) .execute(); return null; @@ -122,21 +123,20 @@ public class DbTaskStorage implements TaskStorage { return handle.createStatement( String.format( - "UPDATE %s SET status_code = :status_code, status_payload = :status_payload WHERE id = :id AND status_code = :old_status_code", + "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1", dbConnectorConfig.getTaskTable() ) ) .bind("id", status.getId()) - .bind("status_code", status.getStatusCode().toString()) - .bind("old_status_code", TaskStatus.Status.RUNNING.toString()) - .bind("status_payload", jsonMapper.writeValueAsString(status)) + .bind("active", status.isRunnable() ? 1 : 0) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) .execute(); } } ); if(updated != 1) { - throw new IllegalStateException(String.format("Running task not found: %s", status.getId())); + throw new IllegalStateException(String.format("Active task not found: %s", status.getId())); } } @@ -163,7 +163,7 @@ public class DbTaskStorage implements TaskStorage return Optional.absent(); } else { final Map dbStatus = Iterables.getOnlyElement(dbTasks); - return Optional.of(jsonMapper.readValue(dbStatus.get("payload").toString(), Task.class)); + return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("payload"), Task.class)); } } } @@ -193,7 +193,7 @@ public class DbTaskStorage implements TaskStorage return Optional.absent(); } else { final Map dbStatus = Iterables.getOnlyElement(dbStatuses); - return Optional.of(jsonMapper.readValue(dbStatus.get("status_payload").toString(), TaskStatus.class)); + return Optional.of(jsonMapper.readValue((byte[])dbStatus.get("status_payload"), TaskStatus.class)); } } } @@ -201,34 +201,40 @@ public class DbTaskStorage implements TaskStorage } @Override - public List getRunningTaskIds() + public List getRunningTasks() { return dbi.withHandle( - new HandleCallback>() + new HandleCallback>() { @Override - public List withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { final List> dbTasks = handle.createQuery( String.format( - "SELECT id FROM %s WHERE status_code = :status_code", + "SELECT id, payload, status_payload FROM %s WHERE active = 1", dbConnectorConfig.getTaskTable() ) ) - .bind("status_code", TaskStatus.Status.RUNNING.toString()) .list(); - return Lists.transform( - dbTasks, new Function, String>() - { - @Override - public String apply(Map row) - { - return row.get("id").toString(); + final ImmutableList.Builder tasks = ImmutableList.builder(); + for (final Map row : dbTasks) { + final String id = row.get("id").toString(); + + try { + final Task task = jsonMapper.readValue((byte[])row.get("payload"), Task.class); + final TaskStatus status = jsonMapper.readValue((byte[])row.get("status_payload"), TaskStatus.class); + + if (status.isRunnable()) { + tasks.add(task); + } + } catch (Exception e) { + log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit(); } } - ); + + return tasks.build(); } } ); @@ -260,7 +266,7 @@ public class DbTaskStorage implements TaskStorage ) ) .bind("task_id", taskid) - .bind("lock_payload", jsonMapper.writeValueAsString(taskLock)) + .bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock)) .execute(); } } @@ -340,7 +346,7 @@ public class DbTaskStorage implements TaskStorage ) ) .bind("task_id", task.getId()) - .bind("log_payload", jsonMapper.writeValueAsString(taskAction)) + .bind("log_payload", jsonMapper.writeValueAsBytes(taskAction)) .execute(); } } @@ -373,7 +379,7 @@ public class DbTaskStorage implements TaskStorage public TaskAction apply(Map row) { try { - return jsonMapper.readValue(row.get("log_payload").toString(), TaskAction.class); + return jsonMapper.readValue((byte[])row.get("log_payload"), TaskAction.class); } catch(Exception e) { throw Throwables.propagate(e); } @@ -405,7 +411,7 @@ public class DbTaskStorage implements TaskStorage final Map retMap = Maps.newHashMap(); for(final Map row : dbTaskLocks) { - retMap.put((Long)row.get("id"), jsonMapper.readValue(row.get("lock_payload").toString(), TaskLock.class)); + retMap.put((Long)row.get("id"), jsonMapper.readValue((byte[])row.get("lock_payload"), TaskLock.class)); } return retMap; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java index 8d372c29000..895804bc7fd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java @@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public List getRunningTaskIds() + public List getRunningTasks() { giant.lock(); try { - final ImmutableList.Builder listBuilder = ImmutableList.builder(); + final ImmutableList.Builder listBuilder = ImmutableList.builder(); for(final TaskStuff taskStuff : tasks.values()) { if(taskStuff.getStatus().isRunnable()) { - listBuilder.add(taskStuff.getTask().getId()); + listBuilder.add(taskStuff.getTask()); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index 0b000e44faf..203983ef556 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -98,10 +98,8 @@ public class TaskQueue // Get all running tasks and their locks final Multimap tasksByLock = ArrayListMultimap.create(); - for (final String taskId : taskStorage.getRunningTaskIds()) { + for (final Task task : taskStorage.getRunningTasks()) { try { - // .get since TaskStorage semantics should mean this task is always found - final Task task = taskStorage.getTask(taskId).get(); final List taskLocks = taskStorage.getLocks(task.getId()); queue.add(task); @@ -111,16 +109,8 @@ public class TaskQueue } } catch (Exception e) { - log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit(); - - // A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on - // any old Exception or even IOException... - if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) { - // Mark this task a failure, and continue bootstrapping - taskStorage.setStatus(TaskStatus.failure(taskId)); - } else { - throw Throwables.propagate(e); - } + log.makeAlert("Failed to bootstrap task").addData("task", task.getId()).emit(); + throw Throwables.propagate(e); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java index ee633efffb9..c64dedfc508 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java @@ -77,9 +77,9 @@ public interface TaskStorage public List getAuditLogs(String taskid); /** - * Returns a list of currently-running task IDs as stored in the storage facility, in no particular order. + * Returns a list of currently-running tasks as stored in the storage facility, in no particular order. */ - public List getRunningTaskIds(); + public List getRunningTasks(); /** * Returns a list of locks for a particular task. diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 28602887c1b..edddadc12e1 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -606,6 +606,10 @@ public class IndexerCoordinatorNode extends RegisteringNode taskStorage = new HeapMemoryTaskStorage(); } else if (config.getStorageImpl().equals("db")) { final IndexerDbConnectorConfig dbConnectorConfig = configFactory.build(IndexerDbConnectorConfig.class); + DbConnector.createTaskTable(dbi, dbConnectorConfig.getTaskTable()); + DbConnector.createTaskLogTable(dbi, dbConnectorConfig.getTaskLogTable()); + DbConnector.createTaskLockTable(dbi, dbConnectorConfig.getTaskLockTable()); + taskStorage = new DbTaskStorage( getJsonMapper(), dbConnectorConfig, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 699fbe89fe6..8fb4211d7bd 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -30,11 +30,11 @@ import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.config.JacksonConfigManager; -import com.metamx.druid.merger.common.tasklogs.TaskLogProvider; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionHolder; import com.metamx.druid.merger.common.task.Task; +import com.metamx.druid.merger.common.tasklogs.TaskLogProvider; import com.metamx.druid.merger.coordinator.TaskMasterLifecycle; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskRunner; diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 220ce19a16e..871c712d345 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -205,7 +205,8 @@ public class TaskSerdeTest null, null, new Period("PT10M"), - IndexGranularity.HOUR + IndexGranularity.HOUR, + null ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java b/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java new file mode 100644 index 00000000000..5660cf0bc0b --- /dev/null +++ b/realtime/src/main/java/com/metamx/druid/realtime/MinTimeFirehose.java @@ -0,0 +1,68 @@ +package com.metamx.druid.realtime; + +import com.metamx.druid.input.InputRow; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Provides a view on a firehose that only returns rows at or after a certain minimum timestamp. + * Not thread-safe. + */ +public class MinTimeFirehose implements Firehose +{ + private final Firehose firehose; + private final DateTime minTime; + + private InputRow savedInputRow = null; + + public MinTimeFirehose(Firehose firehose, DateTime minTime) + { + this.firehose = firehose; + this.minTime = minTime; + } + + @Override + public boolean hasMore() + { + if (savedInputRow != null) { + return true; + } + + while (firehose.hasMore()) { + final InputRow row = firehose.nextRow(); + if (acceptable(row)) { + savedInputRow = row; + return true; + } + } + + return false; + } + + @Override + public InputRow nextRow() + { + final InputRow row = savedInputRow; + savedInputRow = null; + return row; + } + + @Override + public Runnable commit() + { + return firehose.commit(); + } + + @Override + public void close() throws IOException + { + firehose.close(); + } + + private boolean acceptable(InputRow row) + { + return row.getTimestampFromEpoch() >= minTime.getMillis(); + } +}