Optimize overshadowed segments computation (#7595)

* Move the overshadowed segment computation to SQLMetadataSegmentManager's poll

* rename method in MetadataSegmentManager

* Fix tests

* PR comments

* PR comments

* PR comments

* fix indentation

* fix tests

*  fix test

*  add test for SegmentWithOvershadowedStatus serde format

* PR comments

* PR comments

* fix test

* remove snapshot updates outside poll

* PR comments

* PR comments

* PR comments

*  removed unused import
This commit is contained in:
Surekha 2019-06-07 10:15:54 -07:00 committed by Roman Leventov
parent 061b465d34
commit ea752ef562
20 changed files with 511 additions and 172 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
/**
* DataSegment object plus the overshadowed status for the segment. An immutable object.
@ -31,6 +32,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
{
private final boolean overshadowed;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
* enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
* there will be no change in the serialized format.
*/
@JsonUnwrapped
private final DataSegment dataSegment;
@JsonCreator

View File

@ -19,10 +19,14 @@
package org.apache.druid.utils;
import com.google.common.collect.Maps;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -68,6 +72,17 @@ public final class CollectionUtils
};
}
/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
*/
public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{
final Map<K, V2> result = Maps.newHashMapWithExpectedSize(map.size());
map.forEach((k, v) -> result.put(k, valueMapper.apply(v)));
return result;
}
private CollectionUtils()
{
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class SegmentWithOvershadowedStatusTest
{
private static final ObjectMapper mapper = new TestObjectMapper();
private static final int TEST_VERSION = 0x9;
@Before
public void setUp()
{
InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
mapper.setInjectableValues(injectableValues);
}
@Test
public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception
{
final Interval interval = Intervals.of("2011-10-01/2011-10-02");
final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
final DataSegment dataSegment = new DataSegment(
"something",
interval,
"1",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
TEST_VERSION,
1
);
final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false);
final Map<String, Object> objectMap = mapper.readValue(
mapper.writeValueAsString(segment),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
Assert.assertEquals(11, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(interval.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(false, objectMap.get("overshadowed"));
final String json = mapper.writeValueAsString(segment);
final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue(
json,
TestSegmentWithOvershadowedStatus.class
);
Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource());
Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval());
Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion());
Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec());
Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions());
Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics());
Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId());
}
}
/**
* Subclass of DataSegment with overshadowed status
*/
class TestSegmentWithOvershadowedStatus extends DataSegment
{
private final boolean overshadowed;
@JsonCreator
public TestSegmentWithOvershadowedStatus(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> dimensions,
@JsonProperty("metrics")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
List<String> metrics,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("overshadowed") boolean overshadowed
)
{
super(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
binaryVersion,
size
);
this.overshadowed = overshadowed;
}
@JsonProperty
public boolean isOvershadowed()
{
return overshadowed;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
return false;
}
if (!super.equals(o)) {
return false;
}
final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
if (overshadowed != (that.overshadowed)) {
return false;
}
return true;
}
}

View File

@ -384,7 +384,7 @@ public class MaterializedViewSupervisor implements Supervisor
// drop derivative segments which interval equals the interval in toDeleteBaseSegments
for (Interval interval : toDropInterval.keySet()) {
for (DataSegment segment : derivativeSegments.get(interval)) {
segmentManager.removeSegment(segment.getId());
segmentManager.removeSegment(segment.getId().toString());
}
}
// data of the latest interval will be built firstly.

View File

@ -73,7 +73,7 @@ public class SegmentListActionsTest
expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId()));
expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
}
private DataSegment createSegment(Interval interval, String version)

View File

