From 1a2620606d8187da7725088e9c52ce41b8a692b0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn <52679095+maytasm@users.noreply.github.com> Date: Tue, 16 Jun 2020 20:48:30 -1000 Subject: [PATCH] API to verify a datasource has the latest ingested data (#9965) * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * fix checksyle * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * API to verify a datasource has the latest ingested data * fix spelling * address comments * fix checkstyle * update docs * fix tests * fix doc * address comments * fix typo * fix spelling * address comments * address comments * fix typo in docs --- docs/ingestion/faq.md | 14 + docs/operations/api-reference.md | 43 ++- .../druid/client/CoordinatorServerView.java | 4 + .../metadata/SegmentsMetadataManager.java | 17 + .../metadata/SqlSegmentsMetadataManager.java | 142 ++++++-- .../server/coordinator/DruidCoordinator.java | 19 +- .../server/http/DataSourcesResource.java | 133 +++++++- .../SqlSegmentsMetadataManagerTest.java | 159 ++++++++- .../server/http/DataSourcesResourceTest.java | 315 ++++++++++++++++-- 9 files changed, 773 insertions(+), 73 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 1e6ffe10254..308407fa342 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -66,6 +66,20 @@ Other common reasons that hand-off fails are as follows: Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) . +## How do I know when I can make query to Druid after submitting batch ingestion task? + +You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. +1. Submit your ingestion task. +2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=true` and `interval=` once. +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but is necessary to make sure that we verify all the latest segments' load status) +If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. +4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=false` and `interval=`. +Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. +Note that this workflow only guarantees that the segments are available at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward. + ## I don't see my Druid segments on my Historical processes You can check the Coordinator console located at `:`. Make sure that your segments have actually loaded on [Historical processes](../design/historical.md). If your segments are not present, check the Coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because Historical processes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example): diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a66dd649f41..a3610a8bc52 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment * `/druid/coordinator/v1/loadstatus?simple` -Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication. +Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts. * `/druid/coordinator/v1/loadstatus?full` -Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication. +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts. * `/druid/coordinator/v1/loadqueue` @@ -114,6 +114,45 @@ Returns the number of segments to load and drop, as well as the total segment lo Returns the serialized JSON of segments to load and drop for each Historical process. + +#### Segment Loading by Datasource + +Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). +Also note that these APIs only guarantees that the segments are available at the time of the call. +Segments can still become missing because of historical process failures or any other reasons afterward. + +##### GET + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given +datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + + * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + #### Metadata store information ##### GET diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 2517a8f0e9b..538cc2f526f 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -200,6 +200,10 @@ public class CoordinatorServerView implements InventoryView } } + public Map getSegmentLoadInfos() + { + return segmentLoadInfos; + } @Override public DruidServer getInventoryValue(String serverKey) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 4f97b158021..889141a89c1 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.timeline.DataSegment; @@ -113,6 +114,22 @@ public interface SegmentsMetadataManager */ Iterable iterateAllUsedSegments(); + /** + * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval. + * The order in which segments are iterated is unspecified. Note: the iteration may not be as trivially cheap as, + * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it + * iterates the returned iterable only once rather than several times. + * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer + * response time but will ensure that the latest segment information (at the time this method is called) is returned. + * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll + * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used. + */ + Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + String datasource, + Interval interval, + boolean requiresLatest + ); + /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments * are used or not. If there are no segments in the database, returns an empty set. diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 92a87486352..60f7f4b86ee 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -20,6 +20,8 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -44,6 +46,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -95,7 +98,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager {} /** Represents periodic {@link #poll}s happening from {@link #exec}. */ - private static class PeriodicDatabasePoll implements DatabasePoll + @VisibleForTesting + static class PeriodicDatabasePoll implements DatabasePoll { /** * This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()} @@ -104,13 +108,15 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * leadership changes. */ final CompletableFuture firstPollCompletionFuture = new CompletableFuture<>(); + long lastPollStartTimestampInMs = -1; } /** * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadataManager doesn't poll the database * periodically. */ - private static class OnDemandDatabasePoll implements DatabasePoll + @VisibleForTesting + static class OnDemandDatabasePoll implements DatabasePoll { final long initiationTimeNanos = System.nanoTime(); final CompletableFuture pollCompletionFuture = new CompletableFuture<>(); @@ -127,7 +133,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and * drops leadership repeatedly in quick succession. * - * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadataManager + * This lock is also used to synchronize {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when SqlSegmentsMetadataManager * is not polling the database periodically (in other words, when the Coordinator is not the leader). */ private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock(); @@ -155,7 +161,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * easy to forget to do. * * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link - * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. */ private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null; @@ -170,7 +176,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()} * (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in * this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter - * method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will + * method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will * see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link * #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}. * @@ -185,7 +191,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * SegmentsMetadataManager} and guarantee that it always returns consistent and relatively up-to-date data from methods * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part * is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or - * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method + * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} method * implementation for details. * * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the @@ -194,7 +200,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager * during Coordinator leadership switches is not a priority. * * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link - * #awaitOrPerformDatabasePoll()}. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()}. */ private volatile @Nullable DatabasePoll latestDatabasePoll = null; @@ -311,6 +317,22 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll) { return () -> { + // If latest poll was an OnDemandDatabasePoll that started less than periodicPollDelay, + // We will wait for (periodicPollDelay - currentTime - LatestOnDemandDatabasePollStartTime) then check again. + try { + long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis()); + while (latestDatabasePoll != null + && latestDatabasePoll instanceof OnDemandDatabasePoll + && ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) { + long sleepNano = periodicPollDelayNanos + - ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation(); + TimeUnit.NANOSECONDS.sleep(sleepNano); + } + } + catch (Exception e) { + log.debug(e, "Exception found while waiting for next periodic poll"); + } + // poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and // isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't // actually run anymore after that (it could only enter the synchronized section and exit immediately because the @@ -320,8 +342,10 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager lock.lock(); try { if (startOrder == currentStartPollingOrder) { + periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis(); poll(); periodicDatabasePoll.firstPollCompletionFuture.complete(null); + latestDatabasePoll = periodicDatabasePoll; } else { log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder); } @@ -381,16 +405,16 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } } - private void awaitOrPerformDatabasePoll() + private void useLatestIfWithinDelayOrPerformNewDatabasePoll() { - // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check". - if (awaitLatestDatabasePoll()) { + // Double-checked locking with useLatestSnapshotIfWithinDelay() call playing the role of the "check". + if (useLatestSnapshotIfWithinDelay()) { return; } ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); lock.lock(); try { - if (awaitLatestDatabasePoll()) { + if (useLatestSnapshotIfWithinDelay()) { return; } OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); @@ -403,11 +427,17 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager } /** - * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is - * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise, - * meaning that a new on-demand database poll should be initiated. + * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * made not longer than {@link #periodicPollDelay} from current time. + * This method does wait untill completion for if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * already in the process of polling the database. + * This means that any method using this check can read from snapshot that is + * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old. */ - private boolean awaitLatestDatabasePoll() + @VisibleForTesting + boolean useLatestSnapshotIfWithinDelay() { DatabasePoll latestDatabasePoll = this.latestDatabasePoll; if (latestDatabasePoll instanceof PeriodicDatabasePoll) { @@ -430,6 +460,49 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager return false; } + /** + * This method will always force a database poll if there is no ongoing database poll. This method will then + * waits for the new poll or the ongoing poll to completes before returning. + * This means that any method using this check can be sure that the latest poll for the snapshot was completed after + * this method was called. + */ + @VisibleForTesting + void forceOrWaitOngoingDatabasePoll() + { + long checkStartTime = System.currentTimeMillis(); + ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); + lock.lock(); + try { + DatabasePoll latestDatabasePoll = this.latestDatabasePoll; + try { + //Verify if there was a periodic poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof PeriodicDatabasePoll + && ((PeriodicDatabasePoll) latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) { + return; + } + // Verify if there was a on-demand poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof OnDemandDatabasePoll) { + long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime); + OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll; + if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) { + return; + } + } + } + catch (Exception e) { + // Latest poll was unsuccessful, try to do a new poll + log.debug(e, "Latest poll was unsuccessful. Starting a new poll..."); + } + // Force a database poll + OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); + this.latestDatabasePoll = onDemandDatabasePoll; + doOnDemandPoll(onDemandDatabasePoll); + } + finally { + lock.unlock(); + } + } + private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) { try { @@ -857,19 +930,44 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager @Override public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() { - awaitOrPerformDatabasePoll(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); return dataSourcesSnapshot; } + @VisibleForTesting + DataSourcesSnapshot getDataSourcesSnapshot() + { + return dataSourcesSnapshot; + } + + @VisibleForTesting + DatabasePoll getLatestDatabasePoll() + { + return latestDatabasePoll; + } + + @Override public Iterable iterateAllUsedSegments() { - awaitOrPerformDatabasePoll(); - return () -> dataSourcesSnapshot - .getDataSourcesWithAllUsedSegments() - .stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot(); + } + + @Override + public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest) + { + if (requiresLatest) { + forceOrWaitOngoingDatabasePoll(); + } else { + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + } + VersionedIntervalTimeline usedSegmentsTimeline + = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); + return Optional.fromNullable(usedSegmentsTimeline) + .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 36a414e780d..c4de3644c64 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -256,6 +256,17 @@ public class DruidCoordinator * @return tier -> { dataSource -> underReplicationCount } map */ public Map> computeUnderReplicationCountsPerDataSourcePerTier() + { + final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); + return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); + } + + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ + public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegments( + Iterable dataSegments + ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); @@ -263,8 +274,6 @@ public class DruidCoordinator return underReplicationCountsPerDataSourcePerTier; } - final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); - final DateTime now = DateTimes.nowUtc(); for (final DataSegment segment : dataSegments) { @@ -320,7 +329,7 @@ public class DruidCoordinator for (ImmutableDruidDataSource dataSource : dataSources) { final Set segments = Sets.newHashSet(dataSource.getSegments()); - final int numUsedSegments = segments.size(); + final int numPublishedSegments = segments.size(); // remove loaded segments for (DruidServer druidServer : serverInventoryView.getInventory()) { @@ -333,10 +342,10 @@ public class DruidCoordinator } } } - final int numUnloadedSegments = segments.size(); + final int numUnavailableSegments = segments.size(); loadStatus.put( dataSource.getName(), - 100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments) + 100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments) ); } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index b6d310f1ba7..f88d1c70854 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -22,11 +22,13 @@ package org.apache.druid.server.http; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -49,6 +51,7 @@ import org.apache.druid.metadata.UnknownSegmentIdsException; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -96,12 +99,14 @@ import java.util.stream.Collectors; public class DataSourcesResource { private static final Logger log = new Logger(DataSourcesResource.class); + private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 * 60 * 1000; private final CoordinatorServerView serverInventoryView; private final SegmentsMetadataManager segmentsMetadataManager; private final MetadataRuleManager metadataRuleManager; private final IndexingServiceClient indexingServiceClient; private final AuthorizerMapper authorizerMapper; + private final DruidCoordinator coordinator; @Inject public DataSourcesResource( @@ -109,7 +114,8 @@ public class DataSourcesResource SegmentsMetadataManager segmentsMetadataManager, MetadataRuleManager metadataRuleManager, @Nullable IndexingServiceClient indexingServiceClient, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + DruidCoordinator coordinator ) { this.serverInventoryView = serverInventoryView; @@ -117,6 +123,7 @@ public class DataSourcesResource this.metadataRuleManager = metadataRuleManager; this.indexingServiceClient = indexingServiceClient; this.authorizerMapper = authorizerMapper; + this.coordinator = coordinator; } @GET @@ -391,6 +398,130 @@ public class DataSourcesResource return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains); } + @GET + @Path("/{dataSourceName}/loadstatus") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(DatasourceResourceFilter.class) + public Response getDatasourceLoadstatus( + @PathParam("dataSourceName") String dataSourceName, + @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh, + @QueryParam("interval") @Nullable final String interval, + @QueryParam("simple") @Nullable final String simple, + @QueryParam("full") @Nullable final String full + ) + { + if (forceMetadataRefresh == null) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("Invalid request. forceMetadataRefresh must be specified") + .build(); + } + final Interval theInterval; + if (interval == null) { + long currentTimeInMs = System.currentTimeMillis(); + theInterval = Intervals.utc(currentTimeInMs - DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs); + } else { + theInterval = Intervals.of(interval.replace('_', '/')); + } + + Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + dataSourceName, + theInterval, + forceMetadataRefresh + ); + + if (!segments.isPresent()) { + return logAndCreateDataSourceNotFoundResponse(dataSourceName); + } + + if (Iterables.size(segments.get()) == 0) { + return Response + .status(Response.Status.NO_CONTENT) + .entity("No used segment found for the given datasource and interval") + .build(); + } + + if (simple != null) { + // Calculate response for simple mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); + return Response.ok( + ImmutableMap.of( + dataSourceName, + segmentsLoadStatistics.getNumUnavailableSegments() + ) + ).build(); + } else if (full != null) { + // Calculate response for full mode + Map> segmentLoadMap + = coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get()); + if (segmentLoadMap.isEmpty()) { + return Response.serverError() + .entity("Coordinator segment replicant lookup is not initialized yet. Try again later.") + .build(); + } + return Response.ok(segmentLoadMap).build(); + } else { + // Calculate response for default mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); + return Response.ok( + ImmutableMap.of( + dataSourceName, + 100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments()) + ) + ).build(); + } + } + + private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) + { + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); + int numPublishedSegments = 0; + int numUnavailableSegments = 0; + int numLoadedSegments = 0; + for (DataSegment segment : segments) { + numPublishedSegments++; + if (!segmentLoadInfos.containsKey(segment.getId())) { + numUnavailableSegments++; + } else { + numLoadedSegments++; + } + } + return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments); + } + + private static class SegmentsLoadStatistics + { + private int numPublishedSegments; + private int numUnavailableSegments; + private int numLoadedSegments; + + SegmentsLoadStatistics( + int numPublishedSegments, + int numUnavailableSegments, + int numLoadedSegments + ) + { + this.numPublishedSegments = numPublishedSegments; + this.numUnavailableSegments = numUnavailableSegments; + this.numLoadedSegments = numLoadedSegments; + } + + public int getNumPublishedSegments() + { + return numPublishedSegments; + } + + public int getNumUnavailableSegments() + { + return numUnavailableSegments; + } + + public int getNumLoadedSegments() + { + return numLoadedSegments; + } + } + /** * The property names belong to the public HTTP JSON API. */ diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index be6354a63c7..57df4407346 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -20,11 +20,13 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -43,6 +45,7 @@ import org.junit.Rule; import org.junit.Test; import java.io.IOException; +import java.util.Set; import java.util.stream.Collectors; @@ -117,7 +120,7 @@ public class SqlSegmentsMetadataManagerTest { TestDerbyConnector connector = derbyConnectorRule.getConnector(); SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); - config.setPollDuration(Period.seconds(1)); + config.setPollDuration(Period.seconds(3)); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -148,30 +151,124 @@ public class SqlSegmentsMetadataManagerTest } @Test - public void testPoll() + public void testPollPeriodically() { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( ImmutableSet.of("wikipedia"), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); Assert.assertEquals( ImmutableList.of("wikipedia"), - sqlSegmentsMetadataManager - .getImmutableDataSourcesWithAllUsedSegments() - .stream() - .map(ImmutableDruidDataSource::getName) - .collect(Collectors.toList()) + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test + public void testPollOnDemand() + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + // This should return false and not wait/poll anything as we did not schedule periodic poll + Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay()); + Assert.assertNull(dataSourcesSnapshot); + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableSet.of("wikipedia"), + sqlSegmentsMetadataManager.retrieveAllDataSourceNames() + ); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test(timeout = 60_000) + public void testPollPeriodicallyAndOnDemandInterleave() throws Exception + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + final String newDataSource2 = "wikipedia2"; + final DataSegment newSegment2 = createNewSegment1(newDataSource2); + publisher.publishSegment(newSegment2); + + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + // New datasource should now be in the snapshot since we just force on demand poll. + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + + final String newDataSource3 = "wikipedia3"; + final DataSegment newSegment3 = createNewSegment1(newDataSource3); + publisher.publishSegment(newSegment3); + + // This time wait for periodic poll (not doing on demand poll so we have to wait a bit...) + while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) { + Thread.sleep(1000); + } + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); } @@ -749,4 +846,46 @@ public class SqlSegmentsMetadataManagerTest sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); } + + @Test + public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception + { + final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); + Optional> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + Set dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + final DataSegment newSegment2 = createSegment( + "wikipedia", + "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000", + "2017-10-15T20:19:12.565Z", + "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", + 0 + ); + publisher.publishSegment(newSegment2); + + // New segment is not returned since we call without force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, false + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + // New segment is returned since we call with force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(2, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + Assert.assertTrue(dataSegmentSet.contains(newSegment2)); + } + } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 10a7f97300c..39e02ae86de 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -19,11 +19,14 @@ package org.apache.druid.server.http; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; @@ -39,6 +42,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.IntervalDropRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; @@ -176,7 +180,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -250,7 +254,7 @@ public class DataSourcesResourceTest } }; - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); @@ -289,7 +293,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources(null, "simple", request); Assert.assertEquals(200, response.getStatus()); List> results = (List>) response.getEntity(); @@ -313,7 +317,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", "full"); ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -329,7 +333,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Assert.assertEquals(204, dataSourcesResource.getDataSource("none", null).getStatus()); EasyMock.verify(inventoryView, server); } @@ -347,7 +351,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -380,7 +384,7 @@ public class DataSourcesResourceTest EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server, server2, server3)).atLeastOnce(); EasyMock.replay(inventoryView, server, server2, server3); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -418,7 +422,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result1 = (Map>) response.getEntity(); @@ -463,7 +467,7 @@ public class DataSourcesResourceTest expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z")); expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals( "invalidDataSource", @@ -523,7 +527,7 @@ public class DataSourcesResourceTest EasyMock.replay(inventoryView); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getServedSegmentsInInterval( "invalidDataSource", "2010-01-01/P1D", @@ -593,7 +597,7 @@ public class DataSourcesResourceTest EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); Response response = dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval); Assert.assertEquals(200, response.getStatus()); @@ -607,7 +611,7 @@ public class DataSourcesResourceTest IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class); EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); try { Response response = dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", "true", "???"); @@ -630,7 +634,7 @@ public class DataSourcesResourceTest Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null); Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null); + new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null, null); // test dropped EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) @@ -699,7 +703,7 @@ public class DataSourcesResourceTest EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -713,7 +717,7 @@ public class DataSourcesResourceTest EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -734,7 +738,7 @@ public class DataSourcesResourceTest EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -757,7 +761,7 @@ public class DataSourcesResourceTest EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -780,7 +784,7 @@ public class DataSourcesResourceTest EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -803,7 +807,7 @@ public class DataSourcesResourceTest EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -821,7 +825,7 @@ public class DataSourcesResourceTest EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -835,7 +839,7 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -848,7 +852,7 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -861,7 +865,7 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -874,7 +878,7 @@ public class DataSourcesResourceTest public void testMarkAsUsedNonOvershadowedSegmentsNoPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1026,7 +1030,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1049,7 +1053,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1074,7 +1078,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1096,7 +1100,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1119,7 +1123,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1143,7 +1147,7 @@ public class DataSourcesResourceTest new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1154,7 +1158,7 @@ public class DataSourcesResourceTest public void testMarkSegmentsAsUnusedNullPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1169,7 +1173,7 @@ public class DataSourcesResourceTest public void testMarkSegmentsAsUnusedInvalidPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null); @@ -1183,7 +1187,7 @@ public class DataSourcesResourceTest public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of()); @@ -1193,6 +1197,251 @@ public class DataSourcesResourceTest Assert.assertNotNull(response.getEntity()); } + @Test + public void testGetDatasourceLoadstatusForceMetadataRefreshNull() + { + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testGetDatasourceLoadstatusNoSegmentForInterval() + { + List segments = ImmutableList.of(); + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq( + "datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.replay(segmentsMetadataManager); + + DataSourcesResource dataSourcesResource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + null, + null, + null, + null + ); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(204, response.getStatus()); + } + + @Test + public void testGetDatasourceLoadstatusDefault() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(100.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusSimple() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusFull() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + Object2LongMap tier1 = new Object2LongOpenHashMap<>(); + tier1.put("datasource1", 0L); + Object2LongMap tier2 = new Object2LongOpenHashMap<>(); + tier2.put("datasource1", 3L); + underReplicationCountsPerDataSourcePerTier.put("tier1", tier1); + underReplicationCountsPerDataSourcePerTier.put("tier2", tier2); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments)) + .andReturn(underReplicationCountsPerDataSourcePerTier).once(); + + EasyMock.replay(segmentsMetadataManager, druidCoordinator); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full"); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(2, ((Map) response.getEntity()).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size()); + Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1")); + Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1")); + EasyMock.verify(segmentsMetadataManager); + } + private DruidServerMetadata createRealtimeServerMetadata(String name) { return createServerMetadata(name, ServerType.REALTIME);