Added support for PostgreSQL on overlord nodes

This commit is contained in:
Tugdual Saunier 2014-02-26 00:47:53 +01:00
parent a835db2a3c
commit e40725d5f3
2 changed files with 20 additions and 6 deletions

View File

@ -29,6 +29,7 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -212,12 +213,17 @@ public class DbConnector
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
return isPostgreSQL(handle);
}
}
);
}
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
{
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
}
private final Supplier<DbConnectorConfig> config;
private final Supplier<DbTablesConfig> dbTables;
private final DBI dbi;

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.DbConnector;
import io.druid.db.DbTablesConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
@ -179,8 +180,11 @@ public class IndexerDBCoordinator
try {
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
DbConnector.isPostgreSQL(handle) ?
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)":
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
dbTables.getSegmentsTable()
)
)
@ -196,7 +200,9 @@ public class IndexerDBCoordinator
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
} catch(SQLException e) {
throw new IOException(e);
} catch(Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
@ -293,11 +299,13 @@ public class IndexerDBCoordinator
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws IOException
public List<DataSegment> withHandle(Handle handle) throws IOException, SQLException
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
DbConnector.isPostgreSQL(handle)?
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and \"end\" <= :end and used = 0":
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbTables.getSegmentsTable()
)
)