Stream segments from database (#2859)

* Avoids fetching all segment records into heap by JDBC driver
* Set connection to read-only to help database optimize queries
* Update JDBC drivers (MySQL has fixes for streaming results)
This commit is contained in:
Xavier Léauté 2016-04-20 14:40:07 -07:00 committed by Fangjin Yang
parent 7d3e55717d
commit 5938d9085b
7 changed files with 145 additions and 73 deletions

View File

@ -54,7 +54,7 @@
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version> <version>5.1.38</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.jdbi</groupId> <groupId>org.jdbi</groupId>

View File

@ -75,6 +75,14 @@ public class MySQLConnector extends SQLMetadataConnector
return SERIAL_TYPE; return SERIAL_TYPE;
} }
@Override
protected int getStreamingFetchSize()
{
// this is MySQL's way of indicating you want results streamed back
// see http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
return Integer.MIN_VALUE;
}
@Override @Override
public boolean tableExists(Handle handle, String tableName) public boolean tableExists(Handle handle, String tableName)
{ {

View File

@ -54,7 +54,7 @@
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
<version>9.3-1102-jdbc41</version> <version>9.4.1208.jre7</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.jdbi</groupId> <groupId>org.jdbi</groupId>

View File

@ -39,6 +39,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
private static final Logger log = new Logger(PostgreSQLConnector.class); private static final Logger log = new Logger(PostgreSQLConnector.class);
private static final String PAYLOAD_TYPE = "BYTEA"; private static final String PAYLOAD_TYPE = "BYTEA";
private static final String SERIAL_TYPE = "BIGSERIAL"; private static final String SERIAL_TYPE = "BIGSERIAL";
public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
private final DBI dbi; private final DBI dbi;
@ -71,6 +72,12 @@ public class PostgreSQLConnector extends SQLMetadataConnector
return SERIAL_TYPE; return SERIAL_TYPE;
} }
@Override
protected int getStreamingFetchSize()
{
return DEFAULT_STREAMING_RESULT_SIZE;
}
protected boolean canUpsert(Handle handle) throws SQLException protected boolean canUpsert(Handle handle) throws SQLException
{ {
if (canUpsert == null) { if (canUpsert == null) {

View File

@ -98,6 +98,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
*/ */
protected abstract String getSerialType(); protected abstract String getSerialType();
/**
* Returns the value that should be passed to statement.setFetchSize to ensure results
* are streamed back from the database instead of fetching the entire result set in memory.
*
* @return optimal fetch size to stream results back
*/
protected abstract int getStreamingFetchSize();
public String getValidationQuery() { return "SELECT 1"; } public String getValidationQuery() { return "SELECT 1"; }
public abstract boolean tableExists(Handle handle, final String tableName); public abstract boolean tableExists(Handle handle, final String tableName);

View File

@ -52,11 +52,15 @@ import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.TransactionFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
@ -82,7 +86,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final Supplier<MetadataSegmentManagerConfig> config; private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables; private final Supplier<MetadataStorageTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources; private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi; private final SQLMetadataConnector connector;
private volatile ListeningScheduledExecutorService exec = null; private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null; private volatile ListenableFuture<?> future = null;
@ -103,7 +107,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
this.dataSources = new AtomicReference<>( this.dataSources = new AtomicReference<>(
new ConcurrentHashMap<String, DruidDataSource>() new ConcurrentHashMap<String, DruidDataSource>()
); );
this.dbi = connector.getDBI(); this.connector = connector;
} }
@LifecycleStart @LifecycleStart
@ -157,53 +161,85 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
} }
} }
private <T> T inReadOnlyTransaction(final TransactionCallback<T> callback)
{
return connector.getDBI().withHandle(
new HandleCallback<T>()
{
@Override
public T withHandle(Handle handle) throws Exception
{
final Connection connection = handle.getConnection();
final boolean readOnly = connection.isReadOnly();
connection.setReadOnly(true);
try {
return handle.inTransaction(callback);
} finally {
try {
connection.setReadOnly(readOnly);
} catch (SQLException e) {
// at least try to log it so we don't swallow exceptions
log.error(e, "Unable to reset connection read-only state");
}
}
}
}
);
}
@Override @Override
public boolean enableDatasource(final String ds) public boolean enableDatasource(final String ds)
{ {
try { try {
VersionedIntervalTimeline<String, DataSegment> segmentTimeline = dbi.withHandle( final IDBI dbi = connector.getDBI();
new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>() VersionedIntervalTimeline<String, DataSegment> segmentTimeline = inReadOnlyTransaction(
new TransactionCallback<VersionedIntervalTimeline<String, DataSegment>>()
{ {
@Override @Override
public VersionedIntervalTimeline<String, DataSegment> withHandle(Handle handle) throws Exception public VersionedIntervalTimeline<String, DataSegment> inTransaction(
Handle handle, TransactionStatus status
) throws Exception
{ {
return handle.createQuery( return handle
String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable()) .createQuery(String.format(
) "SELECT payload FROM %s WHERE dataSource = :dataSource",
.bind("dataSource", ds) getSegmentsTable()
.map(ByteArrayMapper.FIRST) ))
.fold( .setFetchSize(connector.getStreamingFetchSize())
new VersionedIntervalTimeline<String, DataSegment>(Ordering.natural()), .bind("dataSource", ds)
new Folder3<VersionedIntervalTimeline<String, DataSegment>, byte[]>() .map(ByteArrayMapper.FIRST)
{ .fold(
@Override new VersionedIntervalTimeline<String, DataSegment>(Ordering.natural()),
public VersionedIntervalTimeline<String, DataSegment> fold( new Folder3<VersionedIntervalTimeline<String, DataSegment>, byte[]>()
VersionedIntervalTimeline<String, DataSegment> timeline, {
byte[] payload, @Override
FoldController foldController, public VersionedIntervalTimeline<String, DataSegment> fold(
StatementContext statementContext VersionedIntervalTimeline<String, DataSegment> timeline,
) throws SQLException byte[] payload,
{ FoldController foldController,
try { StatementContext statementContext
DataSegment segment = jsonMapper.readValue( ) throws SQLException
payload, {
DataSegment.class try {
); DataSegment segment = jsonMapper.readValue(
payload,
DataSegment.class
);
timeline.add( timeline.add(
segment.getInterval(), segment.getInterval(),
segment.getVersion(), segment.getVersion(),
segment.getShardSpec().createChunk(segment) segment.getShardSpec().createChunk(segment)
); );
return timeline; return timeline;
} }
catch (Exception e) { catch (Exception e) {
throw new SQLException(e.toString()); throw new SQLException(e.toString());
} }
} }
} }
); );
} }
} }
); );
@ -260,7 +296,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean enableSegment(final String segmentId) public boolean enableSegment(final String segmentId)
{ {
try { try {
dbi.withHandle( connector.getDBI().withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
{ {
@Override @Override
@ -295,7 +331,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return false; return false;
} }
dbi.withHandle( connector.getDBI().withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
{ {
@Override @Override
@ -326,7 +362,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean removeSegment(String ds, final String segmentID) public boolean removeSegment(String ds, final String segmentID)
{ {
try { try {
dbi.withHandle( connector.getDBI().withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
{ {
@Override @Override
@ -386,7 +422,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public Collection<String> getAllDatasourceNames() public Collection<String> getAllDatasourceNames()
{ {
synchronized (lock) { synchronized (lock) {
return dbi.withHandle( return connector.getDBI().withHandle(
new HandleCallback<List<String>>() new HandleCallback<List<String>>()
{ {
@Override @Override
@ -433,33 +469,38 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
log.debug("Starting polling of segment table"); log.debug("Starting polling of segment table");
final List<DataSegment> segments = dbi.withHandle( // some databases such as PostgreSQL require auto-commit turned off
new HandleCallback<List<DataSegment>>() // to stream results back, enabling transactions disables auto-commit
//
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{ {
@Override @Override
public List<DataSegment> withHandle(Handle handle) throws Exception public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) throws Exception
{ {
return handle.createQuery( return handle
String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()) .createQuery(String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
) .setFetchSize(connector.getStreamingFetchSize())
.map( .map(
new ResultSetMapper<DataSegment>() new ResultSetMapper<DataSegment>()
{ {
@Override @Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) public DataSegment map(int index, ResultSet r, StatementContext ctx)
throws SQLException throws SQLException
{ {
try { try {
return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
} }
catch (IOException e) { catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db."); log.makeAlert(e, "Failed to read segment from db.");
return null; return null;
} }
} }
} }
) )
.list(); .list();
} }
} }
); );
@ -526,11 +567,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
final int limit final int limit
) )
{ {
return dbi.withHandle( return inReadOnlyTransaction(
new HandleCallback<List<Interval>>() new TransactionCallback<List<Interval>>()
{ {
@Override @Override
public List<Interval> withHandle(Handle handle) throws IOException, SQLException public List<Interval> inTransaction(Handle handle, TransactionStatus status) throws Exception
{ {
Iterator<Interval> iter = handle Iterator<Interval> iter = handle
.createQuery( .createQuery(
@ -539,6 +580,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
getSegmentsTable() getSegmentsTable()
) )
) )
.setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource) .bind("dataSource", dataSource)
.bind("start", interval.getStart().toString()) .bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString()) .bind("end", interval.getEnd().toString())

View File

@ -77,6 +77,13 @@ public class DerbyConnector extends SQLMetadataConnector
@Override @Override
public DBI getDBI() { return dbi; } public DBI getDBI() { return dbi; }
@Override
protected int getStreamingFetchSize()
{
// Derby only supports fetch size of 1
return 1;
}
@Override @Override
public String getValidationQuery() { return "VALUES 1"; } public String getValidationQuery() { return "VALUES 1"; }
} }