From 7be252599eff7696150d9fc7d5a09d7fe55bd302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 29 Oct 2014 12:43:21 -0700 Subject: [PATCH] reduce memory usage when pulling segments --- .../io/druid/db/SQLMetadataRuleManager.java | 4 +-- .../druid/db/SQLMetadataSegmentManager.java | 34 ++++++++++++++----- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java index 68df33225c1..3a322f700ce 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataRuleManager.java @@ -229,9 +229,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager return Pair.of( r.getString("dataSource"), jsonMapper.>readValue( - r.getBytes("payload"), new TypeReference>() - { - } + r.getBytes("payload"), new TypeReference>(){} ) ); } diff --git a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java index f89b4d8e186..0c823c5608c 100644 --- a/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/db/SQLMetadataSegmentManager.java @@ -47,8 +47,11 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; +import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.ByteArrayMapper; +import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; @@ -412,31 +415,44 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager ConcurrentHashMap newDataSources = new ConcurrentHashMap(); - List segmentRows = dbi.withHandle( - new HandleCallback>() + List segments = dbi.withHandle( + new HandleCallback>() { @Override - public List withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { return handle.createQuery( String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()) ) - .map(ByteArrayMapper.FIRST) + .map( + new ResultSetMapper() + { + @Override + public DataSegment map(int index, ResultSet r, StatementContext ctx) + throws SQLException + { + try { + return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); + } + catch (IOException e) { + throw new SQLException(e); + } + } + } + ) .list(); } } ); - if (segmentRows == null || segmentRows.isEmpty()) { + if (segments == null || segments.isEmpty()) { log.warn("No segments found in the database!"); return; } - log.info("Polled and found %,d segments in the database", segmentRows.size()); - - for (final byte[] payload : segmentRows) { - DataSegment segment = jsonMapper.readValue(payload, DataSegment.class); + log.info("Polled and found %,d segments in the database", segments.size()); + for (final DataSegment segment : segments) { String datasourceName = segment.getDataSource(); DruidDataSource dataSource = newDataSources.get(datasourceName);