Coordinator: Allow dropping all segments. (#7447)

Removes the coordinator sanity check that prevents it from dropping all
segments. It's useful to get rid of this, since the behavior is
unintuitive for dev/testing clusters where users might regularly want
to drop all their data to get back to a clean slate.

But the sanity check was there for a reason: to prevent a race condition
where the coordinator might drop all segments if it ran before the
first metadata store poll finished. This patch addresses that concern
differently, by allowing methods in MetadataSegmentManager to return
null if a poll has not happened yet, and canceling coordinator runs
in that case.

This patch also makes the "dataSources" reference in
SQLMetadataSegmentManager volatile. I'm not sure why it wasn't volatile
before, but it seems necessary to me: it's not final, and it's dereferenced
from multiple threads without synchronization.
This commit is contained in:
Gian Merlino 2019-04-11 08:45:38 -07:00 committed by Fangjin Yang
parent 4ea37e2614
commit a517f8ce49
9 changed files with 186 additions and 58 deletions

View File

@ -52,8 +52,7 @@ Segments can be automatically loaded and dropped from the cluster based on a set
### Cleaning Up Segments
Each run, the Druid Coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.
Note that if all segments in database are deleted(or marked unused), then Coordinator will not drop anything from the Historicals. This is done to prevent a race condition in which the Coordinator would drop all segments if it started running cleanup before it finished polling the database for available segments for the first time and believed that there were no segments.
Each run, the Druid coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.
### Segment Availability

View File

@ -58,6 +58,13 @@ public interface MetadataSegmentManager
@Nullable
ImmutableDruidDataSource getDataSource(String dataSourceName);
/**
* Returns a collection of known datasources.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
Collection<ImmutableDruidDataSource> getDataSources();
/**
@ -65,7 +72,11 @@ public interface MetadataSegmentManager
* unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
Iterable<DataSegment> iterateAllSegments();
Collection<String> getAllDataSourceNames();

View File

@ -66,6 +66,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -73,6 +74,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
*
*/
@ManageLifecycle
public class SQLMetadataSegmentManager implements MetadataSegmentManager
@ -102,9 +104,14 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;
private ConcurrentHashMap<String, DruidDataSource> dataSources = new ConcurrentHashMap<>();
// Volatile since this reference is reassigned in "poll" and then read from in other threads.
// Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map)
@Nullable
private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
/** The number of times this SQLMetadataSegmentManager was started. */
/**
* The number of times this SQLMetadataSegmentManager was started.
*/
private long startCount = 0;
/**
* Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
@ -200,7 +207,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return;
}
dataSources = new ConcurrentHashMap<>();
dataSources = null;
currentStartOrder = -1;
exec.shutdownNow();
exec = null;
@ -325,7 +332,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
).bind("dataSource", dataSource).execute()
);
dataSources.remove(dataSource);
Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
if (removed == 0) {
return false;
@ -348,18 +355,21 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
// Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
// potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
dataSources.computeIfPresent(
dataSourceName,
(dsName, dataSource) -> {
for (SegmentId possibleSegmentId : possibleSegmentIds) {
if (dataSource.removeSegment(possibleSegmentId) != null) {
break;
}
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
dataSourceName,
(dsName, dataSource) -> {
for (SegmentId possibleSegmentId : possibleSegmentIds) {
if (dataSource.removeSegment(possibleSegmentId) != null) {
break;
}
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);
return removed;
@ -375,14 +385,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
{
try {
final boolean removed = removeSegmentFromTable(segmentId.toString());
dataSources.computeIfPresent(
segmentId.getDataSource(),
(dsName, dataSource) -> {
dataSource.removeSegment(segmentId);
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
segmentId.getDataSource(),
(dsName, dataSource) -> {
dataSource.removeSegment(segmentId);
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);
return removed;
}
@ -422,23 +435,37 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
final DruidDataSource dataSource = dataSources.get(dataSourceName);
final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
}
@Override
@Nullable
public Collection<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList());
return Optional.ofNullable(dataSources)
.map(m ->
m.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList())
)
.orElse(null);
}
@Override
@Nullable
public Iterable<DataSegment> iterateAllSegments()
{
return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
if (dataSourcesSnapshot == null) {
return null;
}
return () -> dataSourcesSnapshot.values()
.stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}
@Override
@ -543,6 +570,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.addSegmentIfAbsent(segment);
});
// Replace "dataSources" atomically.
dataSources = newDataSources;
}
@ -557,7 +585,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
DruidDataSource dataSource = dataSources.get(segment.getDataSource());
DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
if (dataSource == null) {
return segment;
}

View File

@ -74,8 +74,10 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -88,6 +90,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
*
*/
@ManageLifecycle
public class DruidCoordinator
@ -242,7 +245,9 @@ public class DruidCoordinator
return loadManagementPeons;
}
/** @return tier -> { dataSource -> underReplicationCount } map */
/**
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
@ -251,9 +256,15 @@ public class DruidCoordinator
return underReplicationCountsPerDataSourcePerTier;
}
final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();
if (dataSegments == null) {
return underReplicationCountsPerDataSourcePerTier;
}
final DateTime now = DateTimes.nowUtc();
for (final DataSegment segment : iterateAvailableDataSegments()) {
for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
for (final Rule rule : rules) {
@ -285,7 +296,13 @@ public class DruidCoordinator
return retVal;
}
for (DataSegment segment : iterateAvailableDataSegments()) {
final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();
if (dataSegments == null) {
return retVal;
}
for (DataSegment segment : dataSegments) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
@ -298,8 +315,14 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
Map<String, Double> loadStatus = new HashMap<>();
for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getDataSources()) {
final Map<String, Double> loadStatus = new HashMap<>();
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
if (dataSources == null) {
return loadStatus;
}
for (ImmutableDruidDataSource dataSource : dataSources) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int availableSegmentSize = segments.size();
@ -453,7 +476,11 @@ public class DruidCoordinator
* is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
public Iterable<DataSegment> iterateAvailableDataSegments()
{
return metadataSegmentManager.iterateAllSegments();
@ -643,10 +670,16 @@ public class DruidCoordinator
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
if (dataSources == null) {
log.info("Metadata store not polled yet, skipping this run.");
return;
}
DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDataSources(metadataSegmentManager.getDataSources())
.withDataSources(dataSources)
.withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig())
.withEmitter(emitter)
@ -656,6 +689,11 @@ public class DruidCoordinator
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
params = helper.run(params);
if (params == null) {
// This helper wanted to cancel the run. No log message, since the helper should have logged a reason.
return;
}
}
}
}

View File

@ -33,6 +33,7 @@ import java.util.Set;
import java.util.SortedSet;
/**
*
*/
public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
{
@ -45,21 +46,12 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
Set<DataSegment> availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster();
if (availableSegments.isEmpty()) {
log.info(
"Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
"a race condition in which the coordinator would drop all segments if it started running cleanup before " +
"it finished polling the metadata storage for available segments for the first time."
);
return params.buildFromExisting().withCoordinatorStats(stats).build();
}
// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
// not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
// segments at all, we should have all of them.)
// Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It's
// also filled atomically, so if there are any segments at all, we should have all of them.)
//
// Note that if the metadata store has not been polled yet, "getAvailableSegments" would throw an error since
// "availableSegments" is null. But this won't happen, since the earlier helper "DruidCoordinatorSegmentInfoLoader"
// would have canceled the run.
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

View File

@ -21,7 +21,10 @@ package org.apache.druid.server.coordinator.helper;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import javax.annotation.Nullable;
/**
*
*/
public interface DruidCoordinatorHelper
{
@ -29,8 +32,13 @@ public interface DruidCoordinatorHelper
* Implementations of this method run various activities performed by the coordinator.
* Input params can be used and modified. They are typically in a list and returned
* DruidCoordinatorRuntimeParams is passed to the next helper.
*
* @param params
* @return same as input or a modified value to be used by next helper.
*
* @return same as input or a modified value to be used by next helper. Null return
* values will prevent future DruidCoordinatorHelpers from running until the next
* cycle.
*/
@Nullable
DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params);
}

