initial attempt for abstraction; druid cluster works with Derby as a default

This commit is contained in:
jisookim0513 2014-09-19 17:39:59 -07:00
parent f0d67fe873
commit 273205f217
250 changed files with 4266 additions and 3234 deletions

View File

@ -34,13 +34,13 @@ import java.util.PriorityQueue;
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
*
* <p/>
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
*
* <p/>
* If this doesn't make sense, check out OrderedMergeIteratorTest.testScrewsUpOnOutOfOrderBeginningOfList()
*
* <p/>
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/

View File

@ -22,6 +22,7 @@ package io.druid.collections;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
@ -37,13 +38,13 @@ import java.util.PriorityQueue;
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
*
* <p/>
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
*
* <p/>
* If this doesn't make sense, check out OrderedMergeSequenceTest.testScrewsUpOnOutOfOrderBeginningOfList()
*
* <p/>
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/

View File

@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;
@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
private final DbConnector dbConnector;
private final MetadataDbConnector metadataDbConnector;
private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec;
@ -58,9 +58,9 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
public ConfigManager(DbConnector dbConnector, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
public ConfigManager(MetadataDbConnector metadataDbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.dbConnector = dbConnector;
this.metadataDbConnector = metadataDbConnector;
this.config = config;
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
@ -105,7 +105,7 @@ public class ConfigManager
{
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
try {
if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
if (entry.getValue().swapIfNew(metadataDbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
log.info("New value for key[%s] seen.", entry.getKey());
}
}
@ -137,7 +137,7 @@ public class ConfigManager
// Multiple of these callables can be submitted at the same time, but the callables themselves
// are executed serially, so double check that it hasn't already been populated.
if (!watchedConfigs.containsKey(key)) {
byte[] value = dbConnector.lookup(configTable, "name", "payload", key);
byte[] value = metadataDbConnector.lookup(configTable, "name", "payload", key);
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
watchedConfigs.put(key, holder);
}
@ -181,7 +181,7 @@ public class ConfigManager
@Override
public Boolean call() throws Exception
{
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
metadataDbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) {

View File

@ -20,6 +20,7 @@
package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.OutputSupplier;
import io.druid.collections.IntList;
@ -262,4 +263,4 @@ public class SerializerUtils
return retVal;
}
}
}

View File

@ -1,389 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
*/
public class DbConnector
{
private static final Logger log = new Logger(DbConnector.class);
public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
{
createTable(
dbi,
segmentTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName
),
isPostgreSQL
);
}
public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
{
createTable(
dbi,
ruleTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName
),
isPostgreSQL
);
}
public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
{
createTable(
dbi,
configTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName
),
isPostgreSQL
);
}
public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);":
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
),
isPostgreSQL
);
}
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLogsTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
),
isPostgreSQL
);
}
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLocksTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
),
isPostgreSQL
);
}
public static void createTable(
final IDBI dbi,
final String tableName,
final String sql,
final boolean isPostgreSQL
)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table;
if ( isPostgreSQL ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
}
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws SQLException
{
final String insertOrUpdateStatement = String.format(
isPostgreSQL ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
return dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertOrUpdateStatement)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn, tableName, keyColumn);
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes(valueColumn);
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return isPostgreSQL(handle);
}
}
);
}
protected static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
private boolean isPostgreSQL = false;
@Inject
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
{
this.config = config;
this.dbTables = dbTables;
this.dbi = new DBI(getDatasource());
}
public DBI getDBI()
{
return dbi;
}
public boolean isPostgreSQL()
{
return isPostgreSQL;
}
private DataSource getDatasource()
{
DbConnectorConfig connectorConfig = config.get();
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
String uri = connectorConfig.getConnectURI();
isPostgreSQL = uri.startsWith("jdbc:postgresql");
dataSource.setUrl(uri);
if (connectorConfig.isUseValidationQuery()) {
dataSource.setValidationQuery(connectorConfig.getValidationQuery());
dataSource.setTestOnBorrow(true);
}
return dataSource;
}
public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(dbi, dbTables.get().getSegmentsTable(), isPostgreSQL);
}
}
public void createRulesTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getRulesTable(), isPostgreSQL);
}
}
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createConfigTable(dbi, dbTables.get().getConfigTable(), isPostgreSQL);
}
}
public void createTaskTables()
{
if (config.get().isCreateTables()) {
final DbTablesConfig dbTablesConfig = dbTables.get();
createTaskTable(dbi, dbTablesConfig.getTasksTable(), isPostgreSQL);
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable(), isPostgreSQL);
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable(), isPostgreSQL);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
/**
*/
public interface MetadataDbConnector
{
public Void insertOrUpdate(
final String storageName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception;
public byte[] lookup(
final String storageName,
final String keyColumn,
final String valueColumn,
final String key
);
public void createSegmentTable();
public void createRulesTable();
public void createConfigTable();
public void createTaskTables();
}

View File

@ -25,21 +25,18 @@ import javax.validation.constraints.NotNull;
/**
*/
public class DbConnectorConfig
public class MetadataDbConnectorConfig
{
@JsonProperty
private boolean createTables = true;
@JsonProperty
@NotNull
private String connectURI = null;
@JsonProperty
@NotNull
private String user = null;
@JsonProperty
@NotNull
private String password = null;
@JsonProperty

View File

@ -24,11 +24,11 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class DbTablesConfig
public class MetadataTablesConfig
{
public static DbTablesConfig fromBase(String base)
public static MetadataTablesConfig fromBase(String base)
{
return new DbTablesConfig(base, null, null, null, null, null, null);
return new MetadataTablesConfig(base, null, null, null, null, null, null);
}
private static String defaultBase = "druid";
@ -55,7 +55,7 @@ public class DbTablesConfig
private final String taskLockTable;
@JsonCreator
public DbTablesConfig(
public MetadataTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
@ -120,4 +120,4 @@ public class DbTablesConfig
{
return taskLockTable;
}
}
}

View File

@ -27,8 +27,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import io.druid.common.config.ConfigManager;
import io.druid.common.config.ConfigManagerConfig;
import io.druid.common.config.JacksonConfigManager;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
/**
*/
@ -43,8 +43,8 @@ public class JacksonConfigManagerModule implements Module
@Provides @ManageLifecycle
public ConfigManager getConfigManager(
final DbConnector dbConnector,
final Supplier<DbTablesConfig> dbTables,
final MetadataDbConnector metadataDbConnector,
final Supplier<MetadataTablesConfig> dbTables,
final Supplier<ConfigManagerConfig> config,
final Lifecycle lifecycle
)
@ -55,7 +55,7 @@ public class JacksonConfigManagerModule implements Module
@Override
public void start() throws Exception
{
dbConnector.createConfigTable();
metadataDbConnector.createConfigTable();
}
@Override
@ -66,6 +66,6 @@ public class JacksonConfigManagerModule implements Module
}
);
return new ConfigManager(dbConnector, dbTables, config);
return new ConfigManager(metadataDbConnector, dbTables, config);
}
}

View File

@ -0,0 +1,11 @@
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.timeline.DataSegment;
import java.util.List;
public interface MetadataUpdaterJobHandler
{
public void publishSegments(String tableName, List<DataSegment> segments, ObjectMapper mapper);
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
*/
public interface IndexerMetadataCoordinator
{
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException;
/**
* Attempts to insert a set of segments to the database. Returns the set of segments actually added (segments
* with identifiers already in the database will not be added).
*
* @param segments set of segments to add
* @return set of segments actually added
*/
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException;
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException;
public void deleteSegments(final Set<DataSegment> segments) throws IOException;
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval);
}

