Merge pull request #847 from metamx/druid-0.7.x

master branch is now Druid 0.7
This commit is contained in:
xvrl 2014-11-12 14:31:05 -08:00
commit f884e80933
443 changed files with 34513 additions and 4536 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
@ -46,9 +46,8 @@
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>

View File

@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;
@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;
private final DbConnector dbConnector;
private final MetadataStorageConnector dbConnector;
private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec;
@ -58,7 +58,7 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;
@Inject
public ConfigManager(DbConnector dbConnector, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
public ConfigManager(MetadataStorageConnector dbConnector, Supplier<MetadataStorageTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.dbConnector = dbConnector;
this.config = config;

View File

@ -21,6 +21,7 @@ package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import io.druid.collections.IntList;
import java.io.IOException;
@ -262,4 +263,9 @@ public class SerializerUtils
return retVal;
}
public int getSerializedStringByteSize(String str)
{
return Ints.BYTES + str.getBytes(UTF8).length;
}
}

View File

@ -1,389 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
*/
public class DbConnector
{
private static final Logger log = new Logger(DbConnector.class);
public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
{
createTable(
dbi,
segmentTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName
),
isPostgreSQL
);
}
public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
{
createTable(
dbi,
ruleTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName
),
isPostgreSQL
);
}
public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
{
createTable(
dbi,
configTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName
),
isPostgreSQL
);
}
public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n"
+ " datasource varchar(255) NOT NULL,\n"
+ " payload bytea NOT NULL,\n"
+ " status_payload bytea NOT NULL,\n"
+ " active SMALLINT NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (id)\n"
+ ");\n" +
"CREATE INDEX ON %1$s(active, created_date);":
"CREATE TABLE `%s` (\n"
+ " `id` varchar(255) NOT NULL,\n"
+ " `created_date` tinytext NOT NULL,\n"
+ " `datasource` varchar(255) NOT NULL,\n"
+ " `payload` longblob NOT NULL,\n"
+ " `status_payload` longblob NOT NULL,\n"
+ " `active` tinyint(1) NOT NULL DEFAULT '0',\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY (active, created_date(100))\n"
+ ")",
taskTableName
),
isPostgreSQL
);
}
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLogsTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " log_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `log_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLogsTableName
),
isPostgreSQL
);
}
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
{
createTable(
dbi,
taskLocksTableName,
String.format(
isPostgreSQL ?
"CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n"
+ " lock_payload bytea,\n"
+ " PRIMARY KEY (id)\n"
+ ");\n"+
"CREATE INDEX ON %1$s(task_id);":
"CREATE TABLE `%s` (\n"
+ " `id` bigint(20) NOT NULL AUTO_INCREMENT,\n"
+ " `task_id` varchar(255) DEFAULT NULL,\n"
+ " `lock_payload` longblob,\n"
+ " PRIMARY KEY (`id`),\n"
+ " KEY `task_id` (`task_id`)\n"
+ ")",
taskLocksTableName
),
isPostgreSQL
);
}
public static void createTable(
final IDBI dbi,
final String tableName,
final String sql,
final boolean isPostgreSQL
)
{
try {
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
List<Map<String, Object>> table;
if ( isPostgreSQL ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
}
if (table.isEmpty()) {
log.info("Creating table[%s]", tableName);
handle.createStatement(sql).execute();
} else {
log.info("Table[%s] existed: [%s]", tableName, table);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws SQLException
{
final String insertOrUpdateStatement = String.format(
isPostgreSQL ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
return dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertOrUpdateStatement)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn, tableName, keyColumn);
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes(valueColumn);
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public static Boolean isPostgreSQL(final IDBI dbi)
{
return dbi.withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return isPostgreSQL(handle);
}
}
);
}
protected static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;
private boolean isPostgreSQL = false;
@Inject
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
{
this.config = config;
this.dbTables = dbTables;
this.dbi = new DBI(getDatasource());
}
public DBI getDBI()
{
return dbi;
}
public boolean isPostgreSQL()
{
return isPostgreSQL;
}
private DataSource getDatasource()
{
DbConnectorConfig connectorConfig = config.get();
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
String uri = connectorConfig.getConnectURI();
isPostgreSQL = uri.startsWith("jdbc:postgresql");
dataSource.setUrl(uri);
if (connectorConfig.isUseValidationQuery()) {
dataSource.setValidationQuery(connectorConfig.getValidationQuery());
dataSource.setTestOnBorrow(true);
}
return dataSource;
}
public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(dbi, dbTables.get().getSegmentsTable(), isPostgreSQL);
}
}
public void createRulesTable()
{
if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getRulesTable(), isPostgreSQL);
}
}
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createConfigTable(dbi, dbTables.get().getConfigTable(), isPostgreSQL);
}
}
public void createTaskTables()
{
if (config.get().isCreateTables()) {
final DbTablesConfig dbTablesConfig = dbTables.get();
createTaskTable(dbi, dbTablesConfig.getTasksTable(), isPostgreSQL);
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable(), isPostgreSQL);
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable(), isPostgreSQL);
}
}
}

View File

@ -71,7 +71,8 @@ public class DruidSecondaryModule implements Module
binder.install(new DruidGuiceExtensions());
binder.bind(Properties.class).toInstance(properties);
binder.bind(ConfigurationObjectFactory.class).toInstance(factory);
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class));
// make objectMapper eager to ensure jackson gets setup with guice injection for JsonConfigurator
binder.bind(ObjectMapper.class).to(Key.get(ObjectMapper.class, Json.class)).asEagerSingleton();
binder.bind(Validator.class).toInstance(validator);
binder.bind(JsonConfigurator.class).toInstance(jsonConfigurator);
}

View File

@ -27,8 +27,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import io.druid.common.config.ConfigManager;
import io.druid.common.config.ConfigManagerConfig;
import io.druid.common.config.JacksonConfigManager;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
/**
*/
@ -43,8 +43,8 @@ public class JacksonConfigManagerModule implements Module
@Provides @ManageLifecycle
public ConfigManager getConfigManager(
final DbConnector dbConnector,
final Supplier<DbTablesConfig> dbTables,
final MetadataStorageConnector dbConnector,
final Supplier<MetadataStorageTablesConfig> dbTables,
final Supplier<ConfigManagerConfig> config,
final Lifecycle lifecycle
)

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.timeline.DataSegment;
import java.util.List;
public interface MetadataStorageUpdaterJobHandler
{
public void publishSegments(String tableName, List<DataSegment> segments, ObjectMapper mapper);
}

View File

@ -17,25 +17,25 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
package io.druid.metadata;
public class TaskExistsException extends Exception
public class EntryExistsException extends Exception
{
private final String taskId;
private final String entryId;
public TaskExistsException(String taskId, Throwable t)
public EntryExistsException(String entryId, Throwable t)
{
super(String.format("Task exists: %s", taskId), t);
this.taskId = taskId;
super(String.format("Entry already exists: %s", entryId), t);
this.entryId = entryId;
}
public TaskExistsException(String taskId)
public EntryExistsException(String entryId)
{
this(taskId, null);
this(entryId, null);
}
public String getTaskId()
public String getEntryId()
{
return taskId;
return entryId;
}
}

View File

@ -0,0 +1,137 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
import com.google.common.base.Optional;
import com.metamx.common.Pair;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{
/**
* Creates a new entry.
*
* @param id entry id
* @param timestamp timestamp this entry was created
* @param dataSource datasource associated with this entry
* @param entry object representing this entry
* @param active active or inactive flag
* @param status status object associated wit this object, can be null
* @throws EntryExistsException
*/
public void insert(
@NotNull String id,
@NotNull DateTime timestamp,
@NotNull String dataSource,
@NotNull EntryType entry,
boolean active,
@Nullable StatusType status
) throws EntryExistsException;
/**
* Sets or updates the status for any active entry with the given id.
* Once an entry has been set inactive, its status cannot be updated anymore.
*
* @param entryId entry id
* @param active active
* @param status status
* @return true if the status was updated, false if the entry did not exist of if the entry was inactive
*/
public boolean setStatus(String entryId, boolean active, StatusType status);
/**
* Retrieves the entry with the given id.
*
* @param entryId entry id
* @return optional entry, absent if the given id does not exist
*/
public Optional<EntryType> getEntry(String entryId);
/**
* Retrieve the status for the entry with the given id.
*
* @param entryId entry id
* @return optional status, absent if entry does not exist or status is not set
*/
public Optional<StatusType> getStatus(String entryId);
/**
* Return all active entries with their respective status
*
* @return list of (entry, status) pairs
*/
public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
/**
* Return all statuses for inactive entries created on or later than the given timestamp
*
* @param timestamp timestamp
* @return list of statuses
*/
public List<StatusType> getInactiveStatusesSince(DateTime timestamp);
/**
* Add a lock to the given entry
*
* @param entryId entry id
* @param lock lock to add
* @return true if the lock was added
*/
public boolean addLock(String entryId, LockType lock);
/**
* Remove the lock with the given lock id.
*
* @param lockId lock id
* @return true if the lock was removed, false if the given lock id did not exist
*/
public boolean removeLock(long lockId);
/**
* Add a log to the entry with the given id.
*
* @param entryId entry id
* @param log log to add
* @return true if the log was added
*/
public boolean addLog(String entryId, LogType log);
/**
* Returns the logs for the entry with the given id.
*
* @param entryId entry id
* @return list of logs
*/
public List<LogType> getLogs(String entryId);
/**
* Returns the locks for the given entry
*
* @param entryId entry id
* @return map of lockId to lock
*/
public Map<Long, LockType> getLocks(String entryId);
}

