make it easier to test

This commit is contained in:
Xavier Léauté 2014-10-31 14:49:07 -07:00
parent 0bd9ac1154
commit 1872b8f979
3 changed files with 14 additions and 9 deletions

View File

@ -64,6 +64,7 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.TaskQueueConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
@ -86,6 +87,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import java.io.File;
import java.io.IOException;

View File

@ -58,7 +58,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig dbTables;
private final IDBI dbi;
private final SQLMetadataConnector connector;
@Inject
public IndexerSQLMetadataStorageCoordinator(
@ -69,13 +69,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
{
this.jsonMapper = jsonMapper;
this.dbTables = dbTables;
this.dbi = connector.getDBI();
this.connector = connector;
}
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException
{
final VersionedIntervalTimeline<String, DataSegment> timeline = dbi.withHandle(
final VersionedIntervalTimeline<String, DataSegment> timeline = connector.getDBI().withHandle(
new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>()
{
@Override
@ -140,7 +140,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
*/
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
{
return dbi.inTransaction(
return connector.getDBI().inTransaction(
new TransactionCallback<Set<DataSegment>>()
{
@Override
@ -227,7 +227,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
@ -245,7 +245,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
public void deleteSegments(final Set<DataSegment> segments) throws IOException
{
dbi.inTransaction(
connector.getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
@ -288,7 +288,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = dbi.withHandle(
List<DataSegment> matchingSegments = connector.getDBI().withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override

View File

@ -28,6 +28,7 @@ import io.druid.segment.realtime.SegmentPublisher;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
@ -42,7 +43,7 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher
private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig config;
private final IDBI dbi;
private final SQLMetadataConnector connector;
private final String statement;
@Inject
@ -54,7 +55,7 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher
{
this.jsonMapper = jsonMapper;
this.config = config;
this.dbi = connector.getDBI();
this.connector = connector;
this.statement = String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
@ -66,6 +67,7 @@ public class SQLMetadataSegmentPublisher implements SegmentPublisher
public void publishSegment(final DataSegment segment) throws IOException
{
try {
final DBI dbi = connector.getDBI();
List<Map<String, Object>> exists = dbi.withHandle(
new HandleCallback<List<Map<String, Object>>>()
{