diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 3d195d1d112..6ee3d497774 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -20,8 +20,10 @@ package io.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; @@ -431,7 +433,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager log.debug("Starting polling of segment table"); - List segments = dbi.withHandle( + final List segments = dbi.withHandle( new HandleCallback>() { @Override @@ -451,7 +453,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); } catch (IOException e) { - throw new SQLException(e); + log.makeAlert(e, "Failed to read segment from db."); + return null; } } } @@ -466,9 +469,13 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager return; } + final Collection segmentsFinal = Collections2.filter( + segments, Predicates.notNull() + ); + log.info("Polled and found %,d segments in the database", segments.size()); - for (final DataSegment segment : segments) { + for (final DataSegment segment : segmentsFinal) { String datasourceName = segment.getDataSource(); DruidDataSource dataSource = newDataSources.get(datasourceName); diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentPublisher.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentPublisher.java index 6d5ba2b471e..212c9e30da9 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentPublisher.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentPublisher.java @@ -22,6 +22,7 @@ package io.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; import com.metamx.common.logger.Logger; import io.druid.timeline.DataSegment; @@ -63,6 +64,32 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher @Override public void publishSegment(final DataSegment segment) throws IOException + { + publishSegment( + segment.getIdentifier(), + segment.getDataSource(), + new DateTime().toString(), + segment.getInterval().getStart().toString(), + segment.getInterval().getEnd().toString(), + (segment.getShardSpec() instanceof NoneShardSpec) ? false : true, + segment.getVersion(), + true, + jsonMapper.writeValueAsBytes(segment) + ); + } + + @VisibleForTesting + void publishSegment( + final String identifier, + final String dataSource, + final String createdDate, + final String start, + final String end, + final boolean partitioned, + final String version, + final boolean used, + final byte[] payload + ) { try { final DBI dbi = connector.getDBI(); @@ -75,14 +102,14 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher return handle.createQuery( String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable()) ) - .bind("id", segment.getIdentifier()) + .bind("id", identifier) .list(); } } ); if (!exists.isEmpty()) { - log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); + log.info("Found [%s] in DB, not updating DB", identifier); return; } @@ -93,15 +120,15 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher public Void withHandle(Handle handle) throws Exception { handle.createStatement(statement) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", jsonMapper.writeValueAsBytes(segment)) + .bind("id", identifier) + .bind("dataSource", dataSource) + .bind("created_date", createdDate) + .bind("start", start) + .bind("end", end) + .bind("partitioned", partitioned) + .bind("version", version) + .bind("used", used) + .bind("payload", payload) .execute(); return null; diff --git a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java b/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java index 95217b50d3a..cb87b1fb030 100644 --- a/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java +++ b/server/src/test/java/io/druid/metadata/MetadataSegmentManagerTest.java @@ -24,7 +24,10 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.metamx.emitter.EmittingLogger; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; @@ -40,6 +43,7 @@ public class MetadataSegmentManagerTest public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); private SQLMetadataSegmentManager manager; + private SQLMetadataSegmentPublisher publisher; private final ObjectMapper jsonMapper = new DefaultObjectMapper(); private final DataSegment segment1 = new DataSegment( @@ -78,7 +82,6 @@ public class MetadataSegmentManagerTest public void setUp() throws Exception { TestDerbyConnector connector = derbyConnectorRule.getConnector(); - manager = new SQLMetadataSegmentManager( jsonMapper, Suppliers.ofInstance(new MetadataSegmentManagerConfig()), @@ -86,7 +89,7 @@ public class MetadataSegmentManagerTest connector ); - SQLMetadataSegmentPublisher publisher = new SQLMetadataSegmentPublisher( + publisher = new SQLMetadataSegmentPublisher( jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), connector @@ -114,6 +117,33 @@ public class MetadataSegmentManagerTest manager.stop(); } + @Test + public void testPollWithCurroptedSegment() + { + //create a corrupted segment entry in segments table, which tests + //that overall loading of segments from database continues to work + //even in one of the entries are corrupted. + publisher.publishSegment( + "corrupt-segment-id", + "corrupt-datasource", + "corrupt-create-date", + "corrupt-start-date", + "corrupt-end-date", + true, + "corrupt-version", + true, + "corrupt-payload".getBytes() + ); + + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + manager.start(); + manager.poll(); + + Assert.assertEquals( + "wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName() + ); + } + @Test public void testGetUnusedSegmentsForInterval() throws Exception {