Merge branch 'master' into igalDruid

This commit is contained in:
Igal Levy 2014-03-27 16:19:58 -07:00
commit adfcc3d16f
47 changed files with 2886 additions and 209 deletions

View File

@ -23,7 +23,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -31,16 +30,8 @@ import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector; import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig; import io.druid.db.DbTablesConfig;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -57,38 +48,25 @@ public class ConfigManager
private final Object lock = new Object(); private final Object lock = new Object();
private boolean started = false; private boolean started = false;
private final IDBI dbi; private final DbConnector dbConnector;
private final Supplier<ConfigManagerConfig> config; private final Supplier<ConfigManagerConfig> config;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
private final ConcurrentMap<String, ConfigHolder> watchedConfigs; private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
private final String selectStatement; private final String configTable;
private final String insertStatement;
private volatile ConfigManager.PollingCallable poller; private volatile ConfigManager.PollingCallable poller;
@Inject @Inject
public ConfigManager(IDBI dbi, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config) public ConfigManager(DbConnector dbConnector, Supplier<DbTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{ {
this.dbi = dbi; this.dbConnector = dbConnector;
this.config = config; this.config = config;
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s"); this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
this.watchedConfigs = Maps.newConcurrentMap(); this.watchedConfigs = Maps.newConcurrentMap();
final String configTable = dbTables.get().getConfigTable(); this.configTable = dbTables.get().getConfigTable();
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
this.insertStatement = String.format(
DbConnector.isPostgreSQL(dbi) ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
configTable
);
} }
@LifecycleStart @LifecycleStart
@ -127,7 +105,7 @@ public class ConfigManager
{ {
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) { for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
try { try {
if (entry.getValue().swapIfNew(lookup(entry.getKey()))) { if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
log.info("New value for key[%s] seen.", entry.getKey()); log.info("New value for key[%s] seen.", entry.getKey());
} }
} }
@ -159,7 +137,7 @@ public class ConfigManager
// Multiple of these callables can be submitted at the same time, but the callables themselves // Multiple of these callables can be submitted at the same time, but the callables themselves
// are executed serially, so double check that it hasn't already been populated. // are executed serially, so double check that it hasn't already been populated.
if (!watchedConfigs.containsKey(key)) { if (!watchedConfigs.containsKey(key)) {
byte[] value = lookup(key); byte[] value = dbConnector.lookup(configTable, "name", "payload", key);
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde); ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
watchedConfigs.put(key, holder); watchedConfigs.put(key, holder);
} }
@ -187,45 +165,10 @@ public class ConfigManager
return holder.getReference(); return holder.getReference();
} }
public byte[] lookup(final String key)
{
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("name", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes("payload");
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj) public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj)
{ {
if (obj == null) { if (obj == null || !started) {
return false; return false;
} }
@ -238,20 +181,7 @@ public class ConfigManager
@Override @Override
public Boolean call() throws Exception public Boolean call() throws Exception
{ {
dbi.withHandle( dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertStatement)
.bind("name", key)
.bind("payload", newBytes)
.execute();
return null;
}
}
);
final ConfigHolder configHolder = watchedConfigs.get(key); final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) { if (configHolder != null) {

View File

@ -21,14 +21,18 @@ package io.druid.db;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSource;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -39,58 +43,61 @@ public class DbConnector
{ {
private static final Logger log = new Logger(DbConnector.class); private static final Logger log = new Logger(DbConnector.class);
public static void createSegmentTable(final IDBI dbi, final String segmentTableName) public static void createSegmentTable(final IDBI dbi, final String segmentTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
segmentTableName, segmentTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" + "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TEXT NOT NULL, start TEXT NOT NULL, \"end\" TEXT NOT NULL, partitioned SMALLINT NOT NULL, version TEXT NOT NULL, used BOOLEAN NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));" +
"CREATE INDEX ON %1$s(dataSource);"+ "CREATE INDEX ON %1$s(dataSource);"+
"CREATE INDEX ON %1$s(used);": "CREATE INDEX ON %1$s(used);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))", "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, created_date TINYTEXT NOT NULL, start TINYTEXT NOT NULL, end TINYTEXT NOT NULL, partitioned BOOLEAN NOT NULL, version TINYTEXT NOT NULL, used BOOLEAN NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), INDEX(used), PRIMARY KEY (id))",
segmentTableName segmentTableName
) ),
isPostgreSQL
); );
} }
public static void createRuleTable(final IDBI dbi, final String ruleTableName) public static void createRuleTable(final IDBI dbi, final String ruleTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
ruleTableName, ruleTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+ "CREATE TABLE %1$s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TEXT NOT NULL, payload bytea NOT NULL, PRIMARY KEY (id));"+
"CREATE INDEX ON %1$s(dataSource);": "CREATE INDEX ON %1$s(dataSource);":
"CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))", "CREATE table %s (id VARCHAR(255) NOT NULL, dataSource VARCHAR(255) NOT NULL, version TINYTEXT NOT NULL, payload LONGTEXT NOT NULL, INDEX(dataSource), PRIMARY KEY (id))",
ruleTableName ruleTableName
) ),
isPostgreSQL
); );
} }
public static void createConfigTable(final IDBI dbi, final String configTableName) public static void createConfigTable(final IDBI dbi, final String configTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
configTableName, configTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))": "CREATE TABLE %s (name VARCHAR(255) NOT NULL, payload bytea NOT NULL, PRIMARY KEY(name))":
"CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))", "CREATE table %s (name VARCHAR(255) NOT NULL, payload BLOB NOT NULL, PRIMARY KEY(name))",
configTableName configTableName
) ),
isPostgreSQL
); );
} }
public static void createTaskTable(final IDBI dbi, final String taskTableName) public static void createTaskTable(final IDBI dbi, final String taskTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
taskTableName, taskTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %1$s (\n" "CREATE TABLE %1$s (\n"
+ " id varchar(255) NOT NULL,\n" + " id varchar(255) NOT NULL,\n"
+ " created_date TEXT NOT NULL,\n" + " created_date TEXT NOT NULL,\n"
@ -112,17 +119,18 @@ public class DbConnector
+ " KEY (active, created_date(100))\n" + " KEY (active, created_date(100))\n"
+ ")", + ")",
taskTableName taskTableName
) ),
isPostgreSQL
); );
} }
public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName) public static void createTaskLogTable(final IDBI dbi, final String taskLogsTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
taskLogsTableName, taskLogsTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %1$s (\n" "CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n" + " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n" + " task_id varchar(255) DEFAULT NULL,\n"
@ -138,17 +146,18 @@ public class DbConnector
+ " KEY `task_id` (`task_id`)\n" + " KEY `task_id` (`task_id`)\n"
+ ")", + ")",
taskLogsTableName taskLogsTableName
) ),
isPostgreSQL
); );
} }
public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName) public static void createTaskLockTable(final IDBI dbi, final String taskLocksTableName, boolean isPostgreSQL)
{ {
createTable( createTable(
dbi, dbi,
taskLocksTableName, taskLocksTableName,
String.format( String.format(
isPostgreSQL(dbi) ? isPostgreSQL ?
"CREATE TABLE %1$s (\n" "CREATE TABLE %1$s (\n"
+ " id bigserial NOT NULL,\n" + " id bigserial NOT NULL,\n"
+ " task_id varchar(255) DEFAULT NULL,\n" + " task_id varchar(255) DEFAULT NULL,\n"
@ -164,14 +173,16 @@ public class DbConnector
+ " KEY `task_id` (`task_id`)\n" + " KEY `task_id` (`task_id`)\n"
+ ")", + ")",
taskLocksTableName taskLocksTableName
) ),
isPostgreSQL
); );
} }
public static void createTable( public static void createTable(
final IDBI dbi, final IDBI dbi,
final String tableName, final String tableName,
final String sql final String sql,
final boolean isPostgreSQL
) )
{ {
try { try {
@ -182,7 +193,7 @@ public class DbConnector
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
List<Map<String, Object>> table; List<Map<String, Object>> table;
if ( isPostgreSQL(dbi) ) { if ( isPostgreSQL ) {
table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName)); table = handle.select(String.format("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename LIKE '%s'", tableName));
} else { } else {
table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
@ -205,6 +216,84 @@ public class DbConnector
} }
} }
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws SQLException
{
final String insertOrUpdateStatement = String.format(
isPostgreSQL ?
"BEGIN;\n" +
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
"COMMIT;" :
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName, keyColumn, valueColumn
);
return dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(insertOrUpdateStatement)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
}
);
}
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn, tableName, keyColumn);
return dbi.withHandle(
new HandleCallback<byte[]>()
{
@Override
public byte[] withHandle(Handle handle) throws Exception
{
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(
new ResultSetMapper<byte[]>()
{
@Override
public byte[] map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
return r.getBytes(valueColumn);
}
}
).list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
}
);
}
public static Boolean isPostgreSQL(final IDBI dbi) public static Boolean isPostgreSQL(final IDBI dbi)
{ {
return dbi.withHandle( return dbi.withHandle(
@ -219,7 +308,7 @@ public class DbConnector
); );
} }
public static Boolean isPostgreSQL(final Handle handle) throws SQLException protected static Boolean isPostgreSQL(final Handle handle) throws SQLException
{ {
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL"); return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
} }
@ -227,6 +316,7 @@ public class DbConnector
private final Supplier<DbConnectorConfig> config; private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables; private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi; private final DBI dbi;
private boolean isPostgreSQL = false;
@Inject @Inject
public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables) public DbConnector(Supplier<DbConnectorConfig> config, Supplier<DbTablesConfig> dbTables)
@ -242,6 +332,11 @@ public class DbConnector
return dbi; return dbi;
} }
public boolean isPostgreSQL()
{
return isPostgreSQL;
}
private DataSource getDatasource() private DataSource getDatasource()
{ {
DbConnectorConfig connectorConfig = config.get(); DbConnectorConfig connectorConfig = config.get();
@ -249,7 +344,9 @@ public class DbConnector
BasicDataSource dataSource = new BasicDataSource(); BasicDataSource dataSource = new BasicDataSource();
dataSource.setUsername(connectorConfig.getUser()); dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword()); dataSource.setPassword(connectorConfig.getPassword());
dataSource.setUrl(connectorConfig.getConnectURI()); String uri = connectorConfig.getConnectURI();
isPostgreSQL = uri.startsWith("jdbc:postgresql");
dataSource.setUrl(uri);
if (connectorConfig.isUseValidationQuery()) { if (connectorConfig.isUseValidationQuery()) {
dataSource.setValidationQuery(connectorConfig.getValidationQuery()); dataSource.setValidationQuery(connectorConfig.getValidationQuery());
@ -262,21 +359,21 @@ public class DbConnector
public void createSegmentTable() public void createSegmentTable()
{ {
if (config.get().isCreateTables()) { if (config.get().isCreateTables()) {
createSegmentTable(dbi, dbTables.get().getSegmentsTable()); createSegmentTable(dbi, dbTables.get().getSegmentsTable(), isPostgreSQL);
} }
} }
public void createRulesTable() public void createRulesTable()
{ {
if (config.get().isCreateTables()) { if (config.get().isCreateTables()) {
createRuleTable(dbi, dbTables.get().getRulesTable()); createRuleTable(dbi, dbTables.get().getRulesTable(), isPostgreSQL);
} }
} }
public void createConfigTable() public void createConfigTable()
{ {
if (config.get().isCreateTables()) { if (config.get().isCreateTables()) {
createConfigTable(dbi, dbTables.get().getConfigTable()); createConfigTable(dbi, dbTables.get().getConfigTable(), isPostgreSQL);
} }
} }
@ -284,9 +381,9 @@ public class DbConnector
{ {
if (config.get().isCreateTables()) { if (config.get().isCreateTables()) {
final DbTablesConfig dbTablesConfig = dbTables.get(); final DbTablesConfig dbTablesConfig = dbTables.get();
createTaskTable(dbi, dbTablesConfig.getTasksTable()); createTaskTable(dbi, dbTablesConfig.getTasksTable(), isPostgreSQL);
createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable()); createTaskLogTable(dbi, dbTablesConfig.getTaskLogTable(), isPostgreSQL);
createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable()); createTaskLockTable(dbi, dbTablesConfig.getTaskLockTable(), isPostgreSQL);
} }
} }
} }