View File

@ -0,0 +1,80 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.google.common.base.Supplier;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataTablesConfig;
import java.util.List;
import java.util.Map;
public interface MetadataActionHandler
{
//<T> T retryCall(Action<T> action);
/* Insert stuff. @returns number of entries inserted on success */
public void insert(
String tableName,
String Id,
String createdDate,
String dataSource,
byte[] payload,
int active,
byte[] statusPayload
) throws Exception;
/* Determines whether the exception happened in the insert statement is caused by already existing task */
public boolean isStatementException(Exception e);
/* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */
public int setStatus(String tableName, String Id, int active, byte[] statusPayload);
/* Retrieve a task with the given ID */
public List<Map<String, Object>> getTask(String tableName, String Id);
/* Retrieve a task status with the given ID */
public List<Map<String, Object>> getTaskStatus(String tableName, String Id);
/* Retrieve active tasks */
public List<Map<String, Object>> getActiveTasks(String tableName);
/* Retrieve task statuses that have been created sooner than the given time */
public List<Map<String, Object>> getRecentlyFinishedTaskStatuses(String tableName, String recent);
/* Add lock to the task with given ID */
public int addLock(String tableName, String Id, byte[] lock);
/* Remove taskLock with given ID */
public int removeLock(String tableName, long lockId);
public int addAuditLog(String tableName, String Id, byte[] taskAction);
/* Get logs for task with given ID */
public List<Map<String, Object>> getTaskLogs(String tableName, String Id);
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(String tableName, String Id);
/* Initialize and return new DbConnector */
// fpublic MetadataDbConnector getConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables);
}

View File

@ -42,17 +42,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
*
* <p/>
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
*
* <p/>
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
* with a timeline entry remains unchanged when chunking occurs.
*
* <p/>
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
* recent objects (according to the version) that match the given interval. The intent is that objects represent
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
* at in order to get a correct answer about that time period.
*
* <p/>
* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because
* they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if

View File

@ -28,6 +28,8 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.SequenceTestHelper;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.TestSequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import junit.framework.Assert;
import org.junit.Test;

View File

@ -17,7 +17,7 @@
},
"pathSpec" : {
"type" : "static",
"paths" : "examples/indexing/wikipedia_data.json"
"paths" : "examples/bin/examples/indexing/wikipedia_data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {

View File

@ -24,7 +24,7 @@
}],
"firehose" : {
"type" : "local",
"baseDir" : "examples/indexing/",
"baseDir" : "examples/bin/examples/indexing",
"filter" : "wikipedia_data.json",
"parser" : {
"timestampSpec" : {

View File

@ -38,60 +38,67 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- override jets3t from hadoop-core -->
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<!-- override httpclient / httpcore version from jets3t -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- override jets3t from hadoop-core -->
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</dependency>
<!-- override httpclient / httpcore version from jets3t -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<!-- Tests -->
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -19,16 +19,9 @@
package io.druid.indexer;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
@ -36,68 +29,26 @@ import java.util.List;
*/
public class DbUpdaterJob implements Jobby
{
private static final Logger log = new Logger(DbUpdaterJob.class);
private final HadoopDruidIndexerConfig config;
private final IDBI dbi;
private final DbConnector dbConnector;
@Inject
private MetadataUpdaterJobHandler handler;
public DbUpdaterJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.dbConnector = new DbConnector(config.getSchema().getIOConfig().getMetadataUpdateSpec(), null);
this.dbi = this.dbConnector.getDBI();
}
@Override
public boolean run()
{
final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
final PreparedBatch batch = handle.prepareBatch(
String.format(
dbConnector.isPostgreSQL() ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)" :
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable()
)
);
for (final DataSegment segment : segments) {
batch.add(
new ImmutableMap.Builder<String, Object>()
.put("id", segment.getIdentifier())
.put("dataSource", segment.getDataSource())
.put("created_date", new DateTime().toString())
.put("start", segment.getInterval().getStart().toString())
.put("end", segment.getInterval().getEnd().toString())
.put("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? 0 : 1)
.put("version", segment.getVersion())
.put("used", true)
.put("payload", HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segment))
.build()
);
log.info("Published %s", segment.getIdentifier());
}
batch.execute();
return null;
}
}
);
final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
final ObjectMapper mapper = HadoopDruidIndexerConfig.jsonMapper;
handler.publishSegments(segmentTable, segments, mapper);
return true;
}
}
}

View File

@ -81,13 +81,13 @@ import java.util.Map;
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
* choosing the best dimension that satisfies the criteria:
*
* <p/>
* <ul>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
*
* <p/>
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
* segment size relative to the target.
*/

View File

@ -45,9 +45,9 @@ public class HadoopDruidIndexerJob implements Jobby
this.config = config;
if (config.isUpdaterJobSpecSet()) {
dbUpdaterJob = new DbUpdaterJob(config);
this.dbUpdaterJob = new DbUpdaterJob(config);
} else {
dbUpdaterJob = null;
this.dbUpdaterJob = null;
}
}

View File

@ -21,11 +21,11 @@ package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import io.druid.db.DbConnectorConfig;
import io.druid.db.MetadataDbConnectorConfig;
/**
*/
public class DbUpdaterJobSpec implements Supplier<DbConnectorConfig>
public class DbUpdaterJobSpec implements Supplier<MetadataDbConnectorConfig>
{
@JsonProperty("connectURI")
public String connectURI;
@ -45,9 +45,9 @@ public class DbUpdaterJobSpec implements Supplier<DbConnectorConfig>
}
@Override
public DbConnectorConfig get()
public MetadataDbConnectorConfig get()
{
return new DbConnectorConfig()
return new MetadataDbConnectorConfig()
{
@Override
public String getConnectURI()

View File

@ -22,13 +22,13 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
@ -355,7 +355,7 @@ public class HadoopIngestionSpecTest
);
final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final DbConnectorConfig connectorConfig = spec.get();
final MetadataDbConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());

View File

@ -20,6 +20,7 @@
package io.druid.indexer.partitions;
import com.google.common.base.Throwables;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigTest;
import org.junit.Assert;
import org.junit.Test;

View File

@ -20,6 +20,7 @@
package io.druid.indexer.partitions;
import com.google.common.base.Throwables;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigTest;
import org.junit.Assert;
import org.junit.Test;

View File

@ -28,7 +28,7 @@ import com.google.common.base.Preconditions;
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
*
* <p/>
* TaskStatus objects are immutable.
*/
public class TaskStatus
@ -101,8 +101,6 @@ public class TaskStatus
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
*
* @return whether the task is runnable.
*/
@JsonIgnore
public boolean isRunnable()
@ -112,8 +110,6 @@ public class TaskStatus
/**
* Inverse of {@link #isRunnable}.
*
* @return whether the task is complete.
*/
@JsonIgnore
public boolean isComplete()
@ -124,8 +120,6 @@ public class TaskStatus
/**
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
* be true at any one time.
*
* @return whether the task succeeded.
*/
@JsonIgnore
public boolean isSuccess()
@ -136,8 +130,6 @@ public class TaskStatus
/**
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
* isFailure will be true at any one time.
*
* @return whether the task failed
*/
@JsonIgnore
public boolean isFailure()

