diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java index 4661b3ab73b..e29d843c98b 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/NoneShardSpec.java @@ -29,6 +29,16 @@ import java.util.List; import java.util.Map; /** + * {@link ShardSpec} with no partitioning in a time chunk, i.e. a single segment + * per time chunk. This shard spec has been deprecated and is not generated by + * the Druid code anymore. The class has been retained only for backward + * compatibility reasons. + *

+ * For more information, refer to + * PR #6883. + * + * @deprecated Since Druid 0.15.0. Segments generated by Druid 0.15.0 onwards + * do not use this shard spec. */ @Deprecated public class NoneShardSpec implements ShardSpec diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java index b69f15edb6b..48a92ecba4e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentPublisher.java @@ -20,7 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; @@ -28,8 +27,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.IOException; import java.util.List; @@ -79,8 +76,7 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher ); } - @VisibleForTesting - void publishSegment( + private void publishSegment( final String segmentId, final String dataSource, final String createdDate, @@ -96,31 +92,18 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher try { final DBI dbi = connector.getDBI(); List> exists = dbi.withHandle( - new HandleCallback>>() - { - @Override - public List> withHandle(Handle handle) - { - return handle.createQuery( - StringUtils.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable()) - ) - .bind("id", segmentId) - .list(); - } - } + handle -> handle.createQuery( + StringUtils.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable()) + ).bind("id", segmentId).list() ); if (!exists.isEmpty()) { - log.info("Found [%s] in DB, not updating DB", segmentId); + log.info("Skipping publish of segment[%s] as it already exists in the metadata store.", segmentId); return; } dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { + handle -> handle.createStatement(statement) .bind("id", segmentId) .bind("dataSource", dataSource) @@ -132,11 +115,7 @@ public class SQLMetadataSegmentPublisher implements MetadataSegmentPublisher .bind("used", used) .bind("payload", payload) .bind("used_status_last_updated", usedFlagLastUpdated) - .execute(); - - return null; - } - } + .execute() ); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 1bf8ec534a9..0ddf4a2d2cc 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -64,7 +64,6 @@ import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; -import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import javax.annotation.Nullable; @@ -72,6 +71,7 @@ import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -335,7 +335,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager { ExecutorService executorService = Executors.newSingleThreadExecutor(); usedFlagLastUpdatedPopulationFuture = executorService.submit( - () -> populateUsedFlagLastUpdated() + this::populateUsedFlagLastUpdated ); } @@ -347,70 +347,68 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager @VisibleForTesting void populateUsedFlagLastUpdated() { - String segmentsTable = getSegmentsTable(); + final String segmentsTable = getSegmentsTable(); log.info( - "Populating used_status_last_updated with non-NULL values for unused segments in [%s]", + "Populating column 'used_status_last_updated' with non-NULL values for unused segments in table[%s].", segmentsTable ); - int limit = 100; + final int batchSize = 100; int totalUpdatedEntries = 0; + // Update the rows in batches of size 100 while (true) { - List segmentsToUpdate = new ArrayList<>(100); + final List segmentsToUpdate = new ArrayList<>(batchSize); + int numUpdatedRows; try { connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - segmentsToUpdate.addAll(handle.createQuery( - StringUtils.format( - "SELECT id FROM %1$s WHERE used_status_last_updated IS NULL and used = :used %2$s", - segmentsTable, - connector.limitClause(limit) - ) - ).bind("used", false).mapTo(String.class).list()); - return null; - } + handle -> { + segmentsToUpdate.addAll(handle.createQuery( + StringUtils.format( + "SELECT id FROM %1$s WHERE used_status_last_updated IS NULL and used = :used %2$s", + segmentsTable, + connector.limitClause(batchSize) + ) + ).bind("used", false).mapTo(String.class).list()); + return null; } ); if (segmentsToUpdate.isEmpty()) { - // We have no segments to process break; } - connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - Batch updateBatch = handle.createBatch(); - String sql = "UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'"; - String now = DateTimes.nowUtc().toString(); - for (String id : segmentsToUpdate) { - updateBatch.add(StringUtils.format(sql, segmentsTable, now, id)); - } - updateBatch.execute(); - return null; + numUpdatedRows = connector.retryWithHandle( + handle -> { + final Batch updateBatch = handle.createBatch(); + final String sql = "UPDATE %1$s SET used_status_last_updated = '%2$s' WHERE id = '%3$s'"; + String now = DateTimes.nowUtc().toString(); + for (String id : segmentsToUpdate) { + updateBatch.add(StringUtils.format(sql, segmentsTable, now, id)); } + int[] results = updateBatch.execute(); + return Arrays.stream(results).sum(); } ); + totalUpdatedEntries += numUpdatedRows; } catch (Exception e) { - log.warn(e, "Population of used_status_last_updated in [%s] has failed. There may be unused segments with" - + " NULL values for used_status_last_updated that won't be killed!", segmentsTable); + log.warn(e, "Populating column 'used_status_last_updated' in table[%s] has failed. There may be unused segments with" + + " NULL values for 'used_status_last_updated' that won't be killed!", segmentsTable); return; } - totalUpdatedEntries += segmentsToUpdate.size(); - log.info("Updated a batch of %d rows in [%s] with a valid used_status_last_updated date", - segmentsToUpdate.size(), - segmentsTable + log.debug( + "Updated a batch of [%d] rows in table[%s] with a valid used_status_last_updated date", + segmentsToUpdate.size(), segmentsTable ); + + // Do not wait if there are no more segments to update + if (segmentsToUpdate.size() == numUpdatedRows && numUpdatedRows < batchSize) { + break; + } + + // Wait for some time before processing the next batch try { Thread.sleep(10000); } @@ -420,9 +418,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } } log.info( - "Finished updating [%s] with a valid used_status_last_updated date. %d rows updated", - segmentsTable, - totalUpdatedEntries + "Populated column 'used_status_last_updated' in table[%s] in [%d] rows.", + segmentsTable, totalUpdatedEntries ); } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java deleted file mode 100644 index 440aff3c084..00000000000 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerEmptyTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.metadata; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import org.apache.druid.client.ImmutableDruidDataSource; -import org.apache.druid.segment.TestHelper; -import org.joda.time.Period; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import java.util.stream.Collectors; - - -/** - * Like {@link SQLMetadataRuleManagerTest} except with no segments to make sure it behaves when it's empty - */ -public class SqlSegmentsMetadataManagerEmptyTest -{ - - @Rule - public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); - - private SqlSegmentsMetadataManager sqlSegmentsMetadataManager; - private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - - @Before - public void setUp() - { - TestDerbyConnector connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); - config.setPollDuration(Period.seconds(1)); - sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( - jsonMapper, - Suppliers.ofInstance(config), - derbyConnectorRule.metadataTablesConfigSupplier(), - connector - ); - sqlSegmentsMetadataManager.start(); - - connector.createSegmentTable(); - } - - @After - public void teardown() - { - if (sqlSegmentsMetadataManager.isPollingDatabasePeriodically()) { - sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); - } - sqlSegmentsMetadataManager.stop(); - } - - @Test - public void testPollEmpty() - { - sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); - Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - Assert.assertEquals( - ImmutableSet.of(), - sqlSegmentsMetadataManager.retrieveAllDataSourceNames() - ); - Assert.assertEquals( - ImmutableList.of(), - sqlSegmentsMetadataManager - .getImmutableDataSourcesWithAllUsedSegments() - .stream() - .map(ImmutableDruidDataSource::getName) - .collect(Collectors.toList()) - ); - Assert.assertEquals( - null, - sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia") - ); - Assert.assertEquals( - ImmutableSet.of(), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) - ); - } - - @Test - public void testStopAndStart() - { - // Simulate successive losing and getting the coordinator leadership - sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); - } -} diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index ab024d568db..85e5021f6c6 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.TestHelper; +import org.apache.druid.server.coordinator.CreateDataSegments; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -46,101 +47,79 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.IOException; -import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; public class SqlSegmentsMetadataManagerTest { + private static class DS + { + static final String WIKI = "wikipedia"; + static final String KOALA = "koala"; + } + private static DataSegment createSegment( String dataSource, String interval, - String version, - String bucketKey, - int binaryVersion + String version ) { return new DataSegment( dataSource, Intervals.of(interval), version, - ImmutableMap.of( - "type", "s3_zip", - "bucket", "test", - "key", dataSource + "/" + bucketKey - ), - ImmutableList.of("dim1", "dim2", "dim3"), - ImmutableList.of("count", "value"), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), NoneShardSpec.instance(), - binaryVersion, + 9, 1234L ); } @Rule - public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule + = new TestDerbyConnector.DerbyConnectorRule(); private SqlSegmentsMetadataManager sqlSegmentsMetadataManager; private SQLMetadataSegmentPublisher publisher; - private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); + private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper(); - private final DataSegment segment1 = createSegment( - "wikipedia", - "2012-03-15T00:00:00.000/2012-03-16T00:00:00.000", - "2012-03-16T00:36:30.848Z", - "index/y=2012/m=03/d=15/2012-03-16T00:36:30.848Z/0/index.zip", - 0 - ); + private final DataSegment wikiSegment1 = + CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-03-15").eachOfSizeInMb(500).get(0); + private final DataSegment wikiSegment2 = + CreateDataSegments.ofDatasource(DS.WIKI).startingAt("2012-01-05").eachOfSizeInMb(500).get(0); - private final DataSegment segment2 = createSegment( - "wikipedia", - "2012-01-05T00:00:00.000/2012-01-06T00:00:00.000", - "2012-01-06T22:19:12.565Z", - "wikipedia/index/y=2012/m=01/d=05/2012-01-06T22:19:12.565Z/0/index.zip", - 0 - ); - - private void publish(DataSegment segment, boolean used) throws IOException + private void publishUnusedSegments(DataSegment... segments) throws IOException { - publish(segment, used, DateTimes.nowUtc()); + for (DataSegment segment : segments) { + publisher.publishSegment(segment); + sqlSegmentsMetadataManager.markSegmentAsUnused(segment.getId()); + } } - private void publish(DataSegment segment, boolean used, DateTime usedFlagLastUpdated) throws IOException + private void publishWikiSegments() { - boolean partitioned = !(segment.getShardSpec() instanceof NoneShardSpec); - - String usedFlagLastUpdatedStr = null; - if (null != usedFlagLastUpdated) { - usedFlagLastUpdatedStr = usedFlagLastUpdated.toString(); + try { + publisher.publishSegment(wikiSegment1); + publisher.publishSegment(wikiSegment2); + } + catch (Exception e) { + throw new RuntimeException(e); } - publisher.publishSegment( - segment.getId().toString(), - segment.getDataSource(), - DateTimes.nowUtc().toString(), - segment.getInterval().getStart().toString(), - segment.getInterval().getEnd().toString(), - partitioned, - segment.getVersion(), - used, - jsonMapper.writeValueAsBytes(segment), - usedFlagLastUpdatedStr - ); } @Before - public void setUp() throws Exception + public void setUp() { - TestDerbyConnector connector = derbyConnectorRule.getConnector(); + final TestDerbyConnector connector = derbyConnectorRule.getConnector(); SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); - config.setPollDuration(Period.seconds(3)); + config.setPollDuration(Period.millis(1)); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( - jsonMapper, + JSON_MAPPER, Suppliers.ofInstance(config), derbyConnectorRule.metadataTablesConfigSupplier(), connector @@ -148,15 +127,12 @@ public class SqlSegmentsMetadataManagerTest sqlSegmentsMetadataManager.start(); publisher = new SQLMetadataSegmentPublisher( - jsonMapper, + JSON_MAPPER, derbyConnectorRule.metadataTablesConfigSupplier().get(), connector ); connector.createSegmentTable(); - - publisher.publishSegment(segment1); - publisher.publishSegment(segment2); } @After @@ -168,9 +144,32 @@ public class SqlSegmentsMetadataManagerTest sqlSegmentsMetadataManager.stop(); } + @Test + public void testPollEmpty() + { + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue( + sqlSegmentsMetadataManager.retrieveAllDataSourceNames().isEmpty() + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager + .getImmutableDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName).count() + ); + Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.WIKI)); + Assert.assertTrue( + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()).isEmpty() + ); + } + @Test public void testPollPeriodically() { + publishWikiSegments(); DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertNull(dataSourcesSnapshot); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); @@ -180,22 +179,22 @@ public class SqlSegmentsMetadataManagerTest Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableSet.of("wikipedia"), + ImmutableSet.of(DS.WIKI), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); Assert.assertEquals( - ImmutableList.of("wikipedia"), + ImmutableList.of(DS.WIKI), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) .collect(Collectors.toList()) ); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments()) ); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) ); } @@ -203,6 +202,7 @@ public class SqlSegmentsMetadataManagerTest @Test public void testPollOnDemand() { + publishWikiSegments(); DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertNull(dataSourcesSnapshot); // This should return false and not wait/poll anything as we did not schedule periodic poll @@ -214,22 +214,22 @@ public class SqlSegmentsMetadataManagerTest Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableSet.of("wikipedia"), + ImmutableSet.of(DS.WIKI), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); Assert.assertEquals( - ImmutableList.of("wikipedia"), + ImmutableList.of(DS.WIKI), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) .collect(Collectors.toList()) ); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource(DS.WIKI).getSegments()) ); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) ); } @@ -237,6 +237,7 @@ public class SqlSegmentsMetadataManagerTest @Test(timeout = 60_000) public void testPollPeriodicallyAndOnDemandInterleave() throws Exception { + publishWikiSegments(); DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertNull(dataSourcesSnapshot); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); @@ -246,15 +247,13 @@ public class SqlSegmentsMetadataManagerTest Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableList.of("wikipedia"), + ImmutableList.of(DS.WIKI), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) .collect(Collectors.toList()) ); - final String newDataSource2 = "wikipedia2"; - final DataSegment newSegment2 = createNewSegment1(newDataSource2); - publisher.publishSegment(newSegment2); + publisher.publishSegment(createNewSegment1(DS.KOALA)); // This call will force on demand poll sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); @@ -263,7 +262,7 @@ public class SqlSegmentsMetadataManagerTest // New datasource should now be in the snapshot since we just force on demand poll. dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableList.of("wikipedia2", "wikipedia"), + ImmutableList.of(DS.KOALA, DS.WIKI), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) @@ -271,8 +270,7 @@ public class SqlSegmentsMetadataManagerTest ); final String newDataSource3 = "wikipedia3"; - final DataSegment newSegment3 = createNewSegment1(newDataSource3); - publisher.publishSegment(newSegment3); + publisher.publishSegment(createNewSegment1(newDataSource3)); // This time wait for periodic poll (not doing on demand poll so we have to wait a bit...) while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) { @@ -282,7 +280,7 @@ public class SqlSegmentsMetadataManagerTest Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableSet.of("wikipedia2", "wikipedia3", "wikipedia"), + ImmutableSet.of(DS.KOALA, "wikipedia3", DS.WIKI), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) @@ -293,29 +291,32 @@ public class SqlSegmentsMetadataManagerTest @Test public void testPrepareImmutableDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws IOException { - DataSegment newSegment = pollThenStopThenStartIntro(); + publishWikiSegments(); + DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment(); Assert.assertEquals( - ImmutableSet.of(newSegment), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments()) + ImmutableSet.of(koalaSegment), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments()) ); } @Test public void testGetDataSourceWithUsedSegmentsAwaitsPollOnRestart() throws IOException { - DataSegment newSegment = pollThenStopThenStartIntro(); + publishWikiSegments(); + DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment(); Assert.assertEquals( - ImmutableSet.of(newSegment), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia2").getSegments()) + ImmutableSet.of(koalaSegment), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA).getSegments()) ); } @Test public void testPrepareImmutableDataSourcesWithAllUsedSegmentsAwaitsPollOnRestart() throws IOException { - DataSegment newSegment = pollThenStopThenStartIntro(); + publishWikiSegments(); + DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment), ImmutableSet.copyOf( sqlSegmentsMetadataManager .getImmutableDataSourcesWithAllUsedSegments() @@ -329,54 +330,49 @@ public class SqlSegmentsMetadataManagerTest @Test public void testIterateAllUsedSegmentsAwaitsPollOnRestart() throws IOException { - DataSegment newSegment = pollThenStopThenStartIntro(); + publishWikiSegments(); + DataSegment koalaSegment = pollThenStopThenPublishKoalaSegment(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } - private DataSegment pollThenStopThenStartIntro() throws IOException + private DataSegment pollThenStopThenPublishKoalaSegment() throws IOException { sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); Assert.assertEquals( - ImmutableSet.of("wikipedia"), + ImmutableSet.of(DS.WIKI), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); - DataSegment newSegment = createNewSegment1("wikipedia2"); - publisher.publishSegment(newSegment); + final DataSegment koalaSegment = createNewSegment1(DS.KOALA); + publisher.publishSegment(koalaSegment); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - return newSegment; + return koalaSegment; } - + /** + * Create a corrupted segment entry in the segments table to test + * whether the overall loading of segments from the database continues to work + * even if one of the entries is corrupted. + */ @Test - public void testPollWithCorruptedSegment() + public void testPollWithCorruptedSegment() throws IOException { - //create a corrupted segment entry in segments table, which tests - //that overall loading of segments from database continues to work - //even in one of the entries are corrupted. - publisher.publishSegment( - "corrupt-segment-id", - "corrupt-datasource", - "corrupt-create-date", - "corrupt-start-date", - "corrupt-end-date", - true, - "corrupt-version", - true, - StringUtils.toUtf8("corrupt-payload"), - "corrupt-last-used-date" - ); + publishWikiSegments(); + + final DataSegment corruptSegment = DataSegment.builder(wikiSegment1).dataSource("corrupt-datasource").build(); + publisher.publishSegment(corruptSegment); + updateSegmentPayload(corruptSegment, StringUtils.toUtf8("corrupt-payload")); EmittingLogger.registerEmitter(new NoopServiceEmitter()); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); Assert.assertEquals( - "wikipedia", + DS.WIKI, Iterables.getOnlyElement(sqlSegmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments()).getName() ); } @@ -384,66 +380,66 @@ public class SqlSegmentsMetadataManagerTest @Test public void testGetUnusedSegmentIntervals() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); - // We alter the segment table to allow nullable used_status_last_updated in order to test compatibility during druid upgrade from version without used_status_last_updated. - derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable(); + // Allow null values of used_status_last_updated to test upgrade from older Druid versions + allowUsedFlagLastUpdatedToBeNullable(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource("wikipedia"); + int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.WIKI); Assert.assertEquals(2, numChangedSegments); - String newDs = "newDataSource"; - final DataSegment newSegment = createSegment( - newDs, + // Publish an unused segment with used_status_last_updated 2 hours ago + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publish(newSegment, false, DateTimes.nowUtc().minus(Duration.parse("PT7200S").getMillis())); + publishUnusedSegments(koalaSegment1); + updateUsedStatusLastUpdated(koalaSegment1, DateTimes.nowUtc().minus(Duration.standardHours(2))); - final DataSegment newSegment2 = createSegment( - newDs, + // Publish an unused segment with used_status_last_updated 2 days ago + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, "2017-10-16T00:00:00.000/2017-10-17T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publish(newSegment2, false, DateTimes.nowUtc().minus(Duration.parse("PT172800S").getMillis())); + publishUnusedSegments(koalaSegment2); + updateUsedStatusLastUpdated(koalaSegment2, DateTimes.nowUtc().minus(Duration.standardDays(2))); - final DataSegment newSegment3 = createSegment( - newDs, + // Publish an unused segment and set used_status_last_updated to null + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publish(newSegment3, false, null); + publishUnusedSegments(koalaSegment3); + updateUsedStatusLastUpdatedToNull(koalaSegment3); Assert.assertEquals( - ImmutableList.of(segment2.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) + ImmutableList.of(wikiSegment2.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, DateTimes.of("3000"), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) ); // Test the DateTime maxEndTime argument of getUnusedSegmentIntervals Assert.assertEquals( - ImmutableList.of(segment2.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) + ImmutableList.of(wikiSegment2.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) ); Assert.assertEquals( - ImmutableList.of(segment1.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) + ImmutableList.of(wikiSegment1.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 4, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) ); Assert.assertEquals( ImmutableList.of(), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, DateTimes.of(2012, 1, 7, 0, 0), DateTimes.of(2012, 1, 7, 0, 0), 1, DateTimes.COMPARE_DATE_AS_STRING_MAX) ); Assert.assertEquals( - ImmutableList.of(segment2.getInterval(), segment1.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX) + ImmutableList.of(wikiSegment2.getInterval(), wikiSegment1.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, null, DateTimes.of("3000"), 5, DateTimes.COMPARE_DATE_AS_STRING_MAX) ); // Test a buffer period that should exclude some segments @@ -451,15 +447,21 @@ public class SqlSegmentsMetadataManagerTest // The wikipedia datasource has segments generated with last used time equal to roughly the time of test run. None of these segments should be selected with a bufer period of 1 day Assert.assertEquals( ImmutableList.of(), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S"))) + sqlSegmentsMetadataManager.getUnusedSegmentIntervals(DS.WIKI, DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S"))) ); - // One of the 3 segments in newDs has a null used_status_last_updated which should mean getUnusedSegmentIntervals never returns it - // One of the 3 segments in newDs has a used_status_last_updated older than 1 day which means it should also be returned - // The last of the 3 segemns in newDs has a used_status_last_updated date less than one day and should not be returned + // koalaSegment3 has a null used_status_last_updated which should mean getUnusedSegmentIntervals never returns it + // koalaSegment2 has a used_status_last_updated older than 1 day which means it should be returned + // The last of the 3 segments in koala has a used_status_last_updated date less than one day and should not be returned Assert.assertEquals( - ImmutableList.of(newSegment2.getInterval()), - sqlSegmentsMetadataManager.getUnusedSegmentIntervals(newDs, DateTimes.COMPARE_DATE_AS_STRING_MIN, DateTimes.of("3000"), 5, DateTimes.nowUtc().minus(Duration.parse("PT86400S"))) + ImmutableList.of(koalaSegment2.getInterval()), + sqlSegmentsMetadataManager.getUnusedSegmentIntervals( + DS.KOALA, + DateTimes.COMPARE_DATE_AS_STRING_MIN, + DateTimes.of("3000"), + 5, + DateTimes.nowUtc().minus(Duration.parse("PT86400S")) + ) ); } @@ -470,37 +472,30 @@ public class SqlSegmentsMetadataManagerTest sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment = createNewSegment1(newDataSource); + publisher.publishSegment(createNewSegment1(DS.KOALA)); - publisher.publishSegment(newSegment); - - awaitDataSourceAppeared(newDataSource); - int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(newDataSource); + awaitDataSourceAppeared(DS.KOALA); + int numChangedSegments = sqlSegmentsMetadataManager.markAsUnusedAllSegmentsInDataSource(DS.KOALA); Assert.assertEquals(1, numChangedSegments); - awaitDataSourceDisappeared(newDataSource); - Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)); + awaitDataSourceDisappeared(DS.KOALA); + Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA)); } - private static DataSegment createNewSegment1(String newDataSource) + private static DataSegment createNewSegment1(String datasource) { return createSegment( - newDataSource, + datasource, "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); } - private static DataSegment createNewSegment2(String newDataSource) + private static DataSegment createNewSegment2(String datasource) { return createSegment( - newDataSource, + datasource, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); } @@ -511,188 +506,171 @@ public class SqlSegmentsMetadataManagerTest sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment = createSegment( - newDataSource, + final DataSegment koalaSegment = createSegment( + DS.KOALA, "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publisher.publishSegment(newSegment); - awaitDataSourceAppeared(newDataSource); - Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)); + publisher.publishSegment(koalaSegment); + awaitDataSourceAppeared(DS.KOALA); + Assert.assertNotNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA)); - Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(newSegment.getId())); - awaitDataSourceDisappeared(newDataSource); - Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource)); + Assert.assertTrue(sqlSegmentsMetadataManager.markSegmentAsUnused(koalaSegment.getId())); + awaitDataSourceDisappeared(DS.KOALA); + Assert.assertNull(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(DS.KOALA)); } - private void awaitDataSourceAppeared(String newDataSource) throws InterruptedException + private void awaitDataSourceAppeared(String datasource) throws InterruptedException { - while (sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(newDataSource) == null) { - Thread.sleep(1000); + while (sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(datasource) == null) { + Thread.sleep(5); } } private void awaitDataSourceDisappeared(String dataSource) throws InterruptedException { while (sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(dataSource) != null) { - Thread.sleep(1000); + Thread.sleep(5); } } @Test public void testMarkAsUsedNonOvershadowedSegments() throws Exception { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createSegment( - newDataSource, + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - final DataSegment newSegment2 = createSegment( - newDataSource, + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-16T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 1 + "2017-10-16T20:19:12.565Z" ); - // Overshadowed by newSegment2 - final DataSegment newSegment3 = createSegment( - newDataSource, + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 1 + "2017-10-15T20:19:12.565Z" ); - publish(newSegment1, false); - publish(newSegment2, false); - publish(newSegment3, false); - final ImmutableSet segmentIds = ImmutableSet.of( - newSegment1.getId().toString(), - newSegment2.getId().toString(), - newSegment3.getId().toString() + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + final Set segmentIds = ImmutableSet.of( + koalaSegment1.getId().toString(), + koalaSegment2.getId().toString(), + koalaSegment3.getId().toString() ); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, segmentIds)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, segmentIds)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment1, newSegment2), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } - @Test(expected = UnknownSegmentIdsException.class) + @Test public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws Exception { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); + final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA); + final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA); - final DataSegment newSegment2 = createNewSegment1(newDataSource); - - publish(newSegment1, false); - publish(newSegment2, false); + publishUnusedSegments(koalaSegment1, koalaSegment2); final ImmutableSet segmentIds = - ImmutableSet.of(newSegment1.getId().toString(), newSegment2.getId().toString()); + ImmutableSet.of(koalaSegment1.getId().toString(), koalaSegment2.getId().toString()); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); - // none of the segments are in data source - Assert.assertEquals(0, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", segmentIds)); + + Assert.assertThrows( + UnknownSegmentIdsException.class, + () -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments("wrongDataSource", segmentIds) + ); } - @Test(expected = UnknownSegmentIdsException.class) - public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds() throws UnknownSegmentIdsException + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithInvalidSegmentIds() { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); - - final DataSegment newSegment2 = createNewSegment1(newDataSource); + final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA); + final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA); final ImmutableSet segmentIds = - ImmutableSet.of(newSegment1.getId().toString(), newSegment2.getId().toString()); + ImmutableSet.of(koalaSegment1.getId().toString(), koalaSegment2.getId().toString()); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); - // none of the segments are in data source - Assert.assertEquals(0, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(newDataSource, segmentIds)); + + Assert.assertThrows( + UnknownSegmentIdsException.class, + () -> sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegments(DS.KOALA, segmentIds) + ); } @Test public void testMarkAsUsedNonOvershadowedSegmentsInInterval() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); - - final DataSegment newSegment2 = createSegment( - newDataSource, + final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-16T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 1 + "2017-10-16T20:19:12.565Z" ); - - final DataSegment newSegment3 = createSegment( - newDataSource, + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - // Overshadowed by newSegment2 - final DataSegment newSegment4 = createNewSegment2(newDataSource); + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA); - publish(newSegment1, false); - publish(newSegment2, false); - publish(newSegment3, false); - publish(newSegment4, false); + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3, koalaSegment4); final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000"); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment1, newSegment2), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } @@ -700,56 +678,47 @@ public class SqlSegmentsMetadataManagerTest @Test public void testMarkAsUsedNonOvershadowedSegmentsInIntervalWithOverlappingInterval() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createSegment( - newDataSource, + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - final DataSegment newSegment2 = createSegment( - newDataSource, + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-16T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 1 + "2017-10-16T20:19:12.565Z" ); - final DataSegment newSegment3 = createSegment( - newDataSource, + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-19T00:00:00.000/2017-10-22T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - // Overshadowed by newSegment2 - final DataSegment newSegment4 = createNewSegment2(newDataSource); + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment4 = createNewSegment2(DS.KOALA); - publish(newSegment1, false); - publish(newSegment2, false); - publish(newSegment3, false); - publish(newSegment4, false); + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3, koalaSegment4); final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000"); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(newDataSource, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment2), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } @@ -757,24 +726,24 @@ public class SqlSegmentsMetadataManagerTest @Test public void testMarkSegmentsAsUnused() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); + final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA); + final DataSegment koalaSegment2 = createNewSegment1(DS.KOALA); - final DataSegment newSegment2 = createNewSegment1(newDataSource); + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); - publisher.publishSegment(newSegment1); - publisher.publishSegment(newSegment2); final ImmutableSet segmentIds = - ImmutableSet.of(newSegment1.getId(), newSegment1.getId()); + ImmutableSet.of(koalaSegment1.getId(), koalaSegment1.getId()); Assert.assertEquals(segmentIds.size(), sqlSegmentsMetadataManager.markSegmentsAsUnused(segmentIds)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2), + ImmutableSet.of(wikiSegment1, wikiSegment2), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } @@ -782,34 +751,30 @@ public class SqlSegmentsMetadataManagerTest @Test public void testMarkAsUnusedSegmentsInInterval() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createNewSegment1(newDataSource); - - final DataSegment newSegment2 = createNewSegment2(newDataSource); - - final DataSegment newSegment3 = createSegment( - newDataSource, + final DataSegment koalaSegment1 = createNewSegment1(DS.KOALA); + final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publisher.publishSegment(newSegment1); - publisher.publishSegment(newSegment2); - publisher.publishSegment(newSegment3); + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000"); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment3), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } @@ -817,40 +782,34 @@ public class SqlSegmentsMetadataManagerTest @Test public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException { + publishWikiSegments(); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); - final String newDataSource = "wikipedia2"; - final DataSegment newSegment1 = createSegment( - newDataSource, + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - - final DataSegment newSegment2 = createNewSegment2(newDataSource); - - final DataSegment newSegment3 = createSegment( - newDataSource, + final DataSegment koalaSegment2 = createNewSegment2(DS.KOALA); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, "2017-10-19T00:00:00.000/2017-10-22T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publisher.publishSegment(newSegment1); - publisher.publishSegment(newSegment2); - publisher.publishSegment(newSegment3); + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000"); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(newDataSource, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( - ImmutableSet.of(segment1, segment2, newSegment1, newSegment3), + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment3), ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) ); } @@ -868,81 +827,133 @@ public class SqlSegmentsMetadataManagerTest @Test public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception { + publishWikiSegments(); final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); - Optional> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( - "wikipedia", theInterval, true + + // Re-create SqlSegmentsMetadataManager with a higher poll duration + final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); + config.setPollDuration(Period.seconds(1)); + sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( + JSON_MAPPER, + Suppliers.ofInstance(config), + derbyConnectorRule.metadataTablesConfigSupplier(), + derbyConnectorRule.getConnector() ); + sqlSegmentsMetadataManager.start(); + + Optional> segments = sqlSegmentsMetadataManager + .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, theInterval, true); Assert.assertTrue(segments.isPresent()); Set dataSegmentSet = ImmutableSet.copyOf(segments.get()); Assert.assertEquals(1, dataSegmentSet.size()); - Assert.assertTrue(dataSegmentSet.contains(segment1)); + Assert.assertTrue(dataSegmentSet.contains(wikiSegment1)); - final DataSegment newSegment2 = createSegment( - "wikipedia", + final DataSegment wikiSegment3 = createSegment( + DS.WIKI, "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publisher.publishSegment(newSegment2); + publisher.publishSegment(wikiSegment3); // New segment is not returned since we call without force poll - segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( - "wikipedia", theInterval, false - ); + segments = sqlSegmentsMetadataManager + .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, theInterval, false); Assert.assertTrue(segments.isPresent()); dataSegmentSet = ImmutableSet.copyOf(segments.get()); Assert.assertEquals(1, dataSegmentSet.size()); - Assert.assertTrue(dataSegmentSet.contains(segment1)); + Assert.assertTrue(dataSegmentSet.contains(wikiSegment1)); // New segment is returned since we call with force poll - segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( - "wikipedia", theInterval, true - ); + segments = sqlSegmentsMetadataManager + .iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DS.WIKI, theInterval, true); Assert.assertTrue(segments.isPresent()); dataSegmentSet = ImmutableSet.copyOf(segments.get()); Assert.assertEquals(2, dataSegmentSet.size()); - Assert.assertTrue(dataSegmentSet.contains(segment1)); - Assert.assertTrue(dataSegmentSet.contains(newSegment2)); + Assert.assertTrue(dataSegmentSet.contains(wikiSegment1)); + Assert.assertTrue(dataSegmentSet.contains(wikiSegment3)); } @Test public void testPopulateUsedFlagLastUpdated() throws IOException { - derbyConnectorRule.allowUsedFlagLastUpdatedToBeNullable(); - final DataSegment newSegment = createSegment( - "dummyDS", + allowUsedFlagLastUpdatedToBeNullable(); + final DataSegment koalaSegment = createSegment( + DS.KOALA, "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", - "2017-10-15T20:19:12.565Z", - "wikipedia2/index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", - 0 + "2017-10-15T20:19:12.565Z" ); - publish(newSegment, false, null); - Assert.assertTrue(getCountOfRowsWithLastUsedNull() > 0); + + publishUnusedSegments(koalaSegment); + updateUsedStatusLastUpdatedToNull(koalaSegment); + + Assert.assertEquals(1, getCountOfRowsWithLastUsedNull()); sqlSegmentsMetadataManager.populateUsedFlagLastUpdated(); - Assert.assertTrue(getCountOfRowsWithLastUsedNull() == 0); + Assert.assertEquals(0, getCountOfRowsWithLastUsedNull()); + } + + private void updateSegmentPayload(DataSegment segment, byte[] payload) + { + executeUpdate( + "UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?", + payload, + segment.getId().toString() + ); } private int getCountOfRowsWithLastUsedNull() { return derbyConnectorRule.getConnector().retryWithHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) - { - List> lst = handle.select( - StringUtils.format( - "SELECT * FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL", - derbyConnectorRule.metadataTablesConfigSupplier() - .get() - .getSegmentsTable() - .toUpperCase(Locale.ENGLISH) - ) - ); - return lst.size(); - } - } + handle -> handle.select( + StringUtils.format( + "SELECT ID FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL", + getSegmentsTable() + ) + ).size() ); } + + private void updateUsedStatusLastUpdated(DataSegment segment, DateTime newValue) + { + executeUpdate( + "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", + newValue.toString(), + segment.getId().toString() + ); + } + + private void updateUsedStatusLastUpdatedToNull(DataSegment segment) + { + executeUpdate( + "UPDATE %1$s SET USED_STATUS_LAST_UPDATED = NULL WHERE ID = ?", + segment.getId().toString() + ); + } + + private void executeUpdate(String sqlFormat, Object... args) + { + derbyConnectorRule.getConnector().retryWithHandle( + handle -> handle.update( + StringUtils.format(sqlFormat, getSegmentsTable()), + args + ) + ); + } + + /** + * Alters the column used_status_last_updated to be nullable. This is used to + * test backward compatibility with versions of Druid without this column + * present in the segments table. + */ + private void allowUsedFlagLastUpdatedToBeNullable() + { + executeUpdate("ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL"); + } + + private String getSegmentsTable() + { + return derbyConnectorRule.metadataTablesConfigSupplier() + .get() + .getSegmentsTable() + .toUpperCase(Locale.ENGLISH); + } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java index d0d8357837c..e5460ce402b 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java @@ -25,14 +25,10 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.storage.derby.DerbyConnector; import org.junit.Assert; import org.junit.rules.ExternalResource; -import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; -import org.skife.jdbi.v2.tweak.HandleCallback; import java.sql.SQLException; -import java.util.Locale; import java.util.UUID; public class TestDerbyConnector extends DerbyConnector @@ -139,27 +135,5 @@ public class TestDerbyConnector extends DerbyConnector { return dbTables; } - - public void allowUsedFlagLastUpdatedToBeNullable() - { - connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) - { - final Batch batch = handle.createBatch(); - batch.add( - StringUtils.format( - "ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL", - dbTables.get().getSegmentsTable().toUpperCase(Locale.ENGLISH) - ) - ); - batch.execute(); - return null; - } - } - ); - } } }