diff --git a/cassandra-storage/pom.xml b/cassandra-storage/pom.xml
index 57150629f14..c8fec800e3e 100644
--- a/cassandra-storage/pom.xml
+++ b/cassandra-storage/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
diff --git a/common/pom.xml b/common/pom.xml
index b4bbaf0f4f6..d54a3486cbf 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
io.druid
druid
- 0.6.161-SNAPSHOT
+ 0.7.0-SNAPSHOT
@@ -46,9 +46,8 @@
commons-codec
- commons-dbcp
- commons-dbcp
- 1.4
+ org.apache.commons
+ commons-dbcp2
commons-pool
diff --git a/common/src/main/java/io/druid/common/config/ConfigManager.java b/common/src/main/java/io/druid/common/config/ConfigManager.java
index dac69a1543f..094d579aad0 100644
--- a/common/src/main/java/io/druid/common/config/ConfigManager.java
+++ b/common/src/main/java/io/druid/common/config/ConfigManager.java
@@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
-import io.druid.db.DbConnector;
-import io.druid.db.DbTablesConfig;
+import io.druid.metadata.MetadataStorageConnector;
+import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;
@@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
- private final DbConnector dbConnector;
+ private final MetadataStorageConnector dbConnector;
private final Supplier config;
private final ScheduledExecutorService exec;
@@ -58,7 +58,7 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
- public ConfigManager(DbConnector dbConnector, Supplier dbTables, Supplier config)
+ public ConfigManager(MetadataStorageConnector dbConnector, Supplier dbTables, Supplier config)
{
this.dbConnector = dbConnector;
this.config = config;
diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java
index 1f6edb43f4e..fff67a9c339 100644
--- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java
+++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java
@@ -21,6 +21,7 @@ package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
+import com.google.common.primitives.Ints;
import io.druid.collections.IntList;
import java.io.IOException;
@@ -262,4 +263,9 @@ public class SerializerUtils
return retVal;
}
+
+ public int getSerializedStringByteSize(String str)
+ {
+ return Ints.BYTES + str.getBytes(UTF8).length;
+ }
}
diff --git a/common/src/main/java/io/druid/db/DbConnector.java b/common/src/main/java/io/druid/db/DbConnector.java
deleted file mode 100644
index 1e783399270..00000000000
--- a/common/src/main/java/io/druid/db/DbConnector.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.db;
-
-import com.google.common.base.Supplier;
-import com.google.inject.Inject;
-import com.metamx.common.ISE;
-import com.metamx.common.logger.Logger;
-import org.apache.commons.dbcp.BasicDataSource;
-import org.skife.jdbi.v2.DBI;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.StatementContext;
-import org.skife.jdbi.v2.tweak.HandleCallback;
-import org.skife.jdbi.v2.tweak.ResultSetMapper;
-
-import javax.sql.DataSource;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class DbConnector
-{
- private static final Logger log = new Logger(DbConnector.class);
-
- public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- segmentTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
- "CREATE INDEX ON %1$s(dataSource);"+
- "CREATE INDEX ON %1$s(used);":
- "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
- segmentTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- ruleTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
- "CREATE INDEX ON %1$s(dataSource);":
- "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
- ruleTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- configTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
- "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
- configTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id varchar(255) NOT NULL,\n"
- + " created_date TEXT NOT NULL,\n"
- + " datasource varchar(255) NOT NULL,\n"
- + " payload bytea NOT NULL,\n"
- + " status_payload bytea NOT NULL,\n"
- + " active SMALLINT NOT NULL DEFAULT '0',\n"
- + " PRIMARY KEY (id)\n"
- + ");\n" +
- "CREATE INDEX ON %1$s(active, created_date);":
- "CREATE TABLE `%s` (\n"
- + " `id` varchar(255) NOT NULL,\n"
- + " `created_date` tinytext NOT NULL,\n"
- + " `datasource` varchar(255) NOT NULL,\n"
- + " `payload` longblob NOT NULL,\n"
- + " `status_payload` longblob NOT NULL,\n"
- + " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY (active, created_date(100))\n"
- + ")",
- taskTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskLogsTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id bigserial NOT NULL,\n"
- + " task_id varchar(255) DEFAULT NULL,\n"
- + " log_payload bytea,\n"
- + " PRIMARY KEY (id)\n"
- + ");\n"+
- "CREATE INDEX ON %1$s(task_id);":
- "CREATE TABLE `%s` (\n"
- + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
- + " `task_id` varchar(255) DEFAULT NULL,\n"
- + " `log_payload` longblob,\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY `task_id` (`task_id`)\n"
- + ")",
- taskLogsTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
- {
- createTable(
- dbi,
- taskLocksTableName,
- String.format(
- isPostgreSQL ?
- "CREATE TABLE %1$s (\n"
- + " id bigserial NOT NULL,\n"
- + " task_id varchar(255) DEFAULT NULL,\n"
- + " lock_payload bytea,\n"
- + " PRIMARY KEY (id)\n"
- + ");\n"+
- "CREATE INDEX ON %1$s(task_id);":
- "CREATE TABLE `%s` (\n"
- + " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
- + " `task_id` varchar(255) DEFAULT NULL,\n"
- + " `lock_payload` longblob,\n"
- + " PRIMARY KEY (`id`),\n"
- + " KEY `task_id` (`task_id`)\n"
- + ")",
- taskLocksTableName
- ),
- isPostgreSQL
- );
- }
-
- public static void createTable(
- final IDBI dbi,
- final String tableName,
- final String sql,
- final boolean isPostgreSQL
- )
- {
- try {
- dbi.withHandle(
- new HandleCallback()
- {
- @Override
- public Void withHandle(Handle handle) throws Exception
- {
- List
-
- mysql
- mysql-connector-java
- 5.1.18
-
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
index 915fa5a004f..40ab0936488 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
@@ -26,7 +26,8 @@ import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.response.ToStringResponseHandler;
+import com.metamx.http.client.response.StatusResponseHandler;
+import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.client.selector.Server;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.indexing.common.RetryPolicy;
@@ -37,6 +38,7 @@ import org.joda.time.Duration;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
public class RemoteTaskActionClient implements TaskActionClient
@@ -75,22 +77,25 @@ public class RemoteTaskActionClient implements TaskActionClient
while (true) {
try {
+ final Server server;
final URI serviceUri;
try {
- serviceUri = getServiceUri();
+ server = getServiceInstance();
+ serviceUri = makeServiceUri(server);
}
catch (Exception e) {
+ // Want to retry, so throw an IOException.
throw new IOException("Failed to locate service uri", e);
}
- final String response;
+ final StatusResponseHolder response;
log.info("Submitting action for task[%s] to overlord[%s]: %s", task.getId(), serviceUri, taskAction);
try {
response = httpClient.post(serviceUri.toURL())
.setContent("application/json", dataToSend)
- .go(new ToStringResponseHandler(Charsets.UTF_8))
+ .go(new StatusResponseHandler(Charsets.UTF_8))
.get();
}
catch (Exception e) {
@@ -99,13 +104,24 @@ public class RemoteTaskActionClient implements TaskActionClient
throw Throwables.propagate(e);
}
- final Map responseDict = jsonMapper.readValue(
- response, new TypeReference>()
- {
+ if (response.getStatus().getCode() / 200 == 1) {
+ final Map responseDict = jsonMapper.readValue(
+ response.getContent(),
+ new TypeReference>()
+ {
+ }
+ );
+ return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
+ } else {
+ // Want to retry, so throw an IOException.
+ throw new IOException(
+ String.format(
+ "Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
+ response.getStatus(),
+ server.getHost()
+ )
+ );
}
- );
-
- return jsonMapper.convertValue(responseDict.get("result"), taskAction.getReturnTypeReference());
}
catch (IOException | ChannelException e) {
log.warn(e, "Exception submitting action for task[%s]", task.getId());
@@ -127,13 +143,18 @@ public class RemoteTaskActionClient implements TaskActionClient
}
}
- private URI getServiceUri() throws Exception
+ private URI makeServiceUri(final Server instance) throws URISyntaxException
+ {
+ return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
+ }
+
+ private Server getServiceInstance()
{
final Server instance = selector.pick();
if (instance == null) {
throw new ISE("Cannot find instance of indexer to talk to!");
+ } else {
+ return instance;
}
-
- return new URI(String.format("%s://%s%s", instance.getScheme(), instance.getHost(), "/druid/indexer/v1/action"));
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
index 5280e394f6f..d76461ab171 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentInsertAction.java
@@ -81,7 +81,7 @@ public class SegmentInsertAction implements TaskAction>
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- final Set retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
+ final Set retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
index fb296871d2d..3aaca159ccd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUnusedAction.java
@@ -68,7 +68,7 @@ public class SegmentListUnusedAction implements TaskAction>
@Override
public List perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
+ return toolbox.getIndexerMetadataStorageCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
index ff9cfc492ca..82d16daf3ae 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentListUsedAction.java
@@ -68,7 +68,7 @@ public class SegmentListUsedAction implements TaskAction>
@Override
public List perform(Task task, TaskActionToolbox toolbox) throws IOException
{
- return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
+ return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
index 4356c80dc59..2abd53fea08 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentMetadataUpdateAction.java
@@ -42,7 +42,7 @@ public class SegmentMetadataUpdateAction implements TaskAction
) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
+ toolbox.getIndexerMetadataStorageCoordinator().updateSegmentMetadata(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
index 54258df1c2d..62bc6142c6d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SegmentNukeAction.java
@@ -59,7 +59,7 @@ public class SegmentNukeAction implements TaskAction
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
- toolbox.getIndexerDBCoordinator().deleteSegments(segments);
+ toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
index d9b0520f40b..5b055b4ba90 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
@@ -27,7 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.overlord.IndexerDBCoordinator;
+import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;
@@ -37,18 +37,18 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
- private final IndexerDBCoordinator indexerDBCoordinator;
+ private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
- IndexerDBCoordinator indexerDBCoordinator,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter
)
{
this.taskLockbox = taskLockbox;
- this.indexerDBCoordinator = indexerDBCoordinator;
+ this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
}
@@ -57,9 +57,9 @@ public class TaskActionToolbox
return taskLockbox;
}
- public IndexerDBCoordinator getIndexerDBCoordinator()
+ public IndexerMetadataStorageCoordinator getIndexerMetadataStorageCoordinator()
{
- return indexerDBCoordinator;
+ return indexerMetadataStorageCoordinator;
}
public ServiceEmitter getEmitter()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
index 11f6bb2264d..0f7859f72a6 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
@@ -35,7 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
-import io.druid.segment.IndexMerger;
+import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
@@ -166,7 +166,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
- IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
+ IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
// Map merged segment so we can extract dimensions
@@ -211,8 +211,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
-
- IndexMerger.persist(
+ IndexMaker.persist(
indexToPersist.getIndex(),
dirToPersist
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
index 51fb358ac8c..15f89e8cfbd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java
@@ -37,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
-import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
deleted file mode 100644
index 39d443d1330..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/DeleteTask.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.metamx.common.logger.Logger;
-import io.druid.granularity.QueryGranularity;
-import io.druid.indexing.common.TaskLock;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.TaskToolbox;
-import io.druid.query.aggregation.AggregatorFactory;
-import io.druid.segment.IndexMerger;
-import io.druid.segment.IndexableAdapter;
-import io.druid.segment.incremental.IncrementalIndex;
-import io.druid.segment.incremental.IncrementalIndexAdapter;
-import io.druid.timeline.DataSegment;
-import io.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-
-import java.io.File;
-
-public class DeleteTask extends AbstractFixedIntervalTask
-{
- private static final Logger log = new Logger(DeleteTask.class);
-
- @JsonCreator
- public DeleteTask(
- @JsonProperty("id") String id,
- @JsonProperty("dataSource") String dataSource,
- @JsonProperty("interval") Interval interval
- )
- {
- super(
- id != null ? id : String.format(
- "delete_%s_%s_%s_%s",
- dataSource,
- interval.getStart(),
- interval.getEnd(),
- new DateTime().toString()
- ),
- dataSource,
- Preconditions.checkNotNull(interval, "interval")
- );
- }
-
- @Override
- public String getType()
- {
- return "delete";
- }
-
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
- {
- // Strategy: Create an empty segment covering the interval to be deleted
- final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
- final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
- final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
-
- // Create DataSegment
- final DataSegment segment =
- DataSegment.builder()
- .dataSource(this.getDataSource())
- .interval(getInterval())
- .version(myLock.getVersion())
- .shardSpec(new NoneShardSpec())
- .build();
-
- final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
- final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
-
- // Upload the segment
- final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
-
- log.info(
- "Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
- segment.getDataSource(),
- segment.getInterval(),
- segment.getVersion()
- );
-
- toolbox.pushSegments(ImmutableList.of(uploadedSegment));
-
- return TaskStatus.success(getId());
- }
-}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index a0ebfa98dce..b46867b0414 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -29,6 +29,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ExtensionsConfig;
@@ -38,6 +39,7 @@ import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
+import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@@ -63,8 +65,9 @@ public class HadoopIndexTask extends AbstractTask
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final ExtensionsConfig extensionsConfig;
+ final static Injector injector = GuiceInjectors.makeStartupInjector();
static {
- extensionsConfig = GuiceInjectors.makeStartupInjector().getInstance(ExtensionsConfig.class);
+ extensionsConfig = injector.getInstance(ExtensionsConfig.class);
}
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
@@ -288,7 +291,10 @@ public class HadoopIndexTask extends AbstractTask
.withTuningConfig(theSchema.getTuningConfig().withVersion(version))
);
- HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
+ HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(
+ config,
+ injector.getInstance(MetadataStorageUpdaterJobHandler.class)
+ );
log.info("Starting a hadoop index generator job...");
if (job.run()) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index f04736a66e3..d1495e6b6d2 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -147,7 +147,7 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
- new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
+ new IndexTuningConfig(targetPartitionSize, 0, null)
);
}
this.jsonMapper = jsonMapper;
@@ -401,7 +401,11 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
- ).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
+ ).findPlumber(
+ schema,
+ new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
+ metrics
+ );
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
@@ -557,7 +561,7 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards
- )
+ )
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 231febf66df..906e8e7a901 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -143,7 +143,9 @@ public class RealtimeIndexTask extends AbstractTask
null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
- spec.getShardSpec()
+ spec.getShardSpec(),
+ false,
+ false
),
null, null, null, null
);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index e5db129128c..a4c093179b3 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -43,7 +43,6 @@ import io.druid.query.QueryRunner;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
- @JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 5744fa7f006..f0dacac6fa8 100644
--- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -51,10 +51,11 @@ import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
+import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
-import io.druid.segment.TimestampColumnSelector;
+import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
@@ -250,7 +251,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory apply(final Cursor cursor)
{
- final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
+ final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map dimSelectors = Maps.newHashMap();
for (String dim : dims) {
@@ -287,7 +288,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory theEvent = Maps.newLinkedHashMap();
- final long timestamp = timestampColumnSelector.getTimestamp();
+ final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry dimSelector : dimSelectors.entrySet()) {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
deleted file mode 100644
index 05576184655..00000000000
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/DbTaskStorage.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Druid - a distributed column store.
- * Copyright (C) 2012, 2013 Metamarkets Group Inc.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- */
-
-package io.druid.indexing.overlord;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import com.metamx.common.RetryUtils;
-import com.metamx.common.lifecycle.LifecycleStart;
-import com.metamx.common.lifecycle.LifecycleStop;
-import com.metamx.emitter.EmittingLogger;
-import com.mysql.jdbc.exceptions.MySQLTransientException;
-import io.druid.db.DbConnector;
-import io.druid.db.DbTablesConfig;
-import io.druid.indexing.common.TaskLock;
-import io.druid.indexing.common.TaskStatus;
-import io.druid.indexing.common.actions.TaskAction;
-import io.druid.indexing.common.config.TaskStorageConfig;
-import io.druid.indexing.common.task.Task;
-import org.joda.time.DateTime;
-import org.skife.jdbi.v2.Handle;
-import org.skife.jdbi.v2.IDBI;
-import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.exceptions.DBIException;
-import org.skife.jdbi.v2.exceptions.StatementException;
-import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
-
-import java.sql.SQLException;
-import java.sql.SQLRecoverableException;
-import java.sql.SQLTransientException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-public class DbTaskStorage implements TaskStorage
-{
- private final ObjectMapper jsonMapper;
- private final DbConnector dbConnector;
- private final DbTablesConfig dbTables;
- private final IDBI dbi;
- private final TaskStorageConfig config;
-
- private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
-
- @Inject
- public DbTaskStorage(
- final ObjectMapper jsonMapper,
- final DbConnector dbConnector,
- final DbTablesConfig dbTables,
- final IDBI dbi,
- final TaskStorageConfig config
- )
- {
- this.jsonMapper = jsonMapper;
- this.dbConnector = dbConnector;
- this.dbTables = dbTables;
- this.dbi = dbi;
- this.config = config;
- }
-
- @LifecycleStart
- public void start()
- {
- dbConnector.createTaskTables();
- }
-
- @LifecycleStop
- public void stop()
- {
- // do nothing
- }
-
- @Override
- public void insert(final Task task, final TaskStatus status) throws TaskExistsException
- {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkNotNull(status, "status");
- Preconditions.checkArgument(
- task.getId().equals(status.getId()),
- "Task/Status ID mismatch[%s/%s]",
- task.getId(),
- status.getId()
- );
-
- log.info("Inserting task %s with status: %s", task.getId(), status);
-
- try {
- retryingHandle(
- new HandleCallback()
- {
- @Override
- public Void withHandle(Handle handle) throws Exception
- {
- handle.createStatement(
- String.format(
- "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
- dbTables.getTasksTable()
- )
- )
- .bind("id", task.getId())
- .bind("created_date", new DateTime().toString())
- .bind("datasource", task.getDataSource())
- .bind("payload", jsonMapper.writeValueAsBytes(task))
- .bind("active", status.isRunnable() ? 1 : 0)
- .bind("status_payload", jsonMapper.writeValueAsBytes(status))
- .execute();
-
- return null;
- }
- }
- );
- }
- catch (Exception e) {
- final boolean isStatementException = e instanceof StatementException ||
- (e instanceof CallbackFailedException
- && e.getCause() instanceof StatementException);
- if (isStatementException && getTask(task.getId()).isPresent()) {
- throw new TaskExistsException(task.getId(), e);
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public void setStatus(final TaskStatus status)
- {
- Preconditions.checkNotNull(status, "status");
-
- log.info("Updating task %s to status: %s", status.getId(), status);
-
- int updated = retryingHandle(
- new HandleCallback()
- {
- @Override
- public Integer withHandle(Handle handle) throws Exception
- {
- return handle.createStatement(
- String.format(
- "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
- dbTables.getTasksTable()
- )
- )
- .bind("id", status.getId())
- .bind("active", status.isRunnable() ? 1 : 0)
- .bind("status_payload", jsonMapper.writeValueAsBytes(status))
- .execute();
- }
- }
- );
-
- if (updated != 1) {
- throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
- }
- }
-
- @Override
- public Optional getTask(final String taskid)
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public Optional withHandle(Handle handle) throws Exception
- {
- final List> dbTasks =
- handle.createQuery(
- String.format(
- "SELECT payload FROM %s WHERE id = :id",
- dbTables.getTasksTable()
- )
- )
- .bind("id", taskid)
- .list();
-
- if (dbTasks.size() == 0) {
- return Optional.absent();
- } else {
- final Map dbStatus = Iterables.getOnlyElement(dbTasks);
- return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
- }
- }
- }
- );
- }
-
- @Override
- public Optional getStatus(final String taskid)
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public Optional withHandle(Handle handle) throws Exception
- {
- final List> dbStatuses =
- handle.createQuery(
- String.format(
- "SELECT status_payload FROM %s WHERE id = :id",
- dbTables.getTasksTable()
- )
- )
- .bind("id", taskid)
- .list();
-
- if (dbStatuses.size() == 0) {
- return Optional.absent();
- } else {
- final Map dbStatus = Iterables.getOnlyElement(dbStatuses);
- return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
- }
- }
- }
- );
- }
-
- @Override
- public List getActiveTasks()
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public List withHandle(Handle handle) throws Exception
- {
- final List> dbTasks =
- handle.createQuery(
- String.format(
- "SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
- dbTables.getTasksTable()
- )
- )
- .list();
-
- final ImmutableList.Builder tasks = ImmutableList.builder();
- for (final Map row : dbTasks) {
- final String id = row.get("id").toString();
-
- try {
- final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
- final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
-
- if (status.isRunnable()) {
- tasks.add(task);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
- }
- }
-
- return tasks.build();
- }
- }
- );
- }
-
- @Override
- public List getRecentlyFinishedTaskStatuses()
- {
- final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public List withHandle(Handle handle) throws Exception
- {
- final List> dbTasks =
- handle.createQuery(
- String.format(
- "SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
- dbTables.getTasksTable()
- )
- ).bind("recent", recent.toString()).list();
-
- final ImmutableList.Builder statuses = ImmutableList.builder();
- for (final Map row : dbTasks) {
- final String id = row.get("id").toString();
-
- try {
- final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
- if (status.isComplete()) {
- statuses.add(status);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
- }
- }
-
- return statuses.build();
- }
- }
- );
- }
-
- @Override
- public void addLock(final String taskid, final TaskLock taskLock)
- {
- Preconditions.checkNotNull(taskid, "taskid");
- Preconditions.checkNotNull(taskLock, "taskLock");
-
- log.info(
- "Adding lock on interval[%s] version[%s] for task: %s",
- taskLock.getInterval(),
- taskLock.getVersion(),
- taskid
- );
-
- retryingHandle(
- new HandleCallback()
- {
- @Override
- public Integer withHandle(Handle handle) throws Exception
- {
- return handle.createStatement(
- String.format(
- "INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
- dbTables.getTaskLockTable()
- )
- )
- .bind("task_id", taskid)
- .bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
- .execute();
- }
- }
- );
- }
-
- @Override
- public void removeLock(String taskid, TaskLock taskLockToRemove)
- {
- Preconditions.checkNotNull(taskid, "taskid");
- Preconditions.checkNotNull(taskLockToRemove, "taskLockToRemove");
-
- final Map taskLocks = getLocksWithIds(taskid);
-
- for (final Map.Entry taskLockWithId : taskLocks.entrySet()) {
- final long id = taskLockWithId.getKey();
- final TaskLock taskLock = taskLockWithId.getValue();
-
- if (taskLock.equals(taskLockToRemove)) {
- log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
-
- retryingHandle(
- new HandleCallback()
- {
- @Override
- public Integer withHandle(Handle handle) throws Exception
- {
- return handle.createStatement(
- String.format(
- "DELETE FROM %s WHERE id = :id",
- dbTables.getTaskLockTable()
- )
- )
- .bind("id", id)
- .execute();
- }
- }
- );
- }
- }
- }
-
- @Override
- public List getLocks(String taskid)
- {
- return ImmutableList.copyOf(
- Iterables.transform(
- getLocksWithIds(taskid).entrySet(), new Function, TaskLock>()
- {
- @Override
- public TaskLock apply(Map.Entry e)
- {
- return e.getValue();
- }
- }
- )
- );
- }
-
- @Override
- public void addAuditLog(final Task task, final TaskAction taskAction)
- {
- Preconditions.checkNotNull(taskAction, "taskAction");
-
- log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
-
- retryingHandle(
- new HandleCallback()
- {
- @Override
- public Integer withHandle(Handle handle) throws Exception
- {
- return handle.createStatement(
- String.format(
- "INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
- dbTables.getTaskLogTable()
- )
- )
- .bind("task_id", task.getId())
- .bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
- .execute();
- }
- }
- );
- }
-
- @Override
- public List getAuditLogs(final String taskid)
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public List withHandle(Handle handle) throws Exception
- {
- final List> dbTaskLogs =
- handle.createQuery(
- String.format(
- "SELECT log_payload FROM %s WHERE task_id = :task_id",
- dbTables.getTaskLogTable()
- )
- )
- .bind("task_id", taskid)
- .list();
-
- final List retList = Lists.newArrayList();
- for (final Map dbTaskLog : dbTaskLogs) {
- try {
- retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to deserialize TaskLog")
- .addData("task", taskid)
- .addData("logPayload", dbTaskLog)
- .emit();
- }
- }
- return retList;
- }
- }
- );
- }
-
- private Map getLocksWithIds(final String taskid)
- {
- return retryingHandle(
- new HandleCallback>()
- {
- @Override
- public Map withHandle(Handle handle) throws Exception
- {
- final List> dbTaskLocks =
- handle.createQuery(
- String.format(
- "SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
- dbTables.getTaskLockTable()
- )
- )
- .bind("task_id", taskid)
- .list();
-
- final Map retMap = Maps.newHashMap();
- for (final Map row : dbTaskLocks) {
- try {
- retMap.put(
- (Long) row.get("id"),
- jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
- );
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to deserialize TaskLock")
- .addData("task", taskid)
- .addData("lockPayload", row)
- .emit();
- }
- }
- return retMap;
- }
- }
- );
- }
-
- /**
- * Retry SQL operations
- */
- private T retryingHandle(final HandleCallback callback)
- {
- final Callable call = new Callable()
- {
- @Override
- public T call() throws Exception
- {
- return dbi.withHandle(callback);
- }
- };
- final Predicate shouldRetry = new Predicate()
- {
- @Override
- public boolean apply(Throwable e)
- {
- return shouldRetryException(e);
- }
- };
- final int maxTries = 10;
- try {
- return RetryUtils.retry(call, shouldRetry, maxTries);
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- private static boolean shouldRetryException(final Throwable e)
- {
- return e != null && (e instanceof SQLTransientException
- || e instanceof MySQLTransientException
- || e instanceof SQLRecoverableException
- || e instanceof UnableToObtainConnectionException
- || (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317)
- || (e instanceof SQLException && shouldRetryException(e.getCause()))
- || (e instanceof DBIException && shouldRetryException(e.getCause())));
- }
-}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index d7c68491e4d..1de7d11aaec 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -56,8 +56,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.RandomAccessFile;
-import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
index f279f4fad22..55c86af6210 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -35,6 +35,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
+import io.druid.metadata.EntryExistsException;
import org.joda.time.DateTime;
import java.util.List;
@@ -63,7 +64,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
- public void insert(Task task, TaskStatus status) throws TaskExistsException
+ public void insert(Task task, TaskStatus status) throws EntryExistsException
{
giant.lock();
@@ -78,7 +79,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
);
if(tasks.containsKey(task.getId())) {
- throw new TaskExistsException(task.getId());
+ throw new EntryExistsException(task.getId());
}
log.info("Inserting task %s with status: %s", task.getId(), status);
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
new file mode 100644
index 00000000000..6adc6566f14
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
@@ -0,0 +1,304 @@
+/*
+ * Druid - a distributed column store.
+ * Copyright (C) 2012, 2013 Metamarkets Group Inc.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.inject.Inject;
+import com.metamx.common.Pair;
+import com.metamx.common.lifecycle.LifecycleStart;
+import com.metamx.common.lifecycle.LifecycleStop;
+import com.metamx.emitter.EmittingLogger;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.metadata.EntryExistsException;
+import io.druid.metadata.MetadataStorageActionHandler;
+import io.druid.metadata.MetadataStorageActionHandlerFactory;
+import io.druid.metadata.MetadataStorageActionHandlerTypes;
+import io.druid.metadata.MetadataStorageConnector;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.actions.TaskAction;
+import io.druid.indexing.common.config.TaskStorageConfig;
+import io.druid.indexing.common.task.Task;
+import io.druid.metadata.MetadataStorageTablesConfig;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Map;
+
+public class MetadataTaskStorage implements TaskStorage
+{
+ private static final MetadataStorageActionHandlerTypes TASK_TYPES = new MetadataStorageActionHandlerTypes()
+ {
+ @Override
+ public TypeReference getEntryType()
+ {
+ return new TypeReference()
+ {
+ };
+ }
+
+ @Override
+ public TypeReference getStatusType()
+ {
+ return new TypeReference()
+ {
+ };
+ }
+
+ @Override
+ public TypeReference getLogType()
+ {
+ return new TypeReference()
+ {
+ };
+ }
+
+ @Override
+ public TypeReference getLockType()
+ {
+ return new TypeReference()
+ {
+ };
+ }
+ };
+
+ private final MetadataStorageConnector metadataStorageConnector;
+ private final TaskStorageConfig config;
+ private final MetadataStorageActionHandler handler;
+
+ private static final EmittingLogger log = new EmittingLogger(MetadataTaskStorage.class);
+
+ @Inject
+ public MetadataTaskStorage(
+ final MetadataStorageConnector metadataStorageConnector,
+ final TaskStorageConfig config,
+ final MetadataStorageActionHandlerFactory factory
+ )
+ {
+ this.metadataStorageConnector = metadataStorageConnector;
+ this.config = config;
+ this.handler = factory.create(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, TASK_TYPES);
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ metadataStorageConnector.createTaskTables();
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ // do nothing
+ }
+
+ @Override
+ public void insert(final Task task, final TaskStatus status) throws EntryExistsException
+ {
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkNotNull(status, "status");
+ Preconditions.checkArgument(
+ task.getId().equals(status.getId()),
+ "Task/Status ID mismatch[%s/%s]",
+ task.getId(),
+ status.getId()
+ );
+
+ log.info("Inserting task %s with status: %s", task.getId(), status);
+
+ try {
+ handler.insert(
+ task.getId(),
+ new DateTime(),
+ task.getDataSource(),
+ task,
+ status.isRunnable(),
+ status
+ );
+ }
+ catch (Exception e) {
+ if(e instanceof EntryExistsException) {
+ throw (EntryExistsException) e;
+ } else {
+ Throwables.propagate(e);
+ }
+ }
+ }
+
+ @Override
+ public void setStatus(final TaskStatus status)
+ {
+ Preconditions.checkNotNull(status, "status");
+
+ log.info("Updating task %s to status: %s", status.getId(), status);
+
+ final boolean set = handler.setStatus(
+ status.getId(),
+ status.isRunnable(),
+ status
+ );
+ if (!set) {
+ throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
+ }
+ }
+
+ @Override
+ public Optional getTask(final String taskId)
+ {
+ return handler.getEntry(taskId);
+ }
+
+ @Override
+ public Optional getStatus(final String taskId)
+ {
+ return handler.getStatus(taskId);
+ }
+
+ @Override
+ public List getActiveTasks()
+ {
+ return ImmutableList.copyOf(
+ Iterables.transform(
+ Iterables.filter(
+ handler.getActiveEntriesWithStatus(),
+ new Predicate>()
+ {
+ @Override
+ public boolean apply(
+ @Nullable Pair input
+ )
+ {
+ return input.rhs.isRunnable();
+ }
+ }
+ ),
+ new Function, Task>()
+ {
+ @Nullable
+ @Override
+ public Task apply(@Nullable Pair input)
+ {
+ return input.lhs;
+ }
+ }
+ )
+ );
+ }
+
+ @Override
+ public List getRecentlyFinishedTaskStatuses()
+ {
+ final DateTime start = new DateTime().minus(config.getRecentlyFinishedThreshold());
+
+ return ImmutableList.copyOf(
+ Iterables.filter(
+ handler.getInactiveStatusesSince(start),
+ new Predicate()
+ {
+ @Override
+ public boolean apply(TaskStatus status)
+ {
+ return status.isComplete();
+ }
+ }
+ )
+ );
+ }
+
+ @Override
+ public void addLock(final String taskid, final TaskLock taskLock)
+ {
+ Preconditions.checkNotNull(taskid, "taskid");
+ Preconditions.checkNotNull(taskLock, "taskLock");
+
+ log.info(
+ "Adding lock on interval[%s] version[%s] for task: %s",
+ taskLock.getInterval(),
+ taskLock.getVersion(),
+ taskid
+ );
+
+ handler.addLock(taskid, taskLock);
+ }
+
+ @Override
+ public void removeLock(String taskid, TaskLock taskLockToRemove)
+ {
+ Preconditions.checkNotNull(taskid, "taskid");
+ Preconditions.checkNotNull(taskLockToRemove, "taskLockToRemove");
+
+ final Map taskLocks = getLocksWithIds(taskid);
+
+ for (final Map.Entry taskLockWithId : taskLocks.entrySet()) {
+ final long id = taskLockWithId.getKey();
+ final TaskLock taskLock = taskLockWithId.getValue();
+
+ if (taskLock.equals(taskLockToRemove)) {
+ log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
+ handler.removeLock(id);
+ }
+ }
+ }
+
+ @Override
+ public List getLocks(String taskid)
+ {
+ return ImmutableList.copyOf(
+ Iterables.transform(
+ getLocksWithIds(taskid).entrySet(), new Function, TaskLock>()
+ {
+ @Override
+ public TaskLock apply(Map.Entry e)
+ {
+ return e.getValue();
+ }
+ }
+ )
+ );
+ }
+
+ @Override
+ public void addAuditLog(final Task task, final TaskAction taskAction)
+ {
+ Preconditions.checkNotNull(taskAction, "taskAction");
+
+ log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
+
+ handler.addLog(task.getId(), taskAction);
+ }
+
+ @Override
+ public List getAuditLogs(final String taskId)
+ {
+ return handler.getLogs(taskId);
+ }
+
+ private Map getLocksWithIds(final String taskid)
+ {
+ return handler.getLocks(taskid);
+ }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java
index 66306636e7f..b48086c08d4 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunnerFactory.java
@@ -68,7 +68,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
curator,
new SimplePathChildrenCacheFactory
.Builder()
- .withCompressed(remoteTaskRunnerConfig.isCompressZnodes())
+ .withCompressed(true)
.build(),
httpClient,
strategy
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
index f1f8d604db7..c309005616b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java
@@ -43,6 +43,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig;
+import io.druid.metadata.EntryExistsException;
import java.util.List;
import java.util.Map;
@@ -292,9 +293,9 @@ public class TaskQueue
* @param task task to add
*
* @return true
- * @throws TaskExistsException if the task already exists
+ * @throws io.druid.metadata.EntryExistsException if the task already exists
*/
- public boolean add(final Task task) throws TaskExistsException
+ public boolean add(final Task task) throws EntryExistsException
{
giant.lock();
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
index 45ee3864143..8d8ad6346e4 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
@@ -24,6 +24,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
+import io.druid.metadata.EntryExistsException;
import java.util.List;
@@ -34,9 +35,9 @@ public interface TaskStorage
*
* @param task task to add
* @param status task status
- * @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
+ * @throws io.druid.metadata.EntryExistsException if the task ID already exists
*/
- public void insert(Task task, TaskStatus status) throws TaskExistsException;
+ public void insert(Task task, TaskStatus status) throws EntryExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
index 8d89d834785..8807debcb3d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/ForkingTaskRunnerConfig.java
@@ -49,7 +49,7 @@ public class ForkingTaskRunnerConfig
@JsonProperty
@Min(1024)
@Max(65535)
- private int startPort = 8081;
+ private int startPort = 8100;
@JsonProperty
@NotNull
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java
index fb149391a06..2b286040654 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/config/RemoteTaskRunnerConfig.java
@@ -33,9 +33,6 @@ public class RemoteTaskRunnerConfig
@NotNull
private Period taskAssignmentTimeout = new Period("PT5M");
- @JsonProperty
- private boolean compressZnodes = false;
-
@JsonProperty
private String minWorkerVersion = "0";
@@ -48,11 +45,6 @@ public class RemoteTaskRunnerConfig
return taskAssignmentTimeout;
}
- public boolean isCompressZnodes()
- {
- return compressZnodes;
- }
-
public String getMinWorkerVersion()
{
return minWorkerVersion;
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index 248d6611b48..3b6ef11bf71 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -36,7 +36,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.overlord.TaskExistsException;
+import io.druid.metadata.EntryExistsException;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
@@ -131,7 +131,7 @@ public class OverlordResource
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
- catch (TaskExistsException e) {
+ catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
.build();
diff --git a/indexing-service/src/main/resources/indexer_static/console.html b/indexing-service/src/main/resources/indexer_static/console.html
index 9b9c254996b..827c8a285ca 100644
--- a/indexing-service/src/main/resources/indexer_static/console.html
+++ b/indexing-service/src/main/resources/indexer_static/console.html
@@ -25,14 +25,14 @@
-
-
+
+
-
-
-
-
-
+
+
+
+
+
@@ -64,4 +64,4 @@