View File

@ -29,6 +29,7 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;

View File

@ -81,7 +81,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> retVal = toolbox.getIndexerMetadataCoordinator().announceHistoricalSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -68,7 +68,7 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -68,7 +68,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -42,7 +42,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
toolbox.getIndexerMetadataCoordinator().updateSegmentMetadata(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -59,7 +59,7 @@ public class SegmentNukeAction implements TaskAction<Void>
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
toolbox.getIndexerMetadataCoordinator().deleteSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -27,7 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;
@ -37,18 +37,18 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final IndexerDBCoordinator indexerDBCoordinator;
private final IndexerMetadataCoordinator indexerMetadataCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerDBCoordinator indexerDBCoordinator,
IndexerMetadataCoordinator indexerMetadataCoordinator,
ServiceEmitter emitter
)
{
this.taskLockbox = taskLockbox;
this.indexerDBCoordinator = indexerDBCoordinator;
this.indexerMetadataCoordinator = indexerMetadataCoordinator;
this.emitter = emitter;
}
@ -57,9 +57,9 @@ public class TaskActionToolbox
return taskLockbox;
}
public IndexerDBCoordinator getIndexerDBCoordinator()
public IndexerMetadataCoordinator getIndexerMetadataCoordinator()
{
return indexerDBCoordinator;
return indexerMetadataCoordinator;
}
public ServiceEmitter getEmitter()

View File

@ -107,12 +107,6 @@ public abstract class AbstractTask implements Task
return null;
}
@Override
public String getClasspathPrefix()
{
return null;
}
@Override
public String toString()
{
@ -124,10 +118,7 @@ public abstract class AbstractTask implements Task
}
/**
* Start helper methods
*
* @param objects objects to join
* @return string of joined objects
* Start helper methods *
*/
public static String joinId(Object... objects)
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
@ -30,6 +31,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;

View File

@ -31,8 +31,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.GuiceInjectors;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
@ -44,7 +42,9 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.guice.ExtensionsConfig;
import io.druid.timeline.DataSegment;
import io.tesla.aether.internal.DefaultTeslaAether;
import org.joda.time.DateTime;
@ -79,13 +79,11 @@ public class HadoopIndexTask extends AbstractTask
private final HadoopIngestionSpec spec;
@JsonIgnore
private final List<String> hadoopDependencyCoordinates;
@JsonIgnore
private final String classpathPrefix;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
*
* <p/>
* Here, we will ensure that the DbConnectorConfig field of the spec is set to null, such that the
* job does not push a list of published segments the database. Instead, we will use the method
* IndexGeneratorJob.getPublishedSegments() to simply return a list of the published
@ -98,8 +96,7 @@ public class HadoopIndexTask extends AbstractTask
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
@JsonProperty("classpathPrefix") String classpathPrefix
@JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
)
{
super(
@ -126,8 +123,6 @@ public class HadoopIndexTask extends AbstractTask
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
this.classpathPrefix = classpathPrefix;
}
@Override
@ -164,13 +159,6 @@ public class HadoopIndexTask extends AbstractTask
return hadoopDependencyCoordinates;
}
@JsonProperty
@Override
public String getClasspathPrefix()
{
return classpathPrefix;
}
@SuppressWarnings("unchecked")
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
@ -328,4 +316,4 @@ public class HadoopIndexTask extends AbstractTask
return null;
}
}
}
}

View File

@ -244,6 +244,8 @@ public class IndexTask extends AbstractFixedIntervalTask
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Load data
System.out.println("PARSER IS");
System.out.println(ingestionSchema.getDataSchema().getParser());
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();

View File

@ -41,6 +41,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.IndexIO;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.TaskLock;

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -43,6 +44,7 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
@ -113,8 +115,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("maxPendingPersists") int maxPendingPersists,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("rejectionPolicyFactory") RejectionPolicyFactory rejectionPolicyFactory
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory
)
{
super(
@ -141,7 +142,7 @@ public class RealtimeIndexTask extends AbstractTask
windowPeriod,
null,
null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
rejectionPolicyFactory,
maxPendingPersists,
spec.getShardSpec()
),
@ -314,7 +315,6 @@ public class RealtimeIndexTask extends AbstractTask
null,
null,
null,
null,
0
);

View File

@ -59,27 +59,23 @@ public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
* @return task ID
*/
public String getId();
/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
* @return task group ID
*/
public String getGroupId();
/**
* Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker requirements a task may require.
* @return {@link io.druid.indexing.common.task.TaskResource} for this task
*/
public TaskResource getTaskResource();
/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
* @return task type label
*/
public String getType();
@ -99,17 +95,9 @@ public interface Task
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
* @param <T> query result type
* @return query runners for this task
*/
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
* Returns an extra classpath that should be prepended to the default classpath when running this task. If no
* extra classpath should be prepended, this should return null or the empty string.
*/
public String getClasspathPrefix();
/**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
@ -134,7 +122,7 @@ public interface Task
*
* @return Some kind of finished status (isRunnable must be false).
*
* @throws Exception if this task failed
* @throws Exception
*/
public TaskStatus run(TaskToolbox toolbox) throws Exception;
}

View File

@ -43,8 +43,6 @@ public class TaskResource
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
* task ID.
*
* @return task availability group
*/
@JsonProperty
public String getAvailabilityGroup()
@ -54,7 +52,7 @@ public class TaskResource
/**
* @return the number of worker slots this task will take
* Returns the number of worker slots this task will take.
*/
@JsonProperty
public int getRequiredCapacity()

View File

