Migrate IndexerSQLMetadataStorageCoordinator.getUnusedSegmentsForInterval to streaming (#3043)

* Migrate IndexerSQLMetadataStorageCoordinator.getUnusedSegmentsForInterval to streaming
* Missed query from #2859

* Make inReadOnlyTransaction part of SQLMetadataConnector
This commit is contained in:
Charles Allen 2016-07-06 16:55:27 -07:00 committed by Xavier Léauté
parent 3f1681c16c
commit 5d9fd0a713
3 changed files with 39 additions and 34 deletions

View File

@ -833,13 +833,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
} }
} }
@Override
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval) public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{ {
List<DataSegment> matchingSegments = connector.getDBI().withHandle( List<DataSegment> matchingSegments = connector.inReadOnlyTransaction(
new HandleCallback<List<DataSegment>>() new TransactionCallback<List<DataSegment>>()
{ {
@Override @Override
public List<DataSegment> withHandle(Handle handle) throws IOException, SQLException public List<DataSegment> inTransaction(final Handle handle, final TransactionStatus status) throws Exception
{ {
return handle return handle
.createQuery( .createQuery(
@ -848,6 +849,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
dbTables.getSegmentsTable() dbTables.getSegmentsTable()
) )
) )
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource) .bind("dataSource", dataSource)
.bind("start", interval.getStart().toString()) .bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString()) .bind("end", interval.getEnd().toString())

View File

@ -39,6 +39,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.IntegerMapper; import org.skife.jdbi.v2.util.IntegerMapper;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLRecoverableException; import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException; import java.sql.SQLTransientException;
@ -563,6 +564,36 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
return dataSource; return dataSource;
} }
protected final <T> T inReadOnlyTransaction(
final TransactionCallback<T> callback
)
{
return getDBI().withHandle(
new HandleCallback<T>()
{
@Override
public T withHandle(Handle handle) throws Exception
{
final Connection connection = handle.getConnection();
final boolean readOnly = connection.isReadOnly();
connection.setReadOnly(true);
try {
return handle.inTransaction(callback);
}
finally {
try {
connection.setReadOnly(readOnly);
}
catch (SQLException e) {
// at least try to log it so we don't swallow exceptions
log.error(e, "Unable to reset connection read-only state");
}
}
}
}
);
}
private void createAuditTable(final String tableName) private void createAuditTable(final String tableName)
{ {
createTable( createTable(
@ -595,5 +626,4 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
createAuditTable(tablesConfigSupplier.get().getAuditTable()); createAuditTable(tablesConfigSupplier.get().getAuditTable());
} }
} }
} }

View File

@ -54,7 +54,6 @@ import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.TransactionFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
@ -161,38 +160,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
} }
} }
private <T> T inReadOnlyTransaction(final TransactionCallback<T> callback)
{
return connector.getDBI().withHandle(
new HandleCallback<T>()
{
@Override
public T withHandle(Handle handle) throws Exception
{
final Connection connection = handle.getConnection();
final boolean readOnly = connection.isReadOnly();
connection.setReadOnly(true);
try {
return handle.inTransaction(callback);
} finally {
try {
connection.setReadOnly(readOnly);
} catch (SQLException e) {
// at least try to log it so we don't swallow exceptions
log.error(e, "Unable to reset connection read-only state");
}
}
}
}
);
}
@Override @Override
public boolean enableDatasource(final String ds) public boolean enableDatasource(final String ds)
{ {
try { try {
final IDBI dbi = connector.getDBI(); final IDBI dbi = connector.getDBI();
VersionedIntervalTimeline<String, DataSegment> segmentTimeline = inReadOnlyTransaction( VersionedIntervalTimeline<String, DataSegment> segmentTimeline = connector.inReadOnlyTransaction(
new TransactionCallback<VersionedIntervalTimeline<String, DataSegment>>() new TransactionCallback<VersionedIntervalTimeline<String, DataSegment>>()
{ {
@Override @Override
@ -474,7 +447,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
// //
// setting connection to read-only will allow some database such as MySQL // setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query // to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = inReadOnlyTransaction( final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>() new TransactionCallback<List<DataSegment>>()
{ {
@Override @Override
@ -567,7 +540,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
final int limit final int limit
) )
{ {
return inReadOnlyTransaction( return connector.inReadOnlyTransaction(
new TransactionCallback<List<Interval>>() new TransactionCallback<List<Interval>>()
{ {
@Override @Override