View File

@ -0,0 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
public interface MetadataStorageActionHandlerFactory
{
public <EntryType, StatusType, LogType, LockType> MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType> create(
final String entryType,
MetadataStorageActionHandlerTypes<EntryType, StatusType, LogType, LockType> payloadTypes
);
}

View File

@ -0,0 +1,30 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
public interface MetadataStorageActionHandlerTypes<EntryType, StatusType, LogType, LockType>
{
public TypeReference<EntryType> getEntryType();
public TypeReference<StatusType> getStatusType();
public TypeReference<LogType> getLogType();
public TypeReference<LockType> getLockType();
}

View File

@ -0,0 +1,49 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
/**
*/
public interface MetadataStorageConnector
{
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception;
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
);
public void createSegmentTable();
public void createRulesTable();
public void createConfigTable();
public void createTaskTables();
}

View File

@ -17,44 +17,52 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
*/
public class DbConnectorConfig
public class MetadataStorageConnectorConfig
{
@JsonProperty
private boolean createTables = true;
@JsonProperty
@NotNull
private String connectURI = null;
private String host = "localhost";
@JsonProperty
private int port = 1527;
@JsonProperty
private String connectURI;
@JsonProperty
@NotNull
private String user = null;
@JsonProperty
@NotNull
private String password = null;
@JsonProperty
private boolean useValidationQuery = false;
@JsonProperty
private String validationQuery = "SELECT 1";
public boolean isCreateTables()
{
return createTables;
}
public String getHost()
{
return host;
}
public int getPort()
{
return port;
}
public String getConnectURI()
{
if (connectURI == null) {
return String.format("jdbc:derby://%s:%s/druid;create=true", host, port);
}
return connectURI;
}
@ -68,25 +76,14 @@ public class DbConnectorConfig
return password;
}
public boolean isUseValidationQuery()
{
return useValidationQuery;
}
public String getValidationQuery() {
return validationQuery;
}
@Override
public String toString()
{
return "DbConnectorConfig{" +
"createTables=" + createTables +
", connectURI='" + connectURI + '\'' +
", connectURI='" + getConnectURI() + '\'' +
", user='" + user + '\'' +
", password=****" +
", useValidationQuery=" + useValidationQuery +
", validationQuery='" + validationQuery + '\'' +
'}';
}
}

View File

@ -17,21 +17,30 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import java.util.Map;
/**
*/
public class DbTablesConfig
public class MetadataStorageTablesConfig
{
public static DbTablesConfig fromBase(String base)
public static MetadataStorageTablesConfig fromBase(String base)
{
return new DbTablesConfig(base, null, null, null, null, null, null);
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null);
}
private static String defaultBase = "druid";
public static final String TASK_ENTRY_TYPE = "task";
private static final String DEFAULT_BASE = "druid";
private final Map<String, String> entryTables = Maps.newHashMap();
private final Map<String, String> logTables = Maps.newHashMap();
private final Map<String, String> lockTables = Maps.newHashMap();
@JsonProperty("base")
private final String base;
@ -55,7 +64,7 @@ public class DbTablesConfig
private final String taskLockTable;
@JsonCreator
public DbTablesConfig(
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
@ -65,13 +74,18 @@ public class DbTablesConfig
@JsonProperty("taskLock") String taskLockTable
)
{
this.base = (base == null) ? defaultBase : base;
this.base = (base == null) ? DEFAULT_BASE : base;
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");
this.tasksTable = makeTableName(tasksTable, "tasks");
this.taskLogTable = makeTableName(taskLogTable, "tasklogs");
this.taskLockTable = makeTableName(taskLockTable, "tasklocks");
entryTables.put(TASK_ENTRY_TYPE, this.tasksTable);
logTables.put(TASK_ENTRY_TYPE, this.taskLogTable);
lockTables.put(TASK_ENTRY_TYPE, this.taskLockTable);
}
private String makeTableName(String explicitTableName, String defaultSuffix)
@ -106,18 +120,23 @@ public class DbTablesConfig
return configTable;
}
public String getTasksTable()
public String getEntryTable(final String entryType)
{
return tasksTable;
return entryTables.get(entryType);
}
public String getTaskLogTable()
public String getLogTable(final String entryType)
{
return taskLogTable;
return logTables.get(entryType);
}
public String getTaskLockTable()
public String getLockTable(final String entryType)
{
return taskLockTable;
return lockTables.get(entryType);
}
public String getTaskEntryType()
{
return TASK_ENTRY_TYPE;
}
}

View File

@ -245,18 +245,18 @@ These properties specify the jdbc connection and other configuration around the
|Property|Description|Default|
|--------|-----------|-------|
|`druid.db.connector.user`|The username to connect with.|none|
|`druid.db.connector.password`|The password to connect with.|none|
|`druid.db.connector.createTables`|If Druid requires a table and it doesn't exist, create it?|true|
|`druid.db.connector.useValidationQuery`|Validate a table with a query.|false|
|`druid.db.connector.validationQuery`|The query to validate with.|SELECT 1|
|`druid.db.tables.base`|The base name for tables.|druid|
|`druid.db.tables.segmentTable`|The table to use to look for segments.|druid_segments|
|`druid.db.tables.ruleTable`|The table to use to look for segment load/drop rules.|druid_rules|
|`druid.db.tables.configTable`|The table to use to look for configs.|druid_config|
|`druid.db.tables.tasks`|Used by the indexing service to store tasks.|druid_tasks|
|`druid.db.tables.taskLog`|Used by the indexing service to store task logs.|druid_taskLog|
|`druid.db.tables.taskLock`|Used by the indexing service to store task locks.|druid_taskLock|
|`druid.metadata.storage.connector.user`|The username to connect with.|none|
|`druid.metadata.storage.connector.password`|The password to connect with.|none|
|`druid.metadata.storage.connector.createTables`|If Druid requires a table and it doesn't exist, create it?|true|
|`druid.metadata.storage.connector.useValidationQuery`|Validate a table with a query.|false|
|`druid.metadata.storage.connector.validationQuery`|The query to validate with.|SELECT 1|
|`druid.metadata.storage.tables.base`|The base name for tables.|druid|
|`druid.metadata.storage.tables.segmentTable`|The table to use to look for segments.|druid_segments|
|`druid.metadata.storage.tables.ruleTable`|The table to use to look for segment load/drop rules.|druid_rules|
|`druid.metadata.storage.tables.configTable`|The table to use to look for configs.|druid_config|
|`druid.metadata.storage.tables.tasks`|Used by the indexing service to store tasks.|druid_tasks|
|`druid.metadata.storage.tables.taskLog`|Used by the indexing service to store task logs.|druid_taskLog|
|`druid.metadata.storage.tables.taskLock`|Used by the indexing service to store task locks.|druid_taskLock|
### Jackson Config Manager Module

View File