@ -23,71 +23,57 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class DbTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private final DbConnector dbConnector;
private final DbTablesConfig dbTables;
private final IDBI dbi;
private final MetadataDbConnector metadataDbConnector;
private final MetadataTablesConfig dbTables;
private final TaskStorageConfig config;
private final MetadataActionHandler handler;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
public DbTaskStorage(
final ObjectMapper jsonMapper,
final DbConnector dbConnector,
final DbTablesConfig dbTables,
final IDBI dbi,
final TaskStorageConfig config
final MetadataDbConnector metadataDbConnector,
final MetadataTablesConfig dbTables,
final TaskStorageConfig config,
final MetadataActionHandler handler
)
{
this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector;
this.metadataDbConnector = metadataDbConnector;
this.dbTables = dbTables;
this.dbi = dbi;
this.config = config;
this.handler = handler;
}
@LifecycleStart
public void start()
{
dbConnector.createTaskTables();
metadataDbConnector.createTaskTables();
}
@LifecycleStop
@ -111,39 +97,22 @@ public class DbTaskStorage implements TaskStorage
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
retryingHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbTables.getTasksTable()
)
)
.bind("id", task.getId())
.bind("created_date", new DateTime().toString())
.bind("datasource", task.getDataSource())
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
}
handler.insert(
dbTables.getTasksTable(),
task.getId(),
new DateTime().toString(),
task.getDataSource(),
jsonMapper.writeValueAsBytes(task),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
);
}
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
final boolean isStatementException = handler.isStatementException(e);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
} else {
throw e;
throw Throwables.propagate(e);
}
}
}
@ -155,169 +124,103 @@ public class DbTaskStorage implements TaskStorage
log.info("Updating task %s to status: %s", status.getId(), status);
int updated = retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbTables.getTasksTable()
)
)
.bind("id", status.getId())
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
}
}
);
if (updated != 1) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
try {
int updated = handler.setStatus(
dbTables.getTasksTable(),
status.getId(),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
);
if (updated != 1) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public Optional<Task> getTask(final String taskid)
{
return retryingHandle(
new HandleCallback<Optional<Task>>()
{
@Override
public Optional<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
dbTables.getTasksTable()
)
)
.bind("id", taskid)
.list();
if (dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
}
}
}
);
try {
final List<Map<String, Object>> dbTasks = handler.getTask(dbTables.getTasksTable(), taskid);
if (dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public Optional<TaskStatus> getStatus(final String taskid)
{
return retryingHandle(
new HandleCallback<Optional<TaskStatus>>()
{
@Override
public Optional<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbStatuses =
handle.createQuery(
String.format(
"SELECT status_payload FROM %s WHERE id = :id",
dbTables.getTasksTable()
)
)
.bind("id", taskid)
.list();
if (dbStatuses.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
}
}
}
);
try {
final List<Map<String, Object>> dbStatuses = handler.getTaskStatus(dbTables.getTasksTable(), taskid);
if (dbStatuses.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public List<Task> getActiveTasks()
{
return retryingHandle(
new HandleCallback<List<Task>>()
{
@Override
public List<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
dbTables.getTasksTable()
)
)
.list();
final List<Map<String, Object>> dbTasks = handler.getActiveTasks(dbTables.getTasksTable());
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
try {
final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
if (status.isRunnable()) {
tasks.add(task);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
return tasks.build();
}
}
);
return tasks.build();
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{
@Override
public List<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
dbTables.getTasksTable()
)
).bind("recent", recent.toString()).list();
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
final List<Map<String, Object>> dbTasks = handler.getRecentlyFinishedTaskStatuses(dbTables.getTasksTable(), recent.toString());
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
@Override
@ -333,24 +236,14 @@ public class DbTaskStorage implements TaskStorage
taskid
);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)
.bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
.execute();
}
}
);
try {
handler.addLock(dbTables.getTaskLockTable(), taskid, jsonMapper.writeValueAsBytes(taskLock));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
@ -367,24 +260,7 @@ public class DbTaskStorage implements TaskStorage
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
dbTables.getTaskLockTable()
)
)
.bind("id", id)
.execute();
}
}
);
handler.removeLock(dbTables.getTaskLockTable(), id);
}
}
}
@ -395,13 +271,13 @@ public class DbTaskStorage implements TaskStorage
return ImmutableList.copyOf(
Iterables.transform(
getLocksWithIds(taskid).entrySet(), new Function<Map.Entry<Long, TaskLock>, TaskLock>()
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
)
);
}
@ -413,140 +289,52 @@ public class DbTaskStorage implements TaskStorage
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
dbTables.getTaskLogTable()
)
)
.bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
);
}
@Override
public List<TaskAction> getAuditLogs(final String taskid)
{
return retryingHandle(
new HandleCallback<List<TaskAction>>()
{
@Override
public List<TaskAction> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTaskLogs =
handle.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
dbTables.getTaskLogTable()
)
)
.bind("task_id", taskid)
.list();
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
return retList;
}
}
);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return retryingHandle(
new HandleCallback<Map<Long, TaskLock>>()
{
@Override
public Map<Long, TaskLock> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTaskLocks =
handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)
.list();
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
}
}
);
}
/**
* Retry SQL operations
*/
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return dbi.withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
handler.addAuditLog(dbTables.getTaskLogTable(), task.getId(), jsonMapper.writeValueAsBytes(taskAction));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private static boolean shouldRetryException(final Throwable e)
@Override
public List<TaskAction> getAuditLogs(final String taskid)
{
return e != null && (e instanceof SQLTransientException
|| e instanceof MySQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317)
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
final List<Map<String, Object>> dbTaskLogs = handler.getTaskLogs(dbTables.getTaskLogTable(), taskid);
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
return retList;
}
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
final List<Map<String, Object>> dbTaskLocks = handler.getTaskLocks(dbTables.getTaskLockTable(), taskid);
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
}
}

View File

@ -161,19 +161,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
final List<String> command = Lists.newArrayList();
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join(
task.getClasspathPrefix(),
config.getClasspath()
);
} else {
taskClasspath = config.getClasspath();
}
command.add(config.getJavaCommand());
command.add("-cp");
command.add(taskClasspath);
command.add(config.getClasspath());
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
@ -459,4 +450,4 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
closer.register(process.getOutputStream());
}
}
}
}

View File

@ -31,7 +31,7 @@ import io.druid.tasklogs.TaskLogPusher;
import java.util.Properties;
/**
*/
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
{
private final ForkingTaskRunnerConfig config;
@ -66,4 +66,4 @@ public class ForkingTaskRunnerFactory implements TaskRunnerFactory
{
return new ForkingTaskRunner(config, taskConfig, workerConfig, props, persistentTaskLogs, jsonMapper, node);
}
}
}

View File

@ -88,13 +88,13 @@ import java.util.concurrent.TimeUnit;
* creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
*
* <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link io.druid.indexing.overlord.scaling.ResourceManagementScheduler} can take care of these duties.
*
* <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
*
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
@ -848,4 +848,4 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
// Notify interested parties
taskRunnerWorkItem.setResult(taskStatus);
}
}
}

View File

@ -31,7 +31,7 @@ import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
/**
*/
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
{
private final CuratorFramework curator;
@ -74,4 +74,4 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
httpClient
);
}
}
}

View File

@ -161,12 +161,6 @@ public class TaskLockbox
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
* cannot be acquired.
*
* @param task task to acquire lock for
* @param interval interval to lock
* @return acquired TaskLock
*
* @throws java.lang.InterruptedException if the lock cannot be acquired
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
@ -302,7 +296,6 @@ public class TaskLockbox
* Return the currently-active locks for some task.
*
* @param task task for which to locate locks
* @return currently-active locks for the given task
*/
public List<TaskLock> findLocksForTask(final Task task)
{

View File

@ -287,4 +287,4 @@ public class TaskMaster
return Optional.absent();
}
}
}
}

View File

@ -56,11 +56,11 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Interface between task producers and the task runner.
*
* <p/>
* This object accepts tasks from producers using {@link #add} and manages delivery of these tasks to a
* {@link TaskRunner}. Tasks will run in a mostly-FIFO order, with deviations when the natural next task is not ready
* in time (based on its {@link Task#isReady} method).
*
* <p/>
* For persistence, we save all new tasks and task status changes using a {@link TaskStorage} object.
*/
public class TaskQueue
@ -292,7 +292,6 @@ public class TaskQueue
* @param task task to add
*
* @return true
* @throws TaskExistsException if the task already exists
*/
public boolean add(final Task task) throws TaskExistsException
{
@ -514,4 +513,4 @@ public class TaskQueue
giant.unlock();
}
}
}
}

View File

@ -42,8 +42,6 @@ public interface TaskRunner
/**
* Inform the task runner it can clean up any resources associated with a task. This implies shutdown of any
* currently-running tasks.
*
* @param taskid task ID to clean up resources for
*/
public void shutdown(String taskid);
@ -54,4 +52,4 @@ public interface TaskRunner
public Collection<? extends TaskRunnerWorkItem> getKnownTasks();
public Collection<ZkWorker> getWorkers();
}
}

View File