View File

@ -66,6 +66,6 @@ public class JacksonConfigManagerModule implements Module
} }
); );
return new ConfigManager(dbConnector.getDBI(), dbTables, config); return new ConfigManager(dbConnector, dbTables, config);
} }
} }

View File

@ -36,8 +36,6 @@ druid.processing.numThreads=1
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}] druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]
``` ```
Note: This will spin up a Historical node with the local filesystem as deep storage.
Production Configs Production Configs
------------------ ------------------
These production configs are using S3 as a deep store. These production configs are using S3 as a deep store.

View File

@ -66,12 +66,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getPathForHadoop(String dataSource) public String getPathForHadoop(String dataSource)
{ {
try { return new Path(config.getStorageDirectory(), dataSource).toUri().toString();
return new Path(config.getStorageDirectory(), dataSource).makeQualified(FileSystem.get(hadoopConfig)).toString();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} }
@Override @Override

View File

@ -60,24 +60,24 @@ public class IndexerDBCoordinator
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final DbTablesConfig dbTables; private final DbTablesConfig dbTables;
private final IDBI dbi; private final DbConnector dbConnector;
@Inject @Inject
public IndexerDBCoordinator( public IndexerDBCoordinator(
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
DbTablesConfig dbTables, DbTablesConfig dbTables,
IDBI dbi DbConnector dbConnector
) )
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.dbTables = dbTables; this.dbTables = dbTables;
this.dbi = dbi; this.dbConnector = dbConnector;
} }
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException throws IOException
{ {
final VersionedIntervalTimeline<String, DataSegment> timeline = dbi.withHandle( final VersionedIntervalTimeline<String, DataSegment> timeline = dbConnector.getDBI().withHandle(
new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>() new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>()
{ {
@Override @Override
@ -142,7 +142,7 @@ public class IndexerDBCoordinator
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
{ {
return dbi.inTransaction( return dbConnector.getDBI().inTransaction(
new TransactionCallback<Set<DataSegment>>() new TransactionCallback<Set<DataSegment>>()
{ {
@Override @Override
@ -180,7 +180,7 @@ public class IndexerDBCoordinator
try { try {
handle.createStatement( handle.createStatement(
String.format( String.format(
DbConnector.isPostgreSQL(handle) ? dbConnector.isPostgreSQL() ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)": + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
@ -200,8 +200,6 @@ public class IndexerDBCoordinator
.execute(); .execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) { } catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
@ -234,7 +232,7 @@ public class IndexerDBCoordinator
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{ {
dbi.inTransaction( dbConnector.getDBI().inTransaction(
new TransactionCallback<Void>() new TransactionCallback<Void>()
{ {
@Override @Override
@ -252,7 +250,7 @@ public class IndexerDBCoordinator
public void deleteSegments(final Set<DataSegment> segments) throws IOException public void deleteSegments(final Set<DataSegment> segments) throws IOException
{ {
dbi.inTransaction( dbConnector.getDBI().inTransaction(
new TransactionCallback<Void>() new TransactionCallback<Void>()
{ {
@Override @Override
@ -295,7 +293,7 @@ public class IndexerDBCoordinator
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval) public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{ {
List<DataSegment> matchingSegments = dbi.withHandle( List<DataSegment> matchingSegments = dbConnector.getDBI().withHandle(
new HandleCallback<List<DataSegment>>() new HandleCallback<List<DataSegment>>()
{ {
@Override @Override
@ -303,7 +301,7 @@ public class IndexerDBCoordinator
{ {
return handle.createQuery( return handle.createQuery(
String.format( String.format(
DbConnector.isPostgreSQL(handle)? dbConnector.isPostgreSQL() ?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false": "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = false":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false", "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = false",
dbTables.getSegmentsTable() dbTables.getSegmentsTable()

View File

@ -91,6 +91,10 @@
<artifactId>easymock</artifactId> <artifactId>easymock</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -37,12 +37,12 @@ public class HLLCV0 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer(); private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{0, 0, 0}).asReadOnlyBuffer();
public HLLCV0() protected HLLCV0()
{ {
super(defaultStorageBuffer); super(defaultStorageBuffer);
} }
public HLLCV0(ByteBuffer buffer) protected HLLCV0(ByteBuffer buffer)
{ {
super(buffer); super(buffer);
} }

View File

@ -44,12 +44,12 @@ public class HLLCV1 extends HyperLogLogCollector
private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0}) private static final ByteBuffer defaultStorageBuffer = ByteBuffer.wrap(new byte[]{VERSION, 0, 0, 0, 0, 0, 0})
.asReadOnlyBuffer(); .asReadOnlyBuffer();
public HLLCV1() protected HLLCV1()
{ {
super(defaultStorageBuffer); super(defaultStorageBuffer);
} }
public HLLCV1(ByteBuffer buffer) protected HLLCV1(ByteBuffer buffer)
{ {
super(buffer); super(buffer);
} }

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes; import com.google.common.primitives.UnsignedBytes;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -57,7 +56,6 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d; public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d;
public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS; public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS;
private static final Logger log = new Logger(HyperLogLogCollector.class);
private static final int bucketMask = 0x7ff; private static final int bucketMask = 0x7ff;
private static final int minBytesRequired = 10; private static final int minBytesRequired = 10;
private static final int bitsPerBucket = 4; private static final int bitsPerBucket = 4;
@ -202,9 +200,9 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return buffer.remaining() != NUM_BYTES_FOR_BUCKETS; return buffer.remaining() != NUM_BYTES_FOR_BUCKETS;
} }
private volatile ByteBuffer storageBuffer; private ByteBuffer storageBuffer;
private volatile int initPosition; private int initPosition;
private volatile Double estimatedCardinality; private Double estimatedCardinality;
public HyperLogLogCollector(ByteBuffer byteBuffer) public HyperLogLogCollector(ByteBuffer byteBuffer)
{ {
@ -332,7 +330,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
if (getRegisterOffset() < other.getRegisterOffset()) { if (getRegisterOffset() < other.getRegisterOffset()) {
// "Swap" the buffers so that we are folding into the one with the higher offset // "Swap" the buffers so that we are folding into the one with the higher offset
ByteBuffer newStorage = ByteBuffer.allocate(other.storageBuffer.remaining()); ByteBuffer newStorage = ByteBuffer.allocateDirect(other.storageBuffer.remaining());
newStorage.put(other.storageBuffer.asReadOnlyBuffer()); newStorage.put(other.storageBuffer.asReadOnlyBuffer());
newStorage.clear(); newStorage.clear();
@ -342,8 +340,8 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
initPosition = 0; initPosition = 0;
} }
ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer(); final ByteBuffer otherBuffer = other.storageBuffer.asReadOnlyBuffer();
byte otherOffset = other.getRegisterOffset(); final byte otherOffset = other.getRegisterOffset();
if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { if (storageBuffer.remaining() != getNumBytesForDenseStorage()) {
convertToDenseStorage(); convertToDenseStorage();
@ -352,56 +350,50 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
byte myOffset = getRegisterOffset(); byte myOffset = getRegisterOffset();
short numNonZero = getNumNonZeroRegisters(); short numNonZero = getNumNonZeroRegisters();
int offsetDiff = myOffset - otherOffset; final int offsetDiff = myOffset - otherOffset;
if (offsetDiff < 0) { if (offsetDiff < 0) {
throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff); throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff);
} }
byte otherOverflowValue = other.getMaxOverflowValue(); add(other.getMaxOverflowRegister(), other.getMaxOverflowValue());
short otherOverflowRegister = other.getMaxOverflowRegister();
add(otherOverflowRegister, otherOverflowValue);
int myPayloadStart = getPayloadBytePosition(); final int myPayloadStart = getPayloadBytePosition();
otherBuffer.position(other.getPayloadBytePosition()); otherBuffer.position(other.getPayloadBytePosition());
if (isSparse(otherBuffer)) { if (isSparse(otherBuffer)) {
while (otherBuffer.hasRemaining()) { while(otherBuffer.hasRemaining()) {
short position = otherBuffer.getShort(); final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes();
int payloadStartPosition = position - other.getNumHeaderBytes();
numNonZero += mergeAndStoreByteRegister( numNonZero += mergeAndStoreByteRegister(
storageBuffer,
myPayloadStart + payloadStartPosition, myPayloadStart + payloadStartPosition,
offsetDiff, offsetDiff,
otherBuffer.get() otherBuffer.get()
); );
if (numNonZero == NUM_BUCKETS) { }
myOffset += 1; if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets(); numNonZero = decrementBuckets();
setRegisterOffset(myOffset); setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero); setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
} }
} else { // dense } else { // dense
int position = getPayloadBytePosition(); int position = getPayloadBytePosition();
while (otherBuffer.hasRemaining()) { while (otherBuffer.hasRemaining()) {
numNonZero += mergeAndStoreByteRegister( numNonZero += mergeAndStoreByteRegister(
storageBuffer,
position, position,
offsetDiff, offsetDiff,
otherBuffer.get() otherBuffer.get()
); );
if (numNonZero == NUM_BUCKETS) {
myOffset += 1;
numNonZero = decrementBuckets();
setRegisterOffset(myOffset);
setNumNonZeroRegisters(numNonZero);
offsetDiff = myOffset - otherOffset;
}
position++; position++;
} }
if (numNonZero == NUM_BUCKETS) {
numNonZero = decrementBuckets();
setRegisterOffset(++myOffset);
setNumNonZeroRegisters(numNonZero);
}
} }
setRegisterOffset(myOffset); // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented
setNumNonZeroRegisters(numNonZero); setNumNonZeroRegisters(numNonZero);
return this; return this;
@ -531,15 +523,15 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private short decrementBuckets() private short decrementBuckets()
{ {
final int startPosition = getPayloadBytePosition();
short count = 0; short count = 0;
int startPosition = getPayloadBytePosition();
for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) { for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) {
byte val = (byte) (storageBuffer.get(i) - 0x11); final byte val = (byte) (storageBuffer.get(i) - 0x11);
if ((val & 0xf0) != 0) { if ((val & 0xf0) != 0) {
count++; ++count;
} }
if ((val & 0x0f) != 0) { if ((val & 0x0f) != 0) {
count++; ++count;
} }
storageBuffer.put(i, val); storageBuffer.put(i, val);
} }
@ -548,7 +540,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToMutableByteBuffer() private void convertToMutableByteBuffer()
{ {
ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining()); ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(storageBuffer.remaining());
tmpBuffer.put(storageBuffer.asReadOnlyBuffer()); tmpBuffer.put(storageBuffer.asReadOnlyBuffer());
tmpBuffer.position(0); tmpBuffer.position(0);
storageBuffer = tmpBuffer; storageBuffer = tmpBuffer;
@ -557,7 +549,7 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
private void convertToDenseStorage() private void convertToDenseStorage()
{ {
ByteBuffer tmpBuffer = ByteBuffer.wrap(new byte[getNumBytesForDenseStorage()]); ByteBuffer tmpBuffer = ByteBuffer.allocateDirect(getNumBytesForDenseStorage());
// put header // put header
setVersion(tmpBuffer); setVersion(tmpBuffer);
setRegisterOffset(tmpBuffer, getRegisterOffset()); setRegisterOffset(tmpBuffer, getRegisterOffset());
@ -614,39 +606,39 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
* *
* @return * @return
*/ */
private int mergeAndStoreByteRegister( private static int mergeAndStoreByteRegister(
int position, final ByteBuffer storageBuffer,
int offsetDiff, final int position,
byte byteToAdd final int offsetDiff,
final byte byteToAdd
) )
{ {
if (byteToAdd == 0) { if (byteToAdd == 0) {
return 0; return 0;
} }
byte currVal = storageBuffer.get(position); final byte currVal = storageBuffer.get(position);
int upperNibble = currVal & 0xf0; final int upperNibble = currVal & 0xf0;
int lowerNibble = currVal & 0x0f; final int lowerNibble = currVal & 0x0f;
// subtract the differences so that the nibbles align // subtract the differences so that the nibbles align
int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket); final int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << bitsPerBucket);
int otherLower = (byteToAdd & 0x0f) - offsetDiff; final int otherLower = (byteToAdd & 0x0f) - offsetDiff;
final int newUpper = Math.max(upperNibble, otherUpper); final int newUpper = Math.max(upperNibble, otherUpper);
final int newLower = Math.max(lowerNibble, otherLower); final int newLower = Math.max(lowerNibble, otherLower);
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
int numNoLongerZero = 0; int numNoLongerZero = 0;
if (upperNibble == 0 && newUpper > 0) { if (upperNibble == 0 && newUpper > 0) {
++numNoLongerZero; ++numNoLongerZero;
} }
if (lowerNibble == 0 && newLower > 0) { if (lowerNibble == 0 && newLower > 0) {
++numNoLongerZero; ++numNoLongerZero;
} }
storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff));
return numNoLongerZero; return numNoLongerZero;
} }

View File

@ -66,10 +66,13 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
@Override @Override
public Object get(ByteBuffer buf, int position) public Object get(ByteBuffer buf, int position)
{ {
ByteBuffer dataCopyBuffer = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage()); final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
ByteBuffer dataCopyBuffer = ByteBuffer.allocateDirect(size);
ByteBuffer mutationBuffer = buf.duplicate(); ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position); mutationBuffer.position(position);
mutationBuffer.get(dataCopyBuffer.array()); mutationBuffer.limit(position + size);
dataCopyBuffer.put(mutationBuffer);
dataCopyBuffer.rewind();
return HyperLogLogCollector.makeCollector(dataCopyBuffer); return HyperLogLogCollector.makeCollector(dataCopyBuffer);
} }

View File

@ -121,9 +121,7 @@ public class HyperUniquesSerde extends ComplexMetricSerde
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes) public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{ {
buffer.limit(buffer.position() + numBytes); buffer.limit(buffer.position() + numBytes);
return HyperLogLogCollector.makeCollector(buffer);
int remaining = buffer.remaining();
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
} }
@Override @Override

View File

@ -0,0 +1,203 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation.hyperloglog;
import com.google.caliper.Param;
import com.google.caliper.Runner;
import com.google.caliper.SimpleBenchmark;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
public class HyperLogLogCollectorBenchmark extends SimpleBenchmark
{
private final HashFunction fn = Hashing.murmur3_128();
private final List<HyperLogLogCollector> collectors = Lists.newLinkedList();
@Param({"true"}) boolean targetIsDirect;
@Param({"default", "random", "0"}) String alignment;
boolean alignSource;
boolean alignTarget;
int CACHE_LINE = 64;
ByteBuffer chunk;
final int count = 100_000;
int[] positions = new int[count];
int[] sizes = new int[count];
@Override
protected void setUp() throws Exception
{
boolean random = false;
Random rand = new Random(0);
int defaultOffset = 0;
switch(alignment) {
case "default":
alignSource = false;
alignTarget = false;
break;
case "random":
random = true;
break;
default:
defaultOffset = Integer.parseInt(alignment);
}
int val = 0;
chunk = ByteBuffers.allocateAlignedByteBuffer(
(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + CACHE_LINE
+ CACHE_LINE) * count, CACHE_LINE
);
int pos = 0;
for(int i = 0; i < count; ++i) {
HyperLogLogCollector c = HyperLogLogCollector.makeLatestCollector();
for(int k = 0; k < 40; ++k) c.add(fn.hashInt(++val).asBytes());
final ByteBuffer sparseHeapCopy = c.toByteBuffer();
int size = sparseHeapCopy.remaining();
final ByteBuffer buf;
final int offset = random ? (int)(rand.nextDouble() * 64) : defaultOffset;
if(alignSource && (pos % CACHE_LINE) != offset) {
pos += (pos % CACHE_LINE) < offset ? offset - (pos % CACHE_LINE) : (CACHE_LINE + offset - pos % CACHE_LINE);
}
positions[i] = pos;
sizes[i] = size;
chunk.limit(pos + size);
chunk.position(pos);
buf = chunk.duplicate();
buf.mark();
pos += size;
buf.put(sparseHeapCopy);
buf.reset();
collectors.add(HyperLogLogCollector.makeCollector(buf));
}
}
private ByteBuffer allocateEmptyHLLBuffer(boolean direct, boolean aligned, int offset)
{
final int size = HyperLogLogCollector.getLatestNumBytesForDenseStorage();
final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
final ByteBuffer buf;
if(direct) {
if(aligned) {
buf = ByteBuffers.allocateAlignedByteBuffer(size + offset, CACHE_LINE);
buf.position(offset);
buf.mark();
buf.limit(size + offset);
} else {
buf = ByteBuffer.allocateDirect(size);
buf.mark();
buf.limit(size);
}
buf.put(EMPTY_BYTES);
buf.reset();
}
else {
buf = ByteBuffer.allocate(size);
buf.limit(size);
buf.put(EMPTY_BYTES);
buf.rewind();
}
return buf;
}
public double timeFold(int reps) throws Exception
{
final ByteBuffer buf = allocateEmptyHLLBuffer(targetIsDirect, alignTarget, 0);
for (int k = 0; k < reps; ++k) {
for(int i = 0; i < count; ++i) {
final int pos = positions[i];
final int size = sizes[i];
HyperLogLogCollector.makeCollector(
(ByteBuffer) buf.duplicate().position(0).limit(
HyperLogLogCollector.getLatestNumBytesForDenseStorage()
)
).fold(
HyperLogLogCollector.makeCollector(
(ByteBuffer) chunk.duplicate().limit(pos + size).position(pos)
)
);
}
}
return HyperLogLogCollector.makeCollector(buf.duplicate()).estimateCardinality();
}
public static void main(String[] args) throws Exception {
Runner.main(HyperLogLogCollectorBenchmark.class, args);
}
}
class ByteBuffers {
private static final Unsafe UNSAFE;
private static final long ADDRESS_OFFSET;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
ADDRESS_OFFSET = UNSAFE.objectFieldOffset(Buffer.class.getDeclaredField("address"));
} catch (Exception e) {
throw new RuntimeException("Cannot access Unsafe methods", e);
}
}
public static long getAddress(ByteBuffer buf) {
return UNSAFE.getLong(buf, ADDRESS_OFFSET);
}
public static ByteBuffer allocateAlignedByteBuffer(int capacity, int align) {
Preconditions.checkArgument(Long.bitCount(align) == 1, "Alignment must be a power of 2");
final ByteBuffer buf = ByteBuffer.allocateDirect(capacity + align);
long address = getAddress(buf);
if ((address & (align - 1)) == 0) {
buf.limit(capacity);
} else {
int offset = (int) (align - (address & (align - 1)));
buf.position(offset);
buf.limit(offset + capacity);
}
return buf.slice();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -70,17 +71,29 @@ public class HyperLogLogCollectorTest
} }
} }
// @Test
/**
* This is a very long running test, disabled by default.
* It is meant to catch issues when combining a large numer of HLL objects.
*
* It compares adding all the values to one HLL vs.
* splitting up values into HLLs of 100 values each, and folding those HLLs into a single main HLL.
*
* When reaching very large cardinalities (>> 50,000,000), offsets are mismatched between the main HLL and the ones
* with 100 values, requiring a floating max as described in
* http://druid.io/blog/2014/02/18/hyperloglog-optimizations-for-real-world-systems.html
*/
@Ignore @Test
public void testHighCardinalityRollingFold() throws Exception public void testHighCardinalityRollingFold() throws Exception
{ {
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector(); final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
final HyperLogLogCollector simple = HyperLogLogCollector.makeLatestCollector(); final HyperLogLogCollector simple = HyperLogLogCollector.makeLatestCollector();
int count;
MessageDigest md = MessageDigest.getInstance("SHA-1"); MessageDigest md = MessageDigest.getInstance("SHA-1");
HyperLogLogCollector tmp = HyperLogLogCollector.makeLatestCollector(); HyperLogLogCollector tmp = HyperLogLogCollector.makeLatestCollector();
for (count = 0; count < 5000000; ++count) { int count;
for (count = 0; count < 100_000_000; ++count) {
md.update(Integer.toString(count).getBytes()); md.update(Integer.toString(count).getBytes());
byte[] hashed = fn.hashBytes(md.digest()).asBytes(); byte[] hashed = fn.hashBytes(md.digest()).asBytes();
@ -110,14 +123,14 @@ public class HyperLogLogCollectorTest
Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05); Assert.assertEquals(n, rolling.estimateCardinality(), n * 0.05);
} }
//@Test @Ignore @Test
public void testHighCardinalityRollingFold2() throws Exception public void testHighCardinalityRollingFold2() throws Exception
{ {
final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector(); final HyperLogLogCollector rolling = HyperLogLogCollector.makeLatestCollector();
int count; int count;
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for (count = 0; count < 5000000; ++count) { for (count = 0; count < 50_000_000; ++count) {
HyperLogLogCollector theCollector = HyperLogLogCollector.makeLatestCollector(); HyperLogLogCollector theCollector = HyperLogLogCollector.makeLatestCollector();
theCollector.add(fn.hashLong(count).asBytes()); theCollector.add(fn.hashLong(count).asBytes());
rolling.fold(theCollector); rolling.fold(theCollector);

View File

@ -0,0 +1,12 @@
all : druid_demo.pdf
clean :
@rm -f *.aux *.bbl *.blg *.log
%.tex : %.bib
%.pdf : %.tex %.bib
lualatex $(*F)
bibtex $(*F)
lualatex $(*F)
lualatex $(*F)

View File

@ -0,0 +1,54 @@
\relax
\providecommand\HyperFirstAtBeginDocument{\AtBeginDocument}
\HyperFirstAtBeginDocument{\ifx\hyper@anchor\@undefined
\global\let\oldcontentsline\contentsline
\gdef\contentsline#1#2#3#4{\oldcontentsline{#1}{#2}{#3}}
\global\let\oldnewlabel\newlabel
\gdef\newlabel#1#2{\newlabelxx{#1}#2}
\gdef\newlabelxx#1#2#3#4#5#6{\oldnewlabel{#1}{{#2}{#3}}}
\AtEndDocument{\ifx\hyper@anchor\@undefined
\let\contentsline\oldcontentsline
\let\newlabel\oldnewlabel
\fi}
\fi}
\global\let\hyper@last\relax
\gdef\HyperFirstAtBeginDocument#1{#1}
\providecommand\HyField@AuxAddToFields[1]{}
\citation{hunt2010zookeeper}
\@writefile{toc}{\contentsline {section}{\numberline {1}Introduction}{1}{section.1}}
\@writefile{toc}{\contentsline {subsection}{\numberline {1.1}The Need for Druid}{1}{subsection.1.1}}
\@writefile{toc}{\contentsline {section}{\numberline {2}Architecture}{1}{section.2}}
\citation{abadi2008column}
\@writefile{lof}{\contentsline {figure}{\numberline {1}{\ignorespaces An overview of a Druid cluster and the flow of data through the cluster.}}{2}{figure.1}}
\newlabel{fig:cluster}{{1}{2}{An overview of a Druid cluster and the flow of data through the cluster}{figure.1}{}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.1}Real-time Nodes}{2}{subsection.2.1}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.2}Historical Nodes}{2}{subsection.2.2}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.3}Broker Nodes}{2}{subsection.2.3}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.4}Coordinator Nodes}{2}{subsection.2.4}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.5}Query Processing}{2}{subsection.2.5}}
\citation{tomasic1993performance}
\citation{colantonio2010concise}
\@writefile{lot}{\contentsline {table}{\numberline {1}{\ignorespaces Sample sales data set.}}{3}{table.1}}
\newlabel{tab:sample_data}{{1}{3}{Sample sales data set}{table.1}{}}
\@writefile{toc}{\contentsline {subsection}{\numberline {2.6}Query Capabilities}{3}{subsection.2.6}}
\@writefile{lof}{\contentsline {figure}{\numberline {2}{\ignorespaces Query latencies of production data sources.}}{3}{figure.2}}
\newlabel{fig:query_latency}{{2}{3}{Query latencies of production data sources}{figure.2}{}}
\@writefile{lof}{\contentsline {figure}{\numberline {3}{\ignorespaces Druid \& MySQL benchmarks -- 100GB TPC-H data.}}{3}{figure.3}}
\newlabel{fig:tpch_100gb}{{3}{3}{Druid \& MySQL benchmarks -- 100GB TPC-H data}{figure.3}{}}
\@writefile{toc}{\contentsline {section}{\numberline {3}Performance}{3}{section.3}}
\@writefile{toc}{\contentsline {subsection}{\numberline {3.1}Query Performance}{3}{subsection.3.1}}
\bibstyle{abbrv}
\bibdata{druid_demo}
\bibcite{abadi2008column}{1}
\bibcite{colantonio2010concise}{2}
\bibcite{hunt2010zookeeper}{3}
\bibcite{tomasic1993performance}{4}
\@writefile{lof}{\contentsline {figure}{\numberline {4}{\ignorespaces Combined cluster ingestion rates.}}{4}{figure.4}}
\newlabel{fig:ingestion_rate}{{4}{4}{Combined cluster ingestion rates}{figure.4}{}}
\@writefile{toc}{\contentsline {subsection}{\numberline {3.2}Data Ingestion Performance}{4}{subsection.3.2}}
\@writefile{toc}{\contentsline {section}{\numberline {4}Demonstration Details}{4}{section.4}}
\@writefile{toc}{\contentsline {subsection}{\numberline {4.1}Setup}{4}{subsection.4.1}}
\@writefile{toc}{\contentsline {subsection}{\numberline {4.2}Goals}{4}{subsection.4.2}}
\@writefile{toc}{\contentsline {section}{\numberline {5}Acknowledgments}{4}{section.5}}
\@writefile{toc}{\contentsline {section}{\numberline {6}Additional Authors}{4}{section.6}}
\@writefile{toc}{\contentsline {section}{\numberline {7}References}{4}{section.7}}

View File

@ -0,0 +1,27 @@
\begin{thebibliography}{1}
\bibitem{abadi2008column}
D.~J. Abadi, S.~R. Madden, and N.~Hachem.
\newblock Column-stores vs. row-stores: How different are they really?
\newblock In {\em Proceedings of the 2008 ACM SIGMOD international conference
on Management of data}, pages 967--980. ACM, 2008.
\bibitem{colantonio2010concise}
A.~Colantonio and R.~Di~Pietro.
\newblock Concise: Compressed ncomposable integer set.
\newblock {\em Information Processing Letters}, 110(16):644--650, 2010.
\bibitem{hunt2010zookeeper}
P.~Hunt, M.~Konar, F.~P. Junqueira, and B.~Reed.
\newblock Zookeeper: Wait-free coordination for internet-scale systems.
\newblock In {\em USENIX ATC}, volume~10, 2010.
\bibitem{tomasic1993performance}
A.~Tomasic and H.~Garcia-Molina.
\newblock Performance of inverted indices in shared-nothing distributed text
document information retrieval systems.
\newblock In {\em Parallel and Distributed Information Systems, 1993.,
Proceedings of the Second International Conference on}, pages 8--17. IEEE,
1993.
\end{thebibliography}

View File

@ -0,0 +1,420 @@
@article{cattell2011scalable,
title={Scalable SQL and NoSQL data stores},
author={Cattell, Rick},
journal={ACM SIGMOD Record},
volume={39},
number={4},
pages={12--27},
year={2011},
publisher={ACM}
}
@article{chang2008bigtable,
title={Bigtable: A distributed storage system for structured data},
author={Chang, Fay and Dean, Jeffrey and Ghemawat, Sanjay and Hsieh, Wilson C and Wallach, Deborah A and Burrows, Mike and Chandra, Tushar and Fikes, Andrew and Gruber, Robert E},
journal={ACM Transactions on Computer Systems (TOCS)},
volume={26},
number={2},
pages={4},
year={2008},
publisher={ACM}
}
@inproceedings{decandia2007dynamo,
title={Dynamo: amazon's highly available key-value store},
author={DeCandia, Giuseppe and Hastorun, Deniz and Jampani, Madan and Kakulapati, Gunavardhan and Lakshman, Avinash and Pilchin, Alex and Sivasubramanian, Swaminathan and Vosshall, Peter and Vogels, Werner},
booktitle={ACM SIGOPS Operating Systems Review},
volume={41},
number={6},
pages={205--220},
year={2007},
organization={ACM}
}
@inproceedings{abadi2008column,
title={Column-Stores vs. Row-Stores: How different are they really?},
author={Abadi, Daniel J and Madden, Samuel R and Hachem, Nabil},
booktitle={Proceedings of the 2008 ACM SIGMOD international conference on Management of data},
pages={967--980},
year={2008},
organization={ACM}
}
@inproceedings{bear2012vertica,
title={The vertica database: SQL RDBMS for managing big data},
author={Bear, Chuck and Lamb, Andrew and Tran, Nga},
booktitle={Proceedings of the 2012 workshop on Management of big data systems},
pages={37--38},
year={2012},
organization={ACM}
}
@article{lakshman2010cassandra,
title={Cassandra—A decentralized structured storage system},
author={Lakshman, Avinash and Malik, Prashant},
journal={Operating systems review},
volume={44},
number={2},
pages={35},
year={2010}
}
@article{melnik2010dremel,
title={Dremel: interactive analysis of web-scale datasets},
author={Melnik, Sergey and Gubarev, Andrey and Long, Jing Jing and Romer, Geoffrey and Shivakumar, Shiva and Tolton, Matt and Vassilakis, Theo},
journal={Proceedings of the VLDB Endowment},
volume={3},
number={1-2},
pages={330--339},
year={2010},
publisher={VLDB Endowment}
}
@article{hall2012processing,
title={Processing a trillion cells per mouse click},
author={Hall, Alexander and Bachmann, Olaf and B{\"u}ssow, Robert and G{\u{a}}nceanu, Silviu and Nunkesser, Marc},
journal={Proceedings of the VLDB Endowment},
volume={5},
number={11},
pages={1436--1446},
year={2012},
publisher={VLDB Endowment}
}
@inproceedings{shvachko2010hadoop,
title={The hadoop distributed file system},
author={Shvachko, Konstantin and Kuang, Hairong and Radia, Sanjay and Chansler, Robert},
booktitle={Mass Storage Systems and Technologies (MSST), 2010 IEEE 26th Symposium on},
pages={1--10},
year={2010},
organization={IEEE}
}
@article{colantonio2010concise,
title={Concise: Compressed nComposable Integer Set},
author={Colantonio, Alessandro and Di Pietro, Roberto},
journal={Information Processing Letters},
volume={110},
number={16},
pages={644--650},
year={2010},
publisher={Elsevier}
}
@inproceedings{stonebraker2005c,
title={C-store: a column-oriented DBMS},
author={Stonebraker, Mike and Abadi, Daniel J and Batkin, Adam and Chen, Xuedong and Cherniack, Mitch and Ferreira, Miguel and Lau, Edmond and Lin, Amerson and Madden, Sam and O'Neil, Elizabeth and others},
booktitle={Proceedings of the 31st international conference on Very large data bases},
pages={553--564},
year={2005},
organization={VLDB Endowment}
}
@inproceedings{engle2012shark,
title={Shark: fast data analysis using coarse-grained distributed memory},
author={Engle, Cliff and Lupher, Antonio and Xin, Reynold and Zaharia, Matei and Franklin, Michael J and Shenker, Scott and Stoica, Ion},
booktitle={Proceedings of the 2012 international conference on Management of Data},
pages={689--692},
year={2012},
organization={ACM}
}
@inproceedings{zaharia2012discretized,
title={Discretized streams: an efficient and fault-tolerant model for stream processing on large clusters},
author={Zaharia, Matei and Das, Tathagata and Li, Haoyuan and Shenker, Scott and Stoica, Ion},
booktitle={Proceedings of the 4th USENIX conference on Hot Topics in Cloud Computing},
pages={10--10},
year={2012},
organization={USENIX Association}
}
@misc{marz2013storm,
author = {Marz, Nathan},
title = {Storm: Distributed and Fault-Tolerant Realtime Computation},
month = {February},
year = {2013},
howpublished = "\url{http://storm-project.net/}"
}
@misc{tschetter2011druid,
author = {Eric Tschetter},
title = {Introducing Druid: Real-Time Analytics at a Billion Rows Per Second},
month = {April},
year = {2011},
howpublished = "\url{http://druid.io/blog/2011/04/30/introducing-druid.html}"
}
@article{farber2012sap,
title={SAP HANA database: data management for modern business applications},
author={F{\"a}rber, Franz and Cha, Sang Kyun and Primsch, J{\"u}rgen and Bornh{\"o}vd, Christof and Sigg, Stefan and Lehner, Wolfgang},
journal={ACM Sigmod Record},
volume={40},
number={4},
pages={45--51},
year={2012},
publisher={ACM}
}
@misc{voltdb2010voltdb,
title={VoltDB Technical Overview},
author={VoltDB, LLC},
year={2010},
howpublished = "\url{https://voltdb.com/}"
}
@inproceedings{macnicol2004sybase,
title={Sybase IQ multiplex-designed for analytics},
author={MacNicol, Roger and French, Blaine},
booktitle={Proceedings of the Thirtieth international conference on Very large data bases-Volume 30},
pages={1227--1230},
year={2004},
organization={VLDB Endowment}
}
@inproceedings{singh2011introduction,
title={Introduction to the IBM Netezza warehouse appliance},
author={Singh, Malcolm and Leonhardi, Ben},
booktitle={Proceedings of the 2011 Conference of the Center for Advanced Studies on Collaborative Research},
pages={385--386},
year={2011},
organization={IBM Corp.}
}
@inproceedings{miner2012unified,
title={Unified analytics platform for big data},
author={Miner, Donald},
booktitle={Proceedings of the WICSA/ECSA 2012 Companion Volume},
pages={176--176},
year={2012},
organization={ACM}
}
@inproceedings{fink2012distributed,
title={Distributed computation on dynamo-style distributed storage: riak pipe},
author={Fink, Bryan},
booktitle={Proceedings of the eleventh ACM SIGPLAN workshop on Erlang workshop},
pages={43--50},
year={2012},
organization={ACM}
}
@misc{paraccel2013,
key = {ParAccel Analytic Database},
title = {ParAccel Analytic Database},
month = {March},
year = {2013},
howpublished = "\url{http://www.paraccel.com/resources/Datasheets/ParAccel-Core-Analytic-Database.pdf}"
}
@misc{cloudera2013,
key = {Cloudera Impala},
title = {Cloudera Impala},
month = {March},
year = {2013},
url = {},
howpublished = "\url{http://blog.cloudera.com/blog}"
}
@inproceedings{hunt2010zookeeper,
title={ZooKeeper: Wait-free coordination for Internet-scale systems},
author={Hunt, Patrick and Konar, Mahadev and Junqueira, Flavio P and Reed, Benjamin},
booktitle={USENIX ATC},
volume={10},
year={2010}
}
@inproceedings{kreps2011kafka,
title={Kafka: A distributed messaging system for log processing},
author={Kreps, Jay and Narkhede, Neha and Rao, Jun},
booktitle={Proceedings of 6th International Workshop on Networking Meets Databases (NetDB), Athens, Greece},
year={2011}
}
@misc{liblzf2013,
title = {LibLZF},
key = {LibLZF},
month = {March},
year = {2013},
howpublished = "\url{http://freecode.com/projects/liblzf}"
}
@inproceedings{tomasic1993performance,
title={Performance of inverted indices in shared-nothing distributed text document information retrieval systems},
author={Tomasic, Anthony and Garcia-Molina, Hector},
booktitle={Parallel and Distributed Information Systems, 1993., Proceedings of the Second International Conference on},
pages={8--17},
year={1993},
organization={IEEE}
}
@inproceedings{antoshenkov1995byte,
title={Byte-aligned bitmap compression},
author={Antoshenkov, Gennady},
booktitle={Data Compression Conference, 1995. DCC'95. Proceedings},
pages={476},
year={1995},
organization={IEEE}
}
@inproceedings{van2011memory,
title={A memory efficient reachability data structure through bit vector compression},
author={van Schaik, Sebastiaan J and de Moor, Oege},
booktitle={Proceedings of the 2011 international conference on Management of data},
pages={913--924},
year={2011},
organization={ACM}
}
@inproceedings{o1993lru,
title={The LRU-K page replacement algorithm for database disk buffering},
author={O'neil, Elizabeth J and O'neil, Patrick E and Weikum, Gerhard},
booktitle={ACM SIGMOD Record},
volume={22},
number={2},
pages={297--306},
year={1993},
organization={ACM}
}
@article{kim2001lrfu,
title={LRFU: A spectrum of policies that subsumes the least recently used and least frequently used policies},
author={Kim, Chong Sang},
journal={IEEE Transactions on Computers},
volume={50},
number={12},
year={2001}
}
@article{wu2006optimizing,
title={Optimizing bitmap indices with efficient compression},
author={Wu, Kesheng and Otoo, Ekow J and Shoshani, Arie},
journal={ACM Transactions on Database Systems (TODS)},
volume={31},
number={1},
pages={1--38},
year={2006},
publisher={ACM}
}
@misc{twitter2013,
key = {Twitter Public Streams},
title = {Twitter Public Streams},
month = {March},
year = {2013},
howpublished = "\url{https://dev.twitter.com/docs/streaming-apis/streams/public}"
}
@article{fitzpatrick2004distributed,
title={Distributed caching with memcached},
author={Fitzpatrick, Brad},
journal={Linux journal},
number={124},
pages={72--74},
year={2004}
}
@inproceedings{amdahl1967validity,
title={Validity of the single processor approach to achieving large scale computing capabilities},
author={Amdahl, Gene M},
booktitle={Proceedings of the April 18-20, 1967, spring joint computer conference},
pages={483--485},
year={1967},
organization={ACM}
}
@book{sarawagi1998discovery,
title={Discovery-driven exploration of OLAP data cubes},
author={Sarawagi, Sunita and Agrawal, Rakesh and Megiddo, Nimrod},
year={1998},
publisher={Springer}
}
@article{hu2011stream,
title={Stream Database Survey},
author={Hu, Bo},
year={2011}
}
@article{dean2008mapreduce,
title={MapReduce: simplified data processing on large clusters},
author={Dean, Jeffrey and Ghemawat, Sanjay},
journal={Communications of the ACM},
volume={51},
number={1},
pages={107--113},
year={2008},
publisher={ACM}
}
@misc{linkedin2013senseidb,
author = {LinkedIn},
title = {SenseiDB},
month = {July},
year = {2013},
howpublished = "\url{http://www.senseidb.com/}"
}
@misc{apache2013solr,
author = {Apache},
title = {Apache Solr},
month = {February},
year = {2013},
howpublished = "\url{http://lucene.apache.org/solr/}"
}
@misc{banon2013elasticsearch,
author = {Banon, Shay},
title = {ElasticSearch},
month = {July},
year = {2013},
howpublished = "\url{http://www.elasticseach.com/}"
}
@book{oehler2012ibm,
title={IBM Cognos TM1: The Official Guide},
author={Oehler, Karsten and Gruenes, Jochen and Ilacqua, Christopher and Perez, Manuel},
year={2012},
publisher={McGraw-Hill}
}
@book{schrader2009oracle,
title={Oracle Essbase \& Oracle OLAP},
author={Schrader, Michael and Vlamis, Dan and Nader, Mike and Claterbos, Chris and Collins, Dave and Campbell, Mitch and Conrad, Floyd},
year={2009},
publisher={McGraw-Hill, Inc.}
}
@book{lachev2005applied,
title={Applied Microsoft Analysis Services 2005: And Microsoft Business Intelligence Platform},
author={Lachev, Teo},
year={2005},
publisher={Prologika Press}
}
@article{o1996log,
title={The log-structured merge-tree (LSM-tree)},
author={ONeil, Patrick and Cheng, Edward and Gawlick, Dieter and ONeil, Elizabeth},
journal={Acta Informatica},
volume={33},
number={4},
pages={351--385},
year={1996},
publisher={Springer}
}
@inproceedings{o1997improved,
title={Improved query performance with variant indexes},
author={O'Neil, Patrick and Quass, Dallan},
booktitle={ACM Sigmod Record},
volume={26},
number={2},
pages={38--49},
year={1997},
organization={ACM}
}
@inproceedings{cipar2012lazybase,
title={LazyBase: trading freshness for performance in a scalable database},
author={Cipar, James and Ganger, Greg and Keeton, Kimberly and Morrey III, Charles B and Soules, Craig AN and Veitch, Alistair},
booktitle={Proceedings of the 7th ACM european conference on Computer Systems},
pages={169--182},
year={2012},
organization={ACM}
}

View File

@ -0,0 +1,46 @@
This is BibTeX, Version 0.99d (TeX Live 2012)
Capacity: max_strings=35307, hash_size=35307, hash_prime=30011
The top-level auxiliary file: druid_demo.aux
The style file: abbrv.bst
Database file #1: druid_demo.bib
You've used 4 entries,
2118 wiz_defined-function locations,
524 strings with 4556 characters,
and the built_in function-call counts, 1592 in all, are:
= -- 160
> -- 67
< -- 3
+ -- 26
- -- 22
* -- 105
:= -- 251
add.period$ -- 14
call.type$ -- 4
change.case$ -- 23
chr.to.int$ -- 0
cite$ -- 4
duplicate$ -- 67
empty$ -- 133
format.name$ -- 22
if$ -- 349
int.to.chr$ -- 0
int.to.str$ -- 4
missing$ -- 4
newline$ -- 23
num.names$ -- 8
pop$ -- 30
preamble$ -- 1
purify$ -- 19
quote$ -- 0
skip$ -- 47
stack$ -- 0
substring$ -- 96
swap$ -- 22
text.length$ -- 3
text.prefix$ -- 0
top$ -- 0
type$ -- 16
warning$ -- 0
while$ -- 16
width$ -- 5
write$ -- 48

View File

@ -0,0 +1,18 @@
\BOOKMARK [1][-]{section.1}{Introduction}{}% 1
\BOOKMARK [2][-]{subsection.1.1}{The Need for Druid}{section.1}% 2
\BOOKMARK [1][-]{section.2}{Architecture}{}% 3
\BOOKMARK [2][-]{subsection.2.1}{Real-time Nodes}{section.2}% 4
\BOOKMARK [2][-]{subsection.2.2}{Historical Nodes}{section.2}% 5
\BOOKMARK [2][-]{subsection.2.3}{Broker Nodes}{section.2}% 6
\BOOKMARK [2][-]{subsection.2.4}{Coordinator Nodes}{section.2}% 7
\BOOKMARK [2][-]{subsection.2.5}{Query Processing}{section.2}% 8
\BOOKMARK [2][-]{subsection.2.6}{Query Capabilities}{section.2}% 9
\BOOKMARK [1][-]{section.3}{Performance}{}% 10
\BOOKMARK [2][-]{subsection.3.1}{Query Performance}{section.3}% 11
\BOOKMARK [2][-]{subsection.3.2}{Data Ingestion Performance}{section.3}% 12
\BOOKMARK [1][-]{section.4}{Demonstration Details}{}% 13
\BOOKMARK [2][-]{subsection.4.1}{Setup}{section.4}% 14
\BOOKMARK [2][-]{subsection.4.2}{Goals}{section.4}% 15
\BOOKMARK [1][-]{section.5}{Acknowledgments}{}% 16
\BOOKMARK [1][-]{section.6}{Additional Authors}{}% 17
\BOOKMARK [1][-]{section.7}{References}{}% 18

Binary file not shown.

View File

@ -0,0 +1,464 @@
% THIS IS AN EXAMPLE DOCUMENT FOR VLDB 2012
% based on ACM SIGPROC-SP.TEX VERSION 2.7
% Modified by Gerald Weber <gerald@cs.auckland.ac.nz>
% Removed the requirement to include *bbl file in here. (AhmetSacan, Sep2012)
% Fixed the equation on page 3 to prevent line overflow. (AhmetSacan, Sep2012)
\documentclass{vldb}
\usepackage{graphicx}
\usepackage{balance} % for \balance command ON LAST PAGE (only there!)
\usepackage{fontspec}
\usepackage{hyperref}
\graphicspath{{figures/}}
\usepackage{enumitem}
\begin{document}
% ****************** TITLE ****************************************
\title{Druid: Open Source Real-time Analytics at Scale}
% possible, but not really needed or used for PVLDB:
%\subtitle{[Extended Abstract]
%\titlenote{A full version of this paper is available as\textit{Author's Guide to Preparing ACM SIG Proceedings Using \LaTeX$2_\epsilon$\ and BibTeX} at \texttt{www.acm.org/eaddress.htm}}}
% ****************** AUTHORS **************************************
% You need the command \numberofauthors to handle the 'placement
% and alignment' of the authors beneath the title.
%
% For aesthetic reasons, we recommend 'three authors at a time'
% i.e. three 'name/affiliation blocks' be placed beneath the title.
%
% NOTE: You are NOT restricted in how many 'rows' of
% "name/affiliations" may appear. We just ask that you restrict
% the number of 'columns' to three.
%
% Because of the available 'opening page real-estate'
% we ask you to refrain from putting more than six authors
% (two rows with three columns) beneath the article title.
% More than six makes the first-page appear very cluttered indeed.
%
% Use the \alignauthor commands to handle the names
% and affiliations for an 'aesthetic maximum' of six authors.
% Add names, affiliations, addresses for
% the seventh etc. author(s) as the argument for the
% \additionalauthors command.
% These 'additional authors' will be output/set for you
% without further effort on your part as the last section in
% the body of your article BEFORE References or any Appendices.
\numberofauthors{6} % in this sample file, there are a *total*
% of EIGHT authors. SIX appear on the 'first-page' (for formatting
% reasons) and the remaining two appear in the \additionalauthors section.
\author{
% You can go ahead and credit any number of authors here,
% e.g. one 'row of three' or two rows (consisting of one row of three
% and a second row of one, two or three).
%
% The command \alignauthor (no curly braces needed) should
% precede each author name, affiliation/snail-mail address and
% e-mail address. Additionally, tag each line of
% affiliation/address with \affaddr, and tag the
% e-mail address with \email.
%
% 1st. author
\alignauthor
Fangjin Yang\\
\affaddr{Metamarkets Group, Inc.}\\
\email{fangjin@metamarkets.com}
% 2nd. author
\alignauthor
Eric Tschetter\\
\email{echeddar@gmail.com}
% 3rd. author
\alignauthor
Xavier Léauté\\
\affaddr{Metamarkets Group, Inc.}\\
\email{xavier@metamarkets.com}
\and % use '\and' if you need 'another row' of author names
% 4th. author
\alignauthor
Nishant Bangarwa\\
\affaddr{Metamarkets Group, Inc.}\\
\email{nishant@metamarkets.com}
% 5th. author
\alignauthor
Nelson Ray\\
\email{ncray86@gmail.com}
% 6th. author
\alignauthor
Gian Merlino\\
\affaddr{Metamarkets Group, Inc.}\\
\email{gian@metamarkets.com}
}
% There's nothing stopping you putting the seventh, eighth, etc.
% author on the opening page (as the 'third row') but we ask,
% for aesthetic reasons that you place these 'additional authors'
% in the \additional authors block, viz.
\additionalauthors{Additional authors: Deep Ganguli (Metamarkets Group, Inc., {\texttt{deep@metamarkets.com}}), Himadri Singh (Metamarkets Group, Inc., {\texttt{himadri@metamarkets.com}}), Igal Levy (Metamarkets Group, Inc., {\texttt{igal@metamarkets.com}})}
\date{14 March 2014}
% Just remember to make sure that the TOTAL number of authors
% is the number that will appear on the first page PLUS the
% number that will appear in the \additionalauthors section.
\maketitle
\begin{abstract}
Druid is an open
source\footnote{\href{https://github.com/metamx/druid}{https://github.com/metamx/druid}}
data store built for exploratory analytics on large data sets. Druid supports
fast data aggregation, low latency data ingestion, and arbitrary data
exploration. The system combines a column-oriented storage layout, a
distributed, shared-nothing architecture, and an advanced indexing structure to
return queries on billions of rows in milliseconds. Druid is petabyte scale and
is deployed in production at several technology companies.
\end{abstract}
\section{Introduction}
The recent proliferation of internet technology has created a surge
in machine-generated events. Individually, these events contain minimal useful
information and are of low value. Given the time and resources required to
extract meaning from large collections of events, many companies were willing
to discard this data instead.
A few years ago, Google introduced MapReduce as their mechanism of leveraging
commodity hardware to index the internet and analyze logs. The Hadoop project
soon followed and was largely patterned after the insights that came out of the
original MapReduce paper. Hadoop has contributed much to helping companies
convert their low-value event streams into high-value aggregates for a variety
of applications such as business intelligence and A-B testing.
As with a lot of great systems, Hadoop has opened our eyes to a new space of
problems. Specifically, Hadoop excels at storing and providing access to large
amounts of data, however, it does not make any performance guarantees around
how quickly that data can be accessed. Furthermore, although Hadoop is a
highly available system, performance degrades under heavy concurrent load.
Lastly, while Hadoop works well for storing data, it is not optimized for
ingesting data and making that data immediately readable.
\subsection{The Need for Druid}
Druid was originally designed to solve problems around ingesting and exploring
large quantities of transactional events (log data). This form of timeseries
data (OLAP data) is commonly found in the business intelligence
space and the nature of the data tends to be very append heavy. Events typically
have three distinct components: a timestamp column indicating when the event
occurred, a set of dimension columns indicating various attributes about the
event, and a set of metric columns containing values (usually numeric) that can
be aggregated. Queries are typically issued for the sum of some set of metrics,
filtered by some set of dimensions, over some span of time.
The Druid project first began out of necessity at Metamarkets to power a
business intelligence dashboard that allowed users to arbitrarily explore and
visualize event streams. Existing open source Relational Database Management
Systems, cluster computing frameworks, and NoSQL key/value stores were unable
to provide a low latency data ingestion and query platform for an interactive
dashboard. Queries needed to return fast enough to allow the data
visualizations in the dashboard to update interactively.
In addition to the query latency needs, the system had to be multi-tenant and
highly available, as the dashboard is used in a highly concurrent environment.
Downtime is costly and many businesses cannot afford to wait if a system is
unavailable in the face of software upgrades or network failure. Finally,
Metamarkets also wanted to allow users and alerting systems to be able to make
business decisions in ``real-time". The time from when an event is created to
when that event is queryable determines how fast users and systems are able to
react to potentially catastrophic occurrences in their systems.
The problems of data exploration, ingestion, and availability span multiple
industries. Since Druid was open sourced in October 2012, it has been deployed as a
video, network monitoring, operations monitoring, and online advertising
analytics platform at multiple companies\footnote{\href{http://druid.io/druid.html}{http://druid.io/druid.html}}.
\begin{figure*}
\centering
\includegraphics[width = 4.5in]{cluster}
\caption{An overview of a Druid cluster and the flow of data through the cluster.}
\label{fig:cluster}
\end{figure*}
\section{Architecture}
A Druid cluster consists of different types of nodes and each node type is
designed to perform a specific set of things. We believe this design separates
concerns and simplifies the complexity of the system. The different node types
operate fairly independently of each other and there is minimal interaction among
them. Hence, intra-cluster communication failures have minimal impact on data
availability. To solve complex data analysis problems, the different node
types come together to form a fully working system. The composition of and flow
of data in a Druid cluster are shown in Figure~\ref{fig:cluster}. All Druid
nodes announce their availability and the data they are serving over
Zookeeper\cite{hunt2010zookeeper}.
\subsection{Real-time Nodes}
Real-time nodes encapsulate the functionality to ingest and query event
streams. Events indexed via these nodes are immediately available for querying.
These nodes are only concerned with events for some small time range. They
periodically hand off batches of immutable events to other nodes in the Druid
cluster that are specialized in dealing with batches of immutable events.
Real-time nodes maintain an in-memory index buffer for all incoming events.
These indexes are incrementally populated as new events are ingested and the
indexes are also directly queryable. To avoid heap overflow problems, real-time
nodes persist their in-memory indexes to disk either periodically or after some
maximum row limit is reached. This persist process converts data stored in the
in-memory buffer to a column oriented storage format. Each persisted index is
immutable and real-time nodes load persisted indexes into off-heap memory such
that they can still be queried. On a periodic basis, each real-time node will
schedule a background task that searches for all locally persisted indexes. The
task merges these indexes together and builds an immutable block of data that
contains all the events that have ingested by a real-time node for some span of
time. We refer to this block of data as a ``segment". During the handoff stage,
a real-time node uploads this segment to permanent backup storage, typically
a distributed file system that Druid calls ``deep storage".
\subsection{Historical Nodes}
Historical nodes encapsulate the functionality to load and serve the immutable
blocks of data (segments) created by real-time nodes. In many real-world
workflows, most of the data loaded in a Druid cluster is immutable and hence
historical nodes are typically the main workers of a Druid cluster. Historical
nodes follow a shared-nothing architecture and there is no single point of
contention among the nodes. The nodes have no knowledge of one another and are
operationally simple; they only know how to load, drop, and serve immutable
segments.
\subsection{Broker Nodes}
Broker nodes act as query routers to historical and real-time nodes. Broker
nodes understand what segments are queryable and where those segments are
located. Broker nodes route incoming queries such that the queries hit the
right historical or real-time nodes. Broker nodes also merge partial results
from historical and real-time nodes before returning a final consolidated
result to the caller.
\subsection{Coordinator Nodes}
Druid coordinator nodes are primarily in charge of data management and
distribution on historical nodes. The coordinator nodes tell historical nodes
to load new data, drop outdated data, replicate data, and move data to load
balance. Coordinator nodes undergo a
leader-election process that determines a single node that runs the coordinator
functionality. The remaining coordinator nodes act as redundant backups.
A coordinator node runs periodically to determine the current state of the
cluster. It makes decisions by comparing the expected state of the cluster with
the actual state of the cluster at the time of the run. Coordinator nodes also
maintain a connection to a MySQL database that contains additional operational
parameters and configurations. One of the key pieces of information located in
the MySQL database is a table that contains a list of all segments that should
be served by historical nodes. This table can be updated by any service that
creates segments, such as real-time nodes.
\subsection{Query Processing}
Data tables in Druid (called \emph{data sources}) are collections of
timestamped events partitioned into a set of segments, where each segment
is typically 5--10 million rows. Formally, we define a segment as a collection
of rows of data that span some period in time. Segments represent the
fundamental storage unit in Druid and replication and distribution are done at
a segment level.
Druid segments are stored in a column orientation. Given that Druid is best
used for aggregating event streams (all data going into Druid must have a
timestamp), the advantages storing aggregate information as columns rather than
rows are well documented \cite{abadi2008column}. Column storage allows for more
efficient CPU usage as only what is needed is actually loaded and scanned.
Druid has multiple column types to represent various data formats. Depending on
the column type, different compression methods are used to reduce the cost of
storing a column in memory and on disk. For example, if an entire column only
contains string values, storing the raw strings is unnecessarily costly.
String columns can be dictionary encoded instead. Dictionary encoding is a
common method to compress data in column stores.
In many real world OLAP workflows, queries are issued for the aggregated
results of some set of metrics where some set of dimension specifications are
met. Consider Table~\ref{tab:sample_data}. An example query for this table may
ask: ``How much revenue was generated in the first hour of 2014-01-01 in the
city of San Francisco?". This query is filtering a sales data set based on a
Boolean expression of dimension values. In many real world data sets, dimension
columns contain strings and metric columns contain numbers. Druid creates
additional lookup indices for string columns such that only those rows that
pertain to a particular query filter are ever scanned.
\begin{table}
\centering
\begin{tabular}{| l | l | l |}
\hline
\textbf{Timestamp} & \textbf{City} & \textbf{Revenue} \\ \hline
2014-01-01T01:00:00Z & San Francisco & 25 \\ \hline
2014-01-01T01:00:00Z & San Francisco & 42 \\ \hline
2014-01-01T02:00:00Z & New York & 17 \\ \hline
2014-01-01T02:00:00Z & New York & 170 \\ \hline
\end{tabular}
\caption{Sample sales data set.}
\label{tab:sample_data}
\end{table}
For each unique city in
Table~\ref{tab:sample_data}, we can form some representation
indicating in which table rows a particular city is seen. We can
store this information in a binary array where the array indices
represent our rows. If a particular page is seen in a certain
row, that array index is marked as \texttt{1}. For example:
{\small\begin{verbatim}
San Francisco -> rows [0, 1] -> [1][1][0][0]
New York -> rows [2, 3] -> [0][0][1][1]
\end{verbatim}}
\texttt{San Francisco} is seen in rows \texttt{0} and \texttt{1}. This mapping of column values
to row indices forms an inverted index \cite{tomasic1993performance}. To know which
rows contain {\ttfamily San Francisco} or {\ttfamily New York}, we can \texttt{OR} together
the two arrays.
{\small\begin{verbatim}
[0][1][0][1] OR [1][0][1][0] = [1][1][1][1]
\end{verbatim}}
This approach of performing Boolean operations on large bitmap sets is commonly
used in search engines. Druid compresses each bitmap index using the Concise
algorithm \cite{colantonio2010concise}. All Boolean operations on top of these
Concise sets are done without decompressing the set.
\subsection{Query Capabilities}
Druid supports many types of aggregations including double sums, long sums,
minimums, maximums, and complex aggregations such as cardinality estimation and
approximate quantile estimation. The results of aggregations can be combined
in mathematical expressions to form other aggregations. Druid supports
different query types ranging from simple aggregates for an interval time,
groupBys, and approximate top-K queries.
\section{Performance}
Druid runs in production at several organizations, and to briefly demonstrate its
performance, we have chosen to share some real world numbers for the main production
cluster running at Metamarkets in early 2014. For comparison with other databases
we also include results from synthetic workloads on TPC-H data.
\subsection{Query Performance}
Query latencies are shown in Figure~\ref{fig:query_latency} for a cluster
hosting approximately 10.5TB of data using 1302 processing threads and 672
total cores (hyperthreaded). There are approximately 50 billion rows of data in
this cluster.
\begin{figure}
\centering
\includegraphics[width = 2.3in]{avg_query_latency}
\caption{Query latencies of production data sources.}
\label{fig:query_latency}
\end{figure}
\begin{figure}
\centering
\includegraphics[width = 2.3in]{tpch_100gb}
\caption{Druid \& MySQL benchmarks -- 100GB TPC-H data.}
\label{fig:tpch_100gb}
\end{figure}
The average queries per minute during this time was approximately
1000. The number of dimensions the various data sources vary from 25 to 78
dimensions, and 8 to 35 metrics. Across all the various data sources, average
query latency is approximately 550 milliseconds, with 90\% of queries returning
in less than 1 second, 95\% in under 2 seconds, and 99\% of queries returning
in less than 10 seconds.
Approximately 30\% of the queries are standard
aggregates involving different types of metrics and filters, 60\% of queries
are ordered group bys over one or more dimensions with aggregates, and 10\% of
queries are search queries and metadata retrieval queries. The number of
columns scanned in aggregate queries roughly follows an exponential
distribution. Queries involving a single column are very frequent, and queries
involving all columns are very rare.
We also present Druid benchmarks on TPC-H data in Figure~\ref{fig:tpch_100g}.
Most TPC-H queries do not directly apply to Druid, so we selected queries more
typical of Druid's workload to demonstrate query performance. As a comparison,
we also provide the results of the same queries using MySQL using the MyISAM
engine (InnoDB was slower in our experiments).
We benchmarked Druid's scan rate at 53,539,211 rows/second/core for
\texttt{select count(*)} equivalent query over a given time interval and
36,246,530 rows/second/core for a \texttt{select sum(float)} type query.
\subsection{Data Ingestion Performance}
To showcase Druid's data ingestion latency, we selected several production
datasources of varying dimensions, metrics, and event volumes. Druid's data
ingestion latency is heavily dependent on the complexity of the data set being
ingested. The data complexity is determined by the number of dimensions in each
event, the number of metrics in each event, and the types of aggregations we
want to perform on those metrics.
\begin{figure}
\centering
\includegraphics[width = 2.3in]{ingestion_rate}
\caption{Combined cluster ingestion rates.}
\label{fig:ingestion_rate}
\end{figure}
For the given datasources, the number of dimensions vary from 5 to 35, and the
number of metrics vary from 2 to 24. The peak ingestion latency we measured in
production was 22914.43 events/second/core on a datasource with 30 dimensions
and 19 metrics.
The latency measurements we presented are sufficient to address the our stated
problems of interactivity. We would prefer the variability in the latencies to
be less, which can be achieved by adding additional
hardware, but we have not chosen to do so because of cost concerns.
\section{Demonstration Details}
We would like to do an end-to-end demonstratation of Druid, from setting up a
cluster, ingesting data, structuring a query, and obtaining results. We would
also like to showcase how to solve real-world data analysis problems with Druid
and demonstrate tools that can be built on top of it, including interactive
data visualizations, approximate algorithms, and machine-learning components.
We already use similar tools in production.
\subsection{Setup}
Users will be able to set up a local Druid cluster to better understand the
components and architecture of the system. Druid is designed to run on
commodity hardware and Druid nodes are simply java processes that need to be
started up. The local setup will allow users to ingest data from Twitter's
public API and query it. We will also provide users access to an AWS hosted
Druid cluster that contains several terabytes of Twitter data that we have been
collecting for over 2 years. There are over 3 billion tweets in this data set,
and new events are constantly being ingested. We will walk through a variety of
different queries to demonstrate Druid's arbitrary data-exploration
capabilities.
Finally, we will teach users how to build a simple interactive dashboard on top
of Druid. The dashboard will use some of Druid's more powerful features such as
approximate algorithms for quickly determining the cardinality of sets, and
machine learning algorithms for scientific computing problems such as anomaly
detection. These use cases represent some of the more interesting problems we
use Druid for in production.
\subsection{Goals}
We will not only walk users through solving real-world problems with Druid and
different tools that have been built on top of Druid, but also answer
conference-specific questions such as what are the trending tweets and topics
at VLDB, what netizens are conversing about in the general area, and even
perform a sentiment analysis of VLDB. Our goal is to clearly explain why the
architecture of Druid makes it highly optimal for certain types of queries, and
the potential of the system as a real-time analytics platform.
%\end{document} % This is where a 'short' article might terminate
% ensure same length columns on last page (might need two sub-sequent latex runs)
\balance
%ACKNOWLEDGMENTS are optional
\section{Acknowledgments}
Druid could not have been built without the help of many great people in the
community. We want to thank everyone that has contributed to the Druid
codebase for their invaluable support.
% The following two commands are all you need in the
% initial runs of your .tex file to
% produce the bibliography for the citations in your paper.
\bibliographystyle{abbrv}
\bibliography{druid_demo} % vldb_sample.bib is the name of the Bibliography in this case
% You must have a proper ".bib" file
% and remember to run:
% latex bibtex latex latex
% to resolve all references
\end{document}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

1400
publications/demo/vldb.cls Normal file

File diff suppressed because it is too large Load Diff

View File

@ -34,6 +34,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.RE;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences; import com.metamx.common.guava.Sequences;
@ -203,7 +204,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
@Override @Override
public JsonParserIterator<T> make() public JsonParserIterator<T> make()
{ {
return new JsonParserIterator<T>(typeRef, future); return new JsonParserIterator<T>(typeRef, future, url);
} }
@Override @Override
@ -239,11 +240,13 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private ObjectCodec objectCodec; private ObjectCodec objectCodec;
private final JavaType typeRef; private final JavaType typeRef;
private final Future<InputStream> future; private final Future<InputStream> future;
private final String url;
public JsonParserIterator(JavaType typeRef, Future<InputStream> future) public JsonParserIterator(JavaType typeRef, Future<InputStream> future, String url)
{ {
this.typeRef = typeRef; this.typeRef = typeRef;
this.future = future; this.future = future;
this.url = url;
jp = null; jp = null;
} }
@ -289,20 +292,20 @@ public class DirectDruidClient<T> implements QueryRunner<T>
try { try {
jp = objectMapper.getFactory().createParser(future.get()); jp = objectMapper.getFactory().createParser(future.get());
if (jp.nextToken() != JsonToken.START_ARRAY) { if (jp.nextToken() != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s]", jp.getCurrentToken()); throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else { } else {
jp.nextToken(); jp.nextToken();
objectCodec = jp.getCodec(); objectCodec = jp.getCodec();
} }
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
catch (ExecutionException e) { catch (ExecutionException e) {
throw Throwables.propagate(e); throw new RE(e, "Failure getting results from[%s]", url);
} }
} }
} }

View File

@ -63,7 +63,7 @@ public class DatabaseRuleManager
public static void createDefaultRule( public static void createDefaultRule(
final IDBI dbi, final IDBI dbi,
final String ruleTable, final String ruleTable,
final String defaultTier, final String defaultDatasourceName,
final ObjectMapper jsonMapper final ObjectMapper jsonMapper
) )
{ {
@ -74,13 +74,15 @@ public class DatabaseRuleManager
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
List<Map<String, Object>> existing = handle.select( List<Map<String, Object>> existing = handle
String.format( .createQuery(
"SELECT id from %s where datasource='%s';", String.format(
ruleTable, "SELECT id from %s where datasource=:dataSource;",
defaultTier ruleTable
)
) )
); .bind("dataSource", defaultDatasourceName)
.list();
if (!existing.isEmpty()) { if (!existing.isEmpty()) {
return null; return null;
@ -101,8 +103,8 @@ public class DatabaseRuleManager
ruleTable ruleTable
) )
) )
.bind("id", String.format("%s_%s", defaultTier, version)) .bind("id", String.format("%s_%s", defaultDatasourceName, version))
.bind("dataSource", defaultTier) .bind("dataSource", defaultDatasourceName)
.bind("version", version) .bind("version", version)
.bind("payload", jsonMapper.writeValueAsString(defaultRules)) .bind("payload", jsonMapper.writeValueAsString(defaultRules))
.execute(); .execute();