View File

@ -43,6 +43,12 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
{
log.info("Starting coordination. Getting available segments.");
final Iterable<DataSegment> dataSegments = coordinator.iterateAvailableDataSegments();
if (dataSegments == null) {
log.info("Metadata store not polled yet, canceling this run.");
return null;
}
// The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments
// and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to
// Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called
@ -54,7 +60,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
//
//noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047
Iterable<DataSegment> availableSegmentsWithSizeChecking = Iterables.transform(
coordinator.iterateAvailableDataSegments(),
dataSegments,
segment -> {
if (segment.getSize() < 0) {
log.makeAlert("No size on a segment")

View File

@ -55,6 +55,7 @@ import javax.ws.rs.core.StreamingOutput;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@ -96,7 +97,10 @@ public class MetadataResource
@Context final HttpServletRequest req
)
{
final Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
// If we haven't polled the metadata store yet, use an empty list of datasources.
final Collection<ImmutableDruidDataSource> druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources())
.orElse(Collections.emptyList());
final Set<String> dataSourceNamesPreAuth;
if (includeDisabled != null) {
dataSourceNamesPreAuth = new TreeSet<>(metadataSegmentManager.getAllDataSourceNames());
@ -154,7 +158,10 @@ public class MetadataResource
@QueryParam("datasources") final Set<String> datasources
)
{
Collection<ImmutableDruidDataSource> druidDataSources = metadataSegmentManager.getDataSources();
// If we haven't polled the metadata store yet, use an empty list of datasources.
Collection<ImmutableDruidDataSource> druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources())
.orElse(Collections.emptyList());
if (datasources != null && !datasources.isEmpty()) {
druidDataSources = druidDataSources.stream()
.filter(src -> datasources.contains(src.getName()))

View File

@ -39,6 +39,7 @@ import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.stream.Collectors;
public class SQLMetadataSegmentManagerTest
@ -123,10 +124,48 @@ public class SQLMetadataSegmentManagerTest
ImmutableList.of("wikipedia"),
manager.getAllDataSourceNames()
);
Assert.assertEquals(
ImmutableList.of("wikipedia"),
manager.getDataSources().stream().map(d -> d.getName()).collect(Collectors.toList())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.getDataSource("wikipedia").getSegments())
);
Assert.assertEquals(
ImmutableSet.of(segment1, segment2),
ImmutableSet.copyOf(manager.iterateAllSegments())
);
}
@Test
public void testNoPoll()
{
manager.start();
Assert.assertTrue(manager.isStarted());
Assert.assertEquals(
ImmutableList.of("wikipedia"),
manager.getAllDataSourceNames()
);
Assert.assertNull(manager.getDataSources());
Assert.assertNull(manager.getDataSource("wikipedia"));
Assert.assertNull(manager.iterateAllSegments());
}
@Test
public void testPollThenStop()
{
manager.start();
manager.poll();
manager.stop();
Assert.assertFalse(manager.isStarted());
Assert.assertEquals(
ImmutableList.of("wikipedia"),
manager.getAllDataSourceNames()
);
Assert.assertNull(manager.getDataSources());
Assert.assertNull(manager.getDataSource("wikipedia"));
Assert.assertNull(manager.iterateAllSegments());
}
@Test