@ -32,33 +32,24 @@ public interface TaskStorage
/**
* Adds a task to the storage facility with a particular status.
*
* @param task task to add
* @param status task status
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
*/
public void insert(Task task, TaskStatus status) throws TaskExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle
* is not respected (absent -&gt; RUNNING -&gt; SUCCESS/FAILURE).
*
* @param status task status
* is not respected (absent -> RUNNING -> SUCCESS/FAILURE).
*/
public void setStatus(TaskStatus status);
/**
* Persists lock state in the storage facility.
* @param taskid task ID
* @param taskLock lock state
*/
public void addLock(String taskid, TaskLock taskLock);
/**
* Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but
* this method can help reclaim wasted space.
*
* @param taskid task ID
* @param taskLock lock state
*/
public void removeLock(String taskid, TaskLock taskLock);
@ -67,44 +58,28 @@ public interface TaskStorage
* absentee Optional.
*
* NOTE: This method really feels like it should be combined with {@link #getStatus}. Expect that in the future.
*
* @param taskid task ID
* @return optional task
*/
public Optional<Task> getTask(String taskid);
/**
* Returns task status as stored in the storage facility. If the task ID does not exist, this will return
* an absentee Optional.
*
* @param taskid task ID
* @return task status
*/
public Optional<TaskStatus> getStatus(String taskid);
/**
* Add an action taken by a task to the audit log.
*
* @param task task to record action for
* @param taskAction task action to record
*
* @param <T> task action return type
*/
public <T> void addAuditLog(Task task, TaskAction<T> taskAction);
/**
* Returns all actions taken by a task.
*
* @param taskid task ID
* @return list of task actions
*/
public List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently running or pending tasks as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in ascending order of creation.
*
* @return list of active tasks
*/
public List<Task> getActiveTasks();
@ -112,16 +87,11 @@ public interface TaskStorage
* Returns a list of recently finished task statuses as stored in the storage facility. No particular order
* is guaranteed, but implementations are encouraged to return tasks in descending order of creation. No particular
* standard of "recent" is guaranteed, and in fact, this method is permitted to simply return nothing.
*
* @return list of recently finished tasks
*/
public List<TaskStatus> getRecentlyFinishedTaskStatuses();
/**
* Returns a list of locks for a particular task.
*
* @param taskid task ID
* @return list of TaskLocks for the given task
*/
public List<TaskLock> getLocks(String taskid);
}

View File

@ -19,7 +19,9 @@
package io.druid.indexing.overlord;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.indexing.common.TaskStatus;
@ -28,6 +30,7 @@ import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;
@ -70,9 +73,6 @@ public class TaskStorageQueryAdapter
* This method is useful when you want to figure out all of the things a single task spawned. It does pose issues
* with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your
* own risk and know that at some point, we might adjust this to actually enforce some sort of limits.
*
* @param taskid task ID
* @return set of segments created by the specified task
*/
public Set<DataSegment> getInsertedSegments(final String taskid)
{

View File

@ -86,19 +86,19 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
{
@Override
public void onSuccess(TaskStatus result)
{
runningItems.remove(taskRunnerWorkItem);
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
@Override
public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
);
return statusFuture;
@ -252,4 +252,4 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
}
}
}
}
}

View File

@ -523,4 +523,4 @@ public class OverlordResource
return data;
}
}
}
}

View File

@ -25,7 +25,6 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.granularity.PeriodGranularity;
import io.druid.indexing.overlord.RemoteTaskRunner;
import io.druid.indexing.overlord.TaskRunner;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
@ -34,7 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
/**
* The ResourceManagementScheduler schedules a check for when worker nodes should potentially be created or destroyed.
* It uses a {@link TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
* It uses a {@link io.druid.indexing.overlord.RemoteTaskRunner.TaskRunner} to return all pending tasks in the system and the status of the worker nodes in
* the system.
* The ResourceManagementScheduler does not contain the logic to decide whether provision or termination should actually
* occur. That decision is made in the {@link ResourceManagementStrategy}.

View File

@ -35,9 +35,6 @@ public interface EC2UserData<T extends EC2UserData>
/**
* Return a copy of this instance with a different worker version. If no changes are needed (possibly because the
* user data does not depend on the worker version) then it is OK to return "this".
*
* @param version worker version
* @return instance with the specified version
*/
public EC2UserData<T> withVersion(String version);

View File

@ -41,7 +41,7 @@ import java.util.concurrent.ExecutorService;
/**
* The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be
* created that waits for new tasks. Tasks are executed as soon as they are seen.
*
* <p/>
* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for
* realtime index tasks.
*/
@ -197,4 +197,4 @@ public class WorkerTaskMonitor
.emit();
}
}
}
}

View File

@ -170,4 +170,4 @@ public class ExecutorLifecycle
{
parentMonitorExec.shutdown();
}
}
}

View File

