Merge pull request #2599 from himanshug/datasource_isolation

make coordinator db polling for list of segments more robust
This commit is contained in:
Fangjin Yang 2016-03-08 12:43:49 -08:00
commit 9c2420a1bc
3 changed files with 80 additions and 16 deletions

View File

@ -20,8 +20,10 @@
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -431,7 +433,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
log.debug("Starting polling of segment table");
List<DataSegment> segments = dbi.withHandle(
final List<DataSegment> segments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
@ -451,7 +453,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
}
catch (IOException e) {
throw new SQLException(e);
log.makeAlert(e, "Failed to read segment from db.");
return null;
}
}
}
@ -466,9 +469,13 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return;
}
final Collection<DataSegment> segmentsFinal = Collections2.filter(
segments, Predicates.notNull()
);
log.info("Polled and found %,d segments in the database", segments.size());
for (final DataSegment segment : segments) {
for (final DataSegment segment : segmentsFinal) {
String datasourceName = segment.getDataSource();
DruidDataSource dataSource = newDataSources.get(datasourceName);

View File

@ -22,6 +22,7 @@
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -63,6 +64,32 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
@Override
public void publishSegment(final DataSegment segment) throws IOException
{
publishSegment(
segment.getIdentifier(),
segment.getDataSource(),
new DateTime().toString(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
(segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
segment.getVersion(),
true,
jsonMapper.writeValueAsBytes(segment)
);
}
@VisibleForTesting
void publishSegment(
final String identifier,
final String dataSource,
final String createdDate,
final String start,
final String end,
final boolean partitioned,
final String version,
final boolean used,
final byte[] payload
)
{
try {
final DBI dbi = connector.getDBI();
@ -75,14 +102,14 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
return handle.createQuery(
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.bind("id", identifier)
.list();
}
}
);
if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
log.info("Found [%s] in DB, not updating DB", identifier);
return;
}
@ -93,15 +120,15 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("id", identifier)
.bind("dataSource", dataSource)
.bind("created_date", createdDate)
.bind("start", start)
.bind("end", end)
.bind("partitioned", partitioned)
.bind("version", version)
.bind("used", used)
.bind("payload", payload)
.execute();
return null;

View File

@ -24,7 +24,10 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
@ -40,6 +43,7 @@ public class MetadataSegmentManagerTest
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private SQLMetadataSegmentManager manager;
private SQLMetadataSegmentPublisher publisher;
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final DataSegment segment1 = new DataSegment(
@ -78,7 +82,6 @@ public class MetadataSegmentManagerTest
public void setUp() throws Exception
{
TestDerbyConnector connector = derbyConnectorRule.getConnector();
manager = new SQLMetadataSegmentManager(
jsonMapper,
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
@ -86,7 +89,7 @@ public class MetadataSegmentManagerTest
connector
);
SQLMetadataSegmentPublisher publisher = new SQLMetadataSegmentPublisher(
publisher = new SQLMetadataSegmentPublisher(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
@ -114,6 +117,33 @@ public class MetadataSegmentManagerTest
manager.stop();
}
@Test
public void testPollWithCurroptedSegment()
{
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
publisher.publishSegment(
"corrupt-segment-id",
"corrupt-datasource",
"corrupt-create-date",
"corrupt-start-date",
"corrupt-end-date",
true,
"corrupt-version",
true,
"corrupt-payload".getBytes()
);
EmittingLogger.registerEmitter(new NoopServiceEmitter());
manager.start();
manager.poll();
Assert.assertEquals(
"wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
);
}
@Test
public void testGetUnusedSegmentsForInterval() throws Exception
{