@ -10,7 +10,7 @@ In addition to the configuration of some of the default modules in [Configuratio
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment.|local|
|`druid.indexer.storage.type`|Choices are "local" or "db". Indicates whether incoming tasks should be stored locally (in heap) or in a database. Storing incoming tasks in a database allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. Storing incoming tasks in metadata storage allows for tasks to be resumed if the overlord should fail.|local|
|`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
|`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
|`druid.indexer.queue.startDelay`|Sleep this long before starting overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|

View File

@ -12,14 +12,14 @@ Depending on what `druid.storage.type` is set to, Druid will upload segments to
## My realtime node is not handing segments off
Make sure that the `druid.publish.type` on your real-time nodes is set to "db". Also make sure that `druid.storage.type` is set to a deep storage that makes sense. Some example configs:
Make sure that the `druid.publish.type` on your real-time nodes is set to `metadata`. Also make sure that `druid.storage.type` is set to a deep storage that makes sense. Some example configs:
```
druid.publish.type=db
druid.publish.type=metadata
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.storage.type=s3
druid.storage.bucket=druid

View File

@ -29,9 +29,9 @@ With the following JVM configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid
-Ddruid.db.connector.password=diurd
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.selectors.indexing.serviceName=overlord
-Ddruid.indexer.runner.startPort=8092
-Ddruid.indexer.fork.property.druid.computation.buffer.size=268435456

View File

@ -7,7 +7,7 @@ MySQL is an external dependency of Druid. We use it to store various metadata ab
Segments Table
--------------
This is dictated by the `druid.db.tables.segments` property.
This is dictated by the `druid.metadata.storage.tables.segments` property.
This table stores metadata about the segments that are available in the system. The table is polled by the [Coordinator](Coordinator.html) to determine the set of segments that should be available for querying in the system. The table has two main functional columns, the other columns are for indexing purposes.

View File

@ -67,11 +67,11 @@ druid.discovery.curator.path=/prod/discovery
druid.s3.accessKey=#{ACCESS_KEY}
druid.s3.secretKey=#{SECRET_KEY}
druid.db.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.db.connector.user=#{MYSQL_USER}
druid.db.connector.password=#{MYSQL_PW}
druid.db.connector.useValidationQuery=true
druid.db.tables.base=prod
druid.metadata.storage.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.metadata.storage.connector.user=#{MYSQL_USER}
druid.metadata.storage.connector.password=#{MYSQL_PW}
druid.metadata.storage.connector.useValidationQuery=true
druid.metadata.storage.tables.base=prod
# Only required if you are autoscaling middle managers
druid.indexer.autoscale.doAutoscale=true
@ -91,7 +91,7 @@ druid.indexer.runner.compressZnodes=true
druid.indexer.runner.minWorkerVersion=#{WORKER_VERSION}
# Store all task state in MySQL
druid.indexer.storage.type=db
druid.indexer.storage.type=metadata
druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
@ -229,11 +229,11 @@ druid.zk.paths.base=/druid/prod
druid.discovery.curator.path=/prod/discovery
druid.db.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.db.connector.user=#{MYSQL_USER}
druid.db.connector.password=#{MYSQL_PW}
druid.db.connector.useValidationQuery=true
druid.db.tables.base=prod
druid.metadata.storage.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.metadata.storage.connector.user=#{MYSQL_USER}
druid.metadata.storage.connector.password=#{MYSQL_PW}
druid.metadata.storage.connector.useValidationQuery=true
druid.metadata.storage.tables.base=prod
druid.selectors.indexing.serviceName=druid:prod:overlord
@ -387,4 +387,4 @@ druid.emitter.http.recipientBaseUrl=#{EMITTER_URL}
# If you choose to compress ZK announcements, you must do so for every node type
druid.announcer.type=batch
druid.curator.compress=true
```
```

View File

@ -35,12 +35,12 @@ druid.zk.service.host=localhost
# The realtime config file.
druid.realtime.specFile=/path/to/specFile
# Choices: db (hand off segments), noop (do not hand off segments).
druid.publish.type=db
# Choices: metadata (hand off segments), noop (do not hand off segments).
druid.publish.type=metadata
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
```
@ -84,13 +84,13 @@ druid.zk.paths.base=/druid/prod
druid.s3.accessKey=#{ACCESS_KEY}
druid.s3.secretKey=#{SECRET_KEY}
druid.db.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.db.connector.user=#{MYSQL_USER}
druid.db.connector.password=#{MYSQL_PW}
druid.db.connector.useValidationQuery=true
druid.db.tables.base=prod
druid.metadata.storage.connector.connectURI=jdbc:mysql://#{MYSQL_URL}:3306/druid
druid.metadata.storage.connector.user=#{MYSQL_USER}
druid.metadata.storage.connector.password=#{MYSQL_PW}
druid.metadata.storage.connector.useValidationQuery=true
druid.metadata.storage.tables.base=prod
druid.publish.type=db
druid.publish.type=metadata
druid.processing.numThreads=3

View File

@ -30,9 +30,9 @@ Configuration:
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid
-Ddruid.db.connector.password=diurd
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.selectors.indexing.serviceName=overlord
-Ddruid.indexer.queue.startDelay=PT0M
@ -66,9 +66,9 @@ druid.port=8082
druid.zk.service.host=localhost
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.coordinator.startDelay=PT70s
```

View File

@ -93,9 +93,9 @@ druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M

View File

@ -120,9 +120,9 @@ druid.port=8082
druid.zk.service.host=localhost
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
druid.coordinator.startDelay=PT70s
```
@ -250,9 +250,9 @@ druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.160","io.d
druid.publish.type=noop
# These configs are only required for real hand off
# druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
# druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.metadata.storage.connector.user=druid
# druid.metadata.storage.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -58,6 +58,7 @@ DRUID_CP=${EXAMPLE_LOC}
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/../config/realtime
#For the kit
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/lib/*
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/_global
DRUID_CP=${DRUID_CP}:${SCRIPT_DIR}/config/realtime
echo "Running command:"

View File

@ -0,0 +1,24 @@
# Extensions
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.druid.extensions:druid-kafka-seven:0.7.0","io.druid.extensions:druid-rabbitmq:0.7.0", "io.druid.extensions:druid-s3-extensions:0.7.0"]
# Zookeeper
druid.zk.service.host=localhost
# Metadata Storage
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
# Deep storage
druid.storage.type=local
druid.storage.storage.storageDirectory=/tmp/druid/localStorage
# Indexing service discovery
druid.selectors.indexing.serviceName=overlord
# Monitoring (disabled for examples)
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor"]
# Metrics logging (disabled for examples)
druid.emitter=noop

View File

@ -2,8 +2,6 @@ druid.host=localhost
druid.service=broker
druid.port=8080
druid.zk.service.host=localhost
# Change these to make Druid faster
# Bump these up only for faster nested groupBy
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1

View File

@ -2,10 +2,4 @@ druid.host=localhost
druid.service=coordinator
druid.port=8082
druid.zk.service.host=localhost
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.coordinator.startDelay=PT70s

View File

@ -2,9 +2,7 @@ druid.host=localhost
druid.service=historical
druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.147"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.7.0"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -16,4 +14,4 @@ druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

View File

@ -1,18 +1,8 @@
druid.host=localhost
druid.port=8087
druid.port=8080
druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.147"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.selectors.indexing.serviceName=overlord
druid.indexer.queue.startDelay=PT0M
druid.indexer.runner.javaOpts="-server -Xmx256m"
druid.indexer.runner.startPort=8088
druid.indexer.fork.property.druid.processing.numThreads=1
druid.indexer.fork.property.druid.computation.buffer.size=100000000
druid.indexer.fork.property.druid.computation.buffer.size=100000000

View File

@ -2,19 +2,11 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.147","io.druid.extensions:druid-kafka-seven:0.6.147","io.druid.extensions:druid-rabbitmq:0.6.147"]
# Change this config to db to hand off to the rest of the Druid cluster
# Change this config to metadata to hand off to the rest of the Druid cluster
druid.publish.type=noop
# These configs are only required for real hand off
# druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
# druid.db.connector.user=druid
# druid.db.connector.password=diurd
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=1
druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor"]
# Enable Real monitoring
# druid.monitoring.monitors=["com.metamx.metrics.SysMonitor","com.metamx.metrics.JvmMonitor","io.druid.segment.realtime.RealtimeMetricsMonitor"]

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -87,6 +87,13 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramBufferAggregator does not support getLong()");
}
@Override
public void close()
{

View File

@ -91,6 +91,12 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getFloat()");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("ApproximateHistogramFoldingBufferAggregator does not support getLong()");
}
@Override
public void close()
{

View File

@ -86,7 +86,8 @@ public class ApproximateHistogramGroupByQueryTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine)
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool),
pool
);
GroupByQueryConfig singleThreadedConfig = new GroupByQueryConfig()
@ -106,7 +107,8 @@ public class ApproximateHistogramGroupByQueryTest
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine)
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool),
pool
);
@ -157,8 +159,8 @@ public class ApproximateHistogramGroupByQueryTest
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
QueryRunnerTestHelper.providerDimension,
"proViderAlias"
QueryRunnerTestHelper.marketDimension,
"marKetAlias"
)
)
)
@ -167,7 +169,7 @@ public class ApproximateHistogramGroupByQueryTest
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
"proViderAlias",
"marKetAlias",
OrderByColumnSpec.Direction.DESCENDING
)
), 1
@ -189,7 +191,7 @@ public class ApproximateHistogramGroupByQueryTest
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
"provideralias", "upfront",
"marketalias", "upfront",
"rows", 186L,
"quantile", 880.9881f,
"apphisto",

View File

@ -48,6 +48,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -113,7 +114,7 @@ public class ApproximateHistogramTopNQueryTest
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.providerDimension)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
@ -144,7 +145,7 @@ public class ApproximateHistogramTopNQueryTest
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "total_market")
.put(QueryRunnerTestHelper.marketDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
@ -175,7 +176,7 @@ public class ApproximateHistogramTopNQueryTest
)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "upfront")
.put(QueryRunnerTestHelper.marketDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
@ -206,7 +207,7 @@ public class ApproximateHistogramTopNQueryTest
)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.providerDimension, "spot")
.put(QueryRunnerTestHelper.marketDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
@ -240,7 +241,8 @@ public class ApproximateHistogramTopNQueryTest
)
)
);
HashMap<String,Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
TestHelper.assertExpectedResults(expectedResults, runner.run(query, context));
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -48,7 +48,6 @@ import io.druid.guice.annotations.Self;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.path.PathSpec;
import io.druid.initialization.Initialization;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
@ -394,6 +393,11 @@ public class HadoopDruidIndexerConfig
}
}
public boolean isPersistInHeap()
{
return schema.getTuningConfig().isPersistInHeap();
}
/******************************************
Path helper logic
******************************************/

View File

@ -32,22 +32,26 @@ public class HadoopDruidIndexerJob implements Jobby
{
private static final Logger log = new Logger(HadoopDruidIndexerJob.class);
private final HadoopDruidIndexerConfig config;
private final DbUpdaterJob dbUpdaterJob;
private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
@Inject
public HadoopDruidIndexerJob(
HadoopDruidIndexerConfig config
HadoopDruidIndexerConfig config,
MetadataStorageUpdaterJobHandler handler
)
{
config.verify();
this.config = config;
if (config.isUpdaterJobSpecSet()) {
dbUpdaterJob = new DbUpdaterJob(config);
metadataStorageUpdaterJob = new MetadataStorageUpdaterJob(
config,
handler
);
} else {
dbUpdaterJob = null;
metadataStorageUpdaterJob = null;
}
}
@ -57,24 +61,30 @@ public class HadoopDruidIndexerJob implements Jobby
List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);
indexJob = new IndexGeneratorJob(config);
if (config.isPersistInHeap()) {
indexJob = new IndexGeneratorJob(config);
} else {
indexJob = new LegacyIndexGeneratorJob(config);
}
jobs.add(indexJob);
if (dbUpdaterJob != null) {
jobs.add(dbUpdaterJob);
if (metadataStorageUpdaterJob != null) {
jobs.add(metadataStorageUpdaterJob);
} else {
log.info("No updaterJobSpec set, not uploading to database");
}
jobs.add(new Jobby()
{
@Override
public boolean run()
{
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true;
}
});
jobs.add(
new Jobby()
{
@Override
public boolean run()
{
publishedSegments = IndexGeneratorJob.getPublishedSegments(config);
return true;
}
}
);
JobHelper.runJobs(jobs, config);

View File

@ -21,7 +21,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.segment.indexing.IOConfig;
import java.util.Map;
@ -32,12 +32,12 @@ import java.util.Map;
public class HadoopIOConfig implements IOConfig
{
private final Map<String, Object> pathSpec;
private final DbUpdaterJobSpec metadataUpdateSpec;
private final MetadataStorageUpdaterJobSpec metadataUpdateSpec;
private final String segmentOutputPath;
public HadoopIOConfig(
final @JsonProperty("inputSpec") Map<String, Object> pathSpec,
final @JsonProperty("metadataUpdateSpec") DbUpdaterJobSpec metadataUpdateSpec,
final @JsonProperty("metadataUpdateSpec") MetadataStorageUpdaterJobSpec metadataUpdateSpec,
final @JsonProperty("segmentOutputPath") String segmentOutputPath
)
{
@ -53,7 +53,7 @@ public class HadoopIOConfig implements IOConfig
}
@JsonProperty("metadataUpdateSpec")
public DbUpdaterJobSpec getMetadataUpdateSpec()
public MetadataStorageUpdaterJobSpec getMetadataUpdateSpec()
{
return metadataUpdateSpec;
}

View File

@ -30,7 +30,7 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
@ -70,7 +70,7 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("rollupSpec") DataRollupSpec rollupSpec,
final @JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec,
final @JsonProperty("updaterJobSpec") MetadataStorageUpdaterJobSpec updaterJobSpec,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
@ -159,13 +159,15 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
version,
thePartitionSpec,
shardSpecs,
rollupSpec == null ? 50000 : rollupSpec.rowFlushBoundary,
null,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
ignoreInvalidRows,
jobProperties,
combineText
combineText,
false,
false
);
}
}
@ -289,4 +291,4 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null
);
}
}
}