@ -54,7 +54,6 @@ public class TestRealtimeTask extends RealtimeIndexTask
null,
1,
null,
null,
null
);
this.status = status;

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
/**

View File

@ -19,6 +19,7 @@
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -206,7 +207,6 @@ public class TaskSerdeTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
@ -426,8 +426,7 @@ public class TaskSerdeTest
null
),
null,
null,
"blah"
null
);
final String json = jsonMapper.writeValueAsString(task);
@ -442,7 +441,5 @@ public class TaskSerdeTest
task.getSpec().getTuningConfig().getJobProperties(),
task2.getSpec().getTuningConfig().getJobProperties()
);
Assert.assertEquals("blah", task.getClasspathPrefix());
Assert.assertEquals("blah", task2.getClasspathPrefix());
}
}

View File

@ -42,13 +42,15 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TestUtils;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentInsertAction;
@ -65,7 +67,6 @@ import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
@ -100,7 +101,7 @@ public class TaskLifecycleTest
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private MockIndexerMetadataCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
TaskStorageQueryAdapter tsqa = null;
@ -509,12 +510,12 @@ public class TaskLifecycleTest
return retVal;
}
private static class MockIndexerDBCoordinator extends IndexerDBCoordinator
private static class MockIndexerMetadataCoordinator extends IndexerSQLMetadataCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private MockIndexerDBCoordinator()
private MockIndexerMetadataCoordinator()
{
super(null, null, null);
}
@ -561,9 +562,9 @@ public class TaskLifecycleTest
}
}
private static MockIndexerDBCoordinator newMockMDC()
private static MockIndexerMetadataCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
return new MockIndexerMetadataCoordinator();
}
private static ServiceEmitter newMockEmitter()

View File

@ -19,6 +19,7 @@
package io.druid.indexing.overlord.scaling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

View File

@ -49,7 +49,6 @@ public class TaskAnnouncementTest
new Period("PT10M"),
1,
Granularity.HOUR,
null,
null
);
final TaskStatus status = TaskStatus.running(task.getId());

93
jdbc-storage/pom.xml Normal file
View File

@ -0,0 +1,93 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-jdbc-storage</artifactId>
<name>druid-jdbc-storage</name>
<description>druid-jdbc-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.152-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.7.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,179 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.jdbc.mysql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
import java.util.Map;
public class MySQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(MySQLConnector.class);
private final DBI dbi;
@Inject
public MySQLConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables)
{
super(config, dbTables);
this.dbi = new DBI(getDatasource());
}
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, "
+ "start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
tableName
)
);
}
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
tableName
)
);
}
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
tableName
)
);
}
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
tableName
)
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
{
return String.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
}
public DBI getDBI() { return dbi; }
}

View File

@ -0,0 +1,139 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.jdbc.mysql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.DerbyConnector;
import io.druid.db.SQLMetadataActionHandler;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexer.MetadataUpdaterJobHandler;
import io.druid.indexer.SQLMetadataUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.indexing.overlord.MetadataActionHandler;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
public class MySQLStorageDruidModule implements DruidModule
{
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindDataBaseMySQL(binder);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataDbConnectorConfig.class);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataDbConnector.class), Key.get(DerbyConnector.class)
);
}
private static void bindDataBaseMySQL(Binder binder)
{
PolyBind.optionBinder(binder, Key.get(MetadataDbConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("mysql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataActionHandler.class))
.addBinding("mysql")
.to(SQLMetadataActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataCoordinator.class))
.addBinding("mysql")
.to(IndexerSQLMetadataCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataUpdaterJobHandler.class))
.addBinding("mysql")
.to(SQLMetadataUpdaterJobHandler.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final MySQLConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -0,0 +1,186 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.jdbc.postgresql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.db.MetadataTablesConfig;
import io.druid.db.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.List;
import java.util.Map;
public class PostgreSQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(PostgreSQLConnector.class);
private final DBI dbi;
@Inject
public PostgreSQLConnector(Supplier<MetadataDbConnectorConfig> config, Supplier<MetadataTablesConfig> dbTables)
{
super(config, dbTables);
this.dbi = new DBI(getDatasource());
}
public void createTable(final IDBI dbi, final String tableName, final String sql)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public void createSegmentTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, "
+ "start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, "
+ "used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);",
tableName
)
);
}
public void createRulesTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);",
tableName
)
);
}
public void createConfigTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))",
tableName
)
);
}
public void createTaskTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);",
tableName
)
);
}
public void createTaskLogTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
public void createTaskLockTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
String.format(
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);",
tableName
)
);
}
public String insertOrUpdateStatement(final String tableName, final String keyColumn, final String valueColumn)
{
return String.format(
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;",
tableName, keyColumn, valueColumn
);
}
public DBI getDBI() { return dbi; }
}

View File

@ -0,0 +1,122 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.jdbc.postgresql;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataCoordinator;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataDbConnector;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.DerbyConnector;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexing.overlord.IndexerMetadataCoordinator;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.DbSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
public class PostgreSQLStorageDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindDataBasePostgreSQL(binder);
// TODO: Change default to Derby
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataDbConnector.class), Key.get(DerbyConnector.class)
);
}
private static void bindDataBasePostgreSQL(Binder binder)
{
PolyBind.optionBinder(binder, Key.get(MetadataDbConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(DbSegmentPublisher.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
// TODO: Bind DbActionHandler
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
// TODO: Bind DbUpdaterJobHandler
PolyBind.optionBinder(binder, Key.get(IndexerMetadataCoordinator.class))
.addBinding("postgresql")
.to(IndexerSQLMetadataCoordinator.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final PostgreSQLConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -0,0 +1,2 @@
io.druid.storage.jdbc.mysql.MySQLStorageDruidModule
postgresql.PostgreSQLStorageDruidModule

View File

@ -41,7 +41,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.6</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.8</druid.api.version>
<druid.api.version>0.2.7</druid.api.version>
</properties>
<modules>
@ -56,6 +56,7 @@
<module>cassandra-storage</module>
<module>hdfs-storage</module>
<module>s3-extensions</module>
<module>jdbc-storage</module>
<module>kafka-seven</module>
<module>kafka-eight</module>
<module>rabbitmq</module>
@ -568,9 +569,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5</version>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
</configuration>
</plugin>
</plugins>
</pluginManagement>

View File

@ -70,14 +70,10 @@ public class DurationGranularity extends BaseQueryGranularity
}
@Override
public long truncate(final long t)
public long truncate(long t)
{
final long duration = getDurationMillis();
long offset = t % duration - origin % duration;
if(offset < 0) {
offset += duration;
}
return t - offset;
return t - t % duration + origin;
}
@Override

View File

@ -301,17 +301,14 @@ public class PeriodGranularity extends BaseQueryGranularity
return current;
}
private long truncateMillisPeriod(final long t)
private long truncateMillisPeriod(long t)
{
// toStandardDuration assumes days are always 24h, and hours are always 60 minutes,
// which may not always be the case, e.g if there are daylight saving changes.
if (chronology.days().isPrecise() && chronology.hours().isPrecise()) {
if(chronology.days().isPrecise() && chronology.hours().isPrecise()) {
final long millis = period.toStandardDuration().getMillis();
long offset = t % millis - origin % millis;
if(offset < 0) {
offset += millis;
}
return t - offset;
t -= t % millis + origin % millis;
return t;
}
else
{

View File

@ -23,6 +23,8 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.metamx.common.config.Config;
import io.druid.guice.JsonConfigurator;
import io.druid.guice.LazySingleton;
import org.skife.config.ConfigurationObjectFactory;
import javax.validation.Validation;

View File

@ -24,6 +24,13 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import io.druid.guice.ConfigModule;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.DruidSecondaryModule;
import io.druid.guice.ExtensionsConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.PropertiesModule;
import io.druid.jackson.JacksonModule;
import java.util.List;

View File

@ -102,7 +102,6 @@ public abstract class BaseQuery<T> implements Query<T>
return duration;
}
@Override
@JsonProperty
public Map<String, Object> getContext()
{

View File

@ -20,8 +20,12 @@
package io.druid.query;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.Yielders;
import com.metamx.common.guava.YieldingAccumulator;
import org.joda.time.DateTime;
import java.util.Arrays;

View File

@ -47,14 +47,14 @@ import java.util.concurrent.TimeoutException;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
*
* <p/>
* When using this, it is important to make sure that the list of QueryRunners provided is fully flattened.
* If, for example, you were to pass a list of a Chained QueryRunner (A) and a non-chained QueryRunner (B). Imagine
* A has 2 QueryRunner chained together (Aa and Ab), the fact that the Queryables are run in parallel on an
* executor would mean that the Queryables are actually processed in the order
*
* <pre>A -&gt; B -&gt; Aa -&gt; Ab</pre>
*
* <p/>
* A -> B -> Aa -> Ab
* <p/>
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
* must be fully cached in memory before the results for Aa and Ab are computed.
*/

View File

@ -42,6 +42,6 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
@Config(value = "${base_path}.columnCache.sizeBytes")
public int columnCacheSizeBytes()
{
return 0;
return 1024 * 1024;
}
}

View File