@ -13,6 +13,6 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
"payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"},\"overshadowed\":false}"
"payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}"
}
]

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
* overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
* to return dataSources and overshadowedSegments.
*/
public class DataSourcesSnapshot
{
private final Map<String, ImmutableDruidDataSource> dataSources;
private final ImmutableSet<SegmentId> overshadowedSegments;
public DataSourcesSnapshot(
Map<String, ImmutableDruidDataSource> dataSources
)
{
this.dataSources = dataSources;
this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
}
public Collection<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values();
}
public Map<String, ImmutableDruidDataSource> getDataSourcesMap()
{
return dataSources;
}
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
return dataSources.get(dataSourceName);
}
public ImmutableSet<SegmentId> getOvershadowedSegments()
{
return overshadowedSegments;
}
@Nullable
public Iterable<DataSegment> iterateAllSegmentsInSnapshot()
{
if (dataSources == null) {
return null;
}
return () -> dataSources.values().stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}
/**
* This method builds timelines from all dataSources and finds the overshadowed segments list
*
* @return overshadowed segment Ids list
*/
private List<SegmentId> determineOvershadowedSegments()
{
final List<DataSegment> segments = dataSources.values().stream()
.flatMap(ds -> ds.getSegments().stream())
.collect(Collectors.toList());
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
segments.forEach(segment -> timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final List<SegmentId> overshadowedSegments = new ArrayList<>();
for (DataSegment dataSegment : segments) {
final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
overshadowedSegments.add(dataSegment.getId());
}
}
return overshadowedSegments;
}
}

View File