View File

@ -53,6 +53,8 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
null,
false,
false,
false
);
}
@ -68,6 +70,8 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
@JsonCreator
public HadoopTuningConfig(
@ -81,7 +85,9 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap
)
{
this.workingPath = workingPath == null ? null : workingPath;
@ -97,6 +103,8 @@ public class HadoopTuningConfig implements TuningConfig
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
}
@JsonProperty
@ -165,6 +173,17 @@ public class HadoopTuningConfig implements TuningConfig
return combineText;
}
@JsonProperty
public boolean isPersistInHeap()
{
return persistInHeap;
}
@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -178,7 +197,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
combineText
combineText,
persistInHeap,
ingestOffheap
);
}
@ -195,7 +216,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
combineText
combineText,
persistInHeap,
ingestOffheap
);
}
@ -212,7 +235,9 @@ public class HadoopTuningConfig implements TuningConfig
overwriteFiles,
ignoreInvalidRows,
jobProperties,
combineText
combineText,
persistInHeap,
ingestOffheap
);
}
}

View File

@ -35,13 +35,17 @@ import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
@ -86,20 +90,9 @@ import java.util.zip.ZipOutputStream;
public class IndexGeneratorJob implements Jobby
{
private static final Logger log = new Logger(IndexGeneratorJob.class);
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.jobStats = new IndexGeneratorStats();
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@ -130,6 +123,22 @@ public class IndexGeneratorJob implements Jobby
return publishedSegments;
}
private final HadoopDruidIndexerConfig config;
private IndexGeneratorStats jobStats;
public IndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
this.config = config;
this.jobStats = new IndexGeneratorStats();
}
protected void setReducerClass(final Job job)
{
job.setReducerClass(IndexGeneratorReducer.class);
}
public IndexGeneratorStats getJobStats()
{
return jobStats;
@ -161,7 +170,7 @@ public class IndexGeneratorJob implements Jobby
job.setNumReduceTasks(Iterables.size(config.getAllBuckets().get()));
job.setPartitionerClass(IndexGeneratorPartitioner.class);
job.setReducerClass(IndexGeneratorReducer.class);
setReducerClass(job);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
@ -190,7 +199,6 @@ public class IndexGeneratorJob implements Jobby
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
{
@Override
protected void innerMap(
@ -256,6 +264,42 @@ public class IndexGeneratorJob implements Jobby
private List<String> metricNames = Lists.newArrayList();
private StringInputRowParser parser;
protected ProgressIndicator makeProgressIndicator(final Context context)
{
return new LoggingProgressIndicator("IndexGeneratorJob")
{
@Override
public void progress()
{
context.progress();
}
};
}
protected File persist(
final IncrementalIndex index,
final Interval interval,
final File file,
final ProgressIndicator progressIndicator
) throws IOException
{
return IndexMaker.persist(
index, interval, file, progressIndicator
);
}
protected File mergeQueryableIndex(
final List<QueryableIndex> indexes,
final AggregatorFactory[] aggs,
final File file,
ProgressIndicator progressIndicator
) throws IOException
{
return IndexMaker.mergeQueryableIndex(
indexes, aggs, file, progressIndicator
);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException
@ -282,113 +326,84 @@ public class IndexGeneratorJob implements Jobby
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
baseFlushFile.mkdirs();
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
baseFlushFile.mkdirs();
Set<File> toMerge = Sets.newTreeSet();
int indexCount = 0;
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<File> toMerge = Sets.newTreeSet();
int indexCount = 0;
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = Sets.newHashSet();
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.getSpatialDimensionRowFormatter().formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
toMerge.add(file);
Set<String> allDimensionNames = Sets.newHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
for (final Text value : values) {
context.progress();
IndexMerger.persist(
index, interval, file, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
index = makeIncrementalIndex(bucket, aggs);
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
startTime = System.currentTimeMillis();
++indexCount;
}
}
int numRows = index.add(inputRow);
++lineCount;
log.info("%,d lines completed.", lineCount);
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
toMerge.add(file);
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
IndexMerger.persist(
index, interval, mergedBase, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
persist(index, interval, file, progressIndicator);
// close this index and make a new one
index.close();
index = makeIncrementalIndex(bucket, aggs);
startTime = System.currentTimeMillis();
++indexCount;
}
}
);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMerger.persist(
index, interval, finalFile, new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
toMerge.add(finalFile);
}
log.info("%,d lines completed.", lineCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
persist(index, interval, mergedBase, progressIndicator);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
persist(index, interval, finalFile, progressIndicator);
toMerge.add(finalFile);
}
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
);
}
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
FileUtils.deleteDirectory(file);
}
mergedBase = IndexMerger.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new IndexMerger.ProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
}
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) {
FileUtils.deleteDirectory(file);
finally {
index.close();
}
}
@ -616,14 +631,29 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{
return new IncrementalIndex(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build()
);
int aggsSize = 0;
for (AggregatorFactory agg : aggs) {
aggsSize += agg.getMaxIntermediateSize();
}
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
int bufferSize = aggsSize * tuningConfig.getRowFlushBoundary();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (tuningConfig.isIngestOffheap()) {
return new OffheapIncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
} else {
return new IncrementalIndex(
indexSchema,
new OffheapBufferPool(bufferSize)
);
}
}
private void createNewZipEntry(ZipOutputStream out, String name) throws IOException

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.IndexMerger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
import java.io.File;
import java.io.IOException;
import java.util.List;
/**
*/
public class LegacyIndexGeneratorJob extends IndexGeneratorJob
{
public LegacyIndexGeneratorJob(
HadoopDruidIndexerConfig config
)
{
super(config);
}
@Override
protected void setReducerClass(Job job)
{
job.setReducerClass(LegacyIndexGeneratorReducer.class);
}
public static class LegacyIndexGeneratorReducer extends IndexGeneratorJob.IndexGeneratorReducer
{
@Override
protected ProgressIndicator makeProgressIndicator(final Context context)
{
return new BaseProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
};
}
@Override
protected File persist(
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.persist(index, interval, file, progressIndicator);
}
@Override
protected File mergeQueryableIndex(
List<QueryableIndex> indexes,
AggregatorFactory[] aggs,
File file,
ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.mergeQueryableIndex(indexes, aggs, file, progressIndicator);
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexer;
import io.druid.timeline.DataSegment;
import java.util.List;
/**
*/
public class MetadataStorageUpdaterJob implements Jobby
{
private final HadoopDruidIndexerConfig config;
private final MetadataStorageUpdaterJobHandler handler;
public MetadataStorageUpdaterJob(
HadoopDruidIndexerConfig config,
MetadataStorageUpdaterJobHandler handler
)
{
this.config = config;
this.handler = handler;
}
@Override
public boolean run()
{
final List<DataSegment> segments = IndexGeneratorJob.getPublishedSegments(config);
final String segmentTable = config.getSchema().getIOConfig().getMetadataUpdateSpec().getSegmentTable();
handler.publishSegments(segmentTable, segments, HadoopDruidIndexerConfig.jsonMapper);
return true;
}
}

View File

@ -20,6 +20,7 @@
package io.druid.indexer.rollup;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.util.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
@ -30,32 +31,17 @@ import java.util.List;
*
* Adjust to JsonCreator and final fields when resolved.
*/
@Deprecated
public class DataRollupSpec
{
@JsonProperty
public List<AggregatorFactory> aggs;
public List<AggregatorFactory> aggs = Lists.newArrayList();
@JsonProperty
public QueryGranularity rollupGranularity = QueryGranularity.NONE;
@JsonProperty
public int rowFlushBoundary = 500000;
public DataRollupSpec() {}
public DataRollupSpec(List<AggregatorFactory> aggs, QueryGranularity rollupGranularity)
{
this.aggs = aggs;
this.rollupGranularity = rollupGranularity;
}
public List<AggregatorFactory> getAggs()
{
return aggs;
}
public QueryGranularity getRollupGranularity()
{
return rollupGranularity;
}
}

View File

@ -21,12 +21,18 @@ package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import io.druid.db.DbConnectorConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import javax.validation.constraints.NotNull;
/**
*/
public class DbUpdaterJobSpec implements Supplier<DbConnectorConfig>
public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageConnectorConfig>
{
@JsonProperty("type")
@NotNull
public String type;
@JsonProperty("connectURI")
public String connectURI;
@ -44,10 +50,15 @@ public class DbUpdaterJobSpec implements Supplier<DbConnectorConfig>
return segmentTable;
}
@Override
public DbConnectorConfig get()
public String getType()
{
return new DbConnectorConfig()
return type;
}
@Override
public MetadataStorageConnectorConfig get()
{
return new MetadataStorageConnectorConfig()
{
@Override
public String getConnectURI()

View File

@ -152,6 +152,10 @@ public class HadoopDruidIndexerConfigTest
for (int i = 0; i < partitionCount; i++) {
specs.add(new HadoopyShardSpec(new HashBasedNumberedShardSpec(i, partitionCount, new DefaultObjectMapper()), i));
}
// Backwards compatibility
DataRollupSpec rollupSpec = new DataRollupSpec();
rollupSpec.rollupGranularity = QueryGranularity.MINUTE;
HadoopIngestionSpec spec = new HadoopIngestionSpec(
null, null, null,
"foo",
@ -172,7 +176,7 @@ public class HadoopDruidIndexerConfigTest
true,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.MINUTE),
rollupSpec,
null,
false,
ImmutableMap.of("foo", "bar"),

View File

@ -22,11 +22,11 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.DbConnectorConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
@ -354,14 +354,13 @@ public class HadoopIngestionSpecTest
HadoopIngestionSpec.class
);
final DbUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final DbConnectorConfig connectorConfig = spec.get();
final MetadataStorageUpdaterJobSpec spec = schema.getIOConfig().getMetadataUpdateSpec();
final MetadataStorageConnectorConfig connectorConfig = spec.get();
Assert.assertEquals("segments", spec.getSegmentTable());
Assert.assertEquals("jdbc:mysql://localhost/druid", connectorConfig.getConnectURI());
Assert.assertEquals("rofl", connectorConfig.getUser());
Assert.assertEquals("p4ssw0rd", connectorConfig.getPassword());
Assert.assertEquals(false, connectorConfig.isUseValidationQuery());
}
@Test

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
@ -47,11 +47,6 @@
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.18</version>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -81,7 +81,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -68,7 +68,7 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -68,7 +68,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerMetadataStorageCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -42,7 +42,7 @@ public class SegmentMetadataUpdateAction implements TaskAction<Void>
) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);
toolbox.getIndexerMetadataStorageCoordinator().updateSegmentMetadata(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -59,7 +59,7 @@ public class SegmentNukeAction implements TaskAction<Void>
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -27,7 +27,7 @@ import com.metamx.common.ISE;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerDBCoordinator;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.timeline.DataSegment;
@ -37,18 +37,18 @@ import java.util.Set;
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final IndexerDBCoordinator indexerDBCoordinator;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerDBCoordinator indexerDBCoordinator,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter
)
{
this.taskLockbox = taskLockbox;
this.indexerDBCoordinator = indexerDBCoordinator;
this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
this.emitter = emitter;
}
@ -57,9 +57,9 @@ public class TaskActionToolbox
return taskLockbox;
}
public IndexerDBCoordinator getIndexerDBCoordinator()
public IndexerMetadataStorageCoordinator getIndexerMetadataStorageCoordinator()
{
return indexerDBCoordinator;
return indexerMetadataStorageCoordinator;
}
public ServiceEmitter getEmitter()

