API to verify a datasource has the latest ingested data (#9965)

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* fix checksyle

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* API to verify a datasource has the latest ingested data

* fix spelling

* address comments

* fix checkstyle

* update docs

* fix tests

* fix doc

* address comments

* fix typo

* fix spelling

* address comments

* address comments

* fix typo in docs
This commit is contained in:
Maytas Monsereenusorn 2020-06-16 20:48:30 -10:00 committed by GitHub
parent 68aa384190
commit 1a2620606d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 773 additions and 73 deletions

View File

@ -66,6 +66,20 @@ Other common reasons that hand-off fails are as follows:
Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) .
## How do I know when I can make query to Druid after submitting batch ingestion task?
You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow.
1. Submit your ingestion task.
2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed.
3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with
`forceMetadataRefresh=true` and `interval=<INTERVAL_OF_INGESTED_DATA>` once.
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but is necessary to make sure that we verify all the latest segments' load status)
If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with
`forceMetadataRefresh=false` and `interval=<INTERVAL_OF_INGESTED_DATA>`.
Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
Note that this workflow only guarantees that the segments are available at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward.
## I don't see my Druid segments on my Historical processes
You can check the Coordinator console located at `<COORDINATOR_IP>:<PORT>`. Make sure that your segments have actually loaded on [Historical processes](../design/historical.md). If your segments are not present, check the Coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because Historical processes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):

View File

@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
* `/druid/coordinator/v1/loadstatus?simple`
Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts.
* `/druid/coordinator/v1/loadstatus?full`
Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication.
Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts.
* `/druid/coordinator/v1/loadqueue`
@ -114,6 +114,45 @@ Returns the number of segments to load and drop, as well as the total segment lo
Returns the serialized JSON of segments to load and drop for each Historical process.
#### Segment Loading by Datasource
Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28).
Also note that these APIs only guarantees that the segments are available at the time of the call.
Segments can still become missing because of historical process failures or any other reasons afterward.
##### GET
* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given
datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set.
Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms
of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh.
If no used segments are found for the given inputs, this API returns `204 No Content`
* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource
over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set.
Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms
of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh.
If no used segments are found for the given inputs, this API returns `204 No Content`
* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}`
Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource
over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set.
Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms
of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh.
If no used segments are found for the given inputs, this API returns `204 No Content`
#### Metadata store information
##### GET

View File

@ -200,6 +200,10 @@ public class CoordinatorServerView implements InventoryView
}
}
public Map<SegmentId, SegmentLoadInfo> getSegmentLoadInfos()
{
return segmentLoadInfos;
}
@Override
public DruidServer getInventoryValue(String serverKey)

View File

@ -20,6 +20,7 @@
package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.timeline.DataSegment;
@ -113,6 +114,22 @@ public interface SegmentsMetadataManager
*/
Iterable<DataSegment> iterateAllUsedSegments();
/**
* Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval.
* The order in which segments are iterated is unspecified. Note: the iteration may not be as trivially cheap as,
* for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it
* iterates the returned iterable only once rather than several times.
* If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer
* response time but will ensure that the latest segment information (at the time this method is called) is returned.
* If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll
* period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used.
*/
Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
String datasource,
Interval interval,
boolean requiresLatest
);
/**
* Retrieves all data source names for which there are segment in the database, regardless of whether those segments
* are used or not. If there are no segments in the database, returns an empty set.

View File

@ -20,6 +20,8 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
@ -44,6 +46,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@ -95,7 +98,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
{}
/** Represents periodic {@link #poll}s happening from {@link #exec}. */
private static class PeriodicDatabasePoll implements DatabasePoll
@VisibleForTesting
static class PeriodicDatabasePoll implements DatabasePoll
{
/**
* This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()}
@ -104,13 +108,15 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* leadership changes.
*/
final CompletableFuture<Void> firstPollCompletionFuture = new CompletableFuture<>();
long lastPollStartTimestampInMs = -1;
}
/**
* Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadataManager doesn't poll the database
* periodically.
*/
private static class OnDemandDatabasePoll implements DatabasePoll
@VisibleForTesting
static class OnDemandDatabasePoll implements DatabasePoll
{
final long initiationTimeNanos = System.nanoTime();
final CompletableFuture<Void> pollCompletionFuture = new CompletableFuture<>();
@ -127,7 +133,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* called at the same time if two different threads are calling them. This might be possible if Coordinator gets and
* drops leadership repeatedly in quick succession.
*
* This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadataManager
* This lock is also used to synchronize {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when SqlSegmentsMetadataManager
* is not polling the database periodically (in other words, when the Coordinator is not the leader).
*/
private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
@ -155,7 +161,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* easy to forget to do.
*
* This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link
* #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager.
* #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager.
*/
private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null;
@ -170,7 +176,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()}
* (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in
* this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter
* method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will
* method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will
* see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link
* #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}.
*
@ -185,7 +191,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* SegmentsMetadataManager} and guarantee that it always returns consistent and relatively up-to-date data from methods
* like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part
* is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or
* {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method
* {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} method
* implementation for details.
*
* Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the
@ -194,7 +200,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
* during Coordinator leadership switches is not a priority.
*
* This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link
* #awaitOrPerformDatabasePoll()}.
* #useLatestIfWithinDelayOrPerformNewDatabasePoll()}.
*/
private volatile @Nullable DatabasePoll latestDatabasePoll = null;
@ -311,6 +317,22 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll)
{
return () -> {
// If latest poll was an OnDemandDatabasePoll that started less than periodicPollDelay,
// We will wait for (periodicPollDelay - currentTime - LatestOnDemandDatabasePollStartTime) then check again.
try {
long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
while (latestDatabasePoll != null
&& latestDatabasePoll instanceof OnDemandDatabasePoll
&& ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) {
long sleepNano = periodicPollDelayNanos
- ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation();
TimeUnit.NANOSECONDS.sleep(sleepNano);
}
}
catch (Exception e) {
log.debug(e, "Exception found while waiting for next periodic poll");
}
// poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and
// isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't
// actually run anymore after that (it could only enter the synchronized section and exit immediately because the
@ -320,8 +342,10 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
lock.lock();
try {
if (startOrder == currentStartPollingOrder) {
periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis();
poll();
periodicDatabasePoll.firstPollCompletionFuture.complete(null);
latestDatabasePoll = periodicDatabasePoll;
} else {
log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder);
}
@ -381,16 +405,16 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
}
}
private void awaitOrPerformDatabasePoll()
private void useLatestIfWithinDelayOrPerformNewDatabasePoll()
{
// Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check".
if (awaitLatestDatabasePoll()) {
// Double-checked locking with useLatestSnapshotIfWithinDelay() call playing the role of the "check".
if (useLatestSnapshotIfWithinDelay()) {
return;
}
ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
if (awaitLatestDatabasePoll()) {
if (useLatestSnapshotIfWithinDelay()) {
return;
}
OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
@ -403,11 +427,17 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
}
/**
* If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
* made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
* meaning that a new on-demand database poll should be initiated.
* This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
* {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
* made not longer than {@link #periodicPollDelay} from current time.
* This method does wait untill completion for if the latest {@link DatabasePoll} is a
* {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
* already in the process of polling the database.
* This means that any method using this check can read from snapshot that is
* up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
*/
private boolean awaitLatestDatabasePoll()
@VisibleForTesting
boolean useLatestSnapshotIfWithinDelay()
{
DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
@ -430,6 +460,49 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
return false;
}
/**
* This method will always force a database poll if there is no ongoing database poll. This method will then
* waits for the new poll or the ongoing poll to completes before returning.
* This means that any method using this check can be sure that the latest poll for the snapshot was completed after
* this method was called.
*/
@VisibleForTesting
void forceOrWaitOngoingDatabasePoll()
{
long checkStartTime = System.currentTimeMillis();
ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
lock.lock();
try {
DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
try {
//Verify if there was a periodic poll completed while we were waiting for the lock
if (latestDatabasePoll instanceof PeriodicDatabasePoll
&& ((PeriodicDatabasePoll) latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) {
return;
}
// Verify if there was a on-demand poll completed while we were waiting for the lock
if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime);
OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll;
if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) {
return;
}
}
}
catch (Exception e) {
// Latest poll was unsuccessful, try to do a new poll
log.debug(e, "Latest poll was unsuccessful. Starting a new poll...");
}
// Force a database poll
OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
this.latestDatabasePoll = onDemandDatabasePoll;
doOnDemandPoll(onDemandDatabasePoll);
}
finally {
lock.unlock();
}
}
private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
{
try {
@ -857,19 +930,44 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
@Override
public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
{
awaitOrPerformDatabasePoll();
useLatestIfWithinDelayOrPerformNewDatabasePoll();
return dataSourcesSnapshot;
}
@VisibleForTesting
DataSourcesSnapshot getDataSourcesSnapshot()
{
return dataSourcesSnapshot;
}
@VisibleForTesting
DatabasePoll getLatestDatabasePoll()
{
return latestDatabasePoll;
}
@Override
public Iterable<DataSegment> iterateAllUsedSegments()
{
awaitOrPerformDatabasePoll();
return () -> dataSourcesSnapshot
.getDataSourcesWithAllUsedSegments()
.stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
useLatestIfWithinDelayOrPerformNewDatabasePoll();
return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot();
}
@Override
public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource,
Interval interval,
boolean requiresLatest)
{
if (requiresLatest) {
forceOrWaitOngoingDatabasePoll();
} else {
useLatestIfWithinDelayOrPerformNewDatabasePoll();
}
VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline
= dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource);
return Optional.fromNullable(usedSegmentsTimeline)
.transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
}
@Override

View File

@ -256,6 +256,17 @@ public class DruidCoordinator
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
}
/**
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(
Iterable<DataSegment> dataSegments
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
@ -263,8 +274,6 @@ public class DruidCoordinator
return underReplicationCountsPerDataSourcePerTier;
}
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
final DateTime now = DateTimes.nowUtc();
for (final DataSegment segment : dataSegments) {
@ -320,7 +329,7 @@ public class DruidCoordinator
for (ImmutableDruidDataSource dataSource : dataSources) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int numUsedSegments = segments.size();
final int numPublishedSegments = segments.size();
// remove loaded segments
for (DruidServer druidServer : serverInventoryView.getInventory()) {
@ -333,10 +342,10 @@ public class DruidCoordinator
}
}
}
final int numUnloadedSegments = segments.size();
final int numUnavailableSegments = segments.size();
loadStatus.put(
dataSource.getName(),
100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments)
100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments)
);
}

View File

@ -22,11 +22,13 @@ package org.apache.druid.server.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DruidDataSource;
@ -49,6 +51,7 @@ import org.apache.druid.metadata.UnknownSegmentIdsException;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
@ -96,12 +99,14 @@ import java.util.stream.Collectors;
public class DataSourcesResource
{
private static final Logger log = new Logger(DataSourcesResource.class);
private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 * 60 * 1000;
private final CoordinatorServerView serverInventoryView;
private final SegmentsMetadataManager segmentsMetadataManager;
private final MetadataRuleManager metadataRuleManager;
private final IndexingServiceClient indexingServiceClient;
private final AuthorizerMapper authorizerMapper;
private final DruidCoordinator coordinator;
@Inject
public DataSourcesResource(
@ -109,7 +114,8 @@ public class DataSourcesResource
SegmentsMetadataManager segmentsMetadataManager,
MetadataRuleManager metadataRuleManager,
@Nullable IndexingServiceClient indexingServiceClient,
AuthorizerMapper authorizerMapper
AuthorizerMapper authorizerMapper,
DruidCoordinator coordinator
)
{
this.serverInventoryView = serverInventoryView;
@ -117,6 +123,7 @@ public class DataSourcesResource
this.metadataRuleManager = metadataRuleManager;
this.indexingServiceClient = indexingServiceClient;
this.authorizerMapper = authorizerMapper;
this.coordinator = coordinator;
}
@GET
@ -391,6 +398,130 @@ public class DataSourcesResource
return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
}
@GET
@Path("/{dataSourceName}/loadstatus")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response getDatasourceLoadstatus(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
@QueryParam("interval") @Nullable final String interval,
@QueryParam("simple") @Nullable final String simple,
@QueryParam("full") @Nullable final String full
)
{
if (forceMetadataRefresh == null) {
return Response
.status(Response.Status.BAD_REQUEST)
.entity("Invalid request. forceMetadataRefresh must be specified")
.build();
}
final Interval theInterval;
if (interval == null) {
long currentTimeInMs = System.currentTimeMillis();
theInterval = Intervals.utc(currentTimeInMs - DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs);
} else {
theInterval = Intervals.of(interval.replace('_', '/'));
}
Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
dataSourceName,
theInterval,
forceMetadataRefresh
);
if (!segments.isPresent()) {
return logAndCreateDataSourceNotFoundResponse(dataSourceName);
}
if (Iterables.size(segments.get()) == 0) {
return Response
.status(Response.Status.NO_CONTENT)
.entity("No used segment found for the given datasource and interval")
.build();
}
if (simple != null) {
// Calculate response for simple mode
SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get());
return Response.ok(
ImmutableMap.of(
dataSourceName,
segmentsLoadStatistics.getNumUnavailableSegments()
)
).build();
} else if (full != null) {
// Calculate response for full mode
Map<String, Object2LongMap<String>> segmentLoadMap
= coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
if (segmentLoadMap.isEmpty()) {
return Response.serverError()
.entity("Coordinator segment replicant lookup is not initialized yet. Try again later.")
.build();
}
return Response.ok(segmentLoadMap).build();
} else {
// Calculate response for default mode
SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get());
return Response.ok(
ImmutableMap.of(
dataSourceName,
100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments())
)
).build();
}
}
private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable<DataSegment> segments)
{
Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
int numPublishedSegments = 0;
int numUnavailableSegments = 0;
int numLoadedSegments = 0;
for (DataSegment segment : segments) {
numPublishedSegments++;
if (!segmentLoadInfos.containsKey(segment.getId())) {
numUnavailableSegments++;
} else {
numLoadedSegments++;
}
}
return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments);
}
private static class SegmentsLoadStatistics
{
private int numPublishedSegments;
private int numUnavailableSegments;
private int numLoadedSegments;
SegmentsLoadStatistics(
int numPublishedSegments,
int numUnavailableSegments,
int numLoadedSegments
)
{
this.numPublishedSegments = numPublishedSegments;
this.numUnavailableSegments = numUnavailableSegments;
this.numLoadedSegments = numLoadedSegments;
}
public int getNumPublishedSegments()
{
return numPublishedSegments;
}
public int getNumUnavailableSegments()
{
return numUnavailableSegments;
}
public int getNumLoadedSegments()
{
return numLoadedSegments;
}
}
/**
* The property names belong to the public HTTP JSON API.
*/

View File

@ -20,11 +20,13 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -43,6 +45,7 @@ import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
@ -117,7 +120,7 @@ public class SqlSegmentsMetadataManagerTest
{
TestDerbyConnector connector = derbyConnectorRule.getConnector();
SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
config.setPollDuration(Period.seconds(1));
config.setPollDuration(Period.seconds(3));
sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
jsonMapper,
Suppliers.ofInstance(config),
@ -148,30 +151,124 @@ public class SqlSegmentsMetadataManagerTest
}
@Test
public void testPoll()
public void testPollPeriodically()
{
DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.poll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
// This call make sure that the first poll is completed
sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableSet.of("wikipedia"),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
Assert.assertEquals(
ImmutableList.of("wikipedia"),
sqlSegmentsMetadataManager
.getImmutableDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments())
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
);
}
@Test
public void testPollOnDemand()
{
DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
// This should return false and not wait/poll anything as we did not schedule periodic poll
Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay());
Assert.assertNull(dataSourcesSnapshot);
// This call will force on demand poll
sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableSet.of("wikipedia"),
sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
);
Assert.assertEquals(
ImmutableList.of("wikipedia"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
);
}
@Test(timeout = 60_000)
public void testPollPeriodicallyAndOnDemandInterleave() throws Exception
{
DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertNull(dataSourcesSnapshot);
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
// This call make sure that the first poll is completed
sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableList.of("wikipedia"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
final String newDataSource2 = "wikipedia2";
final DataSegment newSegment2 = createNewSegment1(newDataSource2);
publisher.publishSegment(newSegment2);
// This call will force on demand poll
sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
// New datasource should now be in the snapshot since we just force on demand poll.
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableList.of("wikipedia2", "wikipedia"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
final String newDataSource3 = "wikipedia3";
final DataSegment newSegment3 = createNewSegment1(newDataSource3);
publisher.publishSegment(newSegment3);
// This time wait for periodic poll (not doing on demand poll so we have to wait a bit...)
while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) {
Thread.sleep(1000);
}
Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
Assert.assertEquals(
ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"),
dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
.stream()
.map(ImmutableDruidDataSource::getName)
.collect(Collectors.toList())
);
}
@ -749,4 +846,46 @@ public class SqlSegmentsMetadataManagerTest
sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
}
@Test
public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception
{
final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
Optional<Iterable<DataSegment>> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
"wikipedia", theInterval, true
);
Assert.assertTrue(segments.isPresent());
Set<DataSegment> dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(1, dataSegmentSet.size());
Assert.assertTrue(dataSegmentSet.contains(segment1));
final DataSegment newSegment2 = createSegment(
"wikipedia",
"2012-03-16T00:00:00.000/2012-03-17T00:00:00.000",
"2017-10-15T20:19:12.565Z",
"index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
0
);
publisher.publishSegment(newSegment2);
// New segment is not returned since we call without force poll
segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
"wikipedia", theInterval, false
);
Assert.assertTrue(segments.isPresent());
dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(1, dataSegmentSet.size());
Assert.assertTrue(dataSegmentSet.contains(segment1));
// New segment is returned since we call with force poll
segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
"wikipedia", theInterval, true
);
Assert.assertTrue(segments.isPresent());
dataSegmentSet = ImmutableSet.copyOf(segments.get());
Assert.assertEquals(2, dataSegmentSet.size());
Assert.assertTrue(dataSegmentSet.contains(segment1));
Assert.assertTrue(dataSegmentSet.contains(newSegment2));
}
}

View File

@ -19,11 +19,14 @@
package org.apache.druid.server.http;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
@ -39,6 +42,7 @@ import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
@ -176,7 +180,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView, server, request);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
Response response = dataSourcesResource.getQueryableDataSources("full", null, request);
Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
Assert.assertEquals(200, response.getStatus());
@ -250,7 +254,7 @@ public class DataSourcesResourceTest
}
};
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper, null);
Response response = dataSourcesResource.getQueryableDataSources("full", null, request);
Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
@ -289,7 +293,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView, server, request);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
Response response = dataSourcesResource.getQueryableDataSources(null, "simple", request);
Assert.assertEquals(200, response.getStatus());
List<Map<String, Object>> results = (List<Map<String, Object>>) response.getEntity();
@ -313,7 +317,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, null);
new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getDataSource("datasource1", "full");
ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
Assert.assertEquals(200, response.getStatus());
@ -329,7 +333,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, null);
new DataSourcesResource(inventoryView, null, null, null, null, null);
Assert.assertEquals(204, dataSourcesResource.getDataSource("none", null).getStatus());
EasyMock.verify(inventoryView, server);
}
@ -347,7 +351,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, null);
new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@ -380,7 +384,7 @@ public class DataSourcesResourceTest
EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server, server2, server3)).atLeastOnce();
EasyMock.replay(inventoryView, server, server2, server3);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@ -418,7 +422,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getDataSource("datasource1", null);
Assert.assertEquals(200, response.getStatus());
Map<String, Map<String, Object>> result1 = (Map<String, Map<String, Object>>) response.getEntity();
@ -463,7 +467,7 @@ public class DataSourcesResourceTest
expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, null);
new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals(
"invalidDataSource",
@ -523,7 +527,7 @@ public class DataSourcesResourceTest
EasyMock.replay(inventoryView);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, null, null);
new DataSourcesResource(inventoryView, null, null, null, null, null);
Response response = dataSourcesResource.getServedSegmentsInInterval(
"invalidDataSource",
"2010-01-01/P1D",
@ -593,7 +597,7 @@ public class DataSourcesResourceTest
EasyMock.replay(indexingServiceClient, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null);
new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null);
Response response = dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval);
Assert.assertEquals(200, response.getStatus());
@ -607,7 +611,7 @@ public class DataSourcesResourceTest
IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class);
EasyMock.replay(indexingServiceClient, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null);
new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null);
try {
Response response =
dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", "true", "???");
@ -630,7 +634,7 @@ public class DataSourcesResourceTest
Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null);
new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null, null);
// test dropped
EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
@ -699,7 +703,7 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once();
EasyMock.replay(segmentsMetadataManager);
DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null);
DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString());
Assert.assertEquals(200, response.getStatus());
@ -713,7 +717,7 @@ public class DataSourcesResourceTest
EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once();
EasyMock.replay(segmentsMetadataManager);
DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null);
DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString());
Assert.assertEquals(200, response.getStatus());
@ -734,7 +738,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -757,7 +761,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -780,7 +784,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -803,7 +807,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -821,7 +825,7 @@ public class DataSourcesResourceTest
EasyMock.replay(segmentsMetadataManager, inventoryView, server);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -835,7 +839,7 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -848,7 +852,7 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -861,7 +865,7 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
"datasource1",
@ -874,7 +878,7 @@ public class DataSourcesResourceTest
public void testMarkAsUsedNonOvershadowedSegmentsNoPayload()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null);
Assert.assertEquals(400, response.getStatus());
@ -1026,7 +1030,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity());
@ -1049,7 +1053,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@ -1074,7 +1078,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@ -1096,7 +1100,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity());
@ -1119,7 +1123,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@ -1143,7 +1147,7 @@ public class DataSourcesResourceTest
new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
Assert.assertEquals(500, response.getStatus());
Assert.assertNotNull(response.getEntity());
@ -1154,7 +1158,7 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedNullPayload()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", null);
Assert.assertEquals(400, response.getStatus());
@ -1169,7 +1173,7 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedInvalidPayload()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null);
@ -1183,7 +1187,7 @@ public class DataSourcesResourceTest
public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments()
{
DataSourcesResource dataSourcesResource =
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of());
@ -1193,6 +1197,251 @@ public class DataSourcesResourceTest
Assert.assertNotNull(response.getEntity());
}
@Test
public void testGetDatasourceLoadstatusForceMetadataRefreshNull()
{
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null);
Assert.assertEquals(400, response.getStatus());
}
@Test
public void testGetDatasourceLoadstatusNoSegmentForInterval()
{
List<DataSegment> segments = ImmutableList.of();
// Test when datasource fully loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq(
"datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
EasyMock.replay(segmentsMetadataManager);
DataSourcesResource dataSourcesResource = new DataSourcesResource(
inventoryView,
segmentsMetadataManager,
null,
null,
null,
null
);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
Assert.assertEquals(204, response.getStatus());
}
@Test
public void testGetDatasourceLoadstatusDefault()
{
DataSegment datasource1Segment1 = new DataSegment(
"datasource1",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
10
);
DataSegment datasource1Segment2 = new DataSegment(
"datasource1",
Intervals.of("2010-01-22/P1D"),
"",
null,
null,
null,
null,
0x9,
20
);
DataSegment datasource2Segment1 = new DataSegment(
"datasource2",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
30
);
List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
);
Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
);
// Test when datasource fully loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
EasyMock.replay(segmentsMetadataManager, inventoryView);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
Assert.assertEquals(100.0, ((Map) response.getEntity()).get("datasource1"));
EasyMock.verify(segmentsMetadataManager, inventoryView);
EasyMock.reset(segmentsMetadataManager, inventoryView);
// Test when datasource half loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
EasyMock.replay(segmentsMetadataManager, inventoryView);
dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1"));
EasyMock.verify(segmentsMetadataManager, inventoryView);
}
@Test
public void testGetDatasourceLoadstatusSimple()
{
DataSegment datasource1Segment1 = new DataSegment(
"datasource1",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
10
);
DataSegment datasource1Segment2 = new DataSegment(
"datasource1",
Intervals.of("2010-01-22/P1D"),
"",
null,
null,
null,
null,
0x9,
20
);
DataSegment datasource2Segment1 = new DataSegment(
"datasource2",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
30
);
List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
);
Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
);
// Test when datasource fully loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
EasyMock.replay(segmentsMetadataManager, inventoryView);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1"));
EasyMock.verify(segmentsMetadataManager, inventoryView);
EasyMock.reset(segmentsMetadataManager, inventoryView);
// Test when datasource half loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
EasyMock.replay(segmentsMetadataManager, inventoryView);
dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(1, ((Map) response.getEntity()).size());
Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1"));
EasyMock.verify(segmentsMetadataManager, inventoryView);
}
@Test
public void testGetDatasourceLoadstatusFull()
{
DataSegment datasource1Segment1 = new DataSegment(
"datasource1",
Intervals.of("2010-01-01/P1D"),
"",
null,
null,
null,
null,
0x9,
10
);
DataSegment datasource1Segment2 = new DataSegment(
"datasource1",
Intervals.of("2010-01-22/P1D"),
"",
null,
null,
null,
null,
0x9,
20
);
List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
Object2LongMap<String> tier1 = new Object2LongOpenHashMap<>();
tier1.put("datasource1", 0L);
Object2LongMap<String> tier2 = new Object2LongOpenHashMap<>();
tier2.put("datasource1", 3L);
underReplicationCountsPerDataSourcePerTier.put("tier1", tier1);
underReplicationCountsPerDataSourcePerTier.put("tier2", tier2);
// Test when datasource fully loaded
EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
.andReturn(Optional.of(segments)).once();
DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class);
EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments))
.andReturn(underReplicationCountsPerDataSourcePerTier).once();
EasyMock.replay(segmentsMetadataManager, druidCoordinator);
DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator);
Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full");
Assert.assertEquals(200, response.getStatus());
Assert.assertNotNull(response.getEntity());
Assert.assertEquals(2, ((Map) response.getEntity()).size());
Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size());
Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size());
Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1"));
Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1"));
EasyMock.verify(segmentsMetadataManager);
}
private DruidServerMetadata createRealtimeServerMetadata(String name)
{
return createServerMetadata(name, ServerType.REALTIME);