in coordinator db polling for available segments, ignore corrupted entries in segments table so that coordinator continues to load new segments even if there are few corrupted segment entries

This commit is contained in:
Himanshu Gupta 2016-03-04 16:49:01 -06:00
parent 4fa08a1329
commit 1288784bde
3 changed files with 80 additions and 16 deletions

View File

@ -20,8 +20,10 @@
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@ -431,7 +433,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
log.debug("Starting polling of segment table");
List<DataSegment> segments = dbi.withHandle(
final List<DataSegment> segments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
@ -451,7 +453,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
}
catch (IOException e) {
throw new SQLException(e);
log.makeAlert(e, "Failed to read segment from db.");
return null;
}
}
}
@ -466,9 +469,13 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return;
}
final Collection<DataSegment> segmentsFinal = Collections2.filter(
segments, Predicates.notNull()
);
log.info("Polled and found %,d segments in the database", segments.size());
for (final DataSegment segment : segments) {
for (final DataSegment segment : segmentsFinal) {
String datasourceName = segment.getDataSource();
DruidDataSource dataSource = newDataSources.get(datasourceName);

View File

@ -22,6 +22,7 @@
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
@ -63,6 +64,32 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
@Override
public void publishSegment(final DataSegment segment) throws IOException
{
publishSegment(
segment.getIdentifier(),
segment.getDataSource(),
new DateTime().toString(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
(segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
segment.getVersion(),
true,
jsonMapper.writeValueAsBytes(segment)
);
}
@VisibleForTesting
void publishSegment(
final String identifier,
final String dataSource,
final String createdDate,
final String start,
final String end,
final boolean partitioned,
final String version,
final boolean used,
final byte[] payload
)
{
try {
final DBI dbi = connector.getDBI();
@ -75,14 +102,14 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
return handle.createQuery(
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.bind("id", identifier)
.list();
}
}
);
if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
log.info("Found [%s] in DB, not updating DB", identifier);
return;
}
@ -93,15 +120,15 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("id", identifier)
.bind("dataSource", dataSource)
.bind("created_date", createdDate)
.bind("start", start)
.bind("end", end)
.bind("partitioned", partitioned)
.bind("version", version)
.bind("used", used)
.bind("payload", payload)
.execute();
return null;

View File

@ -24,7 +24,10 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
@ -40,6 +43,7 @@ public class MetadataSegmentManagerTest
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private SQLMetadataSegmentManager manager;
private SQLMetadataSegmentPublisher publisher;
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final DataSegment segment1 = new DataSegment(
@ -78,7 +82,6 @@ public class MetadataSegmentManagerTest
public void setUp() throws Exception
{
TestDerbyConnector connector = derbyConnectorRule.getConnector();
manager = new SQLMetadataSegmentManager(
jsonMapper,
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
@ -86,7 +89,7 @@ public class MetadataSegmentManagerTest
connector
);
SQLMetadataSegmentPublisher publisher = new SQLMetadataSegmentPublisher(
publisher = new SQLMetadataSegmentPublisher(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
@ -114,6 +117,33 @@ public class MetadataSegmentManagerTest
manager.stop();
}
@Test
public void testPollWithCurroptedSegment()
{
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
publisher.publishSegment(
"corrupt-segment-id",
"corrupt-datasource",
"corrupt-create-date",
"corrupt-start-date",
"corrupt-end-date",
true,
"corrupt-version",
true,
"corrupt-payload".getBytes()
);
EmittingLogger.registerEmitter(new NoopServiceEmitter());
manager.start();
manager.poll();
Assert.assertEquals(
"wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
);
}
@Test
public void testGetUnusedSegmentsForInterval() throws Exception
{