View File

@ -35,7 +35,7 @@ import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMaker;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.indexing.DataSchema;
@ -166,7 +166,7 @@ public class YeOldePlumberSchool implements PlumberSchool
}
fileToUpload = new File(tmpSegmentDir, "merged");
IndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
IndexMaker.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload);
}
// Map merged segment so we can extract dimensions
@ -211,8 +211,7 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
IndexMerger.persist(
IndexMaker.persist(
indexToPersist.getIndex(),
dirToPersist
);

View File

@ -37,7 +37,6 @@ import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;

View File

@ -1,109 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;
public class DeleteTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(DeleteTask.class);
@JsonCreator
public DeleteTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
id != null ? id : String.format(
"delete_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
dataSource,
Preconditions.checkNotNull(interval, "interval")
);
}
@Override
public String getType()
{
return "delete";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(getInterval(), empty);
// Create DataSegment
final DataSegment segment =
DataSegment.builder()
.dataSource(this.getDataSource())
.interval(getInterval())
.version(myLock.getVersion())
.shardSpec(new NoneShardSpec())
.build();
final File outDir = new File(toolbox.getTaskWorkDir(), segment.getIdentifier());
final File fileToUpload = IndexMerger.merge(Lists.newArrayList(emptyAdapter), new AggregatorFactory[0], outDir);
// Upload the segment
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, segment);
log.info(
"Uploaded tombstone segment for[%s] interval[%s] with version[%s]",
segment.getDataSource(),
segment.getInterval(),
segment.getVersion()
);
toolbox.pushSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId());
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ExtensionsConfig;
@ -38,6 +39,7 @@ import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerJob;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.Jobby;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
@ -63,8 +65,9 @@ public class HadoopIndexTask extends AbstractTask
private static final Logger log = new Logger(HadoopIndexTask.class);
private static final ExtensionsConfig extensionsConfig;
final static Injector injector = GuiceInjectors.makeStartupInjector();
static {
extensionsConfig = GuiceInjectors.makeStartupInjector().getInstance(ExtensionsConfig.class);
extensionsConfig = injector.getInstance(ExtensionsConfig.class);
}
private static String getTheDataSource(HadoopIngestionSpec spec, HadoopIngestionSpec config)
@ -288,7 +291,10 @@ public class HadoopIndexTask extends AbstractTask
.withTuningConfig(theSchema.getTuningConfig().withVersion(version))
);
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(config);
HadoopDruidIndexerJob job = new HadoopDruidIndexerJob(
config,
injector.getInstance(MetadataStorageUpdaterJobHandler.class)
);
log.info("Starting a hadoop index generator job...");
if (job.run()) {

View File

@ -147,7 +147,7 @@ public class IndexTask extends AbstractFixedIntervalTask
granularitySpec.withQueryGranularity(indexGranularity == null ? QueryGranularity.NONE : indexGranularity)
),
new IndexIOConfig(firehoseFactory),
new IndexTuningConfig(targetPartitionSize, rowFlushBoundary, null)
new IndexTuningConfig(targetPartitionSize, 0, null)
);
}
this.jsonMapper = jsonMapper;
@ -401,7 +401,11 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec), metrics);
).findPlumber(
schema,
new RealtimeTuningConfig(null, null, null, null, null, null, null, shardSpec, null, null),
metrics
);
// rowFlushBoundary for this job
final int myRowFlushBoundary = rowFlushBoundary > 0
@ -557,7 +561,7 @@ public class IndexTask extends AbstractFixedIntervalTask
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
@JsonProperty("numShards") @Nullable Integer numShards
)
)
{
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary;

View File

@ -143,7 +143,9 @@ public class RealtimeIndexTask extends AbstractTask
null,
rejectionPolicy == null ? rejectionPolicyFactory : rejectionPolicy,
maxPendingPersists,
spec.getShardSpec()
spec.getShardSpec(),
false,
false
),
null, null, null, null
);

