Merge branch 'batch-zk' into worker-resource

This commit is contained in:
fjy 2013-07-08 11:04:05 -07:00
commit cc35841e23
6 changed files with 87 additions and 34 deletions

View File

@ -64,6 +64,8 @@ public class Announcer
private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap(); private final ConcurrentMap<String, PathChildrenCache> listeners = new MapMaker().makeMap();
private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap(); private final ConcurrentMap<String, ConcurrentMap<String, byte[]>> announcements = new MapMaker().makeMap();
private final Object lock = new Object();
private boolean started = false; private boolean started = false;
public Announcer( public Announcer(
@ -227,10 +229,9 @@ public class Announcer
boolean created = false; boolean created = false;
synchronized (toAnnounce) { synchronized (toAnnounce) {
if (started) { if (started) {
byte[] oldBytes = subPaths.get(pathAndNode.getNode()); byte[] oldBytes = subPaths.putIfAbsent(pathAndNode.getNode(), bytes);
if (oldBytes == null) { if (oldBytes == null) {
subPaths.put(pathAndNode.getNode(), bytes);
created = true; created = true;
} else if (!Arrays.equals(oldBytes, bytes)) { } else if (!Arrays.equals(oldBytes, bytes)) {
throw new IAE("Cannot reannounce different values under the same path"); throw new IAE("Cannot reannounce different values under the same path");
@ -250,6 +251,7 @@ public class Announcer
public void update(final String path, final byte[] bytes) public void update(final String path, final byte[] bytes)
{ {
synchronized (lock) {
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path); final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath(); final String parentPath = pathAndNode.getPath();
@ -274,6 +276,7 @@ public class Announcer
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
}
private String createAnnouncement(final String path, byte[] value) throws Exception private String createAnnouncement(final String path, byte[] value) throws Exception
{ {

View File

@ -140,6 +140,7 @@ public class DbConnector
@Override @Override
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) {
List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); List<Map<String, Object>> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName));
if (table.isEmpty()) { if (table.isEmpty()) {
@ -148,7 +149,7 @@ public class DbConnector
} else { } else {
log.info("Table[%s] existed: [%s]", tableName, table); log.info("Table[%s] existed: [%s]", tableName, table);
} }
}
return null; return null;
} }
} }

View File

@ -0,0 +1,49 @@
-- Table structure for table `config`
--
DROP TABLE IF EXISTS prod_config;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_config (
name varchar(255) NOT NULL,
payload bytea NOT NULL,
PRIMARY KEY (name)
);
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `rules`
--
DROP TABLE IF EXISTS prod_rules;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_rules (
id varchar(255) NOT NULL,
dataSource varchar(255) NOT NULL,
version text NOT NULL,
payload text NOT NULL,
PRIMARY KEY (id)
);
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Table structure for table `segments`
--
DROP TABLE IF EXISTS prod_segments;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE prod_segments (
id varchar(255) NOT NULL,
dataSource varchar(255) NOT NULL,
created_date text NOT NULL,
start text NOT NULL,
"end" text NOT NULL,
partitioned SMALLINT NOT NULL,
version text NOT NULL,
used boolean NOT NULL,
payload text NOT NULL,
PRIMARY KEY (id)
);

View File

@ -62,7 +62,7 @@ public class DbSegmentPublisher implements SegmentPublisher
{ {
handle.createStatement( handle.createStatement(
String.format( String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
config.getSegmentTable() config.getSegmentTable()
) )

View File

@ -192,7 +192,7 @@ public class DatabaseRuleManager
return handle.createQuery( return handle.createQuery(
// Return latest version rule by dataSource // Return latest version rule by dataSource
String.format( String.format(
"SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version",
config.getRuleTable() config.getRuleTable()
) )
).fold( ).fold(

View File

@ -203,7 +203,7 @@ public class DatabaseSegmentManager
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
batch.add( batch.add(
String.format( String.format(
"UPDATE %s SET used=1 WHERE id = '%s'", "UPDATE %s SET used=true WHERE id = '%s'",
config.getSegmentTable(), config.getSegmentTable(),
segment.getIdentifier() segment.getIdentifier()
) )
@ -234,7 +234,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable()) String.format("UPDATE %s SET used=true WHERE id = :id", config.getSegmentTable())
) )
.bind("id", segmentId) .bind("id", segmentId)
.execute(); .execute();
@ -268,7 +268,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable()) String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", config.getSegmentTable())
) )
.bind("dataSource", ds) .bind("dataSource", ds)
.execute(); .execute();
@ -298,7 +298,7 @@ public class DatabaseSegmentManager
public Void withHandle(Handle handle) throws Exception public Void withHandle(Handle handle) throws Exception
{ {
handle.createStatement( handle.createStatement(
String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable()) String.format("UPDATE %s SET used=false WHERE id = :segmentID", config.getSegmentTable())
).bind("segmentID", segmentID) ).bind("segmentID", segmentID)
.execute(); .execute();
@ -398,7 +398,7 @@ public class DatabaseSegmentManager
public List<Map<String, Object>> withHandle(Handle handle) throws Exception public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{ {
return handle.createQuery( return handle.createQuery(
String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable()) String.format("SELECT payload FROM %s WHERE used=true", config.getSegmentTable())
).list(); ).list();
} }
} }