@ -59,9 +59,9 @@ public class Druids
/**
* A Builder for AndDimFilter.
*
* <p/>
* Required: fields() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* AndDimFilter andDimFilter = Druids.newAndDimFilterBuilder()
@ -105,9 +105,9 @@ public class Druids
/**
* A Builder for OrDimFilter.
*
* <p/>
* Required: fields() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* OrDimFilter orDimFilter = Druids.newOrDimFilterBuilder()
@ -160,9 +160,9 @@ public class Druids
/**
* A Builder for NotDimFilter.
*
* <p/>
* Required: field() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* NotDimFilter notDimFilter = Druids.newNotDimFilterBuilder()
@ -206,9 +206,9 @@ public class Druids
/**
* A Builder for SelectorDimFilter.
*
* <p/>
* Required: dimension() and value() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* Selector selDimFilter = Druids.newSelectorDimFilterBuilder()
@ -285,10 +285,10 @@ public class Druids
/**
* A Builder for TimeseriesQuery.
*
* <p/>
* Required: dataSource(), intervals(), and aggregators() must be called before build()
* Optional: filters(), granularity(), postAggregators(), and context() can be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
@ -483,11 +483,11 @@ public class Druids
/**
* A Builder for SearchQuery.
*
* <p/>
* Required: dataSource(), intervals(), dimensions() and query() must be called before build()
*
* <p/>
* Optional: filters(), granularity(), and context() can be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SearchQuery query = Druids.newSearchQueryBuilder()
@ -678,9 +678,9 @@ public class Druids
/**
* A Builder for TimeBoundaryQuery.
*
* <p/>
* Required: dataSource() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* TimeBoundaryQuery query = new MaxTimeQueryBuilder()
@ -774,12 +774,12 @@ public class Druids
/**
* A Builder for Result.
*
* <p/>
* Required: timestamp() and value() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* Result&lt;T&gt; result = Druids.newResultBuilder()
* Result<T> result = Druids.newResultBuilder()
* .timestamp(egDateTime)
* .value(egValue)
* .build();
@ -840,9 +840,9 @@ public class Druids
/**
* A Builder for SegmentMetadataQuery.
*
* <p/>
* Required: dataSource(), intervals() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SegmentMetadataQuery query = new SegmentMetadataQueryBuilder()
@ -948,9 +948,9 @@ public class Druids
/**
* A Builder for SelectQuery.
*
* <p/>
* Required: dataSource(), intervals() must be called before build()
*
* <p/>
* Usage example:
* <pre><code>
* SelectQuery query = new SelectQueryBuilder()

View File

@ -22,12 +22,13 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.MetricManipulatorFns;
import javax.annotation.Nullable;
/**
*/
public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@ -72,14 +73,9 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public T apply(T input)
public T apply(@Nullable T input)
{
Result<BySegmentResultValueClass<T>> result = (Result<BySegmentResultValueClass<T>>) input;
if (input == null) {
throw new ISE("Cannot have a null result!");
}
BySegmentResultValueClass<T> resultsClass = result.getValue();
return (T) new Result<BySegmentResultValueClass>(

View File

@ -21,6 +21,7 @@
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("table")

View File

@ -33,13 +33,10 @@ import java.io.IOException;
*/
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private static final String DEFAULT_METRIC_NAME = "query/time";
private final ServiceEmitter emitter;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryRunner<T> queryRunner;
private final long creationTime;
private final String metricName;
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
@ -47,38 +44,25 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
QueryRunner<T> queryRunner
)
{
this(emitter, builderFn, queryRunner, DEFAULT_METRIC_NAME);
this(emitter, builderFn, queryRunner, -1);
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
long creationTime,
String metricName
long creationTime
)
{
this.emitter = emitter;
this.builderFn = builderFn;
this.queryRunner = queryRunner;
this.creationTime = creationTime;
this.metricName = metricName;
}
public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryRunner<T> queryRunner,
String metricName
)
{
this(emitter, builderFn, queryRunner, -1, metricName);
}
public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
{
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis(), metricName);
return new MetricsEmittingQueryRunner<T>(emitter, builderFn, queryRunner, System.currentTimeMillis());
}
@Override
@ -113,9 +97,9 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
finally {
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build(metricName, timeTaken));
emitter.emit(builder.build("query/time", timeTaken));
if (creationTime > 0) {
if(creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
}
@ -189,13 +173,12 @@ public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
}
long timeTaken = System.currentTimeMillis() - startTime;
emitter.emit(builder.build(metricName, timeTaken));
emitter.emit(builder.build("query/time", timeTaken));
if (creationTime > 0) {
emitter.emit(builder.build("query/wait", startTime - creationTime));
}
}
finally {
} finally {
yielder.close();
}
}

View File

@ -70,8 +70,6 @@ public interface Query<T>
public Duration getDuration();
public Map<String, Object> getContext();
public <ContextType> ContextType getContextValue(String key);
public <ContextType> ContextType getContextValue(String key, ContextType defaultValue);

View File

@ -22,6 +22,7 @@ package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;

View File

@ -29,8 +29,6 @@ public interface QuerySegmentWalker
* Gets the Queryable for a given interval, the Queryable returned can be any version(s) or partitionNumber(s)
* such that it represents the interval.
*
* @param <T> query result type
* @param query the query to find a Queryable for
* @param intervals the intervals to find a Queryable for
* @return a Queryable object that represents the interval
*/
@ -38,10 +36,8 @@ public interface QuerySegmentWalker
/**
* Gets the Queryable for a given list of SegmentSpecs.
* exist.
*
* @param <T> the query result type
* @param query the query to return a Queryable for
* @param specs the list of SegmentSpecs to find a Queryable for
* @return the Queryable object with the given SegmentSpecs
*/
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs);

View File

@ -40,15 +40,12 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* This method doesn't belong here, but it's here for now just to make it work.
*
* @param seqOfSequences sequence of sequences to be merged
* @param seqOfSequences
*
* @return the sequence of merged results
* @return
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(

View File

@ -23,6 +23,7 @@ package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.List;

View File

@ -1,158 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedRow;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.Arrays;
/**
* TimewarpOperator is an example post-processing operator that maps current time
* to the latest period ending withing the specified data interval and truncates
* the query interval to discard data that would be mapped to the future.
*
*/
public class TimewarpOperator<T> implements PostProcessingOperator<T>
{
private final Interval dataInterval;
private final long periodMillis;
private final long originMillis;
/**
*
* @param dataInterval interval containing the actual data
* @param period time will be offset by a multiple of the given period
* until there is at least a full period ending within the data interval
* @param origin origin to be used to align time periods
* (e.g. to determine on what day of the week a weekly period starts)
*/
@JsonCreator
public TimewarpOperator(
@JsonProperty("dataInterval") Interval dataInterval,
@JsonProperty("period") Period period,
@JsonProperty("origin") DateTime origin
)
{
this.originMillis = origin.getMillis();
this.dataInterval = dataInterval;
// this will fail for periods that do not map to millis (e.g. P1M)
this.periodMillis = period.toStandardDuration().getMillis();
}
@Override
public QueryRunner<T> postProcess(QueryRunner<T> baseQueryRunner)
{
return postProcess(baseQueryRunner, DateTime.now().getMillis());
}
public QueryRunner<T> postProcess(final QueryRunner<T> baseRunner, final long now)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query)
{
final long offset = computeOffset(now);
final Interval interval = query.getIntervals().get(0);
final Interval modifiedInterval = new Interval(
interval.getStartMillis() + offset,
Math.min(interval.getEndMillis() + offset, now + offset)
);
return Sequences.map(
baseRunner.run(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Arrays.asList(modifiedInterval)))
),
new Function<T, T>()
{
@Override
public T apply(T input)
{
if (input instanceof Result) {
Result res = (Result) input;
Object value = res.getValue();
if (value instanceof TimeBoundaryResultValue) {
TimeBoundaryResultValue boundary = (TimeBoundaryResultValue) value;
DateTime minTime = null;
try{
minTime = boundary.getMinTime();
} catch(IllegalArgumentException e) {}
final DateTime maxTime = boundary.getMaxTime();
return (T) ((TimeBoundaryQuery) query).buildResult(
new DateTime(Math.min(res.getTimestamp().getMillis() - offset, now)),
minTime != null ? minTime.minus(offset) : null,
maxTime != null ? new DateTime(Math.min(maxTime.getMillis() - offset, now)) : null
).iterator().next();
}
return (T) new Result(res.getTimestamp().minus(offset), value);
} else if (input instanceof MapBasedRow) {
MapBasedRow row = (MapBasedRow) input;
return (T) new MapBasedRow(row.getTimestamp().minus(offset), row.getEvent());
}
// default to noop for unknown result types
return input;
}
}
);
}
};
}
/**
* Map time t into the last `period` ending within `dataInterval`
*
* @param t the current time to be mapped into `dataInterval`
* @return the offset between the mapped time and time t
*/
protected long computeOffset(final long t)
{
// start is the beginning of the last period ending within dataInterval
long start = dataInterval.getEndMillis() - periodMillis;
long startOffset = start % periodMillis - originMillis % periodMillis;
if(startOffset < 0) {
startOffset += periodMillis;
};
start -= startOffset;
// tOffset is the offset time t within the last period
long tOffset = t % periodMillis - originMillis % periodMillis;
if(tOffset < 0) {
tOffset += periodMillis;
}
tOffset += start;
return tOffset - t;
}
}