View File

@ -43,7 +43,6 @@ import io.druid.query.QueryRunner;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "merge", value = MergeTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "move", value = MoveTask.class),
@JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),

View File

@ -51,10 +51,11 @@ import io.druid.query.select.EventHolder;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IndexIO;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.StorageAdapter;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.segment.loading.SegmentLoadingException;
@ -250,7 +251,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
@Override
public Sequence<InputRow> apply(final Cursor cursor)
{
final TimestampColumnSelector timestampColumnSelector = cursor.makeTimestampColumnSelector();
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (String dim : dims) {
@ -287,7 +288,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getTimestamp();
final long timestamp = timestampColumnSelector.get();
theEvent.put(EventHolder.timestampKey, new DateTime(timestamp));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {

View File

@ -1,552 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.RetryUtils;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class DbTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private final DbConnector dbConnector;
private final DbTablesConfig dbTables;
private final IDBI dbi;
private final TaskStorageConfig config;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
@Inject
public DbTaskStorage(
final ObjectMapper jsonMapper,
final DbConnector dbConnector,
final DbTablesConfig dbTables,
final IDBI dbi,
final TaskStorageConfig config
)
{
this.jsonMapper = jsonMapper;
this.dbConnector = dbConnector;
this.dbTables = dbTables;
this.dbi = dbi;
this.config = config;
}
@LifecycleStart
public void start()
{
dbConnector.createTaskTables();
}
@LifecycleStop
public void stop()
{
// do nothing
}
@Override
public void insert(final Task task, final TaskStatus status) throws TaskExistsException
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
Preconditions.checkArgument(
task.getId().equals(status.getId()),
"Task/Status ID mismatch[%s/%s]",
task.getId(),
status.getId()
);
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
retryingHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbTables.getTasksTable()
)
)
.bind("id", task.getId())
.bind("created_date", new DateTime().toString())
.bind("datasource", task.getDataSource())
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
}
);
}
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
} else {
throw e;
}
}
}
@Override
public void setStatus(final TaskStatus status)
{
Preconditions.checkNotNull(status, "status");
log.info("Updating task %s to status: %s", status.getId(), status);
int updated = retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbTables.getTasksTable()
)
)
.bind("id", status.getId())
.bind("active", status.isRunnable() ? 1 : 0)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
}
}
);
if (updated != 1) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
@Override
public Optional<Task> getTask(final String taskid)
{
return retryingHandle(
new HandleCallback<Optional<Task>>()
{
@Override
public Optional<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
dbTables.getTasksTable()
)
)
.bind("id", taskid)
.list();
if (dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
}
}
}
);
}
@Override
public Optional<TaskStatus> getStatus(final String taskid)
{
return retryingHandle(
new HandleCallback<Optional<TaskStatus>>()
{
@Override
public Optional<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbStatuses =
handle.createQuery(
String.format(
"SELECT status_payload FROM %s WHERE id = :id",
dbTables.getTasksTable()
)
)
.bind("id", taskid)
.list();
if (dbStatuses.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
}
}
}
);
}
@Override
public List<Task> getActiveTasks()
{
return retryingHandle(
new HandleCallback<List<Task>>()
{
@Override
public List<Task> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
dbTables.getTasksTable()
)
)
.list();
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
return tasks.build();
}
}
);
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
return retryingHandle(
new HandleCallback<List<TaskStatus>>()
{
@Override
public List<TaskStatus> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTasks =
handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
dbTables.getTasksTable()
)
).bind("recent", recent.toString()).list();
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
}
}
);
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLock, "taskLock");
log.info(
"Adding lock on interval[%s] version[%s] for task: %s",
taskLock.getInterval(),
taskLock.getVersion(),
taskid
);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)
.bind("lock_payload", jsonMapper.writeValueAsBytes(taskLock))
.execute();
}
}
);
}
@Override
public void removeLock(String taskid, TaskLock taskLockToRemove)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLockToRemove, "taskLockToRemove");
final Map<Long, TaskLock> taskLocks = getLocksWithIds(taskid);
for (final Map.Entry<Long, TaskLock> taskLockWithId : taskLocks.entrySet()) {
final long id = taskLockWithId.getKey();
final TaskLock taskLock = taskLockWithId.getValue();
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
dbTables.getTaskLockTable()
)
)
.bind("id", id)
.execute();
}
}
);
}
}
}
@Override
public List<TaskLock> getLocks(String taskid)
{
return ImmutableList.copyOf(
Iterables.transform(
getLocksWithIds(taskid).entrySet(), new Function<Map.Entry<Long, TaskLock>, TaskLock>()
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
)
);
}
@Override
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
{
Preconditions.checkNotNull(taskAction, "taskAction");
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
dbTables.getTaskLogTable()
)
)
.bind("task_id", task.getId())
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
);
}
@Override
public List<TaskAction> getAuditLogs(final String taskid)
{
return retryingHandle(
new HandleCallback<List<TaskAction>>()
{
@Override
public List<TaskAction> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTaskLogs =
handle.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
dbTables.getTaskLogTable()
)
)
.bind("task_id", taskid)
.list();
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
return retList;
}
}
);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return retryingHandle(
new HandleCallback<Map<Long, TaskLock>>()
{
@Override
public Map<Long, TaskLock> withHandle(Handle handle) throws Exception
{
final List<Map<String, Object>> dbTaskLocks =
handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)
.list();
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
}
}
);
}
/**
* Retry SQL operations
*/
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return dbi.withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private static boolean shouldRetryException(final Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| e instanceof MySQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317)
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
}
}

View File

@ -56,8 +56,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;

View File

@ -35,6 +35,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.metadata.EntryExistsException;
import org.joda.time.DateTime;
import java.util.List;
@ -63,7 +64,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public void insert(Task task, TaskStatus status) throws TaskExistsException
public void insert(Task task, TaskStatus status) throws EntryExistsException
{
giant.lock();
@ -78,7 +79,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
);
if(tasks.containsKey(task.getId())) {
throw new TaskExistsException(task.getId());
throw new EntryExistsException(task.getId());
}
log.info("Inserting task %s with status: %s", task.getId(), status);

View File