@ -25,17 +25,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Ordering;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
@ -114,41 +109,6 @@ public class ImmutableDruidDataSource
return totalSizeOfSegments;
}
/**
* This method finds the overshadowed segments from the given segments
*
* @return set of overshadowed segments
*/
public static Set<DataSegment> determineOvershadowedSegments(Iterable<DataSegment> segments)
{
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = buildTimelines(segments);
final Set<DataSegment> overshadowedSegments = new HashSet<>();
for (DataSegment dataSegment : segments) {
final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
overshadowedSegments.add(dataSegment);
}
}
return overshadowedSegments;
}
/**
* Builds a timeline from given segments
*
* @return map of datasource to VersionedIntervalTimeline of segments
*/
private static Map<String, VersionedIntervalTimeline<String, DataSegment>> buildTimelines(
Iterable<DataSegment> segments
)
{
final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
segments.forEach(segment -> timelines
.computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
return timelines;
}
@Override
public String toString()
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.metadata;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@ -28,6 +29,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.List;
import java.util.Set;
/**
*/
@ -57,14 +59,9 @@ public interface MetadataSegmentManager
boolean removeDataSource(String dataSource);
/**
* Prefer {@link #removeSegment(SegmentId)} to this method when possible.
*
* This method is not removed because {@link org.apache.druid.server.http.DataSourcesResource#deleteDatasourceSegment}
* uses it and if it migrates to {@link #removeSegment(SegmentId)} the performance will be worse.
* Removes the given segmentId from metadata store. Returns true if one or more rows were affected.
*/
boolean removeSegment(String dataSource, String segmentId);
boolean removeSegment(SegmentId segmentId);
boolean removeSegment(String segmentId);
long disableSegments(String dataSource, Collection<String> segmentIds);
@ -98,6 +95,22 @@ public interface MetadataSegmentManager
Collection<String> getAllDataSourceNames();
/**
* Returns a set of overshadowed segment Ids
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
Set<SegmentId> getOvershadowedSegments();
/**
* Returns a snapshot of DruidDataSources and overshadowed segments
*
*/
@Nullable
DataSourcesSnapshot getDataSourcesSnapshot();
/**
* Returns top N unused segment intervals in given interval when ordered by segment start time, end time.
*/

View File

@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.guice.ManageLifecycle;
@ -40,6 +41,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.skife.jdbi.v2.BaseResultSetMapper;
@ -65,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -103,11 +106,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final SQLMetadataConnector connector;
// Volatile since this reference is reassigned in "poll" and then read from in other threads.
// Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map).
// Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and
// empty overshadowedSegments set).
// Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between
// null and nonnull multiple times as stop() and start() are called.
@Nullable
private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
/**
* The number of times this SQLMetadataSegmentManager was started.
@ -206,8 +210,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
if (!isStarted()) {
return;
}
dataSources = null;
dataSourcesSnapshot = null;
currentStartOrder = -1;
exec.shutdownNow();
exec = null;
@ -449,8 +452,6 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
).bind("dataSource", dataSource).execute()
);
Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
if (removed == 0) {
return false;
}
@ -463,58 +464,15 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return true;
}
/**
* This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about
* snapshot update. The segment removal will be reflected after next poll cyccle runs.
*/
@Override
public boolean removeSegment(String dataSourceName, final String segmentId)
public boolean removeSegment(String segmentId)
{
try {
final boolean removed = removeSegmentFromTable(segmentId);
// Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
// potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
dataSourceName,
(dsName, dataSource) -> {
for (SegmentId possibleSegmentId : possibleSegmentIds) {
if (dataSource.removeSegment(possibleSegmentId) != null) {
break;
}
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);
return removed;
}
catch (Exception e) {
log.error(e, e.toString());
return false;
}
}
@Override
public boolean removeSegment(SegmentId segmentId)
{
try {
final boolean removed = removeSegmentFromTable(segmentId.toString());
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
segmentId.getDataSource(),
(dsName, dataSource) -> {
dataSource.removeSegment(segmentId);
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);
return removed;
return removeSegmentFromTable(segmentId);
}
catch (Exception e) {
log.error(e, e.toString());
@ -607,37 +565,47 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSourcesMap().get(dataSourceName))
.orElse(null);
return dataSource == null ? null : dataSource;
}
@Override
@Nullable
public Collection<ImmutableDruidDataSource> getDataSources()
{
return Optional.ofNullable(dataSources)
.map(m ->
m.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList())
)
.orElse(null);
return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
}
@Override
@Nullable
public Iterable<DataSegment> iterateAllSegments()
{
final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
if (dataSourcesSnapshot == null) {
final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSources())
.orElse(null);
if (dataSources == null) {
return null;
}
return () -> dataSourcesSnapshot.values()
.stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
return () -> dataSources.stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}
@Override
@Nullable
public Set<SegmentId> getOvershadowedSegments()
{
return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null);
}
@Nullable
@Override
public DataSourcesSnapshot getDataSourcesSnapshot()
{
return dataSourcesSnapshot;
}
@Override
@ -742,14 +710,26 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
.addSegmentIfAbsent(segment);
});
// Replace "dataSources" atomically.
dataSources = newDataSources;
// dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled
// outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle.
// DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the
// snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove
// calls in rapid succession. So the snapshot update is not done outside of poll at this time.
// Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of
// a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be
// added back in removeDataSource and removeSegment methods after the on-demand polling changes from
// https://github.com/apache/incubator-druid/pull/7653 are in.
final Map<String, ImmutableDruidDataSource> updatedDataSources = CollectionUtils.mapValues(
newDataSources,
v -> v.toImmutableDruidDataSource()
);
dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources);
}
/**
* For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
* (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
* existing segments each time, and then replace them in {@link #dataSources}. This method allows to use already
* existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already
* existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
* com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after
* they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and
@ -757,7 +737,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSourcesMap().get(segment.getDataSource()))
.orElse(null);
if (dataSource == null) {
return segment;
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@ -29,6 +30,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
@ -82,6 +84,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -145,6 +148,10 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
/**
* set in {@link CoordinatorRunnable#run()} at start of every coordinator run
*/
private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
@Inject
public DruidCoordinator(
@ -316,7 +323,9 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
final Map<String, Double> loadStatus = new HashMap<>();
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSources())
.orElse(null);
if (dataSources == null) {
return loadStatus;
@ -365,7 +374,7 @@ public class DruidCoordinator
public void removeSegment(DataSegment segment)
{
log.info("Removing Segment[%s]", segment.getId());
metadataSegmentManager.removeSegment(segment.getId());
metadataSegmentManager.removeSegment(segment.getId().toString());
}
public String getCurrentLeader()
@ -373,6 +382,12 @@ public class DruidCoordinator
return coordLeaderSelector.getCurrentLeader();
}
@VisibleForTesting
void setDataSourcesSnapshotForTest(DataSourcesSnapshot snapshot)
{
dataSourcesSnapshot = snapshot;
}
public void moveSegment(
ImmutableDruidServer fromServer,
ImmutableDruidServer toServer,
@ -393,7 +408,9 @@ public class DruidCoordinator
throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName());
}
ImmutableDruidDataSource dataSource = metadataSegmentManager.getDataSource(segment.getDataSource());
ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSource(segment.getDataSource()))
.orElse(null);
if (dataSource == null) {
throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId);
}
@ -483,7 +500,10 @@ public class DruidCoordinator
@Nullable
public Iterable<DataSegment> iterateAvailableDataSegments()
{
return metadataSegmentManager.iterateAllSegments();
final Iterable<DataSegment> dataSources = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.iterateAllSegmentsInSnapshot())
.orElse(null);
return dataSources == null ? null : dataSources;
}
@LifecycleStart
@ -670,7 +690,11 @@ public class DruidCoordinator
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
// Do coordinator stuff.
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot();
final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
.map(m -> m.getDataSources())
.orElse(null);
if (dataSources == null) {
log.info("Metadata store not polled yet, skipping this run.");
return;
@ -684,6 +708,7 @@ public class DruidCoordinator
.withCompactionConfig(getCompactionConfig())
.withEmitter(emitter)
.withBalancerStrategy(balancerStrategy)
.withDataSourcesSnapshot(dataSourcesSnapshot)
.build();
for (DruidCoordinatorHelper helper : helpers) {
// Don't read state and run state in the same helper otherwise racy conditions may exist

View File

@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -66,6 +67,7 @@ public class DruidCoordinatorRuntimeParams
private final CoordinatorStats stats;
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;
private final DataSourcesSnapshot dataSourcesSnapshot;
private DruidCoordinatorRuntimeParams(
long startTime,
@ -81,7 +83,8 @@ public class DruidCoordinatorRuntimeParams
CoordinatorCompactionConfig coordinatorCompactionConfig,
CoordinatorStats stats,
DateTime balancerReferenceTimestamp,
BalancerStrategy balancerStrategy
BalancerStrategy balancerStrategy,
DataSourcesSnapshot dataSourcesSnapshot
)
{
this.startTime = startTime;
@ -98,6 +101,7 @@ public class DruidCoordinatorRuntimeParams
this.stats = stats;
this.balancerReferenceTimestamp = balancerReferenceTimestamp;
this.balancerStrategy = balancerStrategy;
this.dataSourcesSnapshot = dataSourcesSnapshot;
}
public long getStartTime()
@ -171,6 +175,11 @@ public class DruidCoordinatorRuntimeParams
return balancerStrategy;
}
public DataSourcesSnapshot getDataSourcesSnapshot()
{
return dataSourcesSnapshot;
}
public boolean hasDeletionWaitTimeElapsed()
{
return (System.currentTimeMillis() - getStartTime() > coordinatorDynamicConfig.getMillisToWaitBeforeDeleting());
@ -237,6 +246,7 @@ public class DruidCoordinatorRuntimeParams
private CoordinatorStats stats;
private DateTime balancerReferenceTimestamp;
private BalancerStrategy balancerStrategy;
private DataSourcesSnapshot dataSourcesSnapshot;
Builder()
{
@ -253,6 +263,7 @@ public class DruidCoordinatorRuntimeParams
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
this.balancerReferenceTimestamp = DateTimes.nowUtc();
this.dataSourcesSnapshot = null;
}
Builder(
@ -304,7 +315,8 @@ public class DruidCoordinatorRuntimeParams
coordinatorCompactionConfig,
stats,
balancerReferenceTimestamp,
balancerStrategy
balancerStrategy,
dataSourcesSnapshot
);
}
@ -434,5 +446,11 @@ public class DruidCoordinatorRuntimeParams
this.balancerStrategy = balancerStrategy;
return this;
}
public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
{
this.dataSourcesSnapshot = snapshot;
return this;
}
}
}

