IndexerDBCoordinator: Work around SELECT -> INSERT races when adding segments

This commit is contained in:
Gian Merlino 2013-12-19 13:04:59 -08:00
parent fba6caf7fd
commit 6fbe67eeea
1 changed files with 43 additions and 28 deletions

View File

@ -42,6 +42,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException; import java.io.IOException;
@ -169,39 +170,39 @@ public class IndexerDBCoordinator
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
{ {
try { try {
final List<Map<String, Object>> exists = handle.createQuery( if (segmentExists(handle, segment)) {
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
return false; return false;
} }
handle.createStatement( // Try/catch to work around races due to SELECT -> INSERT. Avoid ON DUPLICATE KEY since it's not portable.
String.format( try {
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", handle.createStatement(
dbTables.getSegmentsTable() String.format(
) "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) "
) + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
.bind("id", segment.getIdentifier()) dbTables.getSegmentsTable()
.bind("dataSource", segment.getDataSource()) )
.bind("created_date", new DateTime().toString()) )
.bind("start", segment.getInterval().getStart().toString()) .bind("id", segment.getIdentifier())
.bind("end", segment.getInterval().getEnd().toString()) .bind("dataSource", segment.getDataSource())
.bind("partitioned", segment.getShardSpec().getPartitionNum()) .bind("created_date", new DateTime().toString())
.bind("version", segment.getVersion()) .bind("start", segment.getInterval().getStart().toString())
.bind("used", true) .bind("end", segment.getInterval().getEnd().toString())
.bind("payload", jsonMapper.writeValueAsString(segment)) .bind("partitioned", segment.getShardSpec().getPartitionNum())
.execute(); .bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
throw e;
}
}
} }
catch (IOException e) { catch (IOException e) {
log.error(e, "Exception inserting into DB"); log.error(e, "Exception inserting into DB");
@ -211,6 +212,20 @@ public class IndexerDBCoordinator
return true; return true;
} }
private boolean segmentExists(final Handle handle, final DataSegment segment) {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = :identifier",
dbTables.getSegmentsTable()
)
).bind(
"identifier",
segment.getIdentifier()
).list();
return !exists.isEmpty();
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{ {
dbi.inTransaction( dbi.inTransaction(