@ -0,0 +1,304 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.common.TaskStatus;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.MetadataStorageActionHandler;
import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.MetadataStorageActionHandlerTypes;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class MetadataTaskStorage implements TaskStorage
{
private static final MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock> TASK_TYPES = new MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock>()
{
@Override
public TypeReference<Task> getEntryType()
{
return new TypeReference<Task>()
{
};
}
@Override
public TypeReference<TaskStatus> getStatusType()
{
return new TypeReference<TaskStatus>()
{
};
}
@Override
public TypeReference<TaskAction> getLogType()
{
return new TypeReference<TaskAction>()
{
};
}
@Override
public TypeReference<TaskLock> getLockType()
{
return new TypeReference<TaskLock>()
{
};
}
};
private final MetadataStorageConnector metadataStorageConnector;
private final TaskStorageConfig config;
private final MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock> handler;
private static final EmittingLogger log = new EmittingLogger(MetadataTaskStorage.class);
@Inject
public MetadataTaskStorage(
final MetadataStorageConnector metadataStorageConnector,
final TaskStorageConfig config,
final MetadataStorageActionHandlerFactory factory
)
{
this.metadataStorageConnector = metadataStorageConnector;
this.config = config;
this.handler = factory.create(MetadataStorageTablesConfig.TASK_ENTRY_TYPE, TASK_TYPES);
}
@LifecycleStart
public void start()
{
metadataStorageConnector.createTaskTables();
}
@LifecycleStop
public void stop()
{
// do nothing
}
@Override
public void insert(final Task task, final TaskStatus status) throws EntryExistsException
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(status, "status");
Preconditions.checkArgument(
task.getId().equals(status.getId()),
"Task/Status ID mismatch[%s/%s]",
task.getId(),
status.getId()
);
log.info("Inserting task %s with status: %s", task.getId(), status);
try {
handler.insert(
task.getId(),
new DateTime(),
task.getDataSource(),
task,
status.isRunnable(),
status
);
}
catch (Exception e) {
if(e instanceof EntryExistsException) {
throw (EntryExistsException) e;
} else {
Throwables.propagate(e);
}
}
}
@Override
public void setStatus(final TaskStatus status)
{
Preconditions.checkNotNull(status, "status");
log.info("Updating task %s to status: %s", status.getId(), status);
final boolean set = handler.setStatus(
status.getId(),
status.isRunnable(),
status
);
if (!set) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
@Override
public Optional<Task> getTask(final String taskId)
{
return handler.getEntry(taskId);
}
@Override
public Optional<TaskStatus> getStatus(final String taskId)
{
return handler.getStatus(taskId);
}
@Override
public List<Task> getActiveTasks()
{
return ImmutableList.copyOf(
Iterables.transform(
Iterables.filter(
handler.getActiveEntriesWithStatus(),
new Predicate<Pair<Task, TaskStatus>>()
{
@Override
public boolean apply(
@Nullable Pair<Task, TaskStatus> input
)
{
return input.rhs.isRunnable();
}
}
),
new Function<Pair<Task, TaskStatus>, Task>()
{
@Nullable
@Override
public Task apply(@Nullable Pair<Task, TaskStatus> input)
{
return input.lhs;
}
}
)
);
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime start = new DateTime().minus(config.getRecentlyFinishedThreshold());
return ImmutableList.copyOf(
Iterables.filter(
handler.getInactiveStatusesSince(start),
new Predicate<TaskStatus>()
{
@Override
public boolean apply(TaskStatus status)
{
return status.isComplete();
}
}
)
);
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLock, "taskLock");
log.info(
"Adding lock on interval[%s] version[%s] for task: %s",
taskLock.getInterval(),
taskLock.getVersion(),
taskid
);
handler.addLock(taskid, taskLock);
}
@Override
public void removeLock(String taskid, TaskLock taskLockToRemove)
{
Preconditions.checkNotNull(taskid, "taskid");
Preconditions.checkNotNull(taskLockToRemove, "taskLockToRemove");
final Map<Long, TaskLock> taskLocks = getLocksWithIds(taskid);
for (final Map.Entry<Long, TaskLock> taskLockWithId : taskLocks.entrySet()) {
final long id = taskLockWithId.getKey();
final TaskLock taskLock = taskLockWithId.getValue();
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
handler.removeLock(id);
}
}
}
@Override
public List<TaskLock> getLocks(String taskid)
{
return ImmutableList.copyOf(
Iterables.transform(
getLocksWithIds(taskid).entrySet(), new Function<Map.Entry<Long, TaskLock>, TaskLock>()
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
)
);
}
@Override
public <T> void addAuditLog(final Task task, final TaskAction<T> taskAction)
{
Preconditions.checkNotNull(taskAction, "taskAction");
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
handler.addLog(task.getId(), taskAction);
}
@Override
public List<TaskAction> getAuditLogs(final String taskId)
{
return handler.getLogs(taskId);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
return handler.getLocks(taskid);
}
}

View File

@ -68,7 +68,7 @@ public class RemoteTaskRunnerFactory implements TaskRunnerFactory
curator,
new SimplePathChildrenCacheFactory
.Builder()
.withCompressed(remoteTaskRunnerConfig.isCompressZnodes())
.withCompressed(true)
.build(),
httpClient,
strategy

View File

@ -43,6 +43,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.metadata.EntryExistsException;
import java.util.List;
import java.util.Map;
@ -292,9 +293,9 @@ public class TaskQueue
* @param task task to add
*
* @return true
* @throws TaskExistsException if the task already exists
* @throws io.druid.metadata.EntryExistsException if the task already exists
*/
public boolean add(final Task task) throws TaskExistsException
public boolean add(final Task task) throws EntryExistsException
{
giant.lock();

View File

@ -24,6 +24,7 @@ import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.metadata.EntryExistsException;
import java.util.List;
@ -34,9 +35,9 @@ public interface TaskStorage
*
* @param task task to add
* @param status task status
* @throws io.druid.indexing.overlord.TaskExistsException if the task ID already exists
* @throws io.druid.metadata.EntryExistsException if the task ID already exists
*/
public void insert(Task task, TaskStatus status) throws TaskExistsException;
public void insert(Task task, TaskStatus status) throws EntryExistsException;
/**
* Persists task status in the storage facility. This method should throw an exception if the task status lifecycle

View File

@ -49,7 +49,7 @@ public class ForkingTaskRunnerConfig
@JsonProperty
@Min(1024)
@Max(65535)
private int startPort = 8081;
private int startPort = 8100;
@JsonProperty
@NotNull

View File

@ -33,9 +33,6 @@ public class RemoteTaskRunnerConfig
@NotNull
private Period taskAssignmentTimeout = new Period("PT5M");
@JsonProperty
private boolean compressZnodes = false;
@JsonProperty
private String minWorkerVersion = "0";
@ -48,11 +45,6 @@ public class RemoteTaskRunnerConfig
return taskAssignmentTimeout;
}
public boolean isCompressZnodes()
{
return compressZnodes;
}
public String getMinWorkerVersion()
{
return minWorkerVersion;

View File

@ -36,7 +36,7 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.TaskExistsException;
import io.druid.metadata.EntryExistsException;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
import io.druid.indexing.overlord.TaskRunner;
@ -131,7 +131,7 @@ public class OverlordResource
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (TaskExistsException e) {
catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", String.format("Task[%s] already exists!", task.getId())))
.build();

View File

@ -25,14 +25,14 @@
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Indexer Coordinator Console"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/demo_table.css";</style>
<style type="text/css">@import "old-console/css/style.css";</style>
<style type="text/css">@import "old-console/css/demo_table.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.11.0.min.js"></script>
<script type="text/javascript" src="js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="old-console/js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="old-console/js/jquery-1.11.0.min.js"></script>
<script type="text/javascript" src="old-console/js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="old-console/js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="old-console/js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="js/console-0.0.1.js"></script>
</head>
@ -64,4 +64,4 @@
<table id="eventTable"></table>
</div>
</body>
</html>
</html>

View File

@ -243,51 +243,6 @@ public class TaskSerdeTest
);
}
@Test
public void testDeleteTaskSerde() throws Exception
{
final DeleteTask task = new DeleteTask(
null,
"foo",
new Interval("2010-01-01/P1D")
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testDeleteTaskFromJson() throws Exception
{
final DeleteTask task = (DeleteTask) jsonMapper.readValue(
"{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}",
Task.class
);
final String json = jsonMapper.writeValueAsString(task);
Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change
final DeleteTask task2 = (DeleteTask) jsonMapper.readValue(json, Task.class);
Assert.assertNotNull(task.getId());
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
}
@Test
public void testAppendTaskSerde() throws Exception
{
@ -413,7 +368,7 @@ public class TaskSerdeTest
true,
null,
false,
new DataRollupSpec(ImmutableList.<AggregatorFactory>of(), QueryGranularity.NONE),
null,
null,
false,
ImmutableMap.of("foo", "bar"),

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
@ -41,7 +40,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -59,7 +57,6 @@ import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
{

View File

@ -42,6 +42,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;
@ -63,6 +64,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -85,6 +87,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import java.io.File;
import java.io.IOException;
@ -100,7 +104,7 @@ public class TaskLifecycleTest
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockIndexerDBCoordinator mdc = null;
private MockIndexerMetadataStorageCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
TaskStorageQueryAdapter tsqa = null;
@ -509,12 +513,12 @@ public class TaskLifecycleTest
return retVal;
}
private static class MockIndexerDBCoordinator extends IndexerDBCoordinator
private static class MockIndexerMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private MockIndexerDBCoordinator()
private MockIndexerMetadataStorageCoordinator()
{
super(null, null, null);
}
@ -561,9 +565,9 @@ public class TaskLifecycleTest
}
}
private static MockIndexerDBCoordinator newMockMDC()
private static MockIndexerMetadataStorageCoordinator newMockMDC()
{
return new MockIndexerDBCoordinator();
return new MockIndexerMetadataStorageCoordinator();
}
private static ServiceEmitter newMockEmitter()

View File

@ -26,12 +26,6 @@ import org.joda.time.Period;
*/
public class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public boolean isCompressZnodes()
{
return false;
}
@Override
public Period getTaskAssignmentTimeout()
{

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>mysql-metadata-storage</artifactId>
<name>mysql-metadata-storage</name>
<description>mysql-metadata-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.33</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,125 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.mysql;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
public class MySQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(MySQLConnector.class);
private static final String PAYLOAD_TYPE = "LONGBLOB";
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private final DBI dbi;
@Inject
public MySQLConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
super(config, dbTables);
final BasicDataSource datasource = getDatasource();
// MySQL driver is classloader isolated as part of the extension
// so we need to help JDBC find the driver
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("com.mysql.jdbc.Driver");
// use double-quotes for quoting columns, so we can write SQL that works with most databases
datasource.setConnectionInitSqls(ImmutableList.of("SET sql_mode='ANSI_QUOTES'"));
this.dbi = new DBI(datasource);
}
@Override
protected String getPayloadType()
{
return PAYLOAD_TYPE;
}
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override
public boolean tableExists(Handle handle, String tableName)
{
return !handle.createQuery("SHOW tables LIKE :tableName")
.bind("tableName", tableName)
.list()
.isEmpty();
}
@Override
protected boolean isTransientException(Throwable e)
{
return e instanceof MySQLTransientException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317)
;
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName,
keyColumn,
valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
@Override
public DBI getDBI() { return dbi; }
}

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.mysql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import java.util.List;
public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
public static final String TYPE = "mysql";
public MySQLMetadataStorageModule()
{
super(TYPE);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding(TYPE)
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding(TYPE)
.to(MySQLConnector.class)
.in(LazySingleton.class);
}
}