View File

@ -19,8 +19,9 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager;
@ -35,6 +36,7 @@ import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Optional;
import java.util.Set;
/**
@ -84,8 +86,14 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
// find available segments which are not overshadowed by other segments in DB
// only those would need to be loaded/dropped
// anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
final Set<DataSegment> overshadowed = ImmutableDruidDataSource
.determineOvershadowedSegments(params.getAvailableSegments());
// If metadata store hasn't been polled yet, use empty overshadowed list
final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot();
Set<SegmentId> overshadowed = ImmutableSet.of();
if (dataSourcesSnapshot != null) {
overshadowed = Optional
.ofNullable(dataSourcesSnapshot.getOvershadowedSegments())
.orElse(ImmutableSet.of());
}
for (String tier : cluster.getTierNames()) {
replicatorThrottler.updateReplicationState(tier);
@ -103,7 +111,7 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;
for (DataSegment segment : params.getAvailableSegments()) {
if (overshadowed.contains(segment)) {
if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments
continue;
}

View File

@ -434,7 +434,7 @@ public class DataSourcesResource
@PathParam("segmentId") String segmentId
)
{
if (databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
if (databaseSegmentManager.removeSegment(segmentId)) {
return Response.ok().build();
}
return Response.noContent().build();

View File

@ -52,7 +52,6 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@ -163,11 +162,11 @@ public class MetadataResource
final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream());
if (includeOvershadowedStatus != null) {
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus(
req,
druidDataSources,
metadataSegments
);
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments =
findAuthorizedSegmentWithOvershadowedStatus(
req,
metadataSegments
);
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
return builder.entity(authorizedSegments).build();
} else {
@ -189,22 +188,18 @@ public class MetadataResource
private Iterable<SegmentWithOvershadowedStatus> findAuthorizedSegmentWithOvershadowedStatus(
HttpServletRequest req,
Collection<ImmutableDruidDataSource> druidDataSources,
Stream<DataSegment> metadataSegments
)
{
// It's fine to add all overshadowed segments to a single collection because only
// a small fraction of the segments in the cluster are expected to be overshadowed,
// so building this collection shouldn't generate a lot of garbage.
final Set<DataSegment> overshadowedSegments = new HashSet<>();
for (ImmutableDruidDataSource dataSource : druidDataSources) {
overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments()));
}
// If metadata store hasn't been polled yet, use empty overshadowed list
final Set<SegmentId> overshadowedSegments = Optional
.ofNullable(metadataSegmentManager.getOvershadowedSegments())
.orElse(Collections.emptySet());
final Stream<SegmentWithOvershadowedStatus> segmentsWithOvershadowedStatus = metadataSegments
.map(segment -> new SegmentWithOvershadowedStatus(
segment,
overshadowedSegments.contains(segment)
overshadowedSegments.contains(segment.getId())
));
final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections

View File

@ -294,7 +294,7 @@ public class SQLMetadataSegmentManagerTest
publisher.publishSegment(newSegment);
Assert.assertNull(manager.getDataSource(newDataSource));
Assert.assertTrue(manager.removeSegment(newSegment.getId()));
Assert.assertTrue(manager.removeSegment(newSegment.getId().toString()));
}
@Test

View File

@ -32,6 +32,7 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.BatchServerInventoryView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
import org.apache.druid.client.CoordinatorServerView;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.common.config.JacksonConfigManager;
@ -96,6 +97,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
private DataSourcesSnapshot dataSourcesSnapshot;
private static final String SEGPATH = "/druid/segments";
private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
@ -127,6 +129,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@ -365,6 +368,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
EasyMock.replay(databaseSegmentManager);
coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
EasyMock.replay(dataSourcesSnapshot);
coordinator.moveSegment(
source.toImmutableDruidServer(),
dest.toImmutableDruidServer(),

View File

@ -20,9 +20,11 @@
package org.apache.druid.server.coordinator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@ -30,6 +32,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.metadata.MetadataRuleManager;
import org.apache.druid.metadata.MetadataSegmentManager;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
@ -67,6 +70,8 @@ public class DruidCoordinatorRuleRunnerTest
private DruidCoordinatorRuleRunner ruleRunner;
private ServiceEmitter emitter;
private MetadataRuleManager databaseRuleManager;
private MetadataSegmentManager databaseSegmentManager;
private DataSourcesSnapshot dataSourcesSnapshot;
@Before
public void setUp()
@ -76,6 +81,8 @@ public class DruidCoordinatorRuleRunnerTest
emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
DateTime start = DateTimes.of("2012-01-01");
availableSegments = new ArrayList<>();
@ -989,7 +996,9 @@ public class DruidCoordinatorRuleRunnerTest
@Test
public void testReplicantThrottle()
{
mockCoordinator();
EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot);
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();
@ -1114,6 +1123,8 @@ public class DruidCoordinatorRuleRunnerTest
.build()
)
.atLeastOnce();
EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
EasyMock.replay(dataSourcesSnapshot);
coordinator.removeSegment(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(coordinator);
@ -1330,8 +1341,9 @@ public class DruidCoordinatorRuleRunnerTest
);
availableSegments.add(v1);
availableSegments.add(v2);
mockCoordinator();
EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes();
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
EasyMock.replay(coordinator, dataSourcesSnapshot);
mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject());
EasyMock.expectLastCall().once();
mockEmptyPeon();
@ -1375,6 +1387,7 @@ public class DruidCoordinatorRuleRunnerTest
.withBalancerStrategy(balancerStrategy)
.withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
.withDataSourcesSnapshot(dataSourcesSnapshot)
.build();
DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);

View File

@ -29,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidDataSource;
@ -99,6 +100,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private DataSourcesSnapshot dataSourcesSnapshot;
@Before
public void setUp() throws Exception
@ -106,8 +108,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes();
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(databaseSegmentManager);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@ -245,8 +251,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment);
EasyMock.replay(druidDataSource);
EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
EasyMock.replay(databaseSegmentManager);
coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
EasyMock.replay(dataSourcesSnapshot);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(scheduledExecutorFactory);
EasyMock.replay(metadataRuleManager);
@ -530,20 +537,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
{
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock
.expect(databaseSegmentManager.iterateAllSegments())
.expect(dataSourcesSnapshot.iterateAllSegmentsInSnapshot())
.andReturn(dataSource.getSegments())
.anyTimes();
EasyMock
.expect(databaseSegmentManager.getDataSources())
.expect(dataSourcesSnapshot.getDataSources())
.andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
.anyTimes();
EasyMock
.expect(databaseSegmentManager.getAllDataSourceNames())
.andReturn(Collections.singleton(dataSource.getName()))
.anyTimes();
EasyMock.replay(databaseSegmentManager);
EasyMock.replay(dataSourcesSnapshot);
}
@Nullable

View File

@ -364,12 +364,13 @@ public class SystemSchema extends AbstractSchema
final AuthenticationResult authenticationResult =
(AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
authenticationResult,
() -> it,
SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
authorizerMapper
);
final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils
.filterAuthorizedResources(
authenticationResult,
() -> it,
SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
authorizerMapper
);
return authorizedSegments.iterator();
}