View File

@ -26,6 +26,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import java.util.ArrayList;
public class UnionQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
@ -45,7 +47,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
DataSource dataSource = query.getDataSource();
if (dataSource instanceof UnionDataSource) {
return toolChest.mergeSequencesUnordered(
return toolChest.mergeSequences(
Sequences.simple(
Lists.transform(
((UnionDataSource) dataSource).getDataSources(),

View File

@ -26,9 +26,9 @@ import java.util.List;
/**
* Processing related interface
*
* <p/>
* An AggregatorFactory is an object that knows how to generate an Aggregator using a ColumnSelectorFactory.
*
* <p/>
* This is useful as an abstraction to allow Aggregator classes to be written in terms of MetricSelector objects
* without making any assumptions about how they are pulling values out of the base data. That is, the data is
* provided to the Aggregator through the MetricSelector object, so whatever creates that object gets to choose how

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import java.util.HashSet;
@ -36,8 +37,6 @@ public class AggregatorUtil
* such that all the dependencies of any given aggregator should occur before that aggregator.
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
* @param postAggName name of the postAgg on which dependency is to be calculated
*
* @return the list of dependent postAggregators
*/
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
{

View File

@ -136,8 +136,6 @@ public class Histogram
* Returns a visual representation of a histogram object.
* Initially returns an array of just the min. and max. values
* but can also support the addition of quantiles.
*
* @return a visual representation of this histogram
*/
public HistogramVisual asVisual() {
float[] visualCounts = new float[bins.length - 2];

View File

@ -32,6 +32,8 @@ import org.mozilla.javascript.Context;
import org.mozilla.javascript.ContextAction;
import org.mozilla.javascript.ContextFactory;
import org.mozilla.javascript.Function;
import org.mozilla.javascript.NativeArray;
import org.mozilla.javascript.Scriptable;
import org.mozilla.javascript.ScriptableObject;
import javax.annotation.Nullable;
@ -39,6 +41,7 @@ import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;

View File

@ -39,6 +39,7 @@ import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -232,40 +233,6 @@ public class CardinalityAggregatorFactory implements AggregatorFactory
return HyperLogLogCollector.makeLatestCollector();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
if (byRow != that.byRow) {
return false;
}
if (fieldNames != null ? !fieldNames.equals(that.fieldNames) : that.fieldNames != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (fieldNames != null ? fieldNames.hashCode() : 0);
result = 31 * result + (byRow ? 1 : 0);
return result;
}
@Override
public String toString()
{

View File

@ -28,20 +28,18 @@ import java.nio.ByteBuffer;
/**
* Implements the HyperLogLog cardinality estimator described in:
*
* <p/>
* http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
*
* <p/>
* Run this code to see a simple indication of expected errors based on different m values:
*
* <code>
* for (int i = 1; i &lt; 20; ++i) {
* System.out.printf("i[%,d], val[%,d] =&gt; error[%f%%]%n", i, 2 &lt;&lt; i, 104 / Math.sqrt(2 &lt;&lt; i));
* }
* </code>
*
* <p/>
* for (int i = 1; i < 20; ++i) {
* System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i));
* }
* <p/>
* This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that
* only one thread is ever calling methods on it.
*
* <p/>
* If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior
*/
public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector>
@ -197,13 +195,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return applyCorrection(e, zeroCount);
}
/**
* Checks if the payload for the given ByteBuffer is sparse or not.
* The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse
*
* @param buffer
* @return
*/
private static boolean isSparse(ByteBuffer buffer)
{
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
@ -504,32 +495,13 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return false;
}
ByteBuffer otherBuffer = ((HyperLogLogCollector) o).storageBuffer;
HyperLogLogCollector collector = (HyperLogLogCollector) o;
if (storageBuffer != null ? false : otherBuffer != null) {
if (storageBuffer != null ? !storageBuffer.equals(collector.storageBuffer) : collector.storageBuffer != null) {
return false;
}
if(storageBuffer == null && otherBuffer == null) {
return true;
}
final ByteBuffer denseStorageBuffer;
if(storageBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer);
denseCollector.convertToDenseStorage();
denseStorageBuffer = denseCollector.storageBuffer;
} else {
denseStorageBuffer = storageBuffer;
}
if(otherBuffer.remaining() != getNumBytesForDenseStorage()) {
HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer);
otherCollector.convertToDenseStorage();
otherBuffer = otherCollector.storageBuffer;
}
return denseStorageBuffer.equals(otherBuffer);
return true;
}
@Override

View File

@ -127,7 +127,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery;
try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery().withOverriddenContext(query.getContext());
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
@ -193,18 +193,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
}
@Override
public Sequence<Row> mergeSequencesUnordered(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
}
private Ordering<Row> getOrdering()
{
return Ordering.<Row>natural().nullsFirst();
return new OrderedMergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
}
@Override
@ -268,8 +257,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
return new CacheStrategy<Row, Object, GroupByQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
@Override
public byte[] computeCacheKey(GroupByQuery query)
{
@ -355,26 +342,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Map<String, Object> event = jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
);
while (aggsIter.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
Object agg = event.get(factory.getName());
if (agg != null) {
event.put(factory.getName(), factory.deserialize(agg));
}
}
return new MapBasedRow(
timestamp,
event
(Map<String, Object>) jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
)
);
}
};
@ -383,7 +358,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{
return new MergeSequence<>(getOrdering(), seqOfSequences);
return new MergeSequence<>(Ordering.<Row>natural().nullsFirst(), seqOfSequences);
}
};
}

View File

@ -22,6 +22,7 @@ package io.druid.query.groupby;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -29,6 +30,7 @@ import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ExecutorExecutingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
@ -47,6 +49,7 @@ import io.druid.segment.StorageAdapter;
import io.druid.segment.incremental.IncrementalIndex;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

View File

@ -23,12 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Bytes;
import io.druid.data.input.Row;
import io.druid.query.Result;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* The "&gt;" operator in a "having" clause. This is similar to SQL's "having aggregation &gt; value",
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
*/
public class GreaterThanHavingSpec implements HavingSpec

Some files were not shown because too many files have changed in this diff Show More