reduce memory usage when pulling segments

This commit is contained in:
Xavier Léauté 2014-10-29 12:43:21 -07:00
parent 68b9436199
commit 7be252599e
2 changed files with 26 additions and 12 deletions

View File

@ -229,9 +229,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
return Pair.of( return Pair.of(
r.getString("dataSource"), r.getString("dataSource"),
jsonMapper.<List<Rule>>readValue( jsonMapper.<List<Rule>>readValue(
r.getBytes("payload"), new TypeReference<List<Rule>>() r.getBytes("payload"), new TypeReference<List<Rule>>(){}
{
}
) )
); );
} }

View File

@ -47,8 +47,11 @@ import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI; import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -412,31 +415,44 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<String, DruidDataSource>(); ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<String, DruidDataSource>();
List<byte[]> segmentRows = dbi.withHandle( List<DataSegment> segments = dbi.withHandle(
new HandleCallback<List<byte[]>>() new HandleCallback<List<DataSegment>>()
{ {
@Override @Override
public List<byte[]> withHandle(Handle handle) throws Exception public List<DataSegment> withHandle(Handle handle) throws Exception
{ {
return handle.createQuery( return handle.createQuery(
String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()) String.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable())
) )
.map(ByteArrayMapper.FIRST) .map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
}
catch (IOException e) {
throw new SQLException(e);
}
}
}
)
.list(); .list();
} }
} }
); );
if (segmentRows == null || segmentRows.isEmpty()) { if (segments == null || segments.isEmpty()) {
log.warn("No segments found in the database!"); log.warn("No segments found in the database!");
return; return;
} }
log.info("Polled and found %,d segments in the database", segmentRows.size()); log.info("Polled and found %,d segments in the database", segments.size());
for (final byte[] payload : segmentRows) {
DataSegment segment = jsonMapper.readValue(payload, DataSegment.class);
for (final DataSegment segment : segments) {
String datasourceName = segment.getDataSource(); String datasourceName = segment.getDataSource();
DruidDataSource dataSource = newDataSources.get(datasourceName); DruidDataSource dataSource = newDataSources.get(datasourceName);