View File

@ -0,0 +1 @@
io.druid.metadata.storage.mysql.MySQLMetadataStorageModule

42
pom.xml
View File

@ -18,19 +18,20 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>
<connection>scm:git:ssh://git@github.com/metamx/druid.git</connection>
<developerConnection>scm:git:ssh://git@github.com/metamx/druid.git</developerConnection>
<url>http://www.github.com/metamx/druid</url>
<tag>druid-0.6.131-SNAPSHOT</tag>
<tag>druid-0.7.0-SNAPSHOT</tag>
</scm>
<prerequisites>
@ -41,7 +42,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.26.9</metamx.java-util.version>
<apache.curator.version>2.6.0</apache.curator.version>
<druid.api.version>0.2.14.1</druid.api.version>
<druid.api.version>0.2.17</druid.api.version>
</properties>
<modules>
@ -60,6 +61,8 @@
<module>kafka-eight</module>
<module>rabbitmq</module>
<module>histogram</module>
<module>mysql-metadata-storage</module>
<module>postgresql-metadata-storage</module>
</modules>
<dependencyManagement>
@ -74,7 +77,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.2.11</version>
<version>0.2.12</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -89,7 +92,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
<version>0.0.4</version>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
@ -191,11 +194,6 @@
<artifactId>curator-x-discovery</artifactId>
<version>${apache.curator.version}</version>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
<version>1.3.7</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@ -386,10 +384,15 @@
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.1.2</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
@ -434,6 +437,21 @@
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbynet</artifactId>
<version>10.11.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derbyclient</artifactId>
<version>10.11.1.1</version>
</dependency>
<!-- Test Scope -->
<dependency>

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>postgresql-metadata-storage</artifactId>
<name>postgresql-metadata-storage</name>
<description>postgresql-metadata-storage</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.3-1102-jdbc41</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,117 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.postgresql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.StringMapper;
public class PostgreSQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(PostgreSQLConnector.class);
private static final String PAYLOAD_TYPE = "BYTEA";
private static final String SERIAL_TYPE = "BIGSERIAL";
private final DBI dbi;
@Inject
public PostgreSQLConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
{
super(config, dbTables);
final BasicDataSource datasource = getDatasource();
// PostgreSQL driver is classloader isolated as part of the extension
// so we need to help JDBC find the driver
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName("org.postgresql.Driver");
this.dbi = new DBI(datasource);
}
@Override
protected String getPayloadType() {
return PAYLOAD_TYPE;
}
@Override
protected String getSerialType()
{
return SERIAL_TYPE;
}
@Override
public boolean tableExists(final Handle handle, final String tableName)
{
return !handle.createQuery(
"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE :tableName"
)
.bind("tableName", tableName)
.map(StringMapper.FIRST)
.list()
.isEmpty();
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception
{
return getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;",
tableName,
keyColumn,
valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
@Override
public DBI getDBI() { return dbi; }
}

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.postgresql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import java.util.List;
public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
public static final String TYPE = "postgresql";
public PostgreSQLMetadataStorageModule()
{
super(TYPE);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding(TYPE)
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding(TYPE)
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
}
}

View File

@ -0,0 +1 @@
io.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule

View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
@ -28,7 +29,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.161-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</parent>
<dependencies>
@ -53,10 +54,6 @@
<groupId>org.skife.config</groupId>
<artifactId>config-magic</artifactId>
</dependency>
<dependency>
<groupId>it.uniroma3.mat</groupId>
<artifactId>extendedset</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@ -82,6 +79,14 @@
<groupId>com.davekoelle</groupId>
<artifactId>alphanum</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
</dependency>
<!-- Tests -->
@ -95,10 +100,10 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -26,6 +26,7 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import io.druid.jackson.JacksonModule;
import java.util.Arrays;
import java.util.List;
/**
@ -37,7 +38,7 @@ public class GuiceInjectors
return Guice.createInjector(
new DruidGuiceExtensions(),
new JacksonModule(),
new PropertiesModule("runtime.properties"),
new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")),
new ConfigModule(),
new Module()
{
@ -56,7 +57,7 @@ public class GuiceInjectors
List<Module> theModules = Lists.newArrayList();
theModules.add(new DruidGuiceExtensions());
theModules.add(new JacksonModule());
theModules.add(new PropertiesModule("runtime.properties"));
theModules.add(new PropertiesModule(Arrays.asList("global.runtime.properties", "runtime.properties")));
theModules.add(new ConfigModule());
theModules.add(
new Module()

View File

@ -17,10 +17,11 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
package io.druid.guice;;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.metamx.common.guava.CloseQuietly;
@ -33,7 +34,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.List;
import java.util.Properties;
/**
@ -42,11 +43,11 @@ public class PropertiesModule implements Module
{
private static final Logger log = new Logger(PropertiesModule.class);
private final String propertiesFile;
private final List<String> propertiesFiles;
public PropertiesModule(String propertiesFile)
public PropertiesModule(List<String> propertiesFiles)
{
this.propertiesFile = propertiesFile;
this.propertiesFiles = propertiesFiles;
}
@Override
@ -58,30 +59,32 @@ public class PropertiesModule implements Module
Properties props = new Properties(fileProps);
props.putAll(systemProps);
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
for (String propertiesFile : propertiesFiles) {
InputStream stream = ClassLoader.getSystemResourceAsStream(propertiesFile);
try {
if (stream == null) {
File workingDirectoryFile = new File(systemProps.getProperty("druid.properties.file", propertiesFile));
if (workingDirectoryFile.exists()) {
stream = new BufferedInputStream(new FileInputStream(workingDirectoryFile));
}
}
}
if (stream != null) {
log.info("Loading properties from %s", propertiesFile);
try(Reader reader = new InputStreamReader(stream, Charsets.UTF_8)) {
fileProps.load(reader);
}
catch (IOException e) {
throw Throwables.propagate(e);
if (stream != null) {
log.info("Loading properties from %s", propertiesFile);
try {
fileProps.load(new InputStreamReader(stream, Charsets.UTF_8));
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}
catch (FileNotFoundException e) {
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
finally {
CloseQuietly.close(stream);
catch (FileNotFoundException e) {
log.wtf(e, "This can only happen if the .exists() call lied. That's f'd up.");
}
finally {
CloseQuietly.close(stream);
}
}
binder.bind(Properties.class).toInstance(props);

View File

@ -70,14 +70,14 @@ public abstract class BaseQuery<T> implements Query<T>
}
@Override
public Sequence<T> run(QuerySegmentWalker walker)
public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
{
return run(querySegmentSpec.lookup(this, walker));
return run(querySegmentSpec.lookup(this, walker), context);
}
public Sequence<T> run(QueryRunner<T> runner)
public Sequence<T> run(QueryRunner<T> runner, Map<String, Object> context)
{
return runner.run(this);
return runner.run(this, context);
}
@Override

View File

@ -26,6 +26,7 @@ import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
@ -48,11 +49,11 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
@Override
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query)
public Sequence<T> run(final Query<T> query, Map<String, Object> context)
{
if (query.getContextBySegment(false)) {
final Sequence<T> baseSequence = base.run(query);
final Sequence<T> baseSequence = base.run(query, context);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Arrays.asList(
@ -67,7 +68,6 @@ public class BySegmentQueryRunner<T> implements QueryRunner<T>
)
);
}
return base.run(query);
return base.run(query, context);
}
}

View File

@ -21,6 +21,8 @@ package io.druid.query;
import com.metamx.common.guava.Sequence;
import java.util.Map;
/**
*/
public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
@ -35,14 +37,14 @@ public abstract class BySegmentSkippingQueryRunner<T> implements QueryRunner<T>
}
@Override
public Sequence<T> run(Query<T> query)
public Sequence<T> run(Query<T> query, Map<String, Object> context)
{
if (query.getContextBySegment(false)) {
return baseRunner.run(query);
return baseRunner.run(query, context);
}
return doRun(baseRunner, query);
return doRun(baseRunner, query, context);
}
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query);
protected abstract Sequence<T> doRun(QueryRunner<T> baseRunner, Query<T> query, Map<String, Object> context);
}

Some files were not shown because too many files have changed in this diff Show More