Refactor `SegmentLoadDropHandler` code (#16685)

Motivation:
- Improve code hygeiene
- Make `SegmentLoadDropHandler` easily extensible

Changes:
- Add `SegmentBootstrapper`
- Move code for bootstrapping segments already cached on disk and fetched from coordinator to
`SegmentBootstrapper`.
- No functional change
- Use separate executor service in `SegmentBootstrapper`
- Bind `SegmentBootstrapper` to `ManageLifecycle` explicitly in `CliBroker`, `CliHistorical` etc.
This commit is contained in:
Abhishek Radhakrishnan 2024-07-07 20:59:55 -07:00 committed by GitHub
parent c6c2652c89
commit bf2be938a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1001 additions and 769 deletions

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import java.util.List;
/**

View File

@ -0,0 +1,439 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Responsible for bootstrapping segments already cached on disk and bootstrap segments fetched from the coordinator.
* Also responsible for announcing the node as a data server if applicable, once the bootstrapping operations
* are complete.
*/
@ManageLifecycle
public class SegmentBootstrapper
{
private final SegmentLoadDropHandler loadDropHandler;
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ServerTypeConfig serverTypeConfig;
private final CoordinatorClient coordinatorClient;
private final ServiceEmitter emitter;
private volatile boolean isComplete = false;
// Synchronizes start/stop of this object.
private final Object startStopLock = new Object();
private static final EmittingLogger log = new EmittingLogger(SegmentBootstrapper.class);
@Inject
public SegmentBootstrapper(
SegmentLoadDropHandler loadDropHandler,
SegmentLoaderConfig config,
DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter
)
{
this.loadDropHandler = loadDropHandler;
this.config = config;
this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.serverTypeConfig = serverTypeConfig;
this.coordinatorClient = coordinatorClient;
this.emitter = emitter;
}
@LifecycleStart
public void start() throws IOException
{
synchronized (startStopLock) {
if (isComplete) {
return;
}
log.info("Starting...");
try {
if (segmentManager.canHandleSegments()) {
loadSegmentsOnStartup();
}
if (shouldAnnounce()) {
serverAnnouncer.announce();
}
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw new RuntimeException(e);
}
isComplete = true;
log.info("Started.");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopLock) {
if (!isComplete) {
return;
}
log.info("Stopping...");
try {
if (shouldAnnounce()) {
serverAnnouncer.unannounce();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
isComplete = false;
}
log.info("Stopped.");
}
}
public boolean isBootstrappingComplete()
{
return isComplete;
}
/**
* Bulk loading of the following segments into the page cache at startup:
* <li> Previously cached segments </li>
* <li> Bootstrap segments from the coordinator </li>
*/
private void loadSegmentsOnStartup() throws IOException
{
final List<DataSegment> segmentsOnStartup = new ArrayList<>();
segmentsOnStartup.addAll(segmentManager.getCachedSegments());
segmentsOnStartup.addAll(getBootstrapSegments());
final Stopwatch stopwatch = Stopwatch.createStarted();
// Start a temporary thread pool to load segments into page cache during bootstrap
final ExecutorService bootstrapExecutor = Execs.multiThreaded(
config.getNumBootstrapThreads(), "Segment-Bootstrap-%s"
);
// Start a temporary scheduled executor for background segment announcing
final ScheduledExecutorService backgroundAnnouncerExecutor = Executors.newScheduledThreadPool(
config.getNumLoadingThreads(), Execs.makeThreadFactory("Background-Segment-Announcer-%s")
);
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(segmentAnnouncer, backgroundAnnouncerExecutor, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final int numSegments = segmentsOnStartup.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segmentsOnStartup) {
bootstrapExecutor.submit(
() -> {
try {
log.info(
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(), numSegments, segment.getId()
);
try {
segmentManager.loadSegmentOnBootstrap(
segment,
() -> loadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP, false)
);
}
catch (Exception e) {
loadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP, false);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId());
}
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getId());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
}
);
}
try {
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("[%,d] errors seen while loading segments on startup", failedSegments.size())
.addData("failedSegments", failedSegments)
.emit();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted").emit();
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments on startup -- likely problem with announcing.")
.addData("numSegments", segmentsOnStartup.size())
.emit();
}
finally {
bootstrapExecutor.shutdownNow();
backgroundAnnouncerExecutor.shutdownNow();
stopwatch.stop();
// At this stage, all tasks have been submitted, send a shutdown command to cleanup any resources alloted
// for the bootstrapping function.
segmentManager.shutdownBootstrap();
log.info("Loaded [%d] segments on startup in [%,d]ms.", segmentsOnStartup.size(), stopwatch.millisElapsed());
}
}
/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
*/
private List<DataSegment> getBootstrapSegments()
{
log.info("Fetching bootstrap segments from the coordinator.");
final Stopwatch stopwatch = Stopwatch.createStarted();
List<DataSegment> bootstrapSegments = new ArrayList<>();
try {
final BootstrapSegmentsResponse response =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
bootstrapSegments = ImmutableList.copyOf(response.getIterator());
}
catch (Exception e) {
log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage());
}
finally {
stopwatch.stop();
final long fetchRunMillis = stopwatch.millisElapsed();
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", fetchRunMillis));
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size()));
log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis);
}
return bootstrapSegments;
}
/**
* Returns whether or not we should announce ourselves as a data server using {@link DataSegmentServerAnnouncer}.
*
* Returns true if _either_:
*
* <li> Our {@link #serverTypeConfig} indicates we are a segment server. This is necessary for Brokers to be able
* to detect that we exist.</li>
* <li> The segment manager is able to handle segments. This is necessary for Coordinators to be able to
* assign segments to us.</li>
*/
private boolean shouldAnnounce()
{
return serverTypeConfig.getServerType().isSegmentServer() || segmentManager.canHandleSegments();
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int announceIntervalMillis;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
@Nullable
private volatile ScheduledFuture startedAnnouncing = null;
@Nullable
private volatile ScheduledFuture nextAnnoucement = null;
BackgroundSegmentAnnouncer(
DataSegmentAnnouncer segmentAnnouncer,
ScheduledExecutorService exec,
int announceIntervalMillis
)
{
this.segmentAnnouncer = segmentAnnouncer;
this.exec = exec;
this.announceIntervalMillis = announceIntervalMillis;
this.queue = new LinkedBlockingQueue<>();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (announceIntervalMillis <= 0) {
log.info("Skipping background segment announcing as announceIntervalMillis is [%d].", announceIntervalMillis);
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
try {
segmentAnnouncer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, announceIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
announceIntervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
// announce any remaining segments
try {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
segmentAnnouncer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background announcing
try {
// check in case intervalMillis is <= 0
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}
}

View File

@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Objects;
/**

View File

@ -26,7 +26,6 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Objects;
/**

View File

@ -20,28 +20,16 @@
package org.apache.druid.server.coordination;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
@ -56,20 +44,13 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*
* Responsible for loading and dropping of segments by a process that can serve segments.
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
@ -79,20 +60,12 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
// Synchronizes removals from segmentsToDelete
private final Object segmentDeleteLock = new Object();
// Synchronizes start/stop of this object.
private final Object startStopLock = new Object();
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
private final CoordinatorClient coordinatorClient;
private final ServiceEmitter emitter;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile boolean started = false;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
// Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up
// again and to return status of a completed request. Maximum size of this cache must be significantly greater
@ -108,25 +81,17 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
public SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter
SegmentManager segmentManager
)
{
this(
config,
announcer,
serverAnnouncer,
segmentManager,
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
),
serverTypeConfig,
coordinatorClient,
emitter
)
);
}
@ -134,83 +99,19 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ScheduledExecutorService exec,
ServerTypeConfig serverTypeConfig,
CoordinatorClient coordinatorClient,
ServiceEmitter emitter
ScheduledExecutorService exec
)
{
this.config = config;
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.exec = exec;
this.serverTypeConfig = serverTypeConfig;
this.coordinatorClient = coordinatorClient;
this.emitter = emitter;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@LifecycleStart
public void start() throws IOException
{
synchronized (startStopLock) {
if (started) {
return;
}
log.info("Starting...");
try {
if (segmentManager.canHandleSegments()) {
loadSegmentsOnStartup();
}
if (shouldAnnounce()) {
serverAnnouncer.announce();
}
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw new RuntimeException(e);
}
started = true;
log.info("Started.");
}
}
@LifecycleStop
public void stop()
{
synchronized (startStopLock) {
if (!started) {
return;
}
log.info("Stopping...");
try {
if (shouldAnnounce()) {
serverAnnouncer.unannounce();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
finally {
started = false;
}
log.info("Stopped.");
}
}
public boolean isStarted()
{
return started;
}
public Map<String, Long> getAverageNumOfRowsPerSegmentForDatasource()
{
return segmentManager.getAverageRowCountForDatasource();
@ -221,132 +122,6 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
return segmentManager.getRowCountDistribution();
}
/**
* Bulk loading of the following segments into the page cache at startup:
* <li> Previously cached segments </li>
* <li> Bootstrap segments from the coordinator </li>
*/
private void loadSegmentsOnStartup() throws IOException
{
final List<DataSegment> segmentsOnStartup = new ArrayList<>();
segmentsOnStartup.addAll(segmentManager.getCachedSegments());
segmentsOnStartup.addAll(getBootstrapSegments());
final Stopwatch stopwatch = Stopwatch.createStarted();
// Start a temporary thread pool to load segments into page cache during bootstrap
final ExecutorService loadingExecutor = Execs.multiThreaded(
config.getNumBootstrapThreads(), "Segment-Load-Startup-%s"
);
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final int numSegments = segmentsOnStartup.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segmentsOnStartup) {
loadingExecutor.submit(
() -> {
try {
log.info(
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(), numSegments, segment.getId()
);
try {
segmentManager.loadSegmentOnBootstrap(
segment,
() -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false)
);
}
catch (Exception e) {
removeSegment(segment, DataSegmentChangeCallback.NOOP, false);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getId());
}
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getId());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
}
);
}
try {
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("[%,d] errors seen while loading segments on startup", failedSegments.size())
.addData("failedSegments", failedSegments)
.emit();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted").emit();
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments on startup -- likely problem with announcing.")
.addData("numSegments", segmentsOnStartup.size())
.emit();
}
finally {
loadingExecutor.shutdownNow();
stopwatch.stop();
// At this stage, all tasks have been submitted, send a shutdown command to cleanup any resources alloted
// for the bootstrapping function.
segmentManager.shutdownBootstrap();
log.info("Loaded [%d] segments on startup in [%,d]ms.", segmentsOnStartup.size(), stopwatch.millisElapsed());
}
}
/**
* @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
*/
private List<DataSegment> getBootstrapSegments()
{
log.info("Fetching bootstrap segments from the coordinator.");
final Stopwatch stopwatch = Stopwatch.createStarted();
List<DataSegment> bootstrapSegments = new ArrayList<>();
try {
final BootstrapSegmentsResponse response =
FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
bootstrapSegments = ImmutableList.copyOf(response.getIterator());
}
catch (Exception e) {
// By default, we "fail open" when there is any error -- finding the coordinator, or if the API endpoint cannot
// be found during rolling upgrades, or even if it's irrecoverable.
log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage());
}
finally {
stopwatch.stop();
final long fetchRunMillis = stopwatch.millisElapsed();
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", fetchRunMillis));
emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size()));
log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis);
}
return bootstrapSegments;
}
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
@ -566,154 +341,6 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
}
}
/**
* Returns whether or not we should announce ourselves as a data server using {@link DataSegmentServerAnnouncer}.
*
* Returns true if _either_:
*
* <li> Our {@link #serverTypeConfig} indicates we are a segment server. This is necessary for Brokers to be able
* to detect that we exist.</li>
* <li> The segment manager is able to handle segments. This is necessary for Coordinators to be able to
* assign segments to us.</li>
*/
private boolean shouldAnnounce()
{
return serverTypeConfig.getServerType().isSegmentServer() || segmentManager.canHandleSegments();
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis;
private final DataSegmentAnnouncer announcer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
@Nullable
private volatile ScheduledFuture startedAnnouncing = null;
@Nullable
private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
)
{
this.announcer = announcer;
this.exec = exec;
this.intervalMillis = intervalMillis;
this.queue = new LinkedBlockingQueue<>();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (intervalMillis <= 0) {
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
// announce any remaining segments
try {
final List<DataSegment> segments = new ArrayList<>();
queue.drainTo(segments);
announcer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background announcing
try {
// check in case intervalMillis is <= 0
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}
// Future with cancel() implementation to remove it from "waitingFutures" list
private class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeResponse>>
{
@ -759,6 +386,5 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
return true;
}
}
}

View File

@ -21,7 +21,7 @@ package org.apache.druid.server.http;
import com.google.common.collect.ImmutableMap;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.inject.Inject;
@ -34,14 +34,14 @@ import javax.ws.rs.core.Response;
@Path("/druid/historical/v1")
public class HistoricalResource
{
private final SegmentLoadDropHandler segmentLoadDropHandler;
private final SegmentBootstrapper segmentBootstrapper;
@Inject
public HistoricalResource(
SegmentLoadDropHandler segmentLoadDropHandler
SegmentBootstrapper segmentBootstrapper
)
{
this.segmentLoadDropHandler = segmentLoadDropHandler;
this.segmentBootstrapper = segmentBootstrapper;
}
@GET
@ -50,14 +50,14 @@ public class HistoricalResource
@Produces(MediaType.APPLICATION_JSON)
public Response getLoadStatus()
{
return Response.ok(ImmutableMap.of("cacheInitialized", segmentLoadDropHandler.isStarted())).build();
return Response.ok(ImmutableMap.of("cacheInitialized", segmentBootstrapper.isBootstrappingComplete())).build();
}
@GET
@Path("/readiness")
public Response getReadiness()
{
if (segmentLoadDropHandler.isStarted()) {
if (segmentBootstrapper.isBootstrappingComplete()) {
return Response.ok().build();
} else {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();

View File

@ -50,10 +50,10 @@ import java.util.Collections;
import java.util.List;
/**
* Similar to {@link SegmentLoadDropHandlerTest}. This class includes tests that cover the
* Similar to {@link SegmentBootstrapperTest}. This class includes tests that cover the
* storage location layer as well.
*/
public class SegmentLoadDropHandlerCacheTest
public class SegmentBootstrapperCacheTest
{
private static final long MAX_SIZE = 1000L;
private static final long SEGMENT_SIZE = 100L;
@ -101,8 +101,8 @@ public class SegmentLoadDropHandlerCacheTest
objectMapper
);
segmentManager = new SegmentManager(cacheManager);
segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
segmentAnnouncer = new TestDataSegmentAnnouncer();
coordinatorClient = new TestCoordinatorClient();
emitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(emitter);
@ -112,10 +112,11 @@ public class SegmentLoadDropHandlerCacheTest
public void testLoadStartStopWithEmptyLocations() throws IOException
{
final List<StorageLocation> emptyLocations = ImmutableList.of();
final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig();
segmentManager = new SegmentManager(
new SegmentLocalCacheManager(
emptyLocations,
new SegmentLoaderConfig(),
loaderConfig,
new LeastBytesUsedStorageLocationSelectorStrategy(emptyLocations),
TestIndex.INDEX_IO,
objectMapper
@ -123,19 +124,26 @@ public class SegmentLoadDropHandlerCacheTest
);
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
new SegmentLoaderConfig(),
loaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
loadDropHandler,
loaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER),
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
emitter
);
loadDropHandler.start();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
loadDropHandler.stop();
bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@ -143,19 +151,26 @@ public class SegmentLoadDropHandlerCacheTest
public void testLoadStartStop() throws IOException
{
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
loaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
loadDropHandler,
loaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.BROKER),
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
emitter
);
loadDropHandler.start();
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
loadDropHandler.stop();
bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@ -176,6 +191,13 @@ public class SegmentLoadDropHandlerCacheTest
}
final SegmentLoadDropHandler loadDropHandler = new SegmentLoadDropHandler(
loaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
loadDropHandler,
loaderConfig,
segmentAnnouncer,
serverAnnouncer,
@ -185,8 +207,7 @@ public class SegmentLoadDropHandlerCacheTest
emitter
);
// Start the load drop handler
loadDropHandler.start();
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
// Verify the expected announcements
@ -202,7 +223,7 @@ public class SegmentLoadDropHandlerCacheTest
loadDropHandler.addSegment(newSegment, null);
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(newSegment));
loadDropHandler.stop();
bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
}

View File

@ -0,0 +1,306 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.druid.server.TestSegmentUtils.makeSegment;
public class SegmentBootstrapperTest
{
private static final int COUNT = 50;
private TestDataSegmentAnnouncer segmentAnnouncer;
private TestDataServerAnnouncer serverAnnouncer;
private SegmentLoaderConfig segmentLoaderConfig;
private TestCoordinatorClient coordinatorClient;
private StubServiceEmitter serviceEmitter;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws IOException
{
final File segmentCacheDir = temporaryFolder.newFolder();
segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return segmentCacheDir;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public List<StorageLocationConfig> getLocations()
{
return Collections.singletonList(
new StorageLocationConfig(segmentCacheDir, null, null)
);
}
};
coordinatorClient = new TestCoordinatorClient();
serviceEmitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
}
@Test
public void testStartStop() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(segments);
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
segmentLoaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
handler,
segmentLoaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache());
bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(1, cacheManager.getObservedShutdownBootstrapCount().get());
}
@Test
public void testLoadCachedSegments() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(segments);
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(segmentLoaderConfig, segmentAnnouncer, segmentManager);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
handler,
segmentLoaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedSegmentsLoadedIntoPageCache());
bootstrapper.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(1, cacheManager.getObservedShutdownBootstrapCount().get());
}
@Test
public void testLoadBootstrapSegments() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments);
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
segmentLoaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
handler,
segmentLoaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size());
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
bootstrapper.stop();
}
@Test
public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
segmentLoaderConfig,
segmentAnnouncer,
segmentManager
);
final SegmentBootstrapper bootstrapper = new SegmentBootstrapper(
handler,
segmentLoaderConfig,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
bootstrapper.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
serviceEmitter.verifyValue("segment/bootstrap/count", 0);
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
bootstrapper.stop();
}
}

View File

@ -20,33 +20,21 @@
package org.apache.druid.server.coordination;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.coordination.SegmentChangeStatus.State;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@ -56,29 +44,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.druid.server.TestSegmentUtils.makeSegment;
public class SegmentLoadDropHandlerTest
{
private static final int COUNT = 50;
private TestDataSegmentAnnouncer segmentAnnouncer;
private TestDataServerAnnouncer serverAnnouncer;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorFactory scheduledExecutorFactory;
private TestCoordinatorClient coordinatorClient;
private StubServiceEmitter serviceEmitter;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -90,7 +69,6 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable = new ArrayList<>();
segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
segmentLoaderConfig = new SegmentLoaderConfig()
{
@Override
@ -140,9 +118,7 @@ public class SegmentLoadDropHandlerTest
};
};
coordinatorClient = new TestCoordinatorClient();
serviceEmitter = new StubServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
EmittingLogger.registerEmitter(new StubServiceEmitter());
}
/**
@ -154,16 +130,12 @@ public class SegmentLoadDropHandlerTest
* </ul>
*/
@Test
public void testSegmentLoading1() throws Exception
public void testSegmentLoading1()
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
handler.removeSegment(segment, DataSegmentChangeCallback.NOOP);
@ -178,19 +150,16 @@ public class SegmentLoadDropHandlerTest
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertEquals(ImmutableList.of(segment), cacheManager.observedSegments);
Assert.assertEquals(ImmutableList.of(segment), cacheManager.observedSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(segment), cacheManager.getObservedSegments());
Assert.assertEquals(ImmutableList.of(segment), cacheManager.getObservedSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(segment), segmentAnnouncer.getObservedSegments());
Assert.assertFalse(
"segment files shouldn't be deleted",
cacheManager.observedSegmentsRemovedFromCache.contains(segment)
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
);
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
/**
@ -203,15 +172,15 @@ public class SegmentLoadDropHandlerTest
* </ul>
*/
@Test
public void testSegmentLoading2() throws Exception
public void testSegmentLoading2()
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
handler.start();
// handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
// Assert.assertEquals(1, serverAnnouncer.getObservedCount());
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
@ -234,176 +203,16 @@ public class SegmentLoadDropHandlerTest
// The same segment reference will be fetched more than once in the above sequence, but the segment should
// be loaded only once onto the page cache.
Assert.assertEquals(ImmutableList.of(segment, segment), cacheManager.observedSegments);
Assert.assertEquals(ImmutableList.of(segment), cacheManager.observedSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(segment, segment), cacheManager.getObservedSegments());
Assert.assertEquals(ImmutableList.of(segment), cacheManager.getObservedSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment));
Assert.assertFalse(
"segment files shouldn't be deleted",
cacheManager.observedSegmentsRemovedFromCache.contains(segment)
cacheManager.getObservedSegmentsRemovedFromCache().contains(segment)
);
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@Test
public void testLoadCache() throws Exception
{
Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(segments);
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegmentsLoadedIntoPageCache);
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}
@Test
public void testLoadBootstrapSegments() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments);
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, coordinatorClient);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments());
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size());
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
handler.stop();
}
@Test
public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
{
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
serviceEmitter.verifyValue("segment/bootstrap/count", 0);
serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
handler.stop();
}
@Test
public void testStartStop() throws Exception
{
final Set<DataSegment> segments = new HashSet<>();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager(segments);
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(ImmutableList.copyOf(segments), segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedBootstrapSegments = ImmutableList.copyOf(segments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedSegmentsLoadedIntoPageCache);
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}
@Test(timeout = 60_000L)
@ -413,10 +222,6 @@ public class SegmentLoadDropHandlerTest
final SegmentManager segmentManager = new SegmentManager(cacheManager);
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
DataSegment segment2 = makeSegment("batchtest2", "1", Intervals.of("P1d/2011-04-01"));
@ -445,13 +250,10 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments());
final ImmutableList<DataSegment> expectedSegments = ImmutableList.of(segment1);
Assert.assertEquals(expectedSegments, cacheManager.observedSegments);
Assert.assertEquals(expectedSegments, cacheManager.observedSegmentsLoadedIntoPageCache);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
Assert.assertEquals(expectedSegments, cacheManager.getObservedSegments());
Assert.assertEquals(expectedSegments, cacheManager.getObservedSegmentsLoadedIntoPageCache());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegments());
Assert.assertEquals(ImmutableList.of(), cacheManager.getObservedBootstrapSegmentsLoadedIntoPageCache());
}
@Test(timeout = 60_000L)
@ -465,9 +267,6 @@ public class SegmentLoadDropHandlerTest
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager);
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
@ -489,8 +288,6 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState());
Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.getObservedSegments());
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
@Test(timeout = 60_000L)
@ -538,13 +335,9 @@ public class SegmentLoadDropHandlerTest
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(
noAnnouncerSegmentLoaderConfig,
segmentManager,
coordinatorClient
segmentManager
);
handler.start();
Assert.assertEquals(1, serverAnnouncer.getObservedCount());
final DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
@ -611,149 +404,23 @@ public class SegmentLoadDropHandlerTest
Mockito.verify(segmentManager, Mockito.times(1))
.dropSegment(ArgumentMatchers.any());
handler.stop();
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager, CoordinatorClient coordinatorClient)
{
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager)
{
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager);
}
private SegmentLoadDropHandler initSegmentLoadDropHandler(
SegmentLoaderConfig config,
SegmentManager segmentManager,
CoordinatorClient coordinatorClient
SegmentManager segmentManager
)
{
return new SegmentLoadDropHandler(
config,
segmentAnnouncer,
serverAnnouncer,
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
new ServerTypeConfig(ServerType.HISTORICAL),
coordinatorClient,
serviceEmitter
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]")
);
}
private DataSegment makeSegment(String dataSource, String version, Interval interval)
{
return TestSegmentUtils.makeSegment(dataSource, version, interval);
}
/**
* A local cache manager to test the bootstrapping and segment add/remove operations. It stubs only the necessary
* methods to support these operations; any other method invoked will throw an exception from the base class,
* {@link NoopSegmentCacheManager}.
*/
private static class TestSegmentCacheManager extends NoopSegmentCacheManager
{
private final List<DataSegment> cachedSegments;
private final List<DataSegment> observedBootstrapSegments;
private final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache;
private final List<DataSegment> observedSegments;
private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
private final List<DataSegment> observedSegmentsRemovedFromCache;
private final AtomicInteger observedShutdownBootstrapCount;
TestSegmentCacheManager()
{
this(ImmutableSet.of());
}
TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
{
this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
this.observedBootstrapSegments = new ArrayList<>();
this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
this.observedSegments = new ArrayList<>();
this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
this.observedSegmentsRemovedFromCache = new ArrayList<>();
this.observedShutdownBootstrapCount = new AtomicInteger(0);
}
@Override
public boolean canHandleSegments()
{
return true;
}
@Override
public List<DataSegment> getCachedSegments()
{
return cachedSegments;
}
@Override
public ReferenceCountingSegment getBootstrapSegment(DataSegment segment, SegmentLazyLoadFailCallback loadFailed)
{
observedBootstrapSegments.add(segment);
return getSegmentInternal(segment);
}
@Override
public ReferenceCountingSegment getSegment(final DataSegment segment)
{
observedSegments.add(segment);
return getSegmentInternal(segment);
}
private ReferenceCountingSegment getSegmentInternal(final DataSegment segment)
{
if (segment.isTombstone()) {
return ReferenceCountingSegment
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment), segment.getShardSpec());
} else {
return ReferenceCountingSegment.wrapSegment(
new TestSegmentUtils.SegmentForTesting(
segment.getDataSource(),
(Interval) segment.getLoadSpec().get("interval"),
MapUtils.getString(segment.getLoadSpec(), "version")
), segment.getShardSpec()
);
}
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment)
{
observedSegmentsLoadedIntoPageCache.add(segment);
}
@Override
public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
{
observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
}
@Override
public void shutdownBootstrap()
{
observedShutdownBootstrapCount.incrementAndGet();
}
@Override
public void storeInfoFile(DataSegment segment)
{
}
@Override
public void removeInfoFile(DataSegment segment)
{
}
@Override
public void cleanup(DataSegment segment)
{
observedSegmentsRemovedFromCache.add(segment);
}
}
}

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.coordination;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.TombstoneSegmentizerFactory;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A local cache manager to test the bootstrapping and segment add/remove operations. It stubs only the necessary
* methods to support these operations; any other method invoked will throw an exception from the base class,
* {@link NoopSegmentCacheManager}.
*/
class TestSegmentCacheManager extends NoopSegmentCacheManager
{
private final List<DataSegment> cachedSegments;
private final List<DataSegment> observedBootstrapSegments;
private final List<DataSegment> observedBootstrapSegmentsLoadedIntoPageCache;
private final List<DataSegment> observedSegments;
private final List<DataSegment> observedSegmentsLoadedIntoPageCache;
private final List<DataSegment> observedSegmentsRemovedFromCache;
private final AtomicInteger observedShutdownBootstrapCount;
TestSegmentCacheManager()
{
this(ImmutableSet.of());
}
TestSegmentCacheManager(final Set<DataSegment> segmentsToCache)
{
this.cachedSegments = ImmutableList.copyOf(segmentsToCache);
this.observedBootstrapSegments = new ArrayList<>();
this.observedBootstrapSegmentsLoadedIntoPageCache = new ArrayList<>();
this.observedSegments = new ArrayList<>();
this.observedSegmentsLoadedIntoPageCache = new ArrayList<>();
this.observedSegmentsRemovedFromCache = new ArrayList<>();
this.observedShutdownBootstrapCount = new AtomicInteger(0);
}
@Override
public boolean canHandleSegments()
{
return true;
}
@Override
public List<DataSegment> getCachedSegments()
{
return cachedSegments;
}
@Override
public ReferenceCountingSegment getBootstrapSegment(DataSegment segment, SegmentLazyLoadFailCallback loadFailed)
{
observedBootstrapSegments.add(segment);
return getSegmentInternal(segment);
}
@Override
public ReferenceCountingSegment getSegment(final DataSegment segment)
{
observedSegments.add(segment);
return getSegmentInternal(segment);
}
private ReferenceCountingSegment getSegmentInternal(final DataSegment segment)
{
if (segment.isTombstone()) {
return ReferenceCountingSegment
.wrapSegment(TombstoneSegmentizerFactory.segmentForTombstone(segment), segment.getShardSpec());
} else {
return ReferenceCountingSegment.wrapSegment(
new TestSegmentUtils.SegmentForTesting(
segment.getDataSource(),
(Interval) segment.getLoadSpec().get("interval"),
MapUtils.getString(segment.getLoadSpec(), "version")
), segment.getShardSpec()
);
}
}
@Override
public void loadSegmentIntoPageCache(DataSegment segment)
{
observedSegmentsLoadedIntoPageCache.add(segment);
}
@Override
public void loadSegmentIntoPageCacheOnBootstrap(DataSegment segment)
{
observedBootstrapSegmentsLoadedIntoPageCache.add(segment);
}
@Override
public void shutdownBootstrap()
{
observedShutdownBootstrapCount.incrementAndGet();
}
@Override
public void storeInfoFile(DataSegment segment)
{
}
@Override
public void removeInfoFile(DataSegment segment)
{
}
@Override
public void cleanup(DataSegment segment)
{
observedSegmentsRemovedFromCache.add(segment);
}
public List<DataSegment> getObservedBootstrapSegments()
{
return observedBootstrapSegments;
}
public List<DataSegment> getObservedBootstrapSegmentsLoadedIntoPageCache()
{
return observedBootstrapSegmentsLoadedIntoPageCache;
}
public List<DataSegment> getObservedSegments()
{
return observedSegments;
}
public List<DataSegment> getObservedSegmentsLoadedIntoPageCache()
{
return observedSegmentsLoadedIntoPageCache;
}
public List<DataSegment> getObservedSegmentsRemovedFromCache()
{
return observedSegmentsRemovedFromCache;
}
public AtomicInteger getObservedShutdownBootstrapCount()
{
return observedShutdownBootstrapCount;
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
@ -42,7 +41,6 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
/**
*/
@ -103,12 +101,7 @@ public class ZkCoordinatorTest extends CuratorTestBase
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
new SegmentLoaderConfig(),
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
EasyMock.createNiceMock(ScheduledExecutorService.class),
new ServerTypeConfig(ServerType.HISTORICAL),
new TestCoordinatorClient(),
new NoopServiceEmitter()
EasyMock.createNiceMock(SegmentManager.class)
)
{
@Override

View File

@ -63,6 +63,7 @@ import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.SubqueryGuardrailHelper;
import org.apache.druid.server.SubqueryGuardrailHelperProvider;
import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.BrokerResource;
@ -172,6 +173,7 @@ public class CliBroker extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
LifecycleModule.register(binder, SegmentBootstrapper.class);
bindAnnouncer(
binder,

View File

@ -49,6 +49,7 @@ import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerManager;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
@ -125,6 +126,7 @@ public class CliHistorical extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
LifecycleModule.register(binder, SegmentBootstrapper.class);
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
binder.install(new CacheModule());

View File

@ -74,6 +74,7 @@ import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderator
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
@ -187,6 +188,7 @@ public class CliIndexer extends ServerRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
LifecycleModule.register(binder, SegmentBootstrapper.class);
bindAnnouncer(
binder,

View File

@ -125,6 +125,7 @@ import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerPr
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.SegmentBootstrapper;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
@ -553,6 +554,7 @@ public class CliPeon extends GuiceRunnable
if (isZkEnabled) {
LifecycleModule.register(binder, ZkCoordinator.class);
}
LifecycleModule.register(binder, SegmentBootstrapper.class);
}
@Provides