mirror of
https://github.com/apache/druid.git
synced 2025-02-17 07:25:02 +00:00
Improve concurrency of SegmentManager (#4298)
* Improve concurrency of SegmentManager * Fix SegmentManager and add more tests * Add more tests * Add null check to TimelineEntry * Remove empty data source and check null in getTimeline() * Add a comment for returning null in compute() * Make SegmentManager LazySingleton
This commit is contained in:
parent
f97c49ba0e
commit
b578adacae
@ -19,13 +19,14 @@
|
||||
|
||||
package io.druid.timeline;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import io.druid.java.util.common.guava.Comparators;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.java.util.common.guava.Comparators;
|
||||
import io.druid.timeline.partition.ImmutablePartitionHolder;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
import io.druid.timeline.partition.PartitionHolder;
|
||||
@ -37,6 +38,7 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
@ -89,6 +91,12 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
|
||||
return timeline;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries()
|
||||
{
|
||||
return allTimelineEntries;
|
||||
}
|
||||
|
||||
public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
|
||||
{
|
||||
try {
|
||||
@ -552,9 +560,9 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
|
||||
|
||||
public TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder<ObjectType> partitionHolder)
|
||||
{
|
||||
this.trueInterval = trueInterval;
|
||||
this.version = version;
|
||||
this.partitionHolder = partitionHolder;
|
||||
this.trueInterval = Preconditions.checkNotNull(trueInterval);
|
||||
this.version = Preconditions.checkNotNull(version);
|
||||
this.partitionHolder = Preconditions.checkNotNull(partitionHolder);
|
||||
}
|
||||
|
||||
public Interval getTrueInterval()
|
||||
@ -571,5 +579,39 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
|
||||
{
|
||||
return partitionHolder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final TimelineEntry that = (TimelineEntry) o;
|
||||
|
||||
if (!this.trueInterval.equals(that.trueInterval)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.version.equals(that.version)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.partitionHolder.equals(that.partitionHolder)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(trueInterval, version, partitionHolder);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,10 +19,11 @@
|
||||
|
||||
package io.druid.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
import io.druid.collections.CountingMap;
|
||||
import io.druid.common.guava.SettableSupplier;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.loading.SegmentLoader;
|
||||
@ -32,19 +33,66 @@ import io.druid.timeline.VersionedIntervalTimeline;
|
||||
import io.druid.timeline.partition.PartitionChunk;
|
||||
import io.druid.timeline.partition.PartitionHolder;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of
|
||||
* segments. All public methods of this class must be thread-safe.
|
||||
*/
|
||||
public class SegmentManager
|
||||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(SegmentManager.class);
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final SegmentLoader segmentLoader;
|
||||
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources = new HashMap<>();
|
||||
private final CountingMap<String> dataSourceSizes = new CountingMap<>();
|
||||
private final CountingMap<String> dataSourceCounts = new CountingMap<>();
|
||||
private final ConcurrentHashMap<String, DataSourceState> dataSources = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Represent the state of a data source including the timeline, total segment size, and number of segments.
|
||||
*/
|
||||
public static class DataSourceState
|
||||
{
|
||||
private final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline =
|
||||
new VersionedIntervalTimeline<>(Ordering.natural());
|
||||
private long totalSegmentSize;
|
||||
private long numSegments;
|
||||
|
||||
private void addSegment(DataSegment segment)
|
||||
{
|
||||
totalSegmentSize += segment.getSize();
|
||||
numSegments++;
|
||||
}
|
||||
|
||||
private void removeSegment(DataSegment segment)
|
||||
{
|
||||
totalSegmentSize -= segment.getSize();
|
||||
numSegments--;
|
||||
}
|
||||
|
||||
public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline()
|
||||
{
|
||||
return timeline;
|
||||
}
|
||||
|
||||
public long getTotalSegmentSize()
|
||||
{
|
||||
return totalSegmentSize;
|
||||
}
|
||||
|
||||
public long getNumSegments()
|
||||
{
|
||||
return numSegments;
|
||||
}
|
||||
|
||||
public boolean isEmpty()
|
||||
{
|
||||
return numSegments == 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
public SegmentManager(
|
||||
@ -54,18 +102,34 @@ public class SegmentManager
|
||||
this.segmentLoader = segmentLoader;
|
||||
}
|
||||
|
||||
public Map<String, Long> getDataSourceSizes()
|
||||
@VisibleForTesting
|
||||
Map<String, DataSourceState> getDataSources()
|
||||
{
|
||||
synchronized (dataSourceSizes) {
|
||||
return dataSourceSizes.snapshot();
|
||||
}
|
||||
return dataSources;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of dataSource to the total byte size of segments managed by this segmentManager. This method should
|
||||
* be used carefully because the returned map might be different from the actual data source states.
|
||||
*
|
||||
* @return a map of dataSources and their total byte sizes
|
||||
*/
|
||||
public Map<String, Long> getDataSourceSizes()
|
||||
{
|
||||
return dataSources.entrySet().stream()
|
||||
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a map of dataSource to the number of segments managed by this segmentManager. This method should be
|
||||
* carefully because the returned map might be different from the actual data source states.
|
||||
*
|
||||
* @return a map of dataSources and number of segments
|
||||
*/
|
||||
public Map<String, Long> getDataSourceCounts()
|
||||
{
|
||||
synchronized (dataSourceCounts) {
|
||||
return dataSourceCounts.snapshot();
|
||||
}
|
||||
return dataSources.entrySet().stream()
|
||||
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments()));
|
||||
}
|
||||
|
||||
public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException
|
||||
@ -73,11 +137,11 @@ public class SegmentManager
|
||||
return segmentLoader.isSegmentLoaded(segment);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline(String dataSource)
|
||||
{
|
||||
synchronized (lock) {
|
||||
return dataSources.get(dataSource);
|
||||
}
|
||||
final DataSourceState dataSourceState = dataSources.get(dataSource);
|
||||
return dataSourceState == null ? null : dataSourceState.getTimeline();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -93,35 +157,37 @@ public class SegmentManager
|
||||
{
|
||||
final Segment adapter = getAdapter(segment);
|
||||
|
||||
synchronized (lock) {
|
||||
final String dataSource = segment.getDataSource();
|
||||
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.computeIfAbsent(
|
||||
dataSource,
|
||||
k -> new VersionedIntervalTimeline<>(Ordering.natural())
|
||||
);
|
||||
final SettableSupplier<Boolean> resultSupplier = new SettableSupplier<>();
|
||||
|
||||
final PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
|
||||
segment.getInterval(),
|
||||
segment.getVersion()
|
||||
);
|
||||
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
|
||||
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
|
||||
return false;
|
||||
}
|
||||
// compute() is used to ensure that the operation for a data source is executed atomically
|
||||
dataSources.compute(
|
||||
segment.getDataSource(),
|
||||
(k, v) -> {
|
||||
final DataSourceState dataSourceState = v == null ? new DataSourceState() : v;
|
||||
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals =
|
||||
dataSourceState.getTimeline();
|
||||
final PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
|
||||
segment.getInterval(),
|
||||
segment.getVersion()
|
||||
);
|
||||
|
||||
loadedIntervals.add(
|
||||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
|
||||
);
|
||||
synchronized (dataSourceSizes) {
|
||||
dataSourceSizes.add(dataSource, segment.getSize());
|
||||
}
|
||||
synchronized (dataSourceCounts) {
|
||||
dataSourceCounts.add(dataSource, 1L);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
|
||||
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
|
||||
resultSupplier.set(false);
|
||||
} else {
|
||||
loadedIntervals.add(
|
||||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
|
||||
);
|
||||
dataSourceState.addSegment(segment);
|
||||
resultSupplier.set(true);
|
||||
}
|
||||
return dataSourceState;
|
||||
}
|
||||
);
|
||||
|
||||
return resultSupplier.get();
|
||||
}
|
||||
|
||||
private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException
|
||||
@ -148,49 +214,53 @@ public class SegmentManager
|
||||
|
||||
public void dropSegment(final DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
String dataSource = segment.getDataSource();
|
||||
synchronized (lock) {
|
||||
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);
|
||||
final String dataSource = segment.getDataSource();
|
||||
|
||||
if (loadedIntervals == null) {
|
||||
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
|
||||
return;
|
||||
}
|
||||
// compute() is used to ensure that the operation for a data source is executed atomically
|
||||
dataSources.compute(
|
||||
dataSource,
|
||||
(dataSourceName, dataSourceState) -> {
|
||||
if (dataSourceState == null) {
|
||||
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSourceName);
|
||||
} else {
|
||||
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals =
|
||||
dataSourceState.getTimeline();
|
||||
|
||||
PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
|
||||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(null)
|
||||
);
|
||||
ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
|
||||
final PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
|
||||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(null)
|
||||
);
|
||||
final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();
|
||||
|
||||
if (oldQueryable != null) {
|
||||
synchronized (dataSourceSizes) {
|
||||
dataSourceSizes.add(dataSource, -segment.getSize());
|
||||
}
|
||||
synchronized (dataSourceCounts) {
|
||||
dataSourceCounts.add(dataSource, -1L);
|
||||
}
|
||||
if (oldQueryable != null) {
|
||||
dataSourceState.removeSegment(segment);
|
||||
|
||||
try {
|
||||
log.info("Attempting to close segment %s", segment.getIdentifier());
|
||||
oldQueryable.close();
|
||||
try {
|
||||
log.info("Attempting to close segment %s", segment.getIdentifier());
|
||||
oldQueryable.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Exception closing segment")
|
||||
.addData("dataSource", dataSourceName)
|
||||
.addData("segmentId", segment.getIdentifier())
|
||||
.emit();
|
||||
}
|
||||
} else {
|
||||
log.info(
|
||||
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
|
||||
dataSourceName,
|
||||
segment.getInterval(),
|
||||
segment.getVersion()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Returning null removes the entry of dataSource from the map
|
||||
return dataSourceState == null || dataSourceState.isEmpty() ? null : dataSourceState;
|
||||
}
|
||||
catch (IOException e) {
|
||||
log.makeAlert(e, "Exception closing segment")
|
||||
.addData("dataSource", dataSource)
|
||||
.addData("segmentId", segment.getIdentifier())
|
||||
.emit();
|
||||
}
|
||||
} else {
|
||||
log.info(
|
||||
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
|
||||
dataSource,
|
||||
segment.getInterval(),
|
||||
segment.getVersion()
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
segmentLoader.cleanup(segment);
|
||||
}
|
||||
}
|
||||
|
439
server/src/test/java/io/druid/server/SegmentManagerTest.java
Normal file
439
server/src/test/java/io/druid/server/SegmentManagerTest.java
Normal file
@ -0,0 +1,439 @@
|
||||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.server;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import io.druid.java.util.common.MapUtils;
|
||||
import io.druid.segment.AbstractSegment;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.ReferenceCountingSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.loading.SegmentLoader;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.server.SegmentManager.DataSourceState;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.VersionedIntervalTimeline;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SegmentManagerTest
|
||||
{
|
||||
private static final SegmentLoader segmentLoader = new SegmentLoader()
|
||||
{
|
||||
@Override
|
||||
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Segment getSegment(final DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
return new SegmentForTesting(
|
||||
MapUtils.getString(segment.getLoadSpec(), "version"),
|
||||
(Interval) segment.getLoadSpec().get("interval")
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
private static class SegmentForTesting extends AbstractSegment
|
||||
{
|
||||
private final String version;
|
||||
private final Interval interval;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
SegmentForTesting(
|
||||
String version,
|
||||
Interval interval
|
||||
)
|
||||
{
|
||||
this.version = version;
|
||||
this.interval = interval;
|
||||
}
|
||||
|
||||
public String getVersion()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
public Interval getInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIdentifier()
|
||||
{
|
||||
return version;
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Interval getDataInterval()
|
||||
{
|
||||
return interval;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryableIndex asQueryableIndex()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageAdapter asStorageAdapter()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
private static final List<DataSegment> segments = ImmutableList.of(
|
||||
new DataSegment(
|
||||
"small_source",
|
||||
new Interval("0/1000"),
|
||||
"0",
|
||||
ImmutableMap.of("interval", new Interval("0/1000"), "version", 0),
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
10
|
||||
),
|
||||
new DataSegment(
|
||||
"small_source",
|
||||
new Interval("1000/2000"),
|
||||
"0",
|
||||
ImmutableMap.of("interval", new Interval("1000/2000"), "version", 0),
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
10
|
||||
),
|
||||
new DataSegment(
|
||||
"large_source",
|
||||
new Interval("0/1000"),
|
||||
"0",
|
||||
ImmutableMap.of("interval", new Interval("0/1000"), "version", 0),
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
100
|
||||
),
|
||||
new DataSegment(
|
||||
"large_source",
|
||||
new Interval("1000/2000"),
|
||||
"0",
|
||||
ImmutableMap.of("interval", new Interval("1000/2000"), "version", 0),
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
NoneShardSpec.instance(),
|
||||
0,
|
||||
100
|
||||
),
|
||||
// overshadowing the ahead segment
|
||||
new DataSegment(
|
||||
"large_source",
|
||||
new Interval("1000/2000"),
|
||||
"1",
|
||||
ImmutableMap.of("interval", new Interval("1000/2000"), "version", 1),
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
NoneShardSpec.instance(),
|
||||
1,
|
||||
100
|
||||
)
|
||||
);
|
||||
|
||||
private ExecutorService executor;
|
||||
private SegmentManager segmentManager;
|
||||
|
||||
@Before
|
||||
public void setup()
|
||||
{
|
||||
segmentManager = new SegmentManager(segmentLoader);
|
||||
executor = Executors.newFixedThreadPool(segments.size());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown()
|
||||
{
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadSegment() throws ExecutionException, InterruptedException, SegmentLoadingException
|
||||
{
|
||||
final List<Future<Boolean>> futures = segments.stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Future<Boolean> eachFuture : futures) {
|
||||
Assert.assertTrue(eachFuture.get());
|
||||
}
|
||||
|
||||
assertResult(segments);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
for (DataSegment eachSegment : segments) {
|
||||
Assert.assertTrue(segmentManager.loadSegment(eachSegment));
|
||||
}
|
||||
|
||||
final List<Future<Void>> futures = ImmutableList.of(segments.get(0), segments.get(2)).stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> {
|
||||
segmentManager.dropSegment(segment);
|
||||
return (Void) null;
|
||||
}
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Future<Void> eachFuture : futures) {
|
||||
eachFuture.get();
|
||||
}
|
||||
|
||||
assertResult(
|
||||
ImmutableList.of(segments.get(1), segments.get(3), segments.get(4))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
Assert.assertTrue(segmentManager.loadSegment(segments.get(0)));
|
||||
Assert.assertTrue(segmentManager.loadSegment(segments.get(2)));
|
||||
|
||||
final List<Future<Boolean>> loadFutures = ImmutableList.of(segments.get(1), segments.get(3), segments.get(4))
|
||||
.stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
final List<Future<Void>> dropFutures = ImmutableList.of(segments.get(0), segments.get(2)).stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> {
|
||||
segmentManager.dropSegment(segment);
|
||||
return (Void) null;
|
||||
}
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Future<Boolean> eachFuture : loadFutures) {
|
||||
Assert.assertTrue(eachFuture.get());
|
||||
}
|
||||
for (Future<Void> eachFuture : dropFutures) {
|
||||
eachFuture.get();
|
||||
}
|
||||
|
||||
assertResult(
|
||||
ImmutableList.of(segments.get(1), segments.get(3), segments.get(4))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException
|
||||
{
|
||||
for (DataSegment segment : segments) {
|
||||
Assert.assertTrue(segmentManager.loadSegment(segment));
|
||||
}
|
||||
// try to load an existing segment
|
||||
Assert.assertFalse(segmentManager.loadSegment(segments.get(0)));
|
||||
|
||||
assertResult(segments);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadDuplicatedSegmentsInParallel()
|
||||
throws ExecutionException, InterruptedException, SegmentLoadingException
|
||||
{
|
||||
final List<Future<Boolean>> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> segmentManager.loadSegment(segment)
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
int numSucceededFutures = 0;
|
||||
int numFailedFutures = 0;
|
||||
for (Future<Boolean> future : futures) {
|
||||
numSucceededFutures += future.get() ? 1 : 0;
|
||||
numFailedFutures += future.get() ? 0 : 1;
|
||||
}
|
||||
|
||||
Assert.assertEquals(1, numSucceededFutures);
|
||||
Assert.assertEquals(2, numFailedFutures);
|
||||
|
||||
assertResult(ImmutableList.of(segments.get(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonExistingSegmentsSequentially() throws SegmentLoadingException
|
||||
{
|
||||
Assert.assertTrue(segmentManager.loadSegment(segments.get(0)));
|
||||
|
||||
// try to drop a non-existing segment of different data source
|
||||
segmentManager.dropSegment(segments.get(2));
|
||||
assertResult(
|
||||
ImmutableList.of(segments.get(0))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonExistingSegmentsInParallel()
|
||||
throws SegmentLoadingException, ExecutionException, InterruptedException
|
||||
{
|
||||
segmentManager.loadSegment(segments.get(0));
|
||||
final List<Future<Void>> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream()
|
||||
.map(
|
||||
segment -> executor.submit(
|
||||
() -> {
|
||||
segmentManager.dropSegment(segment);
|
||||
return (Void) null;
|
||||
}
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
for (Future<Void> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
|
||||
assertResult(ImmutableList.of(segments.get(0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveEmptyTimeline() throws SegmentLoadingException
|
||||
{
|
||||
segmentManager.loadSegment(segments.get(0));
|
||||
assertResult(ImmutableList.of(segments.get(0)));
|
||||
Assert.assertEquals(1, segmentManager.getDataSources().size());
|
||||
segmentManager.dropSegment(segments.get(0));
|
||||
Assert.assertEquals(0, segmentManager.getDataSources().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNonExistingTimeline()
|
||||
{
|
||||
Assert.assertNull(segmentManager.getTimeline("nonExisting"));
|
||||
}
|
||||
|
||||
private void assertResult(List<DataSegment> expectedExistingSegments) throws SegmentLoadingException
|
||||
{
|
||||
final Map<String, Long> expectedDataSourceSizes = expectedExistingSegments.stream()
|
||||
.collect(Collectors.toMap(
|
||||
DataSegment::getDataSource,
|
||||
DataSegment::getSize,
|
||||
Long::sum
|
||||
));
|
||||
final Map<String, Long> expectedDataSourceCounts = expectedExistingSegments.stream()
|
||||
.collect(Collectors.toMap(
|
||||
DataSegment::getDataSource,
|
||||
segment -> 1L,
|
||||
Long::sum
|
||||
));
|
||||
final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> expectedDataSources
|
||||
= new HashMap<>();
|
||||
for (DataSegment segment : expectedExistingSegments) {
|
||||
final VersionedIntervalTimeline<String, ReferenceCountingSegment> expectedTimeline =
|
||||
expectedDataSources.computeIfAbsent(
|
||||
segment.getDataSource(),
|
||||
k -> new VersionedIntervalTimeline<>(Ordering.natural())
|
||||
);
|
||||
expectedTimeline.add(
|
||||
segment.getInterval(),
|
||||
segment.getVersion(),
|
||||
segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment)))
|
||||
);
|
||||
}
|
||||
|
||||
Assert.assertEquals(expectedDataSourceCounts, segmentManager.getDataSourceCounts());
|
||||
Assert.assertEquals(expectedDataSourceSizes, segmentManager.getDataSourceSizes());
|
||||
|
||||
final Map<String, DataSourceState> dataSources = segmentManager.getDataSources();
|
||||
Assert.assertEquals(expectedDataSources.size(), dataSources.size());
|
||||
|
||||
dataSources.forEach(
|
||||
(sourceName, dataSourceState) -> {
|
||||
Assert.assertEquals(expectedDataSourceCounts.get(sourceName).longValue(), dataSourceState.getNumSegments());
|
||||
Assert.assertEquals(expectedDataSourceSizes.get(sourceName).longValue(), dataSourceState.getTotalSegmentSize());
|
||||
Assert.assertEquals(
|
||||
expectedDataSources.get(sourceName).getAllTimelineEntries(),
|
||||
dataSourceState.getTimeline().getAllTimelineEntries()
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Module;
|
||||
import com.google.inject.name.Names;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.druid.client.cache.CacheConfig;
|
||||
import io.druid.client.cache.CacheMonitor;
|
||||
@ -37,15 +36,16 @@ import io.druid.guice.NodeTypeConfig;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.QuerySegmentWalker;
|
||||
import io.druid.query.lookup.LookupModule;
|
||||
import io.druid.server.coordination.ServerType;
|
||||
import io.druid.server.http.SegmentListerResource;
|
||||
import io.druid.server.metrics.QueryCountStatsProvider;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.SegmentManager;
|
||||
import io.druid.server.coordination.ServerManager;
|
||||
import io.druid.server.coordination.ServerType;
|
||||
import io.druid.server.coordination.ZkCoordinator;
|
||||
import io.druid.server.http.HistoricalResource;
|
||||
import io.druid.server.http.SegmentListerResource;
|
||||
import io.druid.server.initialization.jetty.JettyServerInitializer;
|
||||
import io.druid.server.metrics.MetricsModule;
|
||||
import io.druid.server.metrics.QueryCountStatsProvider;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
|
||||
import java.util.List;
|
||||
@ -80,6 +80,7 @@ public class CliHistorical extends ServerRunnable
|
||||
// register Server before binding ZkCoordinator to ensure HTTP endpoints are available immediately
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
binder.bind(ServerManager.class).in(LazySingleton.class);
|
||||
binder.bind(SegmentManager.class).in(LazySingleton.class);
|
||||
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
|
||||
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
|
||||
|
||||
|
@ -40,6 +40,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierC
|
||||
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
|
||||
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
|
||||
import io.druid.server.QueryResource;
|
||||
import io.druid.server.SegmentManager;
|
||||
import io.druid.server.coordination.ServerType;
|
||||
import io.druid.server.coordination.ZkCoordinator;
|
||||
import io.druid.server.http.SegmentListerResource;
|
||||
@ -112,6 +113,7 @@ public class RealtimeModule implements Module
|
||||
LifecycleModule.register(binder, QueryResource.class);
|
||||
LifecycleModule.register(binder, Server.class);
|
||||
|
||||
binder.bind(SegmentManager.class).in(LazySingleton.class);
|
||||
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
|
||||
LifecycleModule.register(binder, ZkCoordinator.class);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user