diff --git a/extensions-core/mysql-metadata-storage/pom.xml b/extensions-core/mysql-metadata-storage/pom.xml
index 8b0395512ee..d3c5f269519 100644
--- a/extensions-core/mysql-metadata-storage/pom.xml
+++ b/extensions-core/mysql-metadata-storage/pom.xml
@@ -54,7 +54,7 @@
mysql
mysql-connector-java
- 5.1.34
+ 5.1.38
org.jdbi
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/io/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/io/druid/metadata/storage/mysql/MySQLConnector.java
index 0f56f91eed4..67264ea4d13 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/io/druid/metadata/storage/mysql/MySQLConnector.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/io/druid/metadata/storage/mysql/MySQLConnector.java
@@ -75,6 +75,14 @@ public class MySQLConnector extends SQLMetadataConnector
return SERIAL_TYPE;
}
+ @Override
+ protected int getStreamingFetchSize()
+ {
+ // this is MySQL's way of indicating you want results streamed back
+ // see http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
+ return Integer.MIN_VALUE;
+ }
+
@Override
public boolean tableExists(Handle handle, String tableName)
{
diff --git a/extensions-core/postgresql-metadata-storage/pom.xml b/extensions-core/postgresql-metadata-storage/pom.xml
index ddb49d67140..d68fdbfa89a 100644
--- a/extensions-core/postgresql-metadata-storage/pom.xml
+++ b/extensions-core/postgresql-metadata-storage/pom.xml
@@ -54,7 +54,7 @@
org.postgresql
postgresql
- 9.3-1102-jdbc41
+ 9.4.1208.jre7
org.jdbi
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java
index 07e7f62aa68..36d8db94f25 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java
@@ -39,6 +39,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
private static final Logger log = new Logger(PostgreSQLConnector.class);
private static final String PAYLOAD_TYPE = "BYTEA";
private static final String SERIAL_TYPE = "BIGSERIAL";
+ public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;
private final DBI dbi;
@@ -71,6 +72,12 @@ public class PostgreSQLConnector extends SQLMetadataConnector
return SERIAL_TYPE;
}
+ @Override
+ protected int getStreamingFetchSize()
+ {
+ return DEFAULT_STREAMING_RESULT_SIZE;
+ }
+
protected boolean canUpsert(Handle handle) throws SQLException
{
if (canUpsert == null) {
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
index 5a0ade98ae9..a926f163483 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataConnector.java
@@ -98,6 +98,14 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
*/
protected abstract String getSerialType();
+ /**
+ * Returns the value that should be passed to statement.setFetchSize to ensure results
+ * are streamed back from the database instead of fetching the entire result set in memory.
+ *
+ * @return optimal fetch size to stream results back
+ */
+ protected abstract int getStreamingFetchSize();
+
public String getValidationQuery() { return "SELECT 1"; }
public abstract boolean tableExists(Handle handle, final String tableName);
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
index 6ee3d497774..0dbeadbd175 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
@@ -52,11 +52,15 @@ import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
+import org.skife.jdbi.v2.TransactionCallback;
+import org.skife.jdbi.v2.TransactionStatus;
+import org.skife.jdbi.v2.exceptions.TransactionFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -82,7 +86,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final Supplier config;
private final Supplier dbTables;
private final AtomicReference> dataSources;
- private final IDBI dbi;
+ private final SQLMetadataConnector connector;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture> future = null;
@@ -103,7 +107,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
this.dataSources = new AtomicReference<>(
new ConcurrentHashMap()
);
- this.dbi = connector.getDBI();
+ this.connector = connector;
}
@LifecycleStart
@@ -157,53 +161,85 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
}
}
+ private T inReadOnlyTransaction(final TransactionCallback callback)
+ {
+ return connector.getDBI().withHandle(
+ new HandleCallback()
+ {
+ @Override
+ public T withHandle(Handle handle) throws Exception
+ {
+ final Connection connection = handle.getConnection();
+ final boolean readOnly = connection.isReadOnly();
+ connection.setReadOnly(true);
+ try {
+ return handle.inTransaction(callback);
+ } finally {
+ try {
+ connection.setReadOnly(readOnly);
+ } catch (SQLException e) {
+ // at least try to log it so we don't swallow exceptions
+ log.error(e, "Unable to reset connection read-only state");
+ }
+ }
+ }
+ }
+ );
+ }
+
@Override
public boolean enableDatasource(final String ds)
{
try {
- VersionedIntervalTimeline segmentTimeline = dbi.withHandle(
- new HandleCallback>()
+ final IDBI dbi = connector.getDBI();
+ VersionedIntervalTimeline segmentTimeline = inReadOnlyTransaction(
+ new TransactionCallback>()
{
@Override
- public VersionedIntervalTimeline withHandle(Handle handle) throws Exception
+ public VersionedIntervalTimeline inTransaction(
+ Handle handle, TransactionStatus status
+ ) throws Exception
{
- return handle.createQuery(
- String.format("SELECT payload FROM %s WHERE dataSource = :dataSource", getSegmentsTable())
- )
- .bind("dataSource", ds)
- .map(ByteArrayMapper.FIRST)
- .fold(
- new VersionedIntervalTimeline(Ordering.natural()),
- new Folder3, byte[]>()
- {
- @Override
- public VersionedIntervalTimeline fold(
- VersionedIntervalTimeline timeline,
- byte[] payload,
- FoldController foldController,
- StatementContext statementContext
- ) throws SQLException
- {
- try {
- DataSegment segment = jsonMapper.readValue(
- payload,
- DataSegment.class
- );
+ return handle
+ .createQuery(String.format(
+ "SELECT payload FROM %s WHERE dataSource = :dataSource",
+ getSegmentsTable()
+ ))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .bind("dataSource", ds)
+ .map(ByteArrayMapper.FIRST)
+ .fold(
+ new VersionedIntervalTimeline(Ordering.natural()),
+ new Folder3, byte[]>()
+ {
+ @Override
+ public VersionedIntervalTimeline fold(
+ VersionedIntervalTimeline timeline,
+ byte[] payload,
+ FoldController foldController,
+ StatementContext statementContext
+ ) throws SQLException
+ {
+ try {
+ DataSegment segment = jsonMapper.readValue(
+ payload,
+ DataSegment.class
+ );
- timeline.add(
- segment.getInterval(),
- segment.getVersion(),
- segment.getShardSpec().createChunk(segment)
- );
+ timeline.add(
+ segment.getInterval(),
+ segment.getVersion(),
+ segment.getShardSpec().createChunk(segment)
+ );
- return timeline;
- }
- catch (Exception e) {
- throw new SQLException(e.toString());
- }
- }
- }
- );
+ return timeline;
+ }
+ catch (Exception e) {
+ throw new SQLException(e.toString());
+ }
+ }
+ }
+ );
}
}
);
@@ -260,7 +296,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean enableSegment(final String segmentId)
{
try {
- dbi.withHandle(
+ connector.getDBI().withHandle(
new HandleCallback()
{
@Override
@@ -295,7 +331,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return false;
}
- dbi.withHandle(
+ connector.getDBI().withHandle(
new HandleCallback()
{
@Override
@@ -326,7 +362,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public boolean removeSegment(String ds, final String segmentID)
{
try {
- dbi.withHandle(
+ connector.getDBI().withHandle(
new HandleCallback()
{
@Override
@@ -386,7 +422,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
public Collection getAllDatasourceNames()
{
synchronized (lock) {
- return dbi.withHandle(
+ return connector.getDBI().withHandle(
new HandleCallback>()
{
@Override
@@ -433,33 +469,38 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
log.debug("Starting polling of segment table");
- final List segments = dbi.withHandle(
- new HandleCallback>()
+ // some databases such as PostgreSQL require auto-commit turned off
+ // to stream results back, enabling transactions disables auto-commit
+ //
+ // setting connection to read-only will allow some database such as MySQL
+ // to automatically use read-only transaction mode, further optimizing the query
+ final List segments = inReadOnlyTransaction(
+ new TransactionCallback>()
{
@Override
- public List withHandle(Handle handle) throws Exception
+ public List inTransaction(Handle handle, TransactionStatus status) throws Exception
{
- return handle.createQuery(
- String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable())
- )
- .map(
- new ResultSetMapper()
- {
- @Override
- public DataSegment map(int index, ResultSet r, StatementContext ctx)
- throws SQLException
- {
- try {
- return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
- }
- catch (IOException e) {
- log.makeAlert(e, "Failed to read segment from db.");
- return null;
- }
- }
- }
- )
- .list();
+ return handle
+ .createQuery(String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
+ .setFetchSize(connector.getStreamingFetchSize())
+ .map(
+ new ResultSetMapper()
+ {
+ @Override
+ public DataSegment map(int index, ResultSet r, StatementContext ctx)
+ throws SQLException
+ {
+ try {
+ return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
+ }
+ catch (IOException e) {
+ log.makeAlert(e, "Failed to read segment from db.");
+ return null;
+ }
+ }
+ }
+ )
+ .list();
}
}
);
@@ -526,11 +567,11 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
final int limit
)
{
- return dbi.withHandle(
- new HandleCallback>()
+ return inReadOnlyTransaction(
+ new TransactionCallback>()
{
@Override
- public List withHandle(Handle handle) throws IOException, SQLException
+ public List inTransaction(Handle handle, TransactionStatus status) throws Exception
{
Iterator iter = handle
.createQuery(
@@ -539,6 +580,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
getSegmentsTable()
)
)
+ .setFetchSize(connector.getStreamingFetchSize())
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
diff --git a/server/src/main/java/io/druid/metadata/storage/derby/DerbyConnector.java b/server/src/main/java/io/druid/metadata/storage/derby/DerbyConnector.java
index 3fe1ebb2189..94e96b69ecf 100644
--- a/server/src/main/java/io/druid/metadata/storage/derby/DerbyConnector.java
+++ b/server/src/main/java/io/druid/metadata/storage/derby/DerbyConnector.java
@@ -77,6 +77,13 @@ public class DerbyConnector extends SQLMetadataConnector
@Override
public DBI getDBI() { return dbi; }
+ @Override
+ protected int getStreamingFetchSize()
+ {
+ // Derby only supports fetch size of 1
+ return 1;
+ }
+
@Override
public String getValidationQuery() { return "VALUES 1"; }
}