Cleanup Coordinator logs, add duty status API (#16959) (#17154)

Description
-----------
Coordinator logs are fairly noisy and don't give much useful information (see example below).
Even when the Coordinator misbehaves, these logs are not very useful.

Main changes
------------
- Add API `GET /druid/coordinator/v1/duties` that returns a status list of all duty groups currently running on the Coordinator
- Emit metrics `segment/poll/time`, `segment/pollWithSchema/time`, `segment/buildSnapshot/time`
- Remove redundant logs that indicate normal operation of well-tested aspects of the Coordinator

Refactors
---------
- Move some logic from `DutiesRunnable` to `CoordinatorDutyGroup`
- Move stats collection from `CollectSegmentAndServerStats` to `PrepareBalancerAndLoadQueues`
- Minor cleanup of class `DruidCoordinator`
- Clean up class `DruidCoordinatorRuntimeParams`
  - Remove field `coordinatorStartTime`. Maintain start time in `MarkOvershadowedSegmentsAsUnused` instead.
  - Remove field `MetadataRuleManager`. Pass supplier to constructor of applicable duties instead.
  - Make `usedSegmentsNewestFirst` and `datasourcesSnapshot` as non-nullable as they are always required.
This commit is contained in:
Kashif Faraz 2024-09-25 14:59:53 +05:30 committed by GitHub
parent 1096728fa4
commit 8059b86c7f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 875 additions and 756 deletions

View File

@ -30,7 +30,6 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
@ -104,16 +103,16 @@ public class TaskActionTestKit extends ExternalResource
}
};
taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter());
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
Suppliers.ofInstance(new SegmentsMetadataManagerConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector,
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
final ServiceEmitter noopEmitter = new NoopServiceEmitter();
final TaskLockConfig taskLockConfig = new TaskLockConfig()
{
@Override
@ -137,10 +136,10 @@ public class TaskActionTestKit extends ExternalResource
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
noopEmitter,
NoopServiceEmitter.instance(),
ScheduledExecutors::fixed
),
noopEmitter,
NoopServiceEmitter.instance(),
EasyMock.createMock(SupervisorManager.class),
objectMapper
);

View File

@ -155,14 +155,15 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter());
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentsMetadataManager = new SqlSegmentsMetadataManager(
objectMapper,
SegmentsMetadataManagerConfig::new,
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
lockbox = new TaskLockbox(taskStorage, storageCoordinator);
segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper());

View File

@ -21,7 +21,6 @@ package org.apache.druid.client;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
@ -55,23 +54,6 @@ public class DataSourcesSnapshot
return new DataSourcesSnapshot(CollectionUtils.mapValues(dataSources, DruidDataSource::toImmutableDruidDataSource));
}
public static DataSourcesSnapshot fromUsedSegmentsTimelines(
Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource,
ImmutableMap<String, String> dataSourceProperties
)
{
Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments =
Maps.newHashMapWithExpectedSize(usedSegmentsTimelinesPerDataSource.size());
usedSegmentsTimelinesPerDataSource.forEach(
(dataSourceName, usedSegmentsTimeline) -> {
DruidDataSource dataSource = new DruidDataSource(dataSourceName, dataSourceProperties);
usedSegmentsTimeline.iterateAllObjects().forEach(dataSource::addSegment);
dataSourcesWithAllUsedSegments.put(dataSourceName, dataSource.toImmutableDruidDataSource());
}
);
return new DataSourcesSnapshot(dataSourcesWithAllUsedSegments, usedSegmentsTimelinesPerDataSource);
}
private final Map<String, ImmutableDruidDataSource> dataSourcesWithAllUsedSegments;
private final Map<String, SegmentTimeline> usedSegmentsTimelinesPerDataSource;
private final ImmutableSet<DataSegment> overshadowedSegments;

View File

@ -50,6 +50,8 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SegmentMetadata;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
@ -164,6 +166,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;
private final SegmentSchemaCache segmentSchemaCache;
private final ServiceEmitter serviceEmitter;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
/**
@ -251,7 +254,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
Supplier<MetadataStorageTablesConfig> dbTables,
SQLMetadataConnector connector,
SegmentSchemaCache segmentSchemaCache,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
this.jsonMapper = jsonMapper;
@ -260,6 +264,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
this.connector = connector;
this.segmentSchemaCache = segmentSchemaCache;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
this.serviceEmitter = serviceEmitter;
}
/**
@ -639,7 +644,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
{
try {
int numUpdatedDatabaseEntries = connector.getDBI().withHandle(
(Handle handle) -> handle
handle -> handle
.createStatement(StringUtils.format("UPDATE %s SET used=true, used_status_last_updated = :used_status_last_updated WHERE id = :id", getSegmentsTable()))
.bind("id", segmentId)
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
@ -650,7 +655,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
// segment into the respective data source, because we don't have it fetched from the database. It's probably not
// worth complicating the implementation and making two database queries just to add the segment because it will
// be anyway fetched during the next poll(). Segment putting that is done in the bulk markAsUsed methods is a nice
// to have thing, but doesn't formally affects the external guarantees of SegmentsMetadataManager class.
// to have thing, but doesn't formally affect the external guarantees of SegmentsMetadataManager class.
return numUpdatedDatabaseEntries > 0;
}
catch (RuntimeException e) {
@ -1031,7 +1036,6 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
private void doPollSegments()
{
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");
// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
@ -1060,14 +1064,12 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
"Unexpected 'null' when polling segments from the db, aborting snapshot update."
);
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.stop();
emitMetric("segment/poll/time", stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d]ms.",
segments.size(), stopwatch.millisElapsed()
);
createDatasourcesSnapshot(segments);
}
@ -1075,7 +1077,6 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
private void doPollSegmentAndSchema()
{
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment and schema table.");
ImmutableMap.Builder<SegmentId, SegmentMetadata> segmentMetadataBuilder = new ImmutableMap.Builder<>();
@ -1166,18 +1167,21 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
"Unexpected 'null' when polling segments from the db, aborting snapshot update."
);
if (segments.isEmpty() && schemaMap.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d] ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.stop();
emitMetric("segment/pollWithSchema/time", stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d]ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
createDatasourcesSnapshot(segments);
}
private void emitMetric(String metricName, long value)
{
serviceEmitter.emit(new ServiceMetricEvent.Builder().setMetric(metricName, value));
}
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
@ -1195,8 +1199,9 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
Iterables.filter(segments, Objects::nonNull), // Filter corrupted entries (see above in this method).
dataSourceProperties
);
log.info(
"Successfully created snapshot from polled segments in [%d] ms. Found [%d] overshadowed segments.",
emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed());
log.debug(
"Created snapshot from polled segments in [%d]ms. Found [%d] overshadowed segments.",
stopwatch.millisElapsed(), dataSourcesSnapshot.getOvershadowedSegments().size()
);
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaCache;
@ -33,6 +34,7 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
private final Supplier<MetadataStorageTablesConfig> storageConfig;
private final SQLMetadataConnector connector;
private final Lifecycle lifecycle;
private final ServiceEmitter serviceEmitter;
private final SegmentSchemaCache segmentSchemaCache;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;
@ -44,7 +46,8 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
SQLMetadataConnector connector,
Lifecycle lifecycle,
SegmentSchemaCache segmentSchemaCache,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
ServiceEmitter serviceEmitter
)
{
this.jsonMapper = jsonMapper;
@ -52,6 +55,7 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
this.storageConfig = storageConfig;
this.connector = connector;
this.lifecycle = lifecycle;
this.serviceEmitter = serviceEmitter;
this.segmentSchemaCache = segmentSchemaCache;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
@ -84,7 +88,8 @@ public class SqlSegmentsMetadataManagerProvider implements SegmentsMetadataManag
storageConfig,
connector,
segmentSchemaCache,
centralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig,
serviceEmitter
);
}
}

View File

@ -19,9 +19,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
@ -29,23 +27,18 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -64,11 +57,12 @@ import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs;
import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CollectSegmentAndServerStats;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorDutyGroup;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
import org.apache.druid.server.coordinator.duty.KillAuditLog;
import org.apache.druid.server.coordinator.duty.KillCompactionConfig;
import org.apache.druid.server.coordinator.duty.KillDatasourceMetadata;
@ -79,6 +73,7 @@ import org.apache.druid.server.coordinator.duty.KillUnreferencedSegmentSchema;
import org.apache.druid.server.coordinator.duty.KillUnusedSegments;
import org.apache.druid.server.coordinator.duty.MarkEternityTombstonesAsUnused;
import org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
import org.apache.druid.server.coordinator.duty.MetadataAction;
import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
@ -95,12 +90,10 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -108,7 +101,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@ -117,27 +109,6 @@ import java.util.stream.Collectors;
@ManageLifecycle
public class DruidCoordinator
{
/**
* Orders newest segments (i.e. segments with most recent intervals) first.
* Used by:
* <ul>
* <li>{@link RunRules} duty to prioritize assignment of more recent segments.
* The order of segments matters because the {@link CoordinatorDynamicConfig#replicationThrottleLimit}
* might cause only a few segments to be picked for replication in a coordinator run.
* </li>
* <li>{@link LoadQueuePeon}s to prioritize load of more recent segments.</li>
* </ul>
* It is presumed that more recent segments are queried more often and contain
* more important data for users. This ordering thus ensures that if the cluster
* has availability or loading problems, the most recent segments are made
* available as soon as possible.
*/
public static final Ordering<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering
.from(Comparators.intervalsByEndThenStart())
.onResultOf(DataSegment::getInterval)
.compound(Ordering.<DataSegment>natural())
.reverse();
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
@ -148,7 +119,7 @@ public class DruidCoordinator
private final ServiceEmitter emitter;
private final OverlordClient overlordClient;
private final ScheduledExecutorFactory executorFactory;
private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new HashMap<>();
private final List<DutiesRunnable> dutiesRunnables = new ArrayList<>();
private final LoadQueueTaskMaster taskMaster;
private final SegmentLoadQueueManager loadQueueManager;
private final ServiceAnnouncer serviceAnnouncer;
@ -184,7 +155,7 @@ public class DruidCoordinator
*/
private volatile Set<DataSegment> broadcastSegments = null;
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";
@ -373,6 +344,11 @@ public class DruidCoordinator
return coordLeaderSelector.getCurrentLeader();
}
public List<DutyGroupStatus> getStatusOfDuties()
{
return dutiesRunnables.stream().map(r -> r.dutyGroup.getStatus()).collect(Collectors.toList());
}
@LifecycleStart
public void start()
{
@ -410,11 +386,8 @@ public class DruidCoordinator
}
coordLeaderSelector.unregisterListener();
started = false;
stopAllDutyGroupExecutors();
balancerStrategyFactory.stopExecutor();
stopAllDutyGroups();
}
}
@ -463,7 +436,6 @@ public class DruidCoordinator
}
final int startingLeaderCounter = coordLeaderSelector.localTerm();
final List<DutiesRunnable> dutiesRunnables = new ArrayList<>();
dutiesRunnables.add(
new DutiesRunnable(
makeHistoricalManagementDuties(),
@ -494,27 +466,27 @@ public class DruidCoordinator
for (CoordinatorCustomDutyGroup customDutyGroup : customDutyGroups.getCoordinatorCustomDutyGroups()) {
dutiesRunnables.add(
new DutiesRunnable(
customDutyGroup.getCustomDutyList(),
new ArrayList<>(customDutyGroup.getCustomDutyList()),
startingLeaderCounter,
customDutyGroup.getName(),
customDutyGroup.getPeriod()
)
);
log.info(
"Done making custom coordinator duties [%s] for group [%s]",
customDutyGroup.getCustomDutyList().stream()
.map(duty -> duty.getClass().getName()).collect(Collectors.toList()),
customDutyGroup.getName()
);
}
log.warn(
"Created [%d] duty groups. DUTY RUNS WILL NOT BE LOGGED."
+ " Use API '/druid/coordinator/v1/duties' to get current status.",
dutiesRunnables.size()
);
for (final DutiesRunnable dutiesRunnable : dutiesRunnables) {
// Several coordinator duties can take a non trival amount of time to complete.
// Hence, we schedule each duty group on a dedicated executor
ScheduledExecutors.scheduleAtFixedRate(
getOrCreateDutyGroupExecutor(dutiesRunnable.dutyGroupName),
dutiesRunnable.executor,
config.getCoordinatorStartDelay(),
dutiesRunnable.getPeriod(),
dutiesRunnable.dutyGroup.getPeriod(),
() -> {
if (coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
@ -548,42 +520,25 @@ public class DruidCoordinator
serviceAnnouncer.unannounce(self);
lookupCoordinatorManager.stop();
metadataManager.onLeaderStop();
balancerStrategyFactory.stopExecutor();
}
}
/**
* Check if compaction supervisors are enabled on the Overlord.
*/
private boolean isCompactionSupervisorEnabled()
{
try {
return FutureUtils.getUnchecked(overlordClient.isCompactionSupervisorEnabled(), true);
}
catch (Exception e) {
// The Overlord is probably on an older version, assume that compaction supervisor is not enabled
return false;
stopAllDutyGroups();
}
}
@GuardedBy("lock")
private ScheduledExecutorService getOrCreateDutyGroupExecutor(String dutyGroup)
private void stopAllDutyGroups()
{
return dutyGroupExecutors.computeIfAbsent(
dutyGroup,
group -> executorFactory.create(1, "Coordinator-Exec-" + dutyGroup + "-%d")
);
}
@GuardedBy("lock")
private void stopAllDutyGroupExecutors()
{
dutyGroupExecutors.values().forEach(ScheduledExecutorService::shutdownNow);
dutyGroupExecutors.clear();
balancerStrategyFactory.stopExecutor();
dutiesRunnables.forEach(group -> group.executor.shutdownNow());
dutiesRunnables.clear();
}
private List<CoordinatorDuty> makeHistoricalManagementDuties()
{
final MetadataAction.DeleteSegments deleteSegments
= segments -> metadataManager.segments().markSegmentsAsUnused(segments);
final MetadataAction.GetDatasourceRules getRules
= dataSource -> metadataManager.rules().getRulesWithDefault(dataSource);
return ImmutableList.of(
new PrepareBalancerAndLoadQueues(
taskMaster,
@ -591,18 +546,18 @@ public class DruidCoordinator
balancerStrategyFactory,
serverInventoryView
),
new RunRules(segments -> metadataManager.segments().markSegmentsAsUnused(segments)),
new RunRules(deleteSegments, getRules),
new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager),
new MarkOvershadowedSegmentsAsUnused(segments -> metadataManager.segments().markSegmentsAsUnused(segments)),
new MarkEternityTombstonesAsUnused(segments -> metadataManager.segments().markSegmentsAsUnused(segments)),
new CollectSegmentStats(),
new UnloadUnusedSegments(loadQueueManager, getRules),
new MarkOvershadowedSegmentsAsUnused(deleteSegments),
new MarkEternityTombstonesAsUnused(deleteSegments),
new BalanceSegments(config.getCoordinatorPeriod()),
new CollectSegmentAndServerStats(taskMaster)
new CollectLoadQueueStats()
);
}
@VisibleForTesting
List<CoordinatorDuty> makeIndexingServiceDuties()
private List<CoordinatorDuty> makeIndexingServiceDuties()
{
final List<CoordinatorDuty> duties = new ArrayList<>();
final KillUnusedSegmentsConfig killUnusedConfig = config.getKillConfigs().unusedSegments(
@ -615,13 +570,10 @@ public class DruidCoordinator
duties.add(new KillStalePendingSegments(overlordClient));
}
// Do not add compactSegments if it is already included in the custom duty groups
// Do not add compactSegments if it is already added in any of the custom duty groups
if (getCompactSegmentsDutyFromCustomGroups().isEmpty()) {
duties.add(compactSegments);
}
if (log.isDebugEnabled()) {
log.debug("Initialized indexing service duties [%s].", duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()));
}
return ImmutableList.copyOf(duties);
}
@ -648,8 +600,7 @@ public class DruidCoordinator
return duties;
}
@VisibleForTesting
CompactSegments initializeCompactSegmentsDuty(CompactionStatusTracker statusTracker)
private CompactSegments initializeCompactSegmentsDuty(CompactionStatusTracker statusTracker)
{
List<CompactSegments> compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups();
if (compactSegmentsDutyFromCustomGroups.isEmpty()) {
@ -665,8 +616,7 @@ public class DruidCoordinator
}
}
@VisibleForTesting
List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
private List<CompactSegments> getCompactSegmentsDutyFromCustomGroups()
{
return customDutyGroups.getCoordinatorCustomDutyGroups()
.stream()
@ -677,180 +627,86 @@ public class DruidCoordinator
.collect(Collectors.toList());
}
private class DutiesRunnable implements Runnable
/**
* Used by {@link CoordinatorDutyGroup} to check leadership and emit stats.
*/
public interface DutyGroupHelper
{
boolean isLeader();
void emitStat(CoordinatorStat stat, RowKey rowKey, long value);
}
/**
* Container for a single {@link CoordinatorDutyGroup} that runs on a dedicated executor.
*/
private class DutiesRunnable implements Runnable, DutyGroupHelper
{
private final DateTime coordinatorStartTime = DateTimes.nowUtc();
private final List<? extends CoordinatorDuty> duties;
private final int startingLeaderCounter;
private final String dutyGroupName;
private final Duration period;
private final ScheduledExecutorService executor;
private final CoordinatorDutyGroup dutyGroup;
DutiesRunnable(
List<? extends CoordinatorDuty> duties,
List<CoordinatorDuty> duties,
final int startingLeaderCounter,
String alias,
Duration period
)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;
this.dutyGroupName = alias;
this.period = period;
this.dutyGroup = new CoordinatorDutyGroup(alias, duties, period, this);
this.executor = executorFactory.create(1, "Coordinator-Exec-" + alias + "-%d");
}
@Override
public void run()
{
try {
log.info("Starting coordinator run for group [%s]", dutyGroupName);
final Stopwatch groupRunTime = Stopwatch.createStarted();
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader());
stopBeingLeader();
return;
}
}
List<Boolean> allStarted = Arrays.asList(
metadataManager.isStarted(),
serverInventoryView.isStarted()
);
for (Boolean aBoolean : allStarted) {
if (!aBoolean) {
log.error("InventoryManagers not started[%s]", allStarted);
stopBeingLeader();
return;
}
if (metadataManager.isStarted() && serverInventoryView.isStarted()) {
final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.builder()
.withDataSourcesSnapshot(metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments())
.withDynamicConfigs(metadataManager.configs().getCurrentDynamicConfig())
.withCompactionConfig(metadataManager.configs().getCurrentCompactionConfig())
.build();
dutyGroup.run(params);
} else {
log.error("Inventory view not initialized yet. Skipping run of duty group[%s].", dutyGroup.getName());
stopBeingLeader();
}
// Do coordinator stuff.
DataSourcesSnapshot dataSourcesSnapshot
= metadataManager.segments().getSnapshotOfDataSourcesWithAllUsedSegments();
final CoordinatorDynamicConfig dynamicConfig = metadataManager.configs().getCurrentDynamicConfig();
final DruidCompactionConfig compactionConfig = metadataManager.configs().getCurrentCompactionConfig();
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams
.newBuilder(coordinatorStartTime)
.withDatabaseRuleManager(metadataManager.rules())
.withDataSourcesSnapshot(dataSourcesSnapshot)
.withDynamicConfigs(dynamicConfig)
.withCompactionConfig(compactionConfig)
.build();
log.info(
"Initialized run params for group [%s] with [%,d] used segments in [%d] datasources.",
dutyGroupName, params.getUsedSegments().size(), dataSourcesSnapshot.getDataSourcesMap().size()
);
boolean coordinationPaused = dynamicConfig.getPauseCoordination();
if (coordinationPaused
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
log.info("Coordination has been paused. Duties will not run until coordination is resumed.");
}
final Stopwatch dutyRunTime = Stopwatch.createUnstarted();
for (CoordinatorDuty duty : duties) {
// Don't read state and run state in the same duty otherwise racy conditions may exist
if (!coordinationPaused
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
if (shouldSkipAutoCompactDuty(duty)) {
continue;
}
dutyRunTime.restart();
params = duty.run(params);
dutyRunTime.stop();
final String dutyName = duty.getClass().getName();
if (params == null) {
log.info("Stopping run for group [%s] on request of duty [%s].", dutyGroupName, dutyName);
return;
} else {
final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
final long dutyRunMillis = dutyRunTime.millisElapsed();
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis);
}
}
}
// Emit stats collected from all duties
final CoordinatorRunStats allStats = params.getCoordinatorStats();
if (allStats.rowCount() > 0) {
final AtomicInteger emittedCount = new AtomicInteger();
allStats.forEachStat(
(stat, dimensions, value) -> {
if (stat.shouldEmit()) {
emitStat(stat, dimensions.getValues(), value);
emittedCount.incrementAndGet();
}
}
);
log.info(
"Emitted [%d] stats for group [%s]. All collected stats:%s",
emittedCount.get(), dutyGroupName, allStats.buildStatsTable()
);
}
// Emit the runtime of the full DutiesRunnable
groupRunTime.stop();
final long runMillis = groupRunTime.millisElapsed();
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms.%n", dutyGroupName, runMillis);
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
}
}
/**
* @return true if this is an auto-compact CompactSegments duty and should
* not be run in case Compaction Scheduler is already running on Overlord.
* Manually triggered compaction should always be run.
*/
private boolean shouldSkipAutoCompactDuty(CoordinatorDuty duty)
{
final boolean shouldSkipDuty = duty instanceof CompactSegments
&& !COMPACT_SEGMENTS_DUTIES_DUTY_GROUP.equals(dutyGroupName)
&& isCompactionSupervisorEnabled();
if (shouldSkipDuty) {
log.warn(
"Skipping Compact Segments duty in group[%s] since compaction"
+ " supervisors are already running on Overlord.",
dutyGroupName
);
}
return shouldSkipDuty;
}
private void emitStat(CoordinatorStat stat, Map<Dimension, String> dimensionValues, long value)
@Override
public void emitStat(CoordinatorStat stat, RowKey rowKey, long value)
{
ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder()
.setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroupName);
dimensionValues.forEach(
.setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroup.getName());
rowKey.getValues().forEach(
(dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), dimValue)
);
emitter.emit(eventBuilder.setMetric(stat.getMetricName(), value));
}
Duration getPeriod()
{
return period;
}
@Override
public String toString()
public boolean isLeader()
{
return "DutiesRunnable{group='" + dutyGroupName + '\'' + '}';
return coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm();
}
}
// Duties that read/update the state in the DruidCoordinator class
/**
* Updates replication status of all used segments. This duty must run after
* {@link RunRules} so that the number of required replicas for all segments
@ -858,7 +714,6 @@ public class DruidCoordinator
*/
private class UpdateReplicationStatus implements CoordinatorDuty
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
@ -867,7 +722,18 @@ public class DruidCoordinator
if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.updateSegmentReplicationStatus(segmentReplicationStatus);
}
return params;
}
}
/**
* Collects stats for unavailable and under-replicated segments.
*/
private class CollectSegmentStats implements CoordinatorDuty
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Collect stats for unavailable and under-replicated segments
final CoordinatorRunStats stats = params.getCoordinatorStats();
getDatasourceToUnavailableSegmentCount().forEach(
@ -894,4 +760,36 @@ public class DruidCoordinator
return params;
}
}
/**
* Collects load queue stats.
*/
private class CollectLoadQueueStats implements CoordinatorDuty
{
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorRunStats stats = params.getCoordinatorStats();
taskMaster.getAllPeons().forEach((serverName, queuePeon) -> {
final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size());
stats.updateMax(Stats.SegmentQueue.LOAD_RATE_KBPS, rowKey, queuePeon.getLoadRateKbps());
queuePeon.getAndResetStats().forEachStat(
(stat, key, statValue) ->
stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue)
);
});
return params;
}
private RowKey createRowKeyForServer(String serverName, Map<Dimension, String> dimensionValues)
{
final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName);
dimensionValues.forEach(builder::with);
return builder.build();
}
}
}

View File

@ -22,8 +22,8 @@ package org.apache.druid.server.coordinator;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
import org.apache.druid.server.coordinator.loading.SegmentHolder;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
@ -32,7 +32,6 @@ import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
@ -46,26 +45,10 @@ import java.util.TreeSet;
*/
public class DruidCoordinatorRuntimeParams
{
/**
* Creates a Set to be assigned into {@link Builder#usedSegments} from the given {@link Iterable} of segments.
*
* Creates a TreeSet sorted in {@link DruidCoordinator#SEGMENT_COMPARATOR_RECENT_FIRST} order and populates it with
* the segments from the given iterable. The given iterable is iterated exactly once. No special action is taken if
* duplicate segments are encountered in the iterable.
*/
private static TreeSet<DataSegment> createUsedSegmentsSet(Iterable<DataSegment> usedSegments)
{
TreeSet<DataSegment> segmentsSet = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
usedSegments.forEach(segmentsSet::add);
return segmentsSet;
}
private final DateTime coordinatorStartTime;
private final DruidCluster druidCluster;
private final MetadataRuleManager databaseRuleManager;
private final StrategicSegmentAssigner segmentAssigner;
private final @Nullable TreeSet<DataSegment> usedSegments;
private final @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private final TreeSet<DataSegment> usedSegmentsNewestFirst;
private final DataSourcesSnapshot dataSourcesSnapshot;
private final CoordinatorDynamicConfig coordinatorDynamicConfig;
private final DruidCompactionConfig compactionConfig;
private final SegmentLoadingConfig segmentLoadingConfig;
@ -74,12 +57,10 @@ public class DruidCoordinatorRuntimeParams
private final Set<String> broadcastDatasources;
private DruidCoordinatorRuntimeParams(
DateTime coordinatorStartTime,
DruidCluster druidCluster,
MetadataRuleManager databaseRuleManager,
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
TreeSet<DataSegment> usedSegmentsNewestFirst,
DataSourcesSnapshot dataSourcesSnapshot,
CoordinatorDynamicConfig coordinatorDynamicConfig,
DruidCompactionConfig compactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
@ -88,11 +69,9 @@ public class DruidCoordinatorRuntimeParams
Set<String> broadcastDatasources
)
{
this.coordinatorStartTime = coordinatorStartTime;
this.druidCluster = druidCluster;
this.databaseRuleManager = databaseRuleManager;
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.usedSegmentsNewestFirst = usedSegmentsNewestFirst;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.compactionConfig = compactionConfig;
@ -102,21 +81,11 @@ public class DruidCoordinatorRuntimeParams
this.broadcastDatasources = broadcastDatasources;
}
public DateTime getCoordinatorStartTime()
{
return coordinatorStartTime;
}
public DruidCluster getDruidCluster()
{
return druidCluster;
}
public MetadataRuleManager getDatabaseRuleManager()
{
return databaseRuleManager;
}
@Nullable
public SegmentReplicationStatus getSegmentReplicationStatus()
{
@ -140,10 +109,29 @@ public class DruidCoordinatorRuntimeParams
return dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource();
}
public TreeSet<DataSegment> getUsedSegments()
/**
* Used segments ordered by {@link SegmentHolder#NEWEST_SEGMENT_FIRST}.
*/
public TreeSet<DataSegment> getUsedSegmentsNewestFirst()
{
Preconditions.checkState(usedSegments != null, "usedSegments or dataSourcesSnapshot must be set");
return usedSegments;
return usedSegmentsNewestFirst;
}
/**
* @return true if the given segment is marked as a "used" segment in the
* metadata store.
*/
public boolean isUsedSegment(DataSegment segment)
{
return usedSegmentsNewestFirst.contains(segment);
}
/**
* Number of used segments in metadata store.
*/
public int getUsedSegmentCount()
{
return usedSegmentsNewestFirst.size();
}
public CoordinatorDynamicConfig getCoordinatorDynamicConfig()
@ -182,19 +170,17 @@ public class DruidCoordinatorRuntimeParams
return dataSourcesSnapshot;
}
public static Builder newBuilder(DateTime coordinatorStartTime)
public static Builder builder()
{
return new Builder(coordinatorStartTime);
return new Builder();
}
public Builder buildFromExisting()
{
return new Builder(
coordinatorStartTime,
druidCluster,
databaseRuleManager,
segmentAssigner,
usedSegments,
usedSegmentsNewestFirst,
dataSourcesSnapshot,
coordinatorDynamicConfig,
compactionConfig,
@ -207,13 +193,11 @@ public class DruidCoordinatorRuntimeParams
public static class Builder
{
private final DateTime coordinatorStartTime;
private DruidCluster druidCluster;
private MetadataRuleManager databaseRuleManager;
private SegmentLoadQueueManager loadQueueManager;
private StrategicSegmentAssigner segmentAssigner;
private @Nullable TreeSet<DataSegment> usedSegments;
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private TreeSet<DataSegment> usedSegmentsNewestFirst;
private DataSourcesSnapshot dataSourcesSnapshot;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private DruidCompactionConfig compactionConfig;
private SegmentLoadingConfig segmentLoadingConfig;
@ -221,21 +205,18 @@ public class DruidCoordinatorRuntimeParams
private BalancerStrategy balancerStrategy;
private Set<String> broadcastDatasources;
private Builder(DateTime coordinatorStartTime)
private Builder()
{
this.coordinatorStartTime = coordinatorStartTime;
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.compactionConfig = DruidCompactionConfig.empty();
this.broadcastDatasources = Collections.emptySet();
}
private Builder(
DateTime coordinatorStartTime,
DruidCluster cluster,
MetadataRuleManager databaseRuleManager,
StrategicSegmentAssigner segmentAssigner,
@Nullable TreeSet<DataSegment> usedSegments,
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
TreeSet<DataSegment> usedSegmentsNewestFirst,
DataSourcesSnapshot dataSourcesSnapshot,
CoordinatorDynamicConfig coordinatorDynamicConfig,
DruidCompactionConfig compactionConfig,
SegmentLoadingConfig segmentLoadingConfig,
@ -244,11 +225,9 @@ public class DruidCoordinatorRuntimeParams
Set<String> broadcastDatasources
)
{
this.coordinatorStartTime = coordinatorStartTime;
this.druidCluster = cluster;
this.databaseRuleManager = databaseRuleManager;
this.segmentAssigner = segmentAssigner;
this.usedSegments = usedSegments;
this.usedSegmentsNewestFirst = usedSegmentsNewestFirst;
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.compactionConfig = compactionConfig;
@ -260,15 +239,16 @@ public class DruidCoordinatorRuntimeParams
public DruidCoordinatorRuntimeParams build()
{
Preconditions.checkNotNull(dataSourcesSnapshot);
Preconditions.checkNotNull(usedSegmentsNewestFirst);
initStatsIfRequired();
initSegmentAssignerIfRequired();
return new DruidCoordinatorRuntimeParams(
coordinatorStartTime,
druidCluster,
databaseRuleManager,
segmentAssigner,
usedSegments,
usedSegmentsNewestFirst,
dataSourcesSnapshot,
coordinatorDynamicConfig,
compactionConfig,
@ -298,11 +278,10 @@ public class DruidCoordinatorRuntimeParams
Preconditions.checkNotNull(druidCluster);
Preconditions.checkNotNull(balancerStrategy);
Preconditions.checkNotNull(usedSegments);
Preconditions.checkNotNull(stats);
if (segmentLoadingConfig == null) {
segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegments.size());
segmentLoadingConfig = SegmentLoadingConfig.create(coordinatorDynamicConfig, usedSegmentsNewestFirst.size());
}
segmentAssigner = new StrategicSegmentAssigner(
@ -314,18 +293,19 @@ public class DruidCoordinatorRuntimeParams
);
}
private static TreeSet<DataSegment> createUsedSegmentsSet(Iterable<DataSegment> usedSegments)
{
TreeSet<DataSegment> segmentsSet = new TreeSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
usedSegments.forEach(segmentsSet::add);
return segmentsSet;
}
public Builder withDruidCluster(DruidCluster cluster)
{
this.druidCluster = cluster;
return this;
}
public Builder withDatabaseRuleManager(MetadataRuleManager databaseRuleManager)
{
this.databaseRuleManager = databaseRuleManager;
return this;
}
/**
* Sets the {@link SegmentLoadQueueManager} which is used to construct the
* {@link StrategicSegmentAssigner} for this run.
@ -338,7 +318,7 @@ public class DruidCoordinatorRuntimeParams
public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
{
this.usedSegments = createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot());
this.usedSegmentsNewestFirst = createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot());
this.dataSourcesSnapshot = snapshot;
return this;
}
@ -350,7 +330,7 @@ public class DruidCoordinatorRuntimeParams
public Builder withUsedSegments(Collection<DataSegment> usedSegments)
{
this.usedSegments = createUsedSegmentsSet(usedSegments);
this.usedSegmentsNewestFirst = createUsedSegmentsSet(usedSegments);
this.dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(usedSegments, ImmutableMap.of());
return this;
}

View File

@ -68,7 +68,6 @@ public abstract class BalancerStrategyFactory
private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")

View File

@ -20,7 +20,7 @@
package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
@ -49,7 +49,7 @@ import java.util.stream.Collectors;
*/
public class TierSegmentBalancer
{
private static final EmittingLogger log = new EmittingLogger(TierSegmentBalancer.class);
private static final Logger log = new Logger(TierSegmentBalancer.class);
private final String tier;
private final DruidCoordinatorRuntimeParams params;
@ -72,7 +72,7 @@ public class TierSegmentBalancer
this.tier = tier;
this.params = params;
this.segmentAssigner = params.getSegmentAssigner();
this.runStats = segmentAssigner.getStats();
this.runStats = params.getCoordinatorStats();
Map<Boolean, List<ServerHolder>> partitions =
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
@ -127,10 +127,10 @@ public class TierSegmentBalancer
);
movedCount += moveSegmentsTo(activeServers, pickedSegments, numLoadedSegmentsToMove);
} else {
log.info("There are already [%,d] segments moving in tier[%s].", movingSegmentCount, tier);
log.debug("There are already [%,d] segments moving in tier[%s].", movingSegmentCount, tier);
}
log.info(
log.debug(
"Moved [%,d of %,d] segments from [%d] [%s] servers in tier [%s].",
movedCount, numSegmentsToMove, sourceServers.size(), sourceServerType, tier
);
@ -167,7 +167,7 @@ public class TierSegmentBalancer
@Nullable
private DataSegment getLoadableSegment(DataSegment segmentToMove)
{
if (!params.getUsedSegments().contains(segmentToMove)) {
if (!params.isUsedSegment(segmentToMove)) {
markUnmoved("Segment is unused", segmentToMove);
return null;
}

View File

@ -20,25 +20,25 @@
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.Duration;
import java.util.Set;
/**
*
* Coordinator Duty to balance segments across Historicals.
*/
public class BalanceSegments implements CoordinatorDuty
{
private static final EmittingLogger log = new EmittingLogger(BalanceSegments.class);
private static final Logger log = new Logger(BalanceSegments.class);
private final Duration coordinatorPeriod;
@ -50,26 +50,17 @@ public class BalanceSegments implements CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
if (params.getUsedSegments().isEmpty()) {
log.info("Skipping balance as there are no used segments.");
if (params.getUsedSegmentCount() <= 0) {
return params;
}
final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig = params.getSegmentLoadingConfig();
final int maxSegmentsToMove = getMaxSegmentsToMove(params);
params.getCoordinatorStats().add(Stats.Balancer.MAX_TO_MOVE, maxSegmentsToMove);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].", maxSegmentsToMove);
return params;
} else {
log.info(
"Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove, loadingConfig.getMaxLifetimeInLoadQueue()
);
}
cluster.getHistoricals().forEach(
params.getDruidCluster().getHistoricals().forEach(
(tier, servers) -> new TierSegmentBalancer(tier, servers, maxSegmentsToMove, params).run()
);
@ -97,7 +88,7 @@ public class BalanceSegments implements CoordinatorDuty
final int numBalancerThreads = params.getSegmentLoadingConfig().getBalancerComputeThreads();
final int maxSegmentsToMove = SegmentToMoveCalculator
.computeMaxSegmentsToMovePerTier(totalSegmentsInCluster, numBalancerThreads, coordinatorPeriod);
log.info(
log.debug(
"Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d] historicals.",
maxSegmentsToMove, totalSegmentsInCluster, numHistoricalsAndSegments.lhs
);

View File

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Collects stats pertaining to segment availability on different servers.
*/
public class CollectSegmentAndServerStats implements CoordinatorDuty
{
private static final Logger log = new Logger(CollectSegmentAndServerStats.class);
private final LoadQueueTaskMaster taskMaster;
public CollectSegmentAndServerStats(LoadQueueTaskMaster taskMaster)
{
this.taskMaster = taskMaster;
}
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
params.getDruidCluster().getHistoricals()
.forEach(this::logHistoricalTierStats);
logServerDebuggingInfo(params.getDruidCluster());
collectLoadQueueStats(params.getCoordinatorStats());
return params;
}
private void collectLoadQueueStats(CoordinatorRunStats stats)
{
taskMaster.getAllPeons().forEach((serverName, queuePeon) -> {
final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey, queuePeon.getSegmentsToDrop().size());
stats.updateMax(Stats.SegmentQueue.LOAD_RATE_KBPS, rowKey, queuePeon.getLoadRateKbps());
queuePeon.getAndResetStats().forEachStat(
(stat, key, statValue) ->
stats.add(stat, createRowKeyForServer(serverName, key.getValues()), statValue)
);
});
}
private RowKey createRowKeyForServer(String serverName, Map<Dimension, String> dimensionValues)
{
final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName);
dimensionValues.forEach(builder::with);
return builder.build();
}
private void logHistoricalTierStats(String tier, Set<ServerHolder> historicals)
{
final AtomicInteger servedCount = new AtomicInteger();
final AtomicInteger loadingCount = new AtomicInteger();
final AtomicInteger droppingCount = new AtomicInteger();
final AtomicDouble usageSum = new AtomicDouble();
final AtomicLong currentBytesSum = new AtomicLong();
historicals.forEach(serverHolder -> {
final ImmutableDruidServer server = serverHolder.getServer();
servedCount.addAndGet(server.getNumSegments());
currentBytesSum.addAndGet(server.getCurrSize());
usageSum.addAndGet(100.0f * server.getCurrSize() / server.getMaxSize());
final LoadQueuePeon queuePeon = serverHolder.getPeon();
loadingCount.addAndGet(queuePeon.getSegmentsToLoad().size());
droppingCount.addAndGet(queuePeon.getSegmentsToDrop().size());
});
final int numHistoricals = historicals.size();
log.info(
"Tier[%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
+ " across [%d] historicals with average usage[%d GBs], [%.1f%%].",
tier, servedCount.get(), loadingCount.get(), droppingCount.get(), numHistoricals,
(currentBytesSum.get() >> 30) / numHistoricals, usageSum.get() / numHistoricals
);
}
private void logServerDebuggingInfo(DruidCluster cluster)
{
if (log.isDebugEnabled()) {
log.debug("Servers");
for (ServerHolder serverHolder : cluster.getAllServers()) {
ImmutableDruidServer druidServer = serverHolder.getServer();
log.debug(" %s", druidServer);
log.debug(" -- DataSources");
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
log.debug(" %s", druidDataSource);
}
}
}
}
}

View File

@ -120,13 +120,19 @@ public class CompactSegments implements CoordinatorCustomDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
// Coordinator supports only native engine for compaction
run(
params.getCompactionConfig(),
params.getUsedSegmentsTimelinesPerDataSource(),
CompactionEngine.NATIVE,
params.getCoordinatorStats()
);
if (isCompactionSupervisorEnabled()) {
LOG.warn(
"Skipping CompactSegments duty since compaction supervisors"
+ " are already running on Overlord."
);
} else {
run(
params.getCompactionConfig(),
params.getUsedSegmentsTimelinesPerDataSource(),
CompactionEngine.NATIVE,
params.getCoordinatorStats()
);
}
return params;
}
@ -137,11 +143,8 @@ public class CompactSegments implements CoordinatorCustomDuty
CoordinatorRunStats stats
)
{
LOG.info("Running CompactSegments duty");
final int maxCompactionTaskSlots = dynamicConfig.getMaxCompactionTaskSlots();
if (maxCompactionTaskSlots <= 0) {
LOG.info("Skipping compaction as maxCompactionTaskSlots is [%d].", maxCompactionTaskSlots);
resetCompactionSnapshot();
return;
}
@ -149,7 +152,6 @@ public class CompactSegments implements CoordinatorCustomDuty
statusTracker.onCompactionConfigUpdated(dynamicConfig);
List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
if (compactionConfigList == null || compactionConfigList.isEmpty()) {
LOG.info("Skipping compaction as compaction config list is empty.");
resetCompactionSnapshot();
return;
}
@ -251,6 +253,21 @@ public class CompactSegments implements CoordinatorCustomDuty
autoCompactionSnapshotPerDataSource.set(Collections.emptyMap());
}
/**
* Check if compaction supervisors are enabled on the Overlord. In this case,
* CompactSegments duty should not be run.
*/
private boolean isCompactionSupervisorEnabled()
{
try {
return FutureUtils.getUnchecked(overlordClient.isCompactionSupervisorEnabled(), true);
}
catch (Exception e) {
// The Overlord is probably on an older version, assume that compaction supervisor is NOT enabled
return false;
}
}
/**
* Queries the Overlord for the status of all tasks that were submitted
* recently but are not active anymore. The statuses are then updated in the
@ -415,7 +432,7 @@ public class CompactSegments implements CoordinatorCustomDuty
// compaction is enabled and estimatedIncompleteCompactionTasks is 0.
availableCompactionTaskSlots = Math.max(1, compactionTaskCapacity);
}
LOG.info(
LOG.debug(
"Found [%d] available task slots for compaction out of max compaction task capacity [%d]",
availableCompactionTaskSlots, compactionTaskCapacity
);
@ -567,7 +584,7 @@ public class CompactSegments implements CoordinatorCustomDuty
new ClientCompactionRunnerInfo(compactionEngine)
);
LOG.info(
LOG.debug(
"Submitted a compaction task[%s] for [%d] segments in datasource[%s], umbrella interval[%s].",
taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval()
);

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.EvictingQueue;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* A group of {@link CoordinatorDuty}.
*/
public class CoordinatorDutyGroup
{
private static final Logger log = new Logger(CoordinatorDutyGroup.class);
private final String name;
private final Duration period;
private final List<CoordinatorDuty> duties;
private final List<String> dutyNames;
private final DruidCoordinator.DutyGroupHelper coordinator;
private final AtomicReference<DateTime> lastRunStartTime = new AtomicReference<>();
private final AtomicReference<DateTime> lastRunEndTime = new AtomicReference<>();
private final EvictingQueue<Long> runTimes = EvictingQueue.create(20);
private final EvictingQueue<Long> gapTimes = EvictingQueue.create(20);
public CoordinatorDutyGroup(
String name,
List<CoordinatorDuty> duties,
Duration period,
DruidCoordinator.DutyGroupHelper coordinator
)
{
this.name = name;
this.duties = duties;
this.period = period;
this.dutyNames = duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList());
this.coordinator = coordinator;
log.info("Created dutyGroup[%s] with period[%s] and duties[%s].", name, period, dutyNames);
}
public String getName()
{
return name;
}
public Duration getPeriod()
{
return period;
}
public synchronized DutyGroupStatus getStatus()
{
return new DutyGroupStatus(
name,
period,
dutyNames,
lastRunStartTime.get(),
lastRunEndTime.get(),
computeWindowAverage(runTimes),
computeWindowAverage(gapTimes)
);
}
/**
* Runs this duty group if the coordinator is leader and emits stats collected
* during the run.
*/
public void run(DruidCoordinatorRuntimeParams params)
{
markRunStarted();
final boolean coordinationPaused = params.getCoordinatorDynamicConfig().getPauseCoordination();
if (coordinationPaused && coordinator.isLeader()) {
log.info("Coordination has been paused. Duties will not run until coordination is resumed.");
return;
}
final CoordinatorRunStats stats = params.getCoordinatorStats();
for (CoordinatorDuty duty : duties) {
if (coordinator.isLeader()) {
final Stopwatch dutyRunTime = Stopwatch.createStarted();
params = duty.run(params);
dutyRunTime.stop();
final String dutyName = duty.getClass().getName();
if (params == null) {
log.warn("Stopping run for group[%s] on request of duty[%s].", name, dutyName);
return;
} else {
stats.add(
Stats.CoordinatorRun.DUTY_RUN_TIME,
RowKey.of(Dimension.DUTY, dutyName),
dutyRunTime.millisElapsed()
);
}
}
}
// Emit stats collected from all duties
if (stats.rowCount() > 0) {
stats.forEachStat(this::emitStat);
final String statsTable = stats.buildStatsTable();
if (!statsTable.isEmpty()) {
log.info("Collected stats for duty group[%s]: %s", name, statsTable);
}
}
// Emit the runtime of the entire duty group
final long runMillis = markRunCompleted();
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, RowKey.empty(), runMillis);
}
private void emitStat(CoordinatorStat stat, RowKey rowKey, long value)
{
if (stat.shouldEmit()) {
coordinator.emitStat(stat, rowKey, value);
}
}
private synchronized long computeWindowAverage(EvictingQueue<Long> window)
{
final int numEntries = window.size();
if (numEntries > 0) {
long totalTimeMillis = window.stream().mapToLong(Long::longValue).sum();
return totalTimeMillis / numEntries;
} else {
return 0;
}
}
private synchronized void addToWindow(EvictingQueue<Long> window, long value)
{
window.add(value);
}
private void markRunStarted()
{
final DateTime now = DateTimes.nowUtc();
final DateTime lastStart = lastRunStartTime.getAndSet(now);
if (lastStart != null) {
addToWindow(gapTimes, now.getMillis() - lastStart.getMillis());
}
}
private long markRunCompleted()
{
final DateTime now = DateTimes.nowUtc();
lastRunEndTime.set(now);
final long runtimeMillis = now.getMillis() - lastRunStartTime.get().getMillis();
addToWindow(runTimes, runtimeMillis);
return runtimeMillis;
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.List;
public class DutyGroupStatus
{
private final String name;
private final Duration period;
private final List<String> dutyNames;
private final DateTime lastRunStart;
private final DateTime lastRunEnd;
private final long avgRuntimeMillis;
private final long avgRunGapMillis;
public DutyGroupStatus(
String name,
Duration period,
List<String> dutyNames,
DateTime lastRunStart,
DateTime lastRunEnd,
long avgRuntimeMillis,
long avgRunGapMillis
)
{
this.name = name;
this.period = period;
this.dutyNames = dutyNames;
this.lastRunStart = lastRunStart;
this.lastRunEnd = lastRunEnd;
this.avgRuntimeMillis = avgRuntimeMillis;
this.avgRunGapMillis = avgRunGapMillis;
}
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public Duration getPeriod()
{
return period;
}
@JsonProperty
public List<String> getDutyNames()
{
return dutyNames;
}
@JsonProperty
public DateTime getLastRunStart()
{
return lastRunStart;
}
@JsonProperty
public DateTime getLastRunEnd()
{
return lastRunEnd;
}
@JsonProperty
public long getAvgRuntimeMillis()
{
return avgRuntimeMillis;
}
@JsonProperty
public long getAvgRunGapMillis()
{
return avgRunGapMillis;
}
}

View File

@ -88,7 +88,6 @@ public class KillCompactionConfig extends MetadataCleanupDuty
{
// If current compaction config is empty then there is nothing to do
if (DruidCompactionConfig.empty().equals(current)) {
log.info("Nothing to do as compaction config is already empty.");
return current;
}
@ -134,7 +133,6 @@ public class KillCompactionConfig extends MetadataCleanupDuty
if (result.isOk()) {
return compactionConfigRemoved.get();
} else if (result.isRetryable()) {
log.debug("Retrying KillCompactionConfig duty");
throw new RetryableException(result.getException());
} else {
log.error(result.getException(), "Failed to kill compaction configurations");

View File

@ -74,7 +74,7 @@ public class KillUnreferencedSegmentSchema extends MetadataCleanupDuty
// This case would arise when segment is associated with a schema which was marked unused in the previous step
// or in the previous run.
List<String> schemaFingerprintsToUpdate = segmentSchemaManager.findReferencedSchemaMarkedAsUnused();
if (schemaFingerprintsToUpdate.size() > 0) {
if (!schemaFingerprintsToUpdate.isEmpty()) {
segmentSchemaManager.markSchemaAsUsed(schemaFingerprintsToUpdate);
log.info("Marked [%s] unused schemas referenced by used segments as used.", schemaFingerprintsToUpdate.size());
}

View File

@ -72,9 +72,9 @@ public class MarkEternityTombstonesAsUnused implements CoordinatorDuty
{
private static final Logger log = new Logger(MarkEternityTombstonesAsUnused.class);
private final SegmentDeleteHandler deleteHandler;
private final MetadataAction.DeleteSegments deleteHandler;
public MarkEternityTombstonesAsUnused(final SegmentDeleteHandler deleteHandler)
public MarkEternityTombstonesAsUnused(final MetadataAction.DeleteSegments deleteHandler)
{
this.deleteHandler = deleteHandler;
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.server.coordinator.duty;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@ -33,9 +33,8 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -56,9 +55,10 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
{
private static final Logger log = new Logger(MarkOvershadowedSegmentsAsUnused.class);
private final SegmentDeleteHandler deleteHandler;
private final MetadataAction.DeleteSegments deleteHandler;
private final Stopwatch sinceCoordinatorStarted = Stopwatch.createStarted();
public MarkOvershadowedSegmentsAsUnused(SegmentDeleteHandler deleteHandler)
public MarkOvershadowedSegmentsAsUnused(MetadataAction.DeleteSegments deleteHandler)
{
this.deleteHandler = deleteHandler;
}
@ -68,19 +68,15 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
{
// Mark overshadowed segments as unused only if the coordinator has been running
// long enough to have refreshed its metadata view
final DateTime coordinatorStartTime = params.getCoordinatorStartTime();
final long delayMillis = params.getCoordinatorDynamicConfig().getMarkSegmentAsUnusedDelayMillis();
if (DateTimes.nowUtc().isBefore(coordinatorStartTime.plus(delayMillis))) {
log.info(
"Skipping MarkAsUnused until [%s] have elapsed after coordinator start time[%s].",
Duration.ofMillis(delayMillis), coordinatorStartTime
);
final Duration requiredDelay = Duration.millis(
params.getCoordinatorDynamicConfig().getMarkSegmentAsUnusedDelayMillis()
);
if (sinceCoordinatorStarted.hasNotElapsed(requiredDelay)) {
return params;
}
final Set<DataSegment> allOvershadowedSegments = params.getDataSourcesSnapshot().getOvershadowedSegments();
if (allOvershadowedSegments.isEmpty()) {
log.info("Skipping MarkAsUnused as there are no overshadowed segments.");
return params;
}
@ -122,8 +118,12 @@ public class MarkOvershadowedSegmentsAsUnused implements CoordinatorDuty
RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource);
stats.add(Stats.Segments.OVERSHADOWED, datasourceKey, unusedSegments.size());
final Stopwatch updateTime = Stopwatch.createStarted();
int updatedCount = deleteHandler.markSegmentsAsUnused(unusedSegments);
log.info("Successfully marked [%d] segments of datasource[%s] as unused.", updatedCount, datasource);
log.info(
"Marked [%d] segments of datasource[%s] as unused in [%,d]ms.",
updatedCount, datasource, updateTime.millisElapsed()
);
}
);

View File

@ -19,13 +19,27 @@
package org.apache.druid.server.coordinator.duty;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.SegmentId;
import java.util.List;
import java.util.Set;
public interface SegmentDeleteHandler
/**
* Contains functional interfaces that are used by a {@link CoordinatorDuty} to
* perform a single read or write operation on the metadata store.
*/
public final class MetadataAction
{
@FunctionalInterface
public interface DeleteSegments
{
int markSegmentsAsUnused(Set<SegmentId> segmentIds);
}
int markSegmentsAsUnused(Set<SegmentId> segmentIds);
@FunctionalInterface
public interface GetDatasourceRules
{
List<Rule> getRulesWithDefault(String dataSource);
}
}

View File

@ -82,7 +82,9 @@ public abstract class MetadataCleanupDuty implements CoordinatorDuty
try {
DateTime minCreatedTime = now.minus(cleanupConfig.getDurationToRetain());
int deletedEntries = cleanupEntriesCreatedBefore(minCreatedTime);
log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, entryType, minCreatedTime);
if (deletedEntries > 0) {
log.info("Removed [%,d] [%s] created before [%s].", deletedEntries, entryType, minCreatedTime);
}
params.getCoordinatorStats().add(cleanupCountStat, deletedEntries);
}

View File

@ -85,7 +85,7 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
final CoordinatorDynamicConfig dynamicConfig = params.getCoordinatorDynamicConfig();
final SegmentLoadingConfig segmentLoadingConfig
= SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegments().size());
= SegmentLoadingConfig.create(dynamicConfig, params.getUsedSegmentCount());
final DruidCluster cluster = prepareCluster(dynamicConfig, segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);
@ -93,11 +93,12 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
final CoordinatorRunStats stats = params.getCoordinatorStats();
collectHistoricalStats(cluster, stats);
collectUsedSegmentStats(params, stats);
collectDebugStats(segmentLoadingConfig, stats);
final int numBalancerThreads = segmentLoadingConfig.getBalancerComputeThreads();
final BalancerStrategy balancerStrategy = balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
log.info(
"Using balancer strategy [%s] with [%d] threads.",
log.debug(
"Using balancer strategy[%s] with [%d] threads.",
balancerStrategy.getClass().getSimpleName(), numBalancerThreads
);
@ -132,13 +133,6 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
}
);
}
if (cancelledCount.get() > 0) {
log.info(
"Cancelled [%d] load/move operations on [%d] decommissioning servers.",
cancelledCount.get(), decommissioningServers.size()
);
}
}
private List<ImmutableDruidServer> prepareCurrentServers()
@ -195,4 +189,10 @@ public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
});
}
private void collectDebugStats(SegmentLoadingConfig config, CoordinatorRunStats stats)
{
stats.add(Stats.Balancer.COMPUTE_THREADS, config.getBalancerComputeThreads());
stats.add(Stats.Segments.REPLICATION_THROTTLE_LIMIT, config.getReplicationThrottleLimit());
}
}

View File

@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
@ -56,11 +55,16 @@ public class RunRules implements CoordinatorDuty
{
private static final EmittingLogger log = new EmittingLogger(RunRules.class);
private final SegmentDeleteHandler deleteHandler;
private final MetadataAction.DeleteSegments deleteHandler;
private final MetadataAction.GetDatasourceRules ruleHandler;
public RunRules(SegmentDeleteHandler deleteHandler)
public RunRules(
MetadataAction.DeleteSegments deleteHandler,
MetadataAction.GetDatasourceRules ruleHandler
)
{
this.deleteHandler = deleteHandler;
this.ruleHandler = ruleHandler;
}
@Override
@ -72,15 +76,13 @@ public class RunRules implements CoordinatorDuty
return params;
}
// Get all used segments sorted by interval. Segments must be sorted to ensure that:
// a) round-robin assignment distributes newer segments uniformly across servers
// b) replication throttling has a smaller impact on newer segments
final Set<DataSegment> usedSegments = params.getUsedSegmentsNewestFirst();
final Set<DataSegment> overshadowed = params.getDataSourcesSnapshot().getOvershadowedSegments();
final Set<DataSegment> usedSegments = params.getUsedSegments();
log.info(
"Applying retention rules on [%,d] used segments, skipping [%,d] overshadowed segments.",
usedSegments.size(), overshadowed.size()
);
final StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
final MetadataRuleManager databaseRuleManager = params.getDatabaseRuleManager();
final DateTime now = DateTimes.nowUtc();
final Object2IntOpenHashMap<String> datasourceToSegmentsWithNoRule = new Object2IntOpenHashMap<>();
@ -92,7 +94,7 @@ public class RunRules implements CoordinatorDuty
}
// Find and apply matching rule
List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
List<Rule> rules = ruleHandler.getRulesWithDefault(segment.getDataSource());
boolean foundMatchingRule = false;
for (Rule rule : rules) {
if (rule.appliesTo(segment, now)) {
@ -158,17 +160,10 @@ public class RunRules implements CoordinatorDuty
private Set<String> getBroadcastDatasources(DruidCoordinatorRuntimeParams params)
{
final Set<String> broadcastDatasources =
params.getDataSourcesSnapshot().getDataSourcesMap().values().stream()
.map(ImmutableDruidDataSource::getName)
.filter(datasource -> isBroadcastDatasource(datasource, params))
.collect(Collectors.toSet());
if (!broadcastDatasources.isEmpty()) {
log.info("Found broadcast datasources [%s] which will not participate in balancing.", broadcastDatasources);
}
return broadcastDatasources;
return params.getDataSourcesSnapshot().getDataSourcesMap().values().stream()
.map(ImmutableDruidDataSource::getName)
.filter(this::isBroadcastDatasource)
.collect(Collectors.toSet());
}
/**
@ -179,9 +174,10 @@ public class RunRules implements CoordinatorDuty
* <li>Are unloaded if unused, even from realtime servers</li>
* </ul>
*/
private boolean isBroadcastDatasource(String datasource, DruidCoordinatorRuntimeParams params)
private boolean isBroadcastDatasource(String datasource)
{
return params.getDatabaseRuleManager().getRulesWithDefault(datasource).stream()
.anyMatch(rule -> rule instanceof BroadcastDistributionRule);
return ruleHandler.getRulesWithDefault(datasource)
.stream()
.anyMatch(rule -> rule instanceof BroadcastDistributionRule);
}
}

View File

@ -33,7 +33,6 @@ import org.apache.druid.timeline.DataSegment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -44,9 +43,14 @@ public class UnloadUnusedSegments implements CoordinatorDuty
private static final Logger log = new Logger(UnloadUnusedSegments.class);
private final SegmentLoadQueueManager loadQueueManager;
private final MetadataAction.GetDatasourceRules ruleHandler;
public UnloadUnusedSegments(SegmentLoadQueueManager loadQueueManager)
public UnloadUnusedSegments(
SegmentLoadQueueManager loadQueueManager,
MetadataAction.GetDatasourceRules ruleHandler
)
{
this.ruleHandler = ruleHandler;
this.loadQueueManager = loadQueueManager;
}
@ -82,18 +86,16 @@ public class UnloadUnusedSegments implements CoordinatorDuty
Map<String, Boolean> broadcastStatusByDatasource
)
{
final Set<DataSegment> usedSegments = params.getUsedSegments();
final AtomicInteger numQueuedDrops = new AtomicInteger(0);
final ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
if (shouldSkipUnload(serverHolder, dataSource.getName(), broadcastStatusByDatasource, params)) {
if (shouldSkipUnload(serverHolder, dataSource.getName(), broadcastStatusByDatasource)) {
continue;
}
int totalUnneededCount = 0;
for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)
if (!params.isUsedSegment(segment)
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
log.debug(
@ -118,13 +120,11 @@ public class UnloadUnusedSegments implements CoordinatorDuty
DruidCoordinatorRuntimeParams params
)
{
final Set<DataSegment> usedSegments = params.getUsedSegments();
final AtomicInteger cancelledOperations = new AtomicInteger(0);
server.getQueuedSegments().forEach((segment, action) -> {
if (shouldSkipUnload(server, segment.getDataSource(), broadcastStatusByDatasource, params)) {
if (shouldSkipUnload(server, segment.getDataSource(), broadcastStatusByDatasource)) {
// do nothing
} else if (usedSegments.contains(segment)) {
} else if (params.isUsedSegment(segment)) {
// do nothing
} else if (action.isLoad() && server.cancelOperation(action, segment)) {
cancelledOperations.incrementAndGet();
@ -147,21 +147,21 @@ public class UnloadUnusedSegments implements CoordinatorDuty
private boolean shouldSkipUnload(
ServerHolder server,
String dataSource,
Map<String, Boolean> broadcastStatusByDatasource,
DruidCoordinatorRuntimeParams params
Map<String, Boolean> broadcastStatusByDatasource
)
{
boolean isBroadcastDatasource = broadcastStatusByDatasource
.computeIfAbsent(dataSource, ds -> isBroadcastDatasource(ds, params));
.computeIfAbsent(dataSource, this::isBroadcastDatasource);
return server.isRealtimeServer() && !isBroadcastDatasource;
}
/**
* A datasource is considered a broadcast datasource if it has even one broadcast rule.
*/
private boolean isBroadcastDatasource(String datasource, DruidCoordinatorRuntimeParams params)
private boolean isBroadcastDatasource(String datasource)
{
return params.getDatabaseRuleManager().getRulesWithDefault(datasource).stream()
.anyMatch(rule -> rule instanceof BroadcastDistributionRule);
return ruleHandler.getRulesWithDefault(datasource)
.stream()
.anyMatch(rule -> rule instanceof BroadcastDistributionRule);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.SegmentChangeRequestNoop;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@ -85,7 +84,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
* {@link #stop()}.
*/
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad
= new ConcurrentSkipListMap<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
= new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
/**
* Needs to be thread safe since it can be concurrently accessed via
@ -93,7 +92,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
* {@link #getSegmentsToDrop()} and {@link #stop()}
*/
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop
= new ConcurrentSkipListMap<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
= new ConcurrentSkipListMap<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
/**
* Needs to be thread safe since it can be concurrently accessed via
@ -101,7 +100,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
* and {@link #getSegmentsToDrop()}
*/
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop
= new ConcurrentSkipListSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
= new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
/**
* Needs to be thread safe since it can be concurrently accessed via
@ -109,7 +108,7 @@ public class CuratorLoadQueuePeon implements LoadQueuePeon
* {@link #getTimedOutSegments()} and {@link #stop()}
*/
private final ConcurrentSkipListSet<DataSegment> timedOutSegments =
new ConcurrentSkipListSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
new ConcurrentSkipListSet<>(SegmentHolder.NEWEST_SEGMENT_FIRST);
public CuratorLoadQueuePeon(
CuratorFramework curator,

View File

@ -22,10 +22,10 @@ package org.apache.druid.server.coordinator.loading;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
@ -43,6 +43,24 @@ import java.util.Objects;
*/
public class SegmentHolder implements Comparable<SegmentHolder>
{
/**
* Orders newest segments first (i.e. segments with most recent intervals).
* <p>
* The order is needed to ensure that:
* <ul>
* <li>Round-robin assignment distributes segments belonging to same or adjacent
* intervals uniformly across all servers.</li>
* <li>Load queue prioritizes load of most recent segments, as
* they are presumed to contain more important data which is queried more often.</li>
* <li>Replication throttler has a smaller impact on replicas of newer segments.</li>
* </ul>
*/
public static final Ordering<DataSegment> NEWEST_SEGMENT_FIRST = Ordering
.from(Comparators.intervalsByEndThenStart())
.onResultOf(DataSegment::getInterval)
.compound(Ordering.<DataSegment>natural())
.reverse();
/**
* Orders segment requests:
* <ul>
@ -53,7 +71,7 @@ public class SegmentHolder implements Comparable<SegmentHolder>
private static final Comparator<SegmentHolder> COMPARE_ACTION_THEN_INTERVAL =
Ordering.explicit(SegmentAction.DROP, SegmentAction.LOAD, SegmentAction.REPLICATE, SegmentAction.MOVE_TO)
.onResultOf(SegmentHolder::getAction)
.compound(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST.onResultOf(SegmentHolder::getSegment));
.compound(NEWEST_SEGMENT_FIRST.onResultOf(SegmentHolder::getSegment));
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;

View File

@ -50,7 +50,7 @@ public class SegmentLoadingConfig
final int throttlePercentage = 5;
final int replicationThrottleLimit = Math.max(100, numUsedSegments * throttlePercentage / 100);
final int numBalancerThreads = CoordinatorDynamicConfig.getDefaultBalancerComputeThreads();
log.info(
log.debug(
"Smart segment loading is enabled. Calculated replicationThrottleLimit[%,d]"
+ " (%d%% of used segments[%,d]) and numBalancerThreads[%d].",
replicationThrottleLimit, throttlePercentage, numUsedSegments, numBalancerThreads

View File

@ -93,11 +93,6 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
);
}
public CoordinatorRunStats getStats()
{
return stats;
}
public SegmentReplicationStatus getReplicationStatus()
{
return replicaCountMap.toReplicationStatus();

View File

@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Contains statistics typically tracked during a single coordinator run or the
@ -112,8 +111,6 @@ public class CoordinatorRunStats
public String buildStatsTable()
{
final StringBuilder statsTable = new StringBuilder();
final AtomicInteger hiddenStats = new AtomicInteger(0);
final AtomicInteger totalStats = new AtomicInteger();
allStats.forEach(
(rowKey, statMap) -> {
@ -129,7 +126,6 @@ public class CoordinatorRunStats
// Add all the errors
final Map<CoordinatorStat, Long> errorStats = levelToStats
.getOrDefault(CoordinatorStat.Level.ERROR, Collections.emptyMap());
totalStats.addAndGet(errorStats.size());
if (!errorStats.isEmpty()) {
statsTable.append(
StringUtils.format("\nError: %s ==> %s", rowKey, errorStats)
@ -139,7 +135,6 @@ public class CoordinatorRunStats
// Add all the info level stats
final Map<CoordinatorStat, Long> infoStats = levelToStats
.getOrDefault(CoordinatorStat.Level.INFO, Collections.emptyMap());
totalStats.addAndGet(infoStats.size());
if (!infoStats.isEmpty()) {
statsTable.append(
StringUtils.format("\nInfo : %s ==> %s", rowKey, infoStats)
@ -149,28 +144,14 @@ public class CoordinatorRunStats
// Add all the debug level stats if the row key has a debug dimension
final Map<CoordinatorStat, Long> debugStats = levelToStats
.getOrDefault(CoordinatorStat.Level.DEBUG, Collections.emptyMap());
totalStats.addAndGet(debugStats.size());
if (!debugStats.isEmpty() && hasDebugDimension(rowKey)) {
statsTable.append(
StringUtils.format("\nDebug: %s ==> %s", rowKey, debugStats)
);
} else {
hiddenStats.addAndGet(debugStats.size());
}
}
);
if (hiddenStats.get() > 0) {
statsTable.append(
StringUtils.format("\nDebug: %d hidden stats. Set 'debugDimensions' to see these.", hiddenStats.get())
);
}
if (totalStats.get() > 0) {
statsTable.append(
StringUtils.format("\nTOTAL: %d stats for %d dimension keys", totalStats.get(), rowCount())
);
}
return statsTable.toString();
}

View File

@ -61,6 +61,10 @@ public class Stats
= CoordinatorStat.toDebugAndEmit("overshadowed", "segment/overshadowed/count");
public static final CoordinatorStat UNNEEDED_ETERNITY_TOMBSTONE
= CoordinatorStat.toDebugAndEmit("unneededEternityTombstone", "segment/unneededEternityTombstone/count");
// Values computed in a run
public static final CoordinatorStat REPLICATION_THROTTLE_LIMIT
= CoordinatorStat.toDebugOnly("replicationThrottleLimit");
}
public static class SegmentQueue
@ -170,5 +174,8 @@ public class Stats
);
public static final CoordinatorStat COMPUTATION_TIME = CoordinatorStat.toDebugOnly("costComputeTime");
public static final CoordinatorStat COMPUTATION_COUNT = CoordinatorStat.toDebugOnly("costComputeCount");
public static final CoordinatorStat COMPUTE_THREADS = CoordinatorStat.toDebugOnly("balancerComputeThreads");
public static final CoordinatorStat MAX_TO_MOVE = CoordinatorStat.toDebugOnly("maxToMove");
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.http;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
import java.util.List;
public class CoordinatorDutyStatus
{
private final List<DutyGroupStatus> dutyGroups;
public CoordinatorDutyStatus(List<DutyGroupStatus> dutyGroups)
{
this.dutyGroups = dutyGroups;
}
@JsonProperty
public List<DutyGroupStatus> getDutyGroups()
{
return dutyGroups;
}
}

View File

@ -149,4 +149,13 @@ public class CoordinatorResource
)
).build();
}
@GET
@Path("/duties")
@ResourceFilters(StateResourceFilter.class)
@Produces(MediaType.APPLICATION_JSON)
public Response getStatusOfDuties()
{
return Response.ok(new CoordinatorDutyStatus(coordinator.getStatusOfDuties())).build();
}
}

View File

@ -54,7 +54,8 @@ public class SqlSegmentsMetadataManagerProviderTest
connector,
lifecycle,
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
SegmentsMetadataManager manager = provider.get();
Assert.assertTrue(manager instanceof SqlSegmentsMetadataManager);

View File

@ -70,7 +70,8 @@ public class SqlSegmentsMetadataManagerSchemaPollTest extends SqlSegmentsMetadat
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();
storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
@ -137,7 +138,8 @@ public class SqlSegmentsMetadataManagerSchemaPollTest extends SqlSegmentsMetadat
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
segmentSchemaCache,
centralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig,
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();
@ -225,7 +227,8 @@ public class SqlSegmentsMetadataManagerSchemaPollTest extends SqlSegmentsMetadat
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
segmentSchemaCache,
centralizedDatasourceSchemaConfig
centralizedDatasourceSchemaConfig,
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();

View File

@ -114,7 +114,7 @@ public class SqlSegmentsMetadataManagerTest extends SqlSegmentsMetadataManagerTe
config.setPollDuration(Period.seconds(3));
storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get();
segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter());
segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance());
segmentSchemaManager = new SegmentSchemaManager(
derbyConnectorRule.metadataTablesConfigSupplier().get(),
jsonMapper,
@ -127,7 +127,8 @@ public class SqlSegmentsMetadataManagerTest extends SqlSegmentsMetadataManagerTe
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();
@ -1338,7 +1339,8 @@ public class SqlSegmentsMetadataManagerTest extends SqlSegmentsMetadataManagerTe
derbyConnectorRule.metadataTablesConfigSupplier(),
derbyConnectorRule.getConnector(),
segmentSchemaCache,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();

View File

@ -128,7 +128,7 @@ public class BalanceSegmentsProfiler
.addTier("normal", serverHolderList.toArray(new ServerHolder[0]))
.build();
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(druidCluster)
.withUsedSegments(segments)
.withDynamicConfigs(
@ -140,11 +140,10 @@ public class BalanceSegmentsProfiler
.build()
)
.withSegmentAssignerUsing(loadQueueManager)
.withDatabaseRuleManager(manager)
.build();
BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
RunRules runner = new RunRules(Set::size);
RunRules runner = new RunRules(Set::size, manager::getRulesWithDefault);
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
DruidCoordinatorRuntimeParams assignParams = runner.run(params);
@ -174,7 +173,7 @@ public class BalanceSegmentsProfiler
EasyMock.replay(druidServer2);
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(
DruidCluster
.builder()

View File

@ -144,9 +144,7 @@ public class CoordinatorRunStatsTest
final String expectedTable
= "\nError: {duty=duty1} ==> {error1=10}"
+ "\nInfo : {duty=duty1} ==> {info1=20}"
+ "\nDebug: 1 hidden stats. Set 'debugDimensions' to see these."
+ "\nTOTAL: 3 stats for 1 dimension keys";
+ "\nInfo : {duty=duty1} ==> {info1=20}";
Assert.assertEquals(expectedTable, stats.buildStatsTable());
}
@ -162,8 +160,7 @@ public class CoordinatorRunStatsTest
final String expectedTable
= "\nError: {duty=duty1} ==> {error1=10}"
+ "\nInfo : {duty=duty1} ==> {info1=20}"
+ "\nDebug: {duty=duty1} ==> {debug1=30}"
+ "\nTOTAL: 3 stats for 1 dimension keys";
+ "\nDebug: {duty=duty1} ==> {debug1=30}";
Assert.assertEquals(expectedTable, debugStats.buildStatsTable());
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.compaction.CompactionSimulateResult;
@ -62,7 +63,7 @@ import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
import org.apache.druid.server.coordinator.duty.KillSupervisorsCustomDuty;
import org.apache.druid.server.coordinator.loading.CuratorLoadQueuePeon;
import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
@ -115,6 +116,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private DruidNode druidNode;
private OverlordClient overlordClient;
private CompactionStatusTracker statusTracker;
private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
@ -127,6 +129,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
loadQueueTaskMaster = EasyMock.createMock(LoadQueueTaskMaster.class);
overlordClient = EasyMock.createMock(OverlordClient.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
@ -183,7 +186,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
overlordClient,
loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
@ -607,6 +610,10 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically())
.andReturn(true).anyTimes();
EasyMock.replay(segmentsMetadataManager);
CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of());
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
@ -614,7 +621,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
overlordClient,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
@ -626,22 +633,32 @@ public class DruidCoordinatorTest extends CuratorTestBase
CentralizedDatasourceSchemaConfig.create(),
new CompactionStatusTracker(OBJECT_MAPPER)
);
coordinator.start();
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
final List<DutyGroupStatus> duties = coordinator.getStatusOfDuties();
Assert.assertEquals(3, duties.size());
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
Assert.assertEquals("HistoricalManagementDuties", duties.get(0).getName());
Assert.assertEquals("IndexingServiceDuties", duties.get(1).getName());
Assert.assertEquals("MetadataStoreManagementDuties", duties.get(2).getName());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(statusTracker);
Assert.assertNotNull(duty);
final String compactDutyName = CompactSegments.class.getName();
Assert.assertTrue(duties.get(1).getDutyNames().contains(compactDutyName));
// CompactSegments should not exist in other duty groups
Assert.assertFalse(duties.get(0).getDutyNames().contains(compactDutyName));
Assert.assertFalse(duties.get(2).getDutyNames().contains(compactDutyName));
coordinator.stop();
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsCompactSegments()
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically())
.andReturn(true).anyTimes();
EasyMock.replay(segmentsMetadataManager);
CoordinatorCustomDutyGroup group = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
@ -654,7 +671,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
overlordClient,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
@ -666,22 +683,32 @@ public class DruidCoordinatorTest extends CuratorTestBase
CentralizedDatasourceSchemaConfig.create(),
new CompactionStatusTracker(OBJECT_MAPPER)
);
coordinator.start();
// Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().anyMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
final List<DutyGroupStatus> duties = coordinator.getStatusOfDuties();
Assert.assertEquals(4, duties.size());
// CompactSegments should not exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertTrue(compactSegmentsDutyFromCustomGroups.isEmpty());
Assert.assertEquals("HistoricalManagementDuties", duties.get(0).getName());
Assert.assertEquals("IndexingServiceDuties", duties.get(1).getName());
Assert.assertEquals("MetadataStoreManagementDuties", duties.get(2).getName());
Assert.assertEquals("group1", duties.get(3).getName());
// CompactSegments returned by this method should be created using the DruidCoordinatorConfig in the DruidCoordinator
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(statusTracker);
Assert.assertNotNull(duty);
final String compactDutyName = CompactSegments.class.getName();
Assert.assertTrue(duties.get(1).getDutyNames().contains(compactDutyName));
// CompactSegments should not exist in other duty groups
Assert.assertFalse(duties.get(0).getDutyNames().contains(compactDutyName));
Assert.assertFalse(duties.get(2).getDutyNames().contains(compactDutyName));
coordinator.stop();
}
@Test
public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactSegments()
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically())
.andReturn(true).anyTimes();
EasyMock.replay(segmentsMetadataManager);
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup(
"group1",
Duration.standardSeconds(1),
@ -694,7 +721,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
overlordClient,
loadQueueTaskMaster,
null,
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
@ -706,19 +733,27 @@ public class DruidCoordinatorTest extends CuratorTestBase
CentralizedDatasourceSchemaConfig.create(),
new CompactionStatusTracker(OBJECT_MAPPER)
);
coordinator.start();
// Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties
List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
Assert.assertTrue(indexingDuties.stream().noneMatch(coordinatorDuty -> coordinatorDuty instanceof CompactSegments));
final List<DutyGroupStatus> duties = coordinator.getStatusOfDuties();
Assert.assertEquals(4, duties.size());
Assert.assertEquals("HistoricalManagementDuties", duties.get(0).getName());
Assert.assertEquals("IndexingServiceDuties", duties.get(1).getName());
Assert.assertEquals("MetadataStoreManagementDuties", duties.get(2).getName());
Assert.assertEquals("group1", duties.get(3).getName());
// CompactSegments should exist in Custom Duty Group
List<CompactSegments> compactSegmentsDutyFromCustomGroups = coordinator.getCompactSegmentsDutyFromCustomGroups();
Assert.assertFalse(compactSegmentsDutyFromCustomGroups.isEmpty());
Assert.assertEquals(1, compactSegmentsDutyFromCustomGroups.size());
Assert.assertNotNull(compactSegmentsDutyFromCustomGroups.get(0));
final String compactDutyName = CompactSegments.class.getName();
Assert.assertTrue(duties.get(3).getDutyNames().contains(compactDutyName));
// CompactSegments returned by this method should be from the Custom Duty Group
CompactSegments duty = coordinator.initializeCompactSegmentsDuty(statusTracker);
Assert.assertNotNull(duty);
// CompactSegments should not exist in other duty groups
Assert.assertFalse(duties.get(0).getDutyNames().contains(compactDutyName));
Assert.assertFalse(duties.get(1).getDutyNames().contains(compactDutyName));
Assert.assertFalse(duties.get(2).getDutyNames().contains(compactDutyName));
coordinator.stop();
}
@Test(timeout = 3000)
@ -799,7 +834,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
serverInventoryView,
serviceEmitter,
scheduledExecutorFactory,
null,
overlordClient,
loadQueueTaskMaster,
new SegmentLoadQueueManager(serverInventoryView, loadQueueTaskMaster),
new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),

View File

@ -326,7 +326,7 @@ public class BalanceSegmentsTest
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(DruidCluster.builder().addTier("normal", servers).build())
.withUsedSegments(allSegments)
.withBroadcastDatasources(broadcastDatasources)

View File

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordinator.duty;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class CollectSegmentAndServerStatsTest
{
@Mock
private LoadQueueTaskMaster mockTaskMaster;
@Test
public void testCollectedSegmentStats()
{
DruidCoordinatorRuntimeParams runtimeParams =
DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc())
.withDruidCluster(DruidCluster.EMPTY)
.withUsedSegments()
.withBalancerStrategy(new RandomBalancerStrategy())
.withSegmentAssignerUsing(new SegmentLoadQueueManager(null, null))
.build();
Mockito.when(mockTaskMaster.getAllPeons())
.thenReturn(ImmutableMap.of("server1", new TestLoadQueuePeon()));
CoordinatorDuty duty = new CollectSegmentAndServerStats(mockTaskMaster);
DruidCoordinatorRuntimeParams params = duty.run(runtimeParams);
CoordinatorRunStats stats = params.getCoordinatorStats();
Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD));
Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP));
Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.LOAD_RATE_KBPS));
}
}

View File

@ -1880,7 +1880,7 @@ public class CompactSegmentsTest
)
{
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDataSourcesSnapshot(dataSources)
.withCompactionConfig(
new DruidCompactionConfig(

View File

@ -175,7 +175,7 @@ public class KillStalePendingSegmentsTest
private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String... datasources)
{
DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
DruidCoordinatorRuntimeParams.Builder builder = DruidCoordinatorRuntimeParams.builder();
// Create a dummy for each of the datasources so that they get added to the timeline
Set<DataSegment> usedSegments = new HashSet<>();

View File

@ -51,6 +51,7 @@ import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
@ -115,7 +116,8 @@ public class KillUnusedSegmentsTest
derbyConnectorRule.metadataTablesConfigSupplier(),
connector,
null,
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
NoopServiceEmitter.instance()
);
sqlSegmentsMetadataManager.start();
@ -130,7 +132,8 @@ public class KillUnusedSegmentsTest
.withBufferPeriod(Duration.standardSeconds(1));
dynamicConfigBuilder = CoordinatorDynamicConfig.builder()
.withKillTaskSlotRatio(1.0);
paramsBuilder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
paramsBuilder = DruidCoordinatorRuntimeParams.builder()
.withUsedSegments(Collections.emptySet());
}
@Test

View File

@ -463,7 +463,7 @@ public class MarkEternityTombstonesAsUnusedTest
.build();
final DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDataSourcesSnapshot(
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
)

View File

@ -90,7 +90,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
.build();
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDataSourcesSnapshot(
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
)

View File

@ -99,7 +99,7 @@ public class RunRulesTest
emitter = new StubServiceEmitter("coordinator", "host");
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
ruleRunner = new RunRules(Set::size);
ruleRunner = new RunRules(Set::size, databaseRuleManager::getRulesWithDefault);
loadQueueManager = new SegmentLoadQueueManager(null, null);
balancerExecutor = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "RunRulesTest-%d"));
}
@ -334,10 +334,9 @@ public class RunRulesTest
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc().minusDays(1))
.builder()
.withDruidCluster(druidCluster)
.withUsedSegments(dataSegments)
.withDatabaseRuleManager(databaseRuleManager);
.withUsedSegments(dataSegments);
}
/**
@ -1014,7 +1013,7 @@ public class RunRulesTest
Assert.assertFalse(stats.hasStat(Stats.Segments.DROPPED));
Assert.assertEquals(2, usedSegments.size());
Assert.assertEquals(usedSegments, params.getUsedSegments());
Assert.assertEquals(usedSegments, params.getUsedSegmentsNewestFirst());
EasyMock.verify(mockPeon);
}

View File

@ -243,7 +243,7 @@ public class UnloadUnusedSegmentsTest
Set<DataSegment> usedSegments = ImmutableSet.of(segment2);
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(
DruidCluster
.builder()
@ -261,10 +261,9 @@ public class UnloadUnusedSegmentsTest
)
.withUsedSegments(usedSegments)
.withBroadcastDatasources(Collections.singleton(broadcastDatasource))
.withDatabaseRuleManager(databaseRuleManager)
.build();
params = new UnloadUnusedSegments(loadQueueManager).run(params);
params = new UnloadUnusedSegments(loadQueueManager, databaseRuleManager::getRulesWithDefault).run(params);
CoordinatorRunStats stats = params.getCoordinatorStats();
// We drop segment1 and broadcast1 from all servers, realtimeSegment is not dropped by the indexer

View File

@ -20,7 +20,6 @@
package org.apache.druid.server.coordinator.rules;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.segment.TestDataSource;
import org.apache.druid.server.coordination.ServerType;
@ -225,7 +224,7 @@ public class BroadcastDistributionRuleTest
{
StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
rule.run(segment, segmentAssigner);
return segmentAssigner.getStats();
return params.getCoordinatorStats();
}
private DruidCoordinatorRuntimeParams makeParamsWithUsedSegments(
@ -234,7 +233,7 @@ public class BroadcastDistributionRuleTest
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(druidCluster)
.withUsedSegments(usedSegments)
.withBalancerStrategy(new RandomBalancerStrategy())

View File

@ -142,7 +142,7 @@ public class LoadRuleTest
)
{
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(druidCluster)
.withBalancerStrategy(balancerStrategy)
.withUsedSegments(usedSegments)
@ -333,7 +333,7 @@ public class LoadRuleTest
DataSegment dataSegment3 = createDataSegment("ds3");
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.builder()
.withDruidCluster(druidCluster)
.withBalancerStrategy(balancerStrategy)
.withUsedSegments(dataSegment1, dataSegment2, dataSegment3)

View File

@ -74,7 +74,7 @@ public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
startSimulation(sim);
runCoordinatorCycle();
// Verify that that replicationThrottleLimit is honored
// Verify that replicationThrottleLimit is honored
verifyValue(Metric.ASSIGNED_COUNT, 2L);
loadQueuedSegments();

View File

@ -20,15 +20,21 @@
package org.apache.druid.server.http;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.List;
public class CoordinatorResourceTest
{
@ -99,4 +105,45 @@ public class CoordinatorResourceTest
);
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testGetStatusOfDuties()
{
final DateTime now = DateTimes.nowUtc();
final DutyGroupStatus dutyGroupStatus = new DutyGroupStatus(
"HistoricalManagementDuties",
Duration.standardMinutes(1),
Collections.singletonList("org.apache.druid.duty.RunRules"),
now.minusMinutes(5),
now,
100L,
500L
);
EasyMock.expect(mock.getStatusOfDuties()).andReturn(
Collections.singletonList(dutyGroupStatus)
).once();
EasyMock.replay(mock);
final Response response = new CoordinatorResource(mock).getStatusOfDuties();
Assert.assertEquals(200, response.getStatus());
final Object payload = response.getEntity();
Assert.assertTrue(payload instanceof CoordinatorDutyStatus);
final List<DutyGroupStatus> observedDutyGroups = ((CoordinatorDutyStatus) payload).getDutyGroups();
Assert.assertEquals(1, observedDutyGroups.size());
final DutyGroupStatus observedStatus = observedDutyGroups.get(0);
Assert.assertEquals("HistoricalManagementDuties", observedStatus.getName());
Assert.assertEquals(Duration.standardMinutes(1), observedStatus.getPeriod());
Assert.assertEquals(
Collections.singletonList("org.apache.druid.duty.RunRules"),
observedStatus.getDutyNames()
);
Assert.assertEquals(now.minusMinutes(5), observedStatus.getLastRunStart());
Assert.assertEquals(now, observedStatus.getLastRunEnd());
Assert.assertEquals(100L, observedStatus.getAvgRuntimeMillis());
Assert.assertEquals(500L, observedStatus.getAvgRunGapMillis());
}
}

View File

@ -24,6 +24,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
public class NoopServiceEmitter extends ServiceEmitter
{
private static final NoopServiceEmitter INSTANCE = new NoopServiceEmitter();
public static NoopServiceEmitter instance()
{
return INSTANCE;
}
public NoopServiceEmitter()
{
super("", "", null);