Merge branch 'master' of github.com:metamx/druid into batch-zk

This commit is contained in:
fjy 2013-07-08 10:40:30 -07:00
commit 454f23d7c0
15 changed files with 84 additions and 35 deletions

View File

@ -20,6 +20,7 @@
package com.metamx.druid.query; package com.metamx.druid.query;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -76,7 +77,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
{ {
this.exec = exec; this.exec = exec;
this.ordering = ordering; this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(queryables); this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull()));
} }
@Override @Override

View File

@ -41,7 +41,7 @@ public class CountAggregatorFactory implements AggregatorFactory
@JsonProperty("name") String name @JsonProperty("name") String name
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
this.name = name; this.name = name;
} }

View File

@ -45,8 +45,8 @@ public class DoubleSumAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;

View File

@ -51,8 +51,8 @@ public class HistogramAggregatorFactory implements AggregatorFactory
@JsonProperty("breaks") final List<Float> breaksList @JsonProperty("breaks") final List<Float> breaksList
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;

View File

@ -63,11 +63,11 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory
@JsonProperty("fnCombine") final String fnCombine @JsonProperty("fnCombine") final String fnCombine
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldNames, "Must have a valid, non null fieldNames"); Preconditions.checkNotNull(fieldNames, "Must have a valid, non-null fieldNames");
Preconditions.checkNotNull(fnAggregate, "Must have a valid, non null fnAggregate"); Preconditions.checkNotNull(fnAggregate, "Must have a valid, non-null fnAggregate");
Preconditions.checkNotNull(fnReset, "Must have a valid, non null fnReset"); Preconditions.checkNotNull(fnReset, "Must have a valid, non-null fnReset");
Preconditions.checkNotNull(fnCombine, "Must have a valid, non null fnCombine"); Preconditions.checkNotNull(fnCombine, "Must have a valid, non-null fnCombine");
this.name = name; this.name = name;
this.fieldNames = fieldNames; this.fieldNames = fieldNames;

View File

@ -45,8 +45,8 @@ public class LongSumAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, nonl null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;

View File

@ -45,8 +45,8 @@ public class MaxAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;

View File

@ -45,8 +45,8 @@ public class MinAggregatorFactory implements AggregatorFactory
@JsonProperty("fieldName") final String fieldName @JsonProperty("fieldName") final String fieldName
) )
{ {
Preconditions.checkNotNull(name, "Must have a valid, non null aggregator name"); Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
Preconditions.checkNotNull(fieldName, "Must have a valid, non null fieldName"); Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.name = name; this.name = name;
this.fieldName = fieldName; this.fieldName = fieldName;

View File

@ -140,6 +140,7 @@ public class DbConnector
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) {
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) { if (table.isEmpty()) {
@ -148,7 +149,7 @@ public class DbConnector
} else { } else {
log.info("Table[%s] existed: [%s]", tableName, table); log.info("Table[%s] existed: [%s]", tableName, table);
} }
}
return null; return null;
} }
} }

View File

@ -0,0 +1,49 @@
-- Table structure for table `config`
--
DROP TABLE IF EXISTS prod_config;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_config (
name varchar(255) NOT NULL,
payload bytea NOT NULL,
PRIMARY KEY (name)
);
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `rules`
--
DROP TABLE IF EXISTS prod_rules;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_rules (
id varchar(255) NOT NULL,
dataSource varchar(255) NOT NULL,
version text NOT NULL,
payload text NOT NULL,
PRIMARY KEY (id)
);
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `segments`
--
DROP TABLE IF EXISTS prod_segments;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_segments (
id varchar(255) NOT NULL,
dataSource varchar(255) NOT NULL,
created_date text NOT NULL,
start text NOT NULL,
"end" text NOT NULL,
partitioned SMALLINT NOT NULL,
version text NOT NULL,
used boolean NOT NULL,
payload text NOT NULL,
PRIMARY KEY (id)
);

View File

@ -62,7 +62,7 @@ public class DbSegmentPublisher implements SegmentPublisher
{ {
handle.createStatement( handle.createStatement(
String.format( String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable() config.getSegmentTable()
) )

View File

@ -192,7 +192,7 @@ public class DatabaseRuleManager
return handle.createQuery( return handle.createQuery(
// Return latest version rule by dataSource // Return latest version rule by dataSource
String.format( String.format(
"SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version",
config.getRuleTable() config.getRuleTable()
) )
).fold( ).fold(

View File

@ -203,7 +203,7 @@ public class DatabaseSegmentManager
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
batch.add( batch.add(
String.format( String.format(
"UPDATE %s SET used=1 WHERE id = '%s'", "UPDATE %s SET used=true WHERE id = '%s'",
config.getSegmentTable(), config.getSegmentTable(),
segment.getIdentifier() segment.getIdentifier()
) )
@ -234,7 +234,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable()) String.format("UPDATE %s SET used=true WHERE id = :id", config.getSegmentTable())
) )
.bind("id", segmentId) .bind("id", segmentId)
.execute(); .execute();
@ -268,7 +268,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable()) String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", config.getSegmentTable())
) )
.bind("dataSource", ds) .bind("dataSource", ds)
.execute(); .execute();
@ -298,7 +298,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable()) String.format("UPDATE %s SET used=false WHERE id = :segmentID", config.getSegmentTable())
).bind("segmentID", segmentID) ).bind("segmentID", segmentID)
.execute(); .execute();
@ -398,7 +398,7 @@ public class DatabaseSegmentManager
public List<Map<String, Object>> withHandle(Handle handle) throws Exception public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{ {
return handle.createQuery( return handle.createQuery(
String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable()) String.format("SELECT payload FROM %s WHERE used=true", config.getSegmentTable())
).list(); ).list();
} }
} }

View File

@ -21,12 +21,10 @@ package com.metamx.druid.loading;
import org.skife.config.Config; import org.skife.config.Config;
import java.io.File;
/** /**
*/ */
public abstract class HdfsDataSegmentPusherConfig public abstract class HdfsDataSegmentPusherConfig
{ {
@Config("druid.pusher.hdfs.storageDirectory") @Config("druid.pusher.hdfs.storageDirectory")
public abstract File getStorageDirectory(); public abstract String getStorageDirectory();
} }

View File

@ -668,7 +668,7 @@ public class DruidMaster
synchronized (lock) { synchronized (lock) {
final LeaderLatch latch = leaderLatch.get(); final LeaderLatch latch = leaderLatch.get();
if (latch == null || !latch.hasLeadership()) { if (latch == null || !latch.hasLeadership()) {
log.info("[%s] is master, not me. Phooey.", latch == null ? null : latch.getLeader().getId()); log.info("LEGGO MY EGGO. [%s] is master.", latch == null ? null : latch.getLeader().getId());
stopBeingMaster(); stopBeingMaster();
return; return;
} }