Optional segment load/drop management without zookeeper using http (#4966)

* introducing CuratorLoadQueuePeon

* HttpLoadQueuePeon based off of current code

* Revert "Remove SegmentLoaderConfig.numLoadingThreads config (#4829)"

This reverts commit d8b3bfa63c9a8ff680a7eb967bc12c845cbadcd8.

* SegmentLoadDropHandler copy/pasted from ZkCoordinator

* Revert "1-based counts in ZkCoordinator (#4917)"

This reverts commit e725ff4146e84a3f3c61b8bea10ed23d7a9e9f9b.

* remove non-zk part from ZkCoordinator

* remove zk part from SegmentLoadDropHandler

* additional changes for segment load/drop management with http

* address review comments

* add some more logs

* Execs class is moved
This commit is contained in:
Himanshu 2017-10-19 14:41:24 -05:00 committed by cheddar
parent ce7bf3f325
commit ef4a8cb724
27 changed files with 3030 additions and 1407 deletions

View File

@ -36,7 +36,8 @@ The historical node uses several of the global configs in [Configuration](../con
|`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|1|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
### Query Configs

View File

@ -34,9 +34,13 @@ public class Pair<T1, T2>
}
public final T1 lhs;
public final T2 rhs;
public Pair(T1 lhs, T2 rhs)
public Pair(
T1 lhs,
T2 rhs
)
{
this.lhs = lhs;
this.rhs = rhs;

View File

@ -20,6 +20,7 @@
package io.druid.client;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
@ -108,6 +109,15 @@ public class ImmutableDruidServer
return segments;
}
public String getURL()
{
if (metadata.getHostAndTlsPort() != null) {
return StringUtils.safeFormat("https://%s", metadata.getHostAndTlsPort());
} else {
return StringUtils.safeFormat("http://%s", metadata.getHostAndPort());
}
}
@Override
public String toString()
{

View File

@ -31,8 +31,6 @@ import java.util.List;
*/
public class SegmentLoaderConfig
{
private static final int DEFAULT_NUM_BOOTSTRAP_THREADS = 1;
@JsonProperty
@NotEmpty
private List<StorageLocationConfig> locations = null;
@ -46,12 +44,18 @@ public class SegmentLoaderConfig
@JsonProperty("announceIntervalMillis")
private int announceIntervalMillis = 0; // do not background announce
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 10;
@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;
@JsonProperty
private File infoDir = null;
@JsonProperty
private int statusQueueMaxSize = 100;
public List<StorageLocationConfig> getLocations()
{
return locations;
@ -72,9 +76,14 @@ public class SegmentLoaderConfig
return announceIntervalMillis;
}
public int getNumLoadingThreads()
{
return numLoadingThreads;
}
public int getNumBootstrapThreads()
{
return numBootstrapThreads == null ? DEFAULT_NUM_BOOTSTRAP_THREADS : numBootstrapThreads;
return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
}
public File getInfoDir()
@ -90,6 +99,11 @@ public class SegmentLoaderConfig
return infoDir;
}
public int getStatusQueueMaxSize()
{
return statusQueueMaxSize;
}
public SegmentLoaderConfig withLocations(List<StorageLocationConfig> locations)
{
SegmentLoaderConfig retVal = new SegmentLoaderConfig();

View File

@ -24,4 +24,6 @@ package io.druid.server.coordination;
public interface DataSegmentChangeCallback
{
void execute();
DataSegmentChangeCallback NOOP = () -> {};
}

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.druid.java.util.common.StringUtils;
import io.druid.timeline.DataSegment;
import java.util.Objects;
/**
*/
public class SegmentChangeRequestDrop implements DataSegmentChangeRequest
@ -58,6 +60,25 @@ public class SegmentChangeRequestDrop implements DataSegmentChangeRequest
return StringUtils.format("DROP: %s", segment.getIdentifier());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentChangeRequestDrop that = (SegmentChangeRequestDrop) o;
return Objects.equals(segment, that.segment);
}
@Override
public int hashCode()
{
return Objects.hash(segment);
}
@Override
public String toString()
{

View File

@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonUnwrapped;
import io.druid.java.util.common.StringUtils;
import io.druid.timeline.DataSegment;
import java.util.Objects;
/**
*/
public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
@ -58,6 +60,25 @@ public class SegmentChangeRequestLoad implements DataSegmentChangeRequest
return StringUtils.format("LOAD: %s", segment.getIdentifier());
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentChangeRequestLoad that = (SegmentChangeRequestLoad) o;
return Objects.equals(segment, that.segment);
}
@Override
public int hashCode()
{
return Objects.hash(segment);
}
@Override
public String toString()
{

View File

@ -0,0 +1,833 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.SegmentManager;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile boolean started = false;
// Keep history of load/drop request status in a LRU cache to maintain idempotency if same request shows up
// again and to return status of a completed request. Maximum size of this cache must be significantly greater
// than number of pending load/drop requests. so that history is not lost too quickly.
private final Cache<DataSegmentChangeRequest, AtomicReference<Status>> requestStatuses;
private final Object requestStatusesLock = new Object();
// This is the list of unresolved futures returned to callers of processBatch(List<DataSegmentChangeRequest>)
// Threads loading/dropping segments resolve these futures as and when some segment load/drop finishes.
private final LinkedHashSet<CustomSettableFuture> waitingFutures = new LinkedHashSet<>();
@Inject
public SegmentLoadDropHandler(
ObjectMapper jsonMapper,
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager
)
{
this(jsonMapper, config, announcer, serverAnnouncer, segmentManager,
Executors.newScheduledThreadPool(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
)
);
}
@VisibleForTesting
SegmentLoadDropHandler(
ObjectMapper jsonMapper,
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ScheduledExecutorService exec
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.exec = exec;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
}
@LifecycleStart
public void start() throws IOException
{
synchronized (lock) {
if (started) {
return;
}
log.info("Starting...");
try {
loadLocalCache();
serverAnnouncer.announce();
}
catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw Throwables.propagate(e);
}
started = true;
log.info("Started.");
}
}
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping...");
try {
serverAnnouncer.unannounce();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
started = false;
}
log.info("Stopped.");
}
}
public boolean isStarted()
{
return started;
}
private void loadLocalCache()
{
final long start = System.currentTimeMillis();
File baseDir = config.getInfoDir();
if (!baseDir.isDirectory()) {
if (baseDir.exists()) {
throw new ISE("[%s] exists but not a directory.", baseDir);
} else if (!baseDir.mkdirs()) {
throw new ISE("Failed to create directory[%s].", baseDir);
}
}
List<DataSegment> cachedSegments = Lists.newArrayList();
File[] segmentsToLoad = baseDir.listFiles();
int ignored = 0;
for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i + 1, segmentsToLoad.length, file);
try {
final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (!segment.getIdentifier().equals(file.getName())) {
log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier());
ignored++;
} else if (segmentManager.isSegmentCached(segment)) {
cachedSegments.add(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
File segmentInfoCacheFile = new File(baseDir, segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to load segment from segmentInfo file")
.addData("file", file)
.emit();
}
}
if (ignored > 0) {
log.makeAlert("Ignored misnamed segment cache files on startup.")
.addData("numIgnored", ignored)
.emit();
}
addSegments(
cachedSegments,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
}
}
);
}
/**
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
* @throws SegmentLoadingException
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{
final boolean loaded;
try {
loaded = segmentManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment, callback, false);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment, callback, false);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
}
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
Status result = null;
try {
log.info("Loading segment %s", segment.getIdentifier());
/*
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads
the segment, which makes dropping segment and downloading segment happen at the same time.
*/
if (segmentsToDelete.contains(segment)) {
/*
Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case,
each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make
things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of
cost of acquiring lock by doing the "contains" check outside the synchronized block.
*/
synchronized (lock) {
segmentsToDelete.remove(segment);
}
}
loadSegment(segment, DataSegmentChangeCallback.NOOP);
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
result = Status.SUCCESS;
}
catch (Exception e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
result = Status.failed(e.getMessage());
}
finally {
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
callback.execute();
}
}
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
ExecutorService loadingExecutor = null;
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "Segment-Load-Startup-%s");
final int numSegments = segments.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
loadingExecutor.submit(
new Runnable()
{
@Override
public void run()
{
try {
log.info(
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(),
numSegments,
segment.getIdentifier()
);
loadSegment(segment, callback);
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
}
}
);
}
try {
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted");
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
.emit();
}
finally {
callback.execute();
if (loadingExecutor != null) {
loadingExecutor.shutdownNow();
}
}
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
removeSegment(segment, callback, true);
}
private void removeSegment(
final DataSegment segment,
final DataSegmentChangeCallback callback,
final boolean scheduleDrop
)
{
Status result = null;
try {
announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
Runnable runnable = new Runnable()
{
@Override
public void run()
{
try {
synchronized (lock) {
if (segmentsToDelete.remove(segment)) {
segmentManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
.addData("segment", segment)
.emit();
}
}
};
if (scheduleDrop) {
log.info(
"Completely removing [%s] in [%,d] millis",
segment.getIdentifier(),
config.getDropSegmentDelayMillis()
);
exec.schedule(
runnable,
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
} else {
runnable.run();
}
result = Status.SUCCESS;
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment)
.emit();
result = Status.failed(e.getMessage());
}
finally {
updateRequestStatus(new SegmentChangeRequestDrop(segment), result);
callback.execute();
}
}
public Collection<DataSegment> getPendingDeleteSnapshot()
{
return ImmutableList.copyOf(segmentsToDelete);
}
public ListenableFuture<List<DataSegmentChangeRequestAndStatus>> processBatch(List<DataSegmentChangeRequest> changeRequests)
{
boolean isAnyRequestDone = false;
Map<DataSegmentChangeRequest, AtomicReference<Status>> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
AtomicReference<Status> status = processRequest(cr);
if (status.get().getState() != Status.STATE.PENDING) {
isAnyRequestDone = true;
}
statuses.put(cr, status);
}
CustomSettableFuture future = new CustomSettableFuture(waitingFutures, statuses);
if (isAnyRequestDone) {
future.resolve();
} else {
synchronized (waitingFutures) {
waitingFutures.add(future);
}
}
return future;
}
private AtomicReference<Status> processRequest(DataSegmentChangeRequest changeRequest)
{
synchronized (requestStatusesLock) {
if (requestStatuses.getIfPresent(changeRequest) == null) {
changeRequest.go(
new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING));
exec.submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad) changeRequest).getSegment(),
() -> resolveWaitingFutures()
)
);
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
requestStatuses.put(changeRequest, new AtomicReference<>(Status.PENDING));
SegmentLoadDropHandler.this.removeSegment(
((SegmentChangeRequestDrop) changeRequest).getSegment(),
() -> resolveWaitingFutures(),
true
);
}
},
() -> resolveWaitingFutures()
);
}
return requestStatuses.getIfPresent(changeRequest);
}
}
private void updateRequestStatus(DataSegmentChangeRequest changeRequest, Status result)
{
if (result == null) {
result = Status.failed("Unknown reason. Check server logs.");
}
synchronized (requestStatusesLock) {
AtomicReference<Status> statusRef = requestStatuses.getIfPresent(changeRequest);
if (statusRef != null) {
statusRef.set(result);
}
}
}
private void resolveWaitingFutures()
{
LinkedHashSet<CustomSettableFuture> waitingFuturesCopy = new LinkedHashSet<>();
synchronized (waitingFutures) {
waitingFuturesCopy.addAll(waitingFutures);
waitingFutures.clear();
}
for (CustomSettableFuture future : waitingFuturesCopy) {
future.resolve();
}
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis;
private final DataSegmentAnnouncer announcer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
private volatile ScheduledFuture startedAnnouncing = null;
private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
)
{
this.announcer = announcer;
this.exec = exec;
this.intervalMillis = intervalMillis;
this.queue = Queues.newLinkedBlockingQueue();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (intervalMillis <= 0) {
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
// announce any remaining segments
try {
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
announcer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background announcing
try {
// check in case intervalMillis is <= 0
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}
// Future with cancel() implementation to remove it from "waitingFutures" list
private static class CustomSettableFuture extends AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
{
private final LinkedHashSet<CustomSettableFuture> waitingFutures;
private final Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs;
private CustomSettableFuture(
LinkedHashSet<CustomSettableFuture> waitingFutures,
Map<DataSegmentChangeRequest, AtomicReference<Status>> statusRefs
)
{
this.waitingFutures = waitingFutures;
this.statusRefs = statusRefs;
}
public void resolve()
{
synchronized (statusRefs) {
if (isDone()) {
return;
}
List<DataSegmentChangeRequestAndStatus> result = new ArrayList<>(statusRefs.size());
statusRefs.forEach(
(request, statusRef) -> result.add(new DataSegmentChangeRequestAndStatus(request, statusRef.get()))
);
super.set(result);
}
}
@Override
public boolean setException(Throwable throwable)
{
return super.setException(throwable);
}
@Override
public boolean cancel(boolean interruptIfRunning)
{
synchronized (waitingFutures) {
waitingFutures.remove(this);
}
return true;
}
}
public static class Status
{
public enum STATE
{
SUCCESS, FAILED, PENDING
}
private final STATE state;
private final String failureCause;
public final static Status SUCCESS = new Status(STATE.SUCCESS, null);
public final static Status PENDING = new Status(STATE.PENDING, null);
@JsonCreator
Status(
@JsonProperty("state") STATE state,
@JsonProperty("failureCause") String failureCause
)
{
Preconditions.checkNotNull(state, "state must be non-null");
this.state = state;
this.failureCause = failureCause;
}
public static Status failed(String cause)
{
return new Status(STATE.FAILED, cause);
}
@JsonProperty
public STATE getState()
{
return state;
}
@JsonProperty
public String getFailureCause()
{
return failureCause;
}
@Override
public String toString()
{
return "Status{" +
"state=" + state +
", failureCause='" + failureCause + '\'' +
'}';
}
}
public static class DataSegmentChangeRequestAndStatus
{
private final DataSegmentChangeRequest request;
private final Status status;
@JsonCreator
public DataSegmentChangeRequestAndStatus(
@JsonProperty("request") DataSegmentChangeRequest request,
@JsonProperty("status") Status status
)
{
this.request = request;
this.status = status;
}
@JsonProperty
public DataSegmentChangeRequest getRequest()
{
return request;
}
@JsonProperty
public Status getStatus()
{
return status;
}
@Override
public String toString()
{
return "DataSegmentChangeRequestAndStatus{" +
"request=" + request +
", status=" + status +
'}';
}
}
}

View File

@ -21,22 +21,12 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.SegmentManager;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -44,68 +34,39 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class ZkCoordinator implements DataSegmentChangeHandler
public class ZkCoordinator
{
private static final EmittingLogger log = new EmittingLogger(ZkCoordinator.class);
private final Object lock = new Object();
private final DataSegmentChangeHandler dataSegmentChangeHandler;
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPaths;
private final SegmentLoaderConfig config;
private final DruidServerMetadata me;
private final CuratorFramework curator;
private final DataSegmentAnnouncer announcer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
@Inject
public ZkCoordinator(
SegmentLoadDropHandler loadDropHandler,
ObjectMapper jsonMapper,
SegmentLoaderConfig config,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
CuratorFramework curator,
SegmentManager segmentManager,
ScheduledExecutorFactory factory
CuratorFramework curator
)
{
this.dataSegmentChangeHandler = loadDropHandler;
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.config = config;
this.me = me;
this.curator = curator;
this.announcer = announcer;
this.serverAnnouncer = serverAnnouncer;
this.segmentManager = segmentManager;
this.exec = factory.create(1, "ZkCoordinator-Exec--%d");
this.segmentsToDelete = new ConcurrentSkipListSet<>();
}
@LifecycleStart
@ -127,7 +88,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
loadQueueLocation,
true,
true,
Execs.singleThreaded("ZkCoordinator-%s")
Execs.singleThreaded("ZkCoordinator")
);
try {
@ -135,9 +96,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
loadLocalCache();
serverAnnouncer.announce();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@ -156,7 +114,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try {
request.go(
getDataSegmentChangeHandler(),
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
boolean hasRun = false;
@ -231,7 +189,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try {
loadQueueCache.close();
serverAnnouncer.unannounce();
}
catch (Exception e) {
throw Throwables.propagate(e);
@ -247,400 +204,4 @@ public class ZkCoordinator implements DataSegmentChangeHandler
{
return started;
}
public void loadLocalCache()
{
final long start = System.currentTimeMillis();
File baseDir = config.getInfoDir();
if (!baseDir.exists() && !config.getInfoDir().mkdirs()) {
return;
}
List<DataSegment> cachedSegments = Lists.newArrayList();
File[] segmentsToLoad = baseDir.listFiles();
int ignored = 0;
for (int i = 0; i < segmentsToLoad.length; i++) {
File file = segmentsToLoad[i];
log.info("Loading segment cache file [%d/%d][%s].", i + 1, segmentsToLoad.length, file);
try {
final DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (!segment.getIdentifier().equals(file.getName())) {
log.warn("Ignoring cache file[%s] for segment[%s].", file.getPath(), segment.getIdentifier());
ignored++;
} else if (segmentManager.isSegmentCached(segment)) {
cachedSegments.add(segment);
} else {
log.warn("Unable to find cache file for %s. Deleting lookup entry", segment.getIdentifier());
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to load segment from segmentInfo file")
.addData("file", file)
.emit();
}
}
if (ignored > 0) {
log.makeAlert("Ignored misnamed segment cache files on startup.")
.addData("numIgnored", ignored)
.emit();
}
addSegments(
cachedSegments,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
log.info("Cache load took %,d ms", System.currentTimeMillis() - start);
}
}
);
}
public DataSegmentChangeHandler getDataSegmentChangeHandler()
{
return ZkCoordinator.this;
}
/**
* Load a single segment. If the segment is loaded successfully, this function simply returns. Otherwise it will
* throw a SegmentLoadingException
*
* @throws SegmentLoadingException
*/
private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException
{
final boolean loaded;
try {
loaded = segmentManager.loadSegment(segment);
}
catch (Exception e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(e, "Exception loading segment[%s]", segment.getIdentifier());
}
if (loaded) {
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
removeSegment(segment, callback);
throw new SegmentLoadingException(
e, "Failed to write to disk segment info cache file[%s]", segmentInfoCacheFile
);
}
}
}
}
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
try {
log.info("Loading segment %s", segment.getIdentifier());
/*
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
files. At that point, it's possible that right after the "if" check, addSegment() is called and actually loads
the segment, which makes dropping segment and downloading segment happen at the same time.
*/
if (segmentsToDelete.contains(segment)) {
/*
Both contains(segment) and remove(segment) can be moved inside the synchronized block. However, in that case,
each time when addSegment() is called, it has to wait for the lock in order to make progress, which will make
things slow. Given that in most cases segmentsToDelete.contains(segment) returns false, it will save a lot of
cost of acquiring lock by doing the "contains" check outside the synchronized block.
*/
synchronized (lock) {
segmentsToDelete.remove(segment);
}
}
loadSegment(segment, callback);
try {
announcer.announceSegment(segment);
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier());
}
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segment for dataSource")
.addData("segment", segment)
.emit();
}
finally {
callback.execute();
}
}
private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
ExecutorService loadingExecutor = null;
try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s");
final int numSegments = segments.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
loadingExecutor.submit(
new Runnable()
{
@Override
public void run()
{
try {
log.info(
"Loading segment[%d/%d][%s]",
counter.incrementAndGet(),
numSegments,
segment.getIdentifier()
);
loadSegment(segment, callback);
try {
backgroundSegmentAnnouncer.announceSegment(segment);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
failedSegments.add(segment);
}
finally {
latch.countDown();
}
}
}
);
}
try {
latch.await();
if (failedSegments.size() > 0) {
log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
.addData("failedSegments", failedSegments);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.makeAlert(e, "LoadingInterrupted");
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
.addData("numSegments", segments.size())
.emit();
}
finally {
callback.execute();
if (loadingExecutor != null) {
loadingExecutor.shutdownNow();
}
}
}
@Override
public void removeSegment(final DataSegment segment, final DataSegmentChangeCallback callback)
{
try {
announcer.unannounceSegment(segment);
segmentsToDelete.add(segment);
log.info("Completely removing [%s] in [%,d] millis", segment.getIdentifier(), config.getDropSegmentDelayMillis());
exec.schedule(
new Runnable()
{
@Override
public void run()
{
try {
synchronized (lock) {
if (segmentsToDelete.remove(segment)) {
segmentManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}
}
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment! Possible resource leak!")
.addData("segment", segment)
.emit();
}
}
},
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment")
.addData("segment", segment)
.emit();
}
finally {
callback.execute();
}
}
public Collection<DataSegment> getPendingDeleteSnapshot()
{
return ImmutableList.copyOf(segmentsToDelete);
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable
{
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis;
private final DataSegmentAnnouncer announcer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
private volatile ScheduledFuture startedAnnouncing = null;
private volatile ScheduledFuture nextAnnoucement = null;
public BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
)
{
this.announcer = announcer;
this.exec = exec;
this.intervalMillis = intervalMillis;
this.queue = Queues.newLinkedBlockingQueue();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (intervalMillis <= 0) {
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
// announce any remaining segments
try {
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
announcer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background announcing
try {
// check in case intervalMillis is <= 0
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}
}

View File

@ -0,0 +1,509 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentChangeRequestNoop;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class CuratorLoadQueuePeon extends LoadQueuePeon
{
private static final EmittingLogger log = new EmittingLogger(CuratorLoadQueuePeon.class);
private static final int DROP = 0;
private static final int LOAD = 1;
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
}
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService processingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final Object lock = new Object();
private volatile SegmentHolder currentlyProcessing = null;
private boolean stopped = false;
CuratorLoadQueuePeon(
CuratorFramework curator,
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService processingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.processingExecutor = processingExecutor;
this.config = config;
}
@JsonProperty
@Override
public Set<DataSegment> getSegmentsToLoad()
{
return segmentsToLoad.keySet();
}
@JsonProperty
@Override
public Set<DataSegment> getSegmentsToDrop()
{
return segmentsToDrop.keySet();
}
@JsonProperty
@Override
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return segmentsMarkedToDrop;
}
@Override
public long getLoadQueueSize()
{
return queuedSize.get();
}
@Override
public int getAndResetFailedAssignCount()
{
return failedAssignCount.getAndSet(0);
}
@Override
public int getNumberOfSegmentsInQueue()
{
return segmentsToLoad.size();
}
@Override
public void loadSegment(
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) {
if ((callback != null)) {
existingHolder.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback)));
}
@Override
public void dropSegment(
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) {
if (callback != null) {
existingHolder.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
}
@Override
public void markSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.add(dataSegment);
}
@Override
public void unmarkSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.remove(dataSegment);
}
private void processSegmentChangeRequest()
{
if (currentlyProcessing != null) {
log.debug(
"Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
return;
}
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
return;
}
try {
if (currentlyProcessing == null) {
if (!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
}
actionCompleted();
return;
}
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
}
}
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(path);
}
}
catch (Exception e) {
failAssign(e);
}
}
private void actionCompleted()
{
if (currentlyProcessing != null) {
switch (currentlyProcessing.getType()) {
case LOAD:
segmentsToLoad.remove(currentlyProcessing.getSegment());
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(currentlyProcessing.getSegment());
break;
default:
throw new UnsupportedOperationException();
}
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
executeCallbacks(callbacks);
}
}
);
}
}
@Override
public void start()
{
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
processSegmentChangeRequest();
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
@Override
public void stop()
{
synchronized (lock) {
if (currentlyProcessing != null) {
executeCallbacks(currentlyProcessing.getCallbacks());
currentlyProcessing = null;
}
if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
stopped = true;
}
}
private void entryRemoved(String path)
{
synchronized (lock) {
if (currentlyProcessing == null) {
log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path);
return;
}
if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath, path, currentlyProcessing
);
return;
}
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, path);
}
}
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();
}
}
private static class SegmentHolder
{
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;
private final int type;
private final List<LoadPeonCallback> callbacks = Lists.newArrayList();
private SegmentHolder(
DataSegment segment,
int type,
Collection<LoadPeonCallback> callbacks
)
{
this.segment = segment;
this.type = type;
this.changeRequest = (type == LOAD)
? new SegmentChangeRequestLoad(segment)
: new SegmentChangeRequestDrop(segment);
this.callbacks.addAll(callbacks);
}
public DataSegment getSegment()
{
return segment;
}
public int getType()
{
return type;
}
public String getSegmentIdentifier()
{
return segment.getIdentifier();
}
public long getSegmentSize()
{
return segment.getSize();
}
public void addCallbacks(Collection<LoadPeonCallback> newCallbacks)
{
synchronized (callbacks) {
callbacks.addAll(newCallbacks);
}
}
public void addCallback(LoadPeonCallback newCallback)
{
synchronized (callbacks) {
callbacks.add(newCallback);
}
}
public List<LoadPeonCallback> getCallbacks()
{
synchronized (callbacks) {
return callbacks;
}
}
public DataSegmentChangeRequest getChangeRequest()
{
return changeRequest;
}
@Override
public String toString()
{
return changeRequest.toString();
}
}
}

View File

@ -735,10 +735,9 @@ public class DruidCoordinator
final DruidCluster cluster = new DruidCluster();
for (ImmutableDruidServer server : servers) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName());
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(basePath);
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
loadQueuePeon.start();
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
log.info("Created LoadQueuePeon for server[%s].", server.getName());
loadManagementPeons.put(server.getName(), loadQueuePeon);
}

View File

@ -86,4 +86,28 @@ public abstract class DruidCoordinatorConfig
{
return Duration.millis(50);
}
@Config("druid.coordinator.loadqueuepeon.type")
public String getLoadQueuePeonType()
{
return "curator";
}
@Config("druid.coordinator.loadqueuepeon.http.repeatDelay")
public Duration getHttpLoadQueuePeonRepeatDelay()
{
return Duration.millis(60000);
}
@Config("druid.coordinator.loadqueuepeon.http.hostTimeout")
public Duration getHttpLoadQueuePeonHostTimeout()
{
return Duration.millis(300000);
}
@Config("druid.coordinator.loadqueuepeon.http.batchSize")
public int getHttpLoadQueuePeonBatchSize()
{
return 1;
}
}

View File

@ -0,0 +1,598 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.RE;
import com.metamx.common.StringUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.io.AppendableByteArrayInputStream;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.InputStreamResponseHandler;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class HttpLoadQueuePeon extends LoadQueuePeon
{
public static final TypeReference REQUEST_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeRequest>>()
{
};
public static final TypeReference RESPONSE_ENTITY_TYPE_REF = new TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
{
};
private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class);
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ScheduledExecutorService processingExecutor;
private volatile boolean stopped = false;
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
private final URL changeRequestURL;
private final String serverId;
private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);
private final ExecutorService callBackExecutor;
private final ObjectWriter requestBodyWriter;
public HttpLoadQueuePeon(
String baseUrl,
ObjectMapper jsonMapper,
HttpClient httpClient,
DruidCoordinatorConfig config,
ScheduledExecutorService processingExecutor,
ExecutorService callBackExecutor
)
{
this.jsonMapper = jsonMapper;
this.requestBodyWriter = jsonMapper.writerWithType(REQUEST_ENTITY_TYPE_REF);
this.httpClient = httpClient;
this.config = config;
this.processingExecutor = processingExecutor;
this.callBackExecutor = callBackExecutor;
this.serverId = baseUrl;
try {
this.changeRequestURL = new URL(
new URL(baseUrl),
StringUtils.safeFormat(
"druid-internal/v1/segments/changeRequests?timeout=%d",
config.getHttpLoadQueuePeonHostTimeout().getMillis()
)
);
}
catch (MalformedURLException ex) {
throw Throwables.propagate(ex);
}
}
private void doSegmentManagement()
{
if (stopped || !mainLoopInProgress.compareAndSet(false, true)) {
log.debug("[%s]Ignoring tick. Either in-progress already or stopped.", serverId);
return;
}
int batchSize = config.getHttpLoadQueuePeonBatchSize();
List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize);
synchronized (lock) {
Iterator<Map.Entry<DataSegment, SegmentHolder>> iter = Iterators.concat(
segmentsToDrop.entrySet().iterator(),
segmentsToLoad.entrySet().iterator()
);
while (batchSize > 0 && iter.hasNext()) {
batchSize--;
Map.Entry<DataSegment, SegmentHolder> entry = iter.next();
if (entry.getValue().hasTimedOut()) {
entry.getValue().requestFailed("timed out");
iter.remove();
} else {
newRequests.add(entry.getValue().getChangeRequest());
}
}
}
if (newRequests.size() == 0) {
log.debug(
"[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].",
serverId,
segmentsToLoad.size(),
segmentsToDrop.size(),
config.getHttpLoadQueuePeonBatchSize()
);
mainLoopInProgress.set(false);
return;
}
try {
log.debug("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId);
BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
ListenableFuture<InputStream> future = httpClient.go(
new Request(HttpMethod.POST, changeRequestURL)
.addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON)
.addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.setContent(requestBodyWriter.writeValueAsBytes(newRequests)),
responseHandler,
new Duration(config.getHttpLoadQueuePeonHostTimeout().getMillis() + 5000)
);
Futures.addCallback(
future,
new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream result)
{
boolean scheduleNextRunImmediately = true;
try {
if (responseHandler.status == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT reseponse from [%s]", serverId);
} else if (HttpServletResponse.SC_OK == responseHandler.status) {
try {
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = jsonMapper.readValue(
result, RESPONSE_ENTITY_TYPE_REF
);
log.debug("Server[%s] returned status response [%s].", serverId, statuses);
synchronized (lock) {
if (stopped) {
log.debug("Ignoring response from Server[%s]. We are already stopped.", serverId);
scheduleNextRunImmediately = false;
return;
}
for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) {
switch (e.getStatus().getState()) {
case SUCCESS:
case FAILED:
handleResponseStatus(e.getRequest(), e.getStatus());
break;
case PENDING:
log.info("Request[%s] is still pending on server[%s].", e.getRequest(), serverId);
break;
default:
scheduleNextRunImmediately = false;
log.error("WTF! Server[%s] returned unknown state in status[%s].", serverId, e.getStatus());
}
}
}
}
catch (Exception ex) {
scheduleNextRunImmediately = false;
logRequestFailure(ex);
}
} else {
scheduleNextRunImmediately = false;
logRequestFailure(new RE("Unexpected Response Status."));
}
}
finally {
mainLoopInProgress.set(false);
if (scheduleNextRunImmediately) {
processingExecutor.execute(HttpLoadQueuePeon.this::doSegmentManagement);
}
}
}
@Override
public void onFailure(Throwable t)
{
try {
logRequestFailure(t);
}
finally {
mainLoopInProgress.set(false);
}
}
private void logRequestFailure(Throwable t)
{
log.error(
t,
"Request[%s] Failed with code[%s] and status[%s]. Reason[%s].",
changeRequestURL,
responseHandler.status,
responseHandler.description
);
}
},
processingExecutor
);
}
catch (Throwable th) {
log.error(th, "Error sending load/drop request to [%s].", serverId);
mainLoopInProgress.set(false);
}
}
private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status)
{
changeRequest.go(
new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
updateSuccessOrFailureInHolder(segmentsToLoad.remove(segment), status);
}
@Override
public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
{
updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status);
}
private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status)
{
if (holder == null) {
return;
}
if (status.getState()
== SegmentLoadDropHandler.Status.STATE.FAILED) {
holder.requestFailed(status.getFailureCause());
} else {
holder.requestSucceeded();
}
}
}, null
);
}
@Override
public void start()
{
synchronized (lock) {
if (stopped) {
throw new ISE("Can't start.");
}
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
new Duration(config.getHttpLoadQueuePeonRepeatDelay()),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
if (!stopped) {
doSegmentManagement();
}
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
}
@Override
public void stop()
{
synchronized (lock) {
if (stopped) {
return;
}
stopped = true;
for (SegmentHolder holder : segmentsToDrop.values()) {
holder.requestSucceeded();
}
for (SegmentHolder holder : segmentsToLoad.values()) {
holder.requestSucceeded();
}
segmentsToDrop.clear();
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
}
}
@Override
public void loadSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
SegmentHolder holder = segmentsToLoad.get(segment);
if (holder == null) {
log.info("Server[%s] to load segment[%s] queued.", serverId, segment.getIdentifier());
segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback));
processingExecutor.execute(this::doSegmentManagement);
} else {
holder.addCallback(callback);
}
}
}
@Override
public void dropSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
SegmentHolder holder = segmentsToDrop.get(segment);
if (holder == null) {
log.info("Server[%s] to drop segment[%s] queued.", serverId, segment.getIdentifier());
segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback));
processingExecutor.execute(this::doSegmentManagement);
} else {
holder.addCallback(callback);
}
}
}
@Override
public Set<DataSegment> getSegmentsToLoad()
{
return Collections.unmodifiableSet(segmentsToLoad.keySet());
}
@Override
public Set<DataSegment> getSegmentsToDrop()
{
return Collections.unmodifiableSet(segmentsToDrop.keySet());
}
@Override
public long getLoadQueueSize()
{
return queuedSize.get();
}
@Override
public int getAndResetFailedAssignCount()
{
return failedAssignCount.getAndSet(0);
}
@Override
public void markSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.add(dataSegment);
}
@Override
public void unmarkSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.remove(dataSegment);
}
@Override
public int getNumberOfSegmentsInQueue()
{
return segmentsToLoad.size();
}
@Override
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return Collections.unmodifiableSet(segmentsMarkedToDrop);
}
private abstract class SegmentHolder
{
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;
private final List<LoadPeonCallback> callbacks = Lists.newArrayList();
// Time when this request was sent to target server the first time.
private volatile long scheduleTime = -1;
private SegmentHolder(
DataSegment segment,
DataSegmentChangeRequest changeRequest,
LoadPeonCallback callback
)
{
this.segment = segment;
this.changeRequest = changeRequest;
if (callback != null) {
this.callbacks.add(callback);
}
}
public void addCallback(LoadPeonCallback newCallback)
{
synchronized (callbacks) {
if (newCallback != null) {
callbacks.add(newCallback);
}
}
}
public DataSegment getSegment()
{
return segment;
}
public DataSegmentChangeRequest getChangeRequest()
{
return changeRequest;
}
public boolean hasTimedOut()
{
if (scheduleTime < 0) {
scheduleTime = System.currentTimeMillis();
return false;
} else if (System.currentTimeMillis() - scheduleTime > config.getLoadTimeoutDelay().getMillis()) {
return true;
} else {
return false;
}
}
public void requestSucceeded()
{
log.info(
"Server[%s] Successfully processed segment[%s] request[%s].",
serverId,
segment.getIdentifier(),
changeRequest.getClass().getSimpleName()
);
callBackExecutor.execute(() -> {
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
});
}
public void requestFailed(String failureCause)
{
log.error(
"Server[%s] Failed segment[%s] request[%s] with cause [%s].",
serverId,
segment.getIdentifier(),
changeRequest.getClass().getSimpleName(),
failureCause
);
failedAssignCount.getAndIncrement();
callBackExecutor.execute(() -> {
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
});
}
@Override
public String toString()
{
return changeRequest.toString();
}
}
private class LoadSegmentHolder extends SegmentHolder
{
public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback)
{
super(segment, new SegmentChangeRequestLoad(segment), callback);
queuedSize.addAndGet(segment.getSize());
}
@Override
public void requestSucceeded()
{
queuedSize.addAndGet(-getSegment().getSize());
super.requestSucceeded();
}
}
private class DropSegmentHolder extends SegmentHolder
{
public DropSegmentHolder(DataSegment segment, LoadPeonCallback callback)
{
super(segment, new SegmentChangeRequestDrop(segment), callback);
}
}
private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler
{
private int status;
private String description;
@Override
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response)
{
status = response.getStatus().getCode();
description = response.getStatus().getReasonPhrase();
return ClientResponse.unfinished(super.handleResponse(response).getObj());
}
}
}

View File

@ -19,479 +19,37 @@
package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.metamx.emitter.EmittingLogger;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.SegmentChangeRequestNoop;
import io.druid.timeline.DataSegment;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* This interface exists only to support configurable load queue management via curator or http. Once HttpLoadQueuePeon
* has been verified enough in production, CuratorLoadQueuePeon and this interface would be removed.
*/
public class LoadQueuePeon
@Deprecated
public abstract class LoadQueuePeon
{
private static final EmittingLogger log = new EmittingLogger(LoadQueuePeon.class);
private static final int DROP = 0;
private static final int LOAD = 1;
public abstract void start();
public abstract void stop();
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
}
public abstract Set<DataSegment> getSegmentsToLoad();
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService processingExecutor;
private final ExecutorService callBackExecutor;
private final DruidCoordinatorConfig config;
public abstract Set<DataSegment> getSegmentsToDrop();
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
public abstract void unmarkSegmentToDrop(DataSegment segmentToLoad);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR
);
private final Object lock = new Object();
public abstract void markSegmentToDrop(DataSegment segmentToLoad);
private volatile SegmentHolder currentlyProcessing = null;
private boolean stopped = false;
public abstract void loadSegment(DataSegment segment, LoadPeonCallback callback);
public abstract void dropSegment(DataSegment segment, LoadPeonCallback callback);
LoadQueuePeon(
CuratorFramework curator,
String basePath,
ObjectMapper jsonMapper,
ScheduledExecutorService processingExecutor,
ExecutorService callbackExecutor,
DruidCoordinatorConfig config
)
{
this.curator = curator;
this.basePath = basePath;
this.jsonMapper = jsonMapper;
this.callBackExecutor = callbackExecutor;
this.processingExecutor = processingExecutor;
this.config = config;
}
public abstract long getLoadQueueSize();
@JsonProperty
public Set<DataSegment> getSegmentsToLoad()
{
return segmentsToLoad.keySet();
}
public abstract int getAndResetFailedAssignCount();
@JsonProperty
public Set<DataSegment> getSegmentsToDrop()
{
return segmentsToDrop.keySet();
}
public abstract int getNumberOfSegmentsInQueue();
public abstract Set<DataSegment> getSegmentsMarkedToDrop();
@JsonProperty
public Set<DataSegment> getSegmentsMarkedToDrop()
{
return segmentsMarkedToDrop;
}
public long getLoadQueueSize()
{
return queuedSize.get();
}
public int getAndResetFailedAssignCount()
{
return failedAssignCount.getAndSet(0);
}
public int getNumberOfSegmentsInQueue()
{
return segmentsToLoad.size();
}
public void loadSegment(
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) {
if ((callback != null)) {
existingHolder.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback)));
}
public void dropSegment(
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyProcessing != null) &&
currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyProcessing.addCallback(callback);
}
return;
}
}
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) {
if (callback != null) {
existingHolder.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
}
public void markSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.add(dataSegment);
}
public void unmarkSegmentToDrop(DataSegment dataSegment)
{
segmentsMarkedToDrop.remove(dataSegment);
}
private void processSegmentChangeRequest()
{
if (currentlyProcessing != null) {
log.debug(
"Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
basePath,
currentlyProcessing.getSegmentIdentifier()
);
return;
}
if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else {
return;
}
try {
if (currentlyProcessing == null) {
if (!stopped) {
log.makeAlert("Crazy race condition! server[%s]", basePath)
.emit();
}
actionCompleted();
return;
}
log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier());
final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier());
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
processingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this operation!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher()
{
@Override
public void process(WatchedEvent watchedEvent) throws Exception
{
switch (watchedEvent.getType()) {
case NodeDeleted:
entryRemoved(watchedEvent.getPath());
break;
default:
// do nothing
}
}
}
).forPath(path);
if (stat == null) {
final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
// Create a node and then delete it to remove the registered watcher. This is a work-around for
// a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event
// that happens for that node. If no events happen, the watcher stays registered foreverz.
// Couple that with the fact that you cannot set a watcher when you create a node, but what we
// want is to create a node and then watch for it to get deleted. The solution is that you *can*
// set a watcher when you check to see if it exists so, we first create the node and then set a
// watcher on its existence. However, if already does not exist by the time the existence check
// returns, then the watcher that was set will never fire (nobody will ever create the node
// again) and thus lead to a slow, but real, memory leak. So, we create another node to cause
// that watcher to fire and delete it right away.
//
// We do not create the existence watcher first, because then it will fire when we create the
// node and we'll have the same race when trying to refresh that watcher.
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
entryRemoved(path);
}
}
catch (Exception e) {
failAssign(e);
}
}
private void actionCompleted()
{
if (currentlyProcessing != null) {
switch (currentlyProcessing.getType()) {
case LOAD:
segmentsToLoad.remove(currentlyProcessing.getSegment());
queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(currentlyProcessing.getSegment());
break;
default:
throw new UnsupportedOperationException();
}
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
executeCallbacks(callbacks);
}
}
);
}
}
public void start()
{
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
config.getLoadQueuePeonRepeatDelay(),
config.getLoadQueuePeonRepeatDelay(),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
processSegmentChangeRequest();
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
}
);
}
public void stop()
{
synchronized (lock) {
if (currentlyProcessing != null) {
executeCallbacks(currentlyProcessing.getCallbacks());
currentlyProcessing = null;
}
if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad.values()) {
executeCallbacks(holder.getCallbacks());
}
}
segmentsToLoad.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
stopped = true;
}
}
private void entryRemoved(String path)
{
synchronized (lock) {
if (currentlyProcessing == null) {
log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path);
return;
}
if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath, path, currentlyProcessing
);
return;
}
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, path);
}
}
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted();
}
}
private static class SegmentHolder
{
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;
private final int type;
private final List<LoadPeonCallback> callbacks = Lists.newArrayList();
private SegmentHolder(
DataSegment segment,
int type,
Collection<LoadPeonCallback> callbacks
)
{
this.segment = segment;
this.type = type;
this.changeRequest = (type == LOAD)
? new SegmentChangeRequestLoad(segment)
: new SegmentChangeRequestDrop(segment);
this.callbacks.addAll(callbacks);
}
public DataSegment getSegment()
{
return segment;
}
public int getType()
{
return type;
}
public String getSegmentIdentifier()
{
return segment.getIdentifier();
}
public long getSegmentSize()
{
return segment.getSize();
}
public void addCallbacks(Collection<LoadPeonCallback> newCallbacks)
{
synchronized (callbacks) {
callbacks.addAll(newCallbacks);
}
}
public void addCallback(LoadPeonCallback newCallback)
{
synchronized (callbacks) {
callbacks.add(newCallback);
}
}
public List<LoadPeonCallback> getCallbacks()
{
synchronized (callbacks) {
return callbacks;
}
}
public DataSegmentChangeRequest getChangeRequest()
{
return changeRequest;
}
@Override
public String toString()
{
return changeRequest.toString();
}
}
}

View File

@ -21,7 +21,13 @@ package io.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.http.client.HttpClient;
import io.druid.client.ImmutableDruidServer;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Json;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -36,14 +42,18 @@ public class LoadQueueTaskMaster
private final ScheduledExecutorService peonExec;
private final ExecutorService callbackExec;
private final DruidCoordinatorConfig config;
private final HttpClient httpClient;
private final ZkPathsConfig zkPaths;
@Inject
public LoadQueueTaskMaster(
CuratorFramework curator,
ObjectMapper jsonMapper,
@Json ObjectMapper jsonMapper,
ScheduledExecutorService peonExec,
ExecutorService callbackExec,
DruidCoordinatorConfig config
DruidCoordinatorConfig config,
@Global HttpClient httpClient,
ZkPathsConfig zkPaths
)
{
this.curator = curator;
@ -51,10 +61,23 @@ public class LoadQueueTaskMaster
this.peonExec = peonExec;
this.callbackExec = callbackExec;
this.config = config;
this.httpClient = httpClient;
this.zkPaths = zkPaths;
}
public LoadQueuePeon giveMePeon(String basePath)
public LoadQueuePeon giveMePeon(ImmutableDruidServer server)
{
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, callbackExec, config);
if ("http".equalsIgnoreCase(config.getLoadQueuePeonType())) {
return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec);
} else {
return new CuratorLoadQueuePeon(
curator,
ZKPaths.makePath(zkPaths.getLoadQueuePath(), server.getName()),
jsonMapper,
peonExec,
callbackExec,
config
);
}
}
}

View File

@ -30,10 +30,12 @@ import com.sun.jersey.spi.container.ResourceFilters;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentChangeRequestHistory;
import io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.server.coordinator.HttpLoadQueuePeon;
import io.druid.server.http.security.StateResourceFilter;
import io.druid.server.security.AuthConfig;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
@ -43,14 +45,17 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.List;
/**
* Endpoints exposed here are to be used only for druid internal management of segments by Coordinators, Brokers etc.
*/
@Path("/druid-internal/v1/segments/")
@ResourceFilters(StateResourceFilter.class)
@ -60,21 +65,21 @@ public class SegmentListerResource
protected final ObjectMapper jsonMapper;
protected final ObjectMapper smileMapper;
protected final AuthConfig authConfig;
private final BatchDataSegmentAnnouncer announcer;
private final SegmentLoadDropHandler loadDropRequestHandler;
@Inject
public SegmentListerResource(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
AuthConfig authConfig,
@Nullable BatchDataSegmentAnnouncer announcer
@Nullable BatchDataSegmentAnnouncer announcer,
@Nullable SegmentLoadDropHandler loadDropRequestHandler
)
{
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.authConfig = authConfig;
this.announcer = announcer;
this.loadDropRequestHandler = loadDropRequestHandler;
}
/**
@ -203,6 +208,117 @@ public class SegmentListerResource
asyncContext.setTimeout(timeout);
}
/**
* This endpoint is used by HttpLoadQueuePeon to assign segment load/drop requests batch. This endpoint makes the
* client wait till one of the following events occur. Note that this is implemented using async IO so no jetty
* threads are held while in wait.
*
* (1) Given timeout elapses.
* (2) Some load/drop request completed.
*
* It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status" for each request in the batch.
*/
@POST
@Path("/changeRequests")
@Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
@Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE})
public void applyDataSegmentChangeRequests(
@QueryParam("timeout") long timeout,
List<DataSegmentChangeRequest> changeRequestList,
@Context final HttpServletRequest req
) throws IOException
{
if (loadDropRequestHandler == null) {
sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "load/drop handler is not available.");
return;
}
if (timeout <= 0) {
sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must be positive.");
return;
}
if (changeRequestList == null || changeRequestList.isEmpty()) {
sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "No change requests provided.");
return;
}
final ResponseContext context = createContext(req.getHeader("Accept"));
final ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> future = loadDropRequestHandler
.processBatch(changeRequestList);
final AsyncContext asyncContext = req.startAsync();
asyncContext.addListener(
new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event) throws IOException
{
}
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
// HTTP 204 NO_CONTENT is sent to the client.
future.cancel(true);
event.getAsyncContext().complete();
}
@Override
public void onError(AsyncEvent event) throws IOException
{
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
}
);
Futures.addCallback(
future,
new FutureCallback<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>()
{
@Override
public void onSuccess(List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result)
{
try {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_OK);
context.inputMapper.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)
.writeValue(asyncContext.getResponse().getOutputStream(), result);
asyncContext.complete();
}
catch (Exception ex) {
log.debug(ex, "Request timed out or closed already.");
}
}
@Override
public void onFailure(Throwable th)
{
try {
HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
if (th instanceof IllegalArgumentException) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, th.getMessage());
} else {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, th.getMessage());
}
asyncContext.complete();
}
catch (Exception ex) {
log.debug(ex, "Request timed out or closed already.");
}
}
}
);
asyncContext.setTimeout(timeout);
}
private void sendErrorResponse(HttpServletRequest req, int code, String error) throws IOException
{
AsyncContext asyncContext = req.startAsync();

View File

@ -308,6 +308,7 @@ public class JettyServerModule extends JerseyServletModule
@Override
public void start() throws Exception
{
log.info("Starting Jetty Server...");
server.start();
if (node.isEnableTlsPort()) {
// Perform validation
@ -336,6 +337,7 @@ public class JettyServerModule extends JerseyServletModule
public void stop()
{
try {
log.info("Stopping Jetty Server...");
server.stop();
}
catch (Exception e) {

View File

@ -26,7 +26,7 @@ import com.metamx.metrics.AbstractMonitor;
import io.druid.client.DruidServerConfig;
import io.druid.query.DruidMetrics;
import io.druid.server.SegmentManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
@ -37,18 +37,18 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
{
private final DruidServerConfig serverConfig;
private final SegmentManager segmentManager;
private final ZkCoordinator zkCoordinator;
private final SegmentLoadDropHandler segmentLoadDropMgr;
@Inject
public HistoricalMetricsMonitor(
DruidServerConfig serverConfig,
SegmentManager segmentManager,
ZkCoordinator zkCoordinator
SegmentLoadDropHandler segmentLoadDropMgr
)
{
this.serverConfig = serverConfig;
this.segmentManager = segmentManager;
this.zkCoordinator = zkCoordinator;
this.segmentLoadDropMgr = segmentLoadDropMgr;
}
@Override
@ -58,7 +58,7 @@ public class HistoricalMetricsMonitor extends AbstractMonitor
final Object2LongOpenHashMap<String> pendingDeleteSizes = new Object2LongOpenHashMap<>();
for (DataSegment segment : zkCoordinator.getPendingDeleteSnapshot()) {
for (DataSegment segment : segmentLoadDropMgr.getPendingDeleteSnapshot()) {
pendingDeleteSizes.addTo(segment.getDataSource(), segment.getSize());
}

View File

@ -0,0 +1,487 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.CacheTestSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.SegmentManager;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class SegmentLoadDropHandlerTest
{
public static final int COUNT = 50;
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
"dummyHost",
null,
0,
ServerType.HISTORICAL,
"normal",
0
);
private SegmentLoadDropHandler segmentLoadDropHandler;
private DataSegmentAnnouncer announcer;
private File infoDir;
private AtomicInteger announceCount;
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
private CacheTestSegmentLoader segmentLoader;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
@Before
public void setUp() throws Exception
{
try {
infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
infoDir.mkdirs();
for (File file : infoDir.listFiles()) {
file.delete();
}
log.info("Creating tmp test files in [%s]", infoDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
scheduledRunnable = Lists.newArrayList();
segmentLoader = new CacheTestSegmentLoader();
segmentManager = new SegmentManager(segmentLoader);
final ZkPathsConfig zkPaths = new ZkPathsConfig()
{
@Override
public String getBase()
{
return "/druid";
}
};
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer()
{
@Override
public void announceSegment(DataSegment segment) throws IOException
{
segmentsAnnouncedByMe.add(segment);
announceCount.incrementAndGet();
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
segmentsAnnouncedByMe.remove(segment);
announceCount.decrementAndGet();
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.add(segment);
}
announceCount.addAndGet(Iterables.size(segments));
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.remove(segment);
}
announceCount.addAndGet(-Iterables.size(segments));
}
};
segmentLoadDropHandler = new SegmentLoadDropHandler(
jsonMapper,
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
},
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
segmentManager,
new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
{
/*
Override normal behavoir by adding the runnable to a list so that you can make sure
all the shceduled runnables are executed by explicitly calling run() on each item in the list
*/
return new ScheduledThreadPoolExecutor(
corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
)
{
@Override
public ScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit
)
{
scheduledRunnable.add(command);
return null;
}
};
}
}.create(5, "SegmentLoadDropHandlerTest-[%d]")
);
}
/**
* Steps:
* 1. removeSegment() schedules a delete runnable that deletes segment files,
* 2. addSegment() succesfully loads the segment and annouces it
* 3. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading1() throws Exception
{
segmentLoadDropHandler.start();
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP);
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP);
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
segmentLoadDropHandler.stop();
}
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it
* 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files
* 3. addSegment() calls loadSegment() and annouces it again
* 4. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading2() throws Exception
{
segmentLoadDropHandler.start();
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP);
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
segmentLoadDropHandler.removeSegment(segment, DataSegmentChangeCallback.NOOP);
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
segmentLoadDropHandler.addSegment(segment, DataSegmentChangeCallback.NOOP);
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
segmentLoadDropHandler.stop();
}
@Test
public void testLoadCache() throws Exception
{
List<DataSegment> segments = Lists.newLinkedList();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
Collections.sort(segments);
for (DataSegment segment : segments) {
writeSegmentToCache(segment);
}
checkCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
segmentLoadDropHandler.start();
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(13 * COUNT, announceCount.get());
segmentLoadDropHandler.stop();
for (DataSegment segment : segments) {
deleteSegmentFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
Assert.assertTrue(infoDir.delete());
}
private DataSegment makeSegment(String dataSource, String version, Interval interval)
{
return new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval, "cacheDir", infoDir),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
123L
);
}
private void writeSegmentToCache(final DataSegment segment) throws IOException
{
if (!infoDir.exists()) {
infoDir.mkdir();
}
File segmentInfoCacheFile = new File(
infoDir,
segment.getIdentifier()
);
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(segmentInfoCacheFile.exists());
}
private void deleteSegmentFromCache(final DataSegment segment) throws IOException
{
File segmentInfoCacheFile = new File(
infoDir,
segment.getIdentifier()
);
if (segmentInfoCacheFile.exists()) {
segmentInfoCacheFile.delete();
}
Assert.assertTrue(!segmentInfoCacheFile.exists());
}
private void checkCache(List<DataSegment> segments) throws IOException
{
Assert.assertTrue(infoDir.exists());
File[] files = infoDir.listFiles();
List<File> sortedFiles = Lists.newArrayList(files);
Collections.sort(sortedFiles);
Assert.assertEquals(segments.size(), sortedFiles.size());
for (int i = 0; i < sortedFiles.size(); i++) {
DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class);
Assert.assertEquals(segments.get(i), segment);
}
}
@Test
public void testStartStop() throws Exception
{
SegmentLoadDropHandler handler = new SegmentLoadDropHandler(
jsonMapper,
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getNumLoadingThreads()
{
return 5;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
},
announcer, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), segmentManager
);
List<DataSegment> segments = Lists.newLinkedList();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
Collections.sort(segments);
for (DataSegment segment : segments) {
writeSegmentToCache(segment);
}
checkCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
handler.start();
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(5 * COUNT, announceCount.get());
handler.stop();
for (DataSegment segment : segments) {
deleteSegmentFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
Assert.assertTrue(infoDir.delete());
}
@Test(timeout = 1000L)
public void testProcessBatch() throws Exception
{
segmentLoadDropHandler.start();
DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01"));
DataSegment segment2 = makeSegment("batchtest2", "1", Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(
new SegmentChangeRequestLoad(segment1),
new SegmentChangeRequestDrop(segment2)
);
ListenableFuture<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> future = segmentLoadDropHandler
.processBatch(batch);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> result = future.get();
Assert.assertEquals(SegmentLoadDropHandler.Status.PENDING, result.get(0).getStatus());
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
result = segmentLoadDropHandler.processBatch(batch).get();
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(0).getStatus());
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, result.get(1).getStatus());
for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : segmentLoadDropHandler.processBatch(batch).get()) {
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS, e.getStatus());
}
segmentLoadDropHandler.stop();
}
}

View File

@ -21,63 +21,33 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.LocalCacheProvider;
import io.druid.java.util.common.concurrent.Execs;
import com.metamx.emitter.EmittingLogger;
import io.druid.TestUtil;
import io.druid.curator.CuratorTestBase;
import io.druid.curator.announcement.Announcer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.lifecycle.Lifecycle;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.NoopQueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.loading.CacheTestSegmentLoader;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.SegmentManager;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class ZkCoordinatorTest extends CuratorTestBase
{
public static final int COUNT = 50;
private static final Logger log = new Logger(ZkCoordinatorTest.class);
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private final DruidServerMetadata me = new DruidServerMetadata(
"dummyServer",
@ -88,16 +58,15 @@ public class ZkCoordinatorTest extends CuratorTestBase
"normal",
0
);
private final ZkPathsConfig zkPaths = new ZkPathsConfig()
{
@Override
public String getBase()
{
return "/druid";
}
};
private ZkCoordinator zkCoordinator;
private ServerManager serverManager;
private DataSegmentAnnouncer announcer;
private File infoDir;
private AtomicInteger announceCount;
private ConcurrentSkipListSet<DataSegment> segmentsAnnouncedByMe;
private CacheTestSegmentLoader segmentLoader;
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
@Before
public void setUp() throws Exception
@ -105,146 +74,6 @@ public class ZkCoordinatorTest extends CuratorTestBase
setupServerAndCurator();
curator.start();
curator.blockUntilConnected();
try {
infoDir = new File(File.createTempFile("blah", "blah2").getParent(), "ZkCoordinatorTest");
infoDir.mkdirs();
for (File file : infoDir.listFiles()) {
file.delete();
}
log.info("Creating tmp test files in [%s]", infoDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
scheduledRunnable = Lists.newArrayList();
segmentLoader = new CacheTestSegmentLoader();
segmentManager = new SegmentManager(segmentLoader);
serverManager = new ServerManager(
new NoopQueryRunnerFactoryConglomerate(),
new NoopServiceEmitter(),
MoreExecutors.sameThreadExecutor(),
MoreExecutors.sameThreadExecutor(),
new DefaultObjectMapper(),
new LocalCacheProvider().get(),
new CacheConfig(),
segmentManager
);
final ZkPathsConfig zkPaths = new ZkPathsConfig()
{
@Override
public String getBase()
{
return "/druid";
}
};
segmentsAnnouncedByMe = new ConcurrentSkipListSet<>();
announceCount = new AtomicInteger(0);
announcer = new DataSegmentAnnouncer()
{
private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer(
me,
new BatchDataSegmentAnnouncerConfig(),
zkPaths,
new Announcer(curator, Execs.singleThreaded("blah")),
jsonMapper
);
@Override
public void announceSegment(DataSegment segment) throws IOException
{
segmentsAnnouncedByMe.add(segment);
announceCount.incrementAndGet();
delegate.announceSegment(segment);
}
@Override
public void unannounceSegment(DataSegment segment) throws IOException
{
segmentsAnnouncedByMe.remove(segment);
announceCount.decrementAndGet();
delegate.unannounceSegment(segment);
}
@Override
public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.add(segment);
}
announceCount.addAndGet(Iterables.size(segments));
delegate.announceSegments(segments);
}
@Override
public void unannounceSegments(Iterable<DataSegment> segments) throws IOException
{
for (DataSegment segment : segments) {
segmentsAnnouncedByMe.remove(segment);
}
announceCount.addAndGet(-Iterables.size(segments));
delegate.unannounceSegments(segments);
}
};
zkCoordinator = new ZkCoordinator(
jsonMapper,
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
@Override
public int getDropSegmentDelayMillis()
{
return 0;
}
},
zkPaths,
me,
announcer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
curator,
segmentManager,
new ScheduledExecutorFactory()
{
@Override
public ScheduledExecutorService create(int corePoolSize, String nameFormat)
{
/*
Override normal behavoir by adding the runnable to a list so that you can make sure
all the shceduled runnables are executed by explicitly calling run() on each item in the list
*/
return new ScheduledThreadPoolExecutor(
corePoolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()
)
{
@Override
public ScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit
)
{
scheduledRunnable.add(command);
return null;
}
};
}
}
);
}
@After
@ -253,303 +82,82 @@ public class ZkCoordinatorTest extends CuratorTestBase
tearDownServerAndCurator();
}
/**
* Steps:
* 1. removeSegment() schedules a delete runnable that deletes segment files,
* 2. addSegment() succesfully loads the segment and annouces it
* 3. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading1() throws Exception
@Test(timeout = 5000L)
public void testLoadDrop() throws Exception
{
zkCoordinator.start();
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
zkCoordinator.stop();
}
/**
* Steps:
* 1. addSegment() succesfully loads the segment and annouces it
* 2. removeSegment() unannounces the segment and schedules a delete runnable that deletes segment files
* 3. addSegment() calls loadSegment() and annouces it again
* 4. scheduled delete task executes and realizes it should not delete the segment files.
*/
@Test
public void testSegmentLoading2() throws Exception
{
zkCoordinator.start();
final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01"));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.removeSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
Assert.assertFalse(segmentsAnnouncedByMe.contains(segment));
zkCoordinator.addSegment(segment, new DataSegmentChangeCallback()
{
@Override
public void execute()
{
// do nothing
}
});
/*
make sure the scheduled runnable that "deletes" segment files has been executed.
Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in
ZkCoordinator, the scheduled runnable will not actually delete segment files.
*/
for (Runnable runnable : scheduledRunnable) {
runnable.run();
}
Assert.assertTrue(segmentsAnnouncedByMe.contains(segment));
Assert.assertFalse("segment files shouldn't be deleted", segmentLoader.getSegmentsInTrash().contains(segment));
zkCoordinator.stop();
}
@Test
public void testLoadCache() throws Exception
{
List<DataSegment> segments = Lists.newLinkedList();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-03")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-04")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T01")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T03")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T05")));
segments.add(makeSegment("test" + i, "2", Intervals.of("PT1h/2011-04-04T06")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
Collections.sort(segments);
for (DataSegment segment : segments) {
writeSegmentToCache(segment);
}
checkCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
zkCoordinator.start();
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(11L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
}
Assert.assertEquals(13 * COUNT, announceCount.get());
zkCoordinator.stop();
for (DataSegment segment : segments) {
deleteSegmentFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
Assert.assertTrue(infoDir.delete());
}
private DataSegment makeSegment(String dataSource, String version, Interval interval)
{
return new DataSegment(
dataSource,
interval,
version,
ImmutableMap.<String, Object>of("version", version, "interval", interval, "cacheDir", infoDir),
EmittingLogger.registerEmitter(new NoopServiceEmitter());
DataSegment segment = new DataSegment(
"test",
Intervals.of("P1d/2011-04-02"),
"v0",
ImmutableMap.<String, Object>of("version", "v0", "interval", Intervals.of("P1d/2011-04-02"), "cacheDir", "/no"),
Arrays.asList("dim1", "dim2", "dim3"),
Arrays.asList("metric1", "metric2"),
NoneShardSpec.instance(),
IndexIO.CURRENT_VERSION_ID,
123L
);
}
private void writeSegmentToCache(final DataSegment segment) throws IOException
{
if (!infoDir.exists()) {
infoDir.mkdir();
}
File segmentInfoCacheFile = new File(
infoDir,
segment.getIdentifier()
);
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
}
catch (IOException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(segmentInfoCacheFile.exists());
}
private void deleteSegmentFromCache(final DataSegment segment) throws IOException
{
File segmentInfoCacheFile = new File(
infoDir,
segment.getIdentifier()
);
if (segmentInfoCacheFile.exists()) {
segmentInfoCacheFile.delete();
}
Assert.assertTrue(!segmentInfoCacheFile.exists());
}
private void checkCache(List<DataSegment> segments) throws IOException
{
Assert.assertTrue(infoDir.exists());
File[] files = infoDir.listFiles();
List<File> sortedFiles = Lists.newArrayList(files);
Collections.sort(sortedFiles);
Assert.assertEquals(segments.size(), sortedFiles.size());
for (int i = 0; i < sortedFiles.size(); i++) {
DataSegment segment = jsonMapper.readValue(sortedFiles.get(i), DataSegment.class);
Assert.assertEquals(segments.get(i), segment);
}
}
@Test
public void testInjector() throws Exception
{
Injector injector = Guice.createInjector(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(ObjectMapper.class).toInstance(jsonMapper);
binder.bind(SegmentLoaderConfig.class).toInstance(
new SegmentLoaderConfig()
{
@Override
public File getInfoDir()
{
return infoDir;
}
@Override
public int getAnnounceIntervalMillis()
{
return 50;
}
}
);
binder.bind(ZkPathsConfig.class).toInstance(
new ZkPathsConfig()
{
@Override
public String getBase()
{
return "/druid";
}
}
);
binder.bind(DruidServerMetadata.class)
.toInstance(new DruidServerMetadata("dummyServer", "dummyHost", null, 0, ServerType.HISTORICAL, "normal", 0));
binder.bind(DataSegmentAnnouncer.class).toInstance(announcer);
binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
binder.bind(CuratorFramework.class).toInstance(curator);
binder.bind(ServerManager.class).toInstance(serverManager);
binder.bind(SegmentManager.class).toInstance(segmentManager);
binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle()));
}
CountDownLatch loadLatch = new CountDownLatch(1);
CountDownLatch dropLatch = new CountDownLatch(1);
SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler(
TestUtil.MAPPER,
new SegmentLoaderConfig(),
EasyMock.createNiceMock(DataSegmentAnnouncer.class),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
EasyMock.createNiceMock(ScheduledExecutorService.class)
)
{
@Override
public void addSegment(DataSegment s, DataSegmentChangeCallback callback)
{
if (segment.getIdentifier().equals(s.getIdentifier())) {
loadLatch.countDown();
callback.execute();
}
}
@Override
public void removeSegment(DataSegment s, DataSegmentChangeCallback callback)
{
if (segment.getIdentifier().equals(s.getIdentifier())) {
dropLatch.countDown();
callback.execute();
}
}
};
zkCoordinator = new ZkCoordinator(
segmentLoadDropHandler,
jsonMapper,
zkPaths,
me,
curator
);
ZkCoordinator zkCoordinator = injector.getInstance(ZkCoordinator.class);
List<DataSegment> segments = Lists.newLinkedList();
for (int i = 0; i < COUNT; ++i) {
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test" + i, "2", Intervals.of("P1d/2011-04-02")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
}
Collections.sort(segments);
for (DataSegment segment : segments) {
writeSegmentToCache(segment);
}
checkCache(segments);
Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
zkCoordinator.start();
Assert.assertTrue(!segmentManager.getDataSourceCounts().isEmpty());
for (int i = 0; i < COUNT; ++i) {
Assert.assertEquals(3L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
String segmentZkPath = ZKPaths.makePath(zkPaths.getLoadQueuePath(), me.getName(), segment.getIdentifier());
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestLoad(segment)));
loadLatch.await();
while (curator.checkExists().forPath(segmentZkPath) != null) {
Thread.sleep(100);
}
Assert.assertEquals(5 * COUNT, announceCount.get());
curator.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
segmentZkPath, jsonMapper.writeValueAsBytes(new SegmentChangeRequestDrop(segment)));
dropLatch.await();
while (curator.checkExists().forPath(segmentZkPath) != null) {
Thread.sleep(100);
}
zkCoordinator.stop();
for (DataSegment segment : segments) {
deleteSegmentFromCache(segment);
}
Assert.assertEquals(0, infoDir.listFiles().length);
Assert.assertTrue(infoDir.delete());
}
}

View File

@ -141,7 +141,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
true,
Execs.singleThreaded("coordinator_test_path_children_cache-%d")
);
loadQueuePeon = new LoadQueuePeon(
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOADPATH,
objectMapper,

View File

@ -0,0 +1,209 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.HttpResponseHandler;
import io.druid.TestUtil;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.timeline.DataSegment;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class HttpLoadQueuePeonTest
{
@Test(timeout = 10000)
public void testSimple() throws Exception
{
final DataSegment segment1 = new DataSegment(
"test1", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment2 = new DataSegment(
"test2", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment3 = new DataSegment(
"test3", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
final DataSegment segment4 = new DataSegment(
"test4", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
TestUtil.MAPPER,
new TestHttpClient(),
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) {
@Override
public int getHttpLoadQueuePeonBatchSize()
{
return 2;
}
},
Executors.newScheduledThreadPool(
2,
Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
),
Execs.singleThreaded("HttpLoadQueuePeonTest")
);
httpLoadQueuePeon.start();
Map<String, CountDownLatch> latches = ImmutableMap.of(
segment1.getIdentifier(), new CountDownLatch(1),
segment2.getIdentifier(), new CountDownLatch(1),
segment3.getIdentifier(), new CountDownLatch(1),
segment4.getIdentifier(), new CountDownLatch(1)
);
httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment1.getIdentifier()).countDown();
}
});
httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment2.getIdentifier()).countDown();
}
});
httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment3.getIdentifier()).countDown();
}
});
httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment4.getIdentifier()).countDown();
}
});
latches.get(segment1.getIdentifier()).await();
latches.get(segment2.getIdentifier()).await();
latches.get(segment3.getIdentifier()).await();
latches.get(segment4.getIdentifier()).await();
httpLoadQueuePeon.stop();
}
private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
{
Listener listener;
@Override
public Collection<DiscoveryDruidNode> getAllNodes()
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public void registerListener(Listener listener)
{
listener.nodesAdded(ImmutableList.of());
this.listener = listener;
}
}
private static class TestHttpClient implements HttpClient
{
AtomicInteger requestNum = new AtomicInteger(0);
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler
)
{
throw new UnsupportedOperationException("Not Implemented.");
}
@Override
public <Intermediate, Final> ListenableFuture<Final> go(
Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration
)
{
HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
httpResponse.setContent(ChannelBuffers.buffer(0));
httpResponseHandler.handleResponse(httpResponse);
try {
List<DataSegmentChangeRequest> changeRequests = TestUtil.MAPPER.readValue(
request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() {}
);
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS));
}
return (ListenableFuture) Futures.immediateFuture(new ByteArrayInputStream(TestUtil.MAPPER.writerWithType(
HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF).writeValueAsBytes(statuses)));
}
catch (Exception ex) {
throw new RE(ex, "Unexpected exception.");
}
}
}
}

View File

@ -82,7 +82,7 @@ public class LoadQueuePeonTest extends CuratorTestBase
final AtomicInteger requestSignalIdx = new AtomicInteger(0);
final AtomicInteger segmentSignalIdx = new AtomicInteger(0);
loadQueuePeon = new LoadQueuePeon(
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,
@ -289,7 +289,7 @@ public class LoadQueuePeonTest extends CuratorTestBase
final CountDownLatch loadRequestSignal = new CountDownLatch(1);
final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
loadQueuePeon = new LoadQueuePeon(
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,

View File

@ -23,7 +23,7 @@ import io.druid.timeline.DataSegment;
import java.util.concurrent.ConcurrentSkipListSet;
public class LoadQueuePeonTester extends LoadQueuePeon
public class LoadQueuePeonTester extends CuratorLoadQueuePeon
{
private final ConcurrentSkipListSet<DataSegment> segmentsToLoad = new ConcurrentSkipListSet<DataSegment>();

View File

@ -29,7 +29,7 @@ import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidServerConfig;
import io.druid.java.util.common.Intervals;
import io.druid.server.SegmentManager;
import io.druid.server.coordination.ZkCoordinator;
import io.druid.server.coordination.SegmentLoadDropHandler;
import io.druid.timeline.DataSegment;
import org.easymock.Capture;
import org.easymock.CaptureType;
@ -48,7 +48,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
{
private DruidServerConfig druidServerConfig;
private SegmentManager segmentManager;
private ZkCoordinator zkCoordinator;
private SegmentLoadDropHandler segmentLoadDropMgr;
private ServiceEmitter serviceEmitter;
@Before
@ -56,7 +56,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
{
druidServerConfig = EasyMock.createStrictMock(DruidServerConfig.class);
segmentManager = EasyMock.createStrictMock(SegmentManager.class);
zkCoordinator = EasyMock.createStrictMock(ZkCoordinator.class);
segmentLoadDropMgr = EasyMock.createStrictMock(SegmentLoadDropHandler.class);
serviceEmitter = EasyMock.createStrictMock(ServiceEmitter.class);
}
@ -81,7 +81,7 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
final String tier = "tier";
EasyMock.expect(druidServerConfig.getMaxSize()).andReturn(maxSize).once();
EasyMock.expect(zkCoordinator.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once();
EasyMock.expect(segmentLoadDropMgr.getPendingDeleteSnapshot()).andReturn(ImmutableList.of(dataSegment)).once();
EasyMock.expect(druidServerConfig.getTier()).andReturn(tier).once();
EasyMock.expect(druidServerConfig.getPriority()).andReturn(priority).once();
EasyMock.expect(segmentManager.getDataSourceSizes()).andReturn(ImmutableMap.of(dataSource, size));
@ -95,16 +95,16 @@ public class HistoricalMetricsMonitorTest extends EasyMockSupport
final HistoricalMetricsMonitor monitor = new HistoricalMetricsMonitor(
druidServerConfig,
segmentManager,
zkCoordinator
segmentLoadDropMgr
);
final Capture<ServiceEventBuilder<ServiceMetricEvent>> eventCapture = EasyMock.newCapture(CaptureType.ALL);
serviceEmitter.emit(EasyMock.capture(eventCapture));
EasyMock.expectLastCall().times(5);
EasyMock.replay(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter);
EasyMock.replay(druidServerConfig, segmentManager, segmentLoadDropMgr, serviceEmitter);
monitor.doMonitor(serviceEmitter);
EasyMock.verify(druidServerConfig, segmentManager, zkCoordinator, serviceEmitter);
EasyMock.verify(druidServerConfig, segmentManager, segmentLoadDropMgr, serviceEmitter);
final String host = "host";
final String service = "service";

View File

@ -28,6 +28,7 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
import com.metamx.http.client.HttpClient;
import io.airlift.airline.Command;
import io.druid.audit.AuditManager;
import io.druid.client.CoordinatorServerView;
@ -42,6 +43,7 @@ import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import io.druid.guice.annotations.Global;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.MetadataRuleManager;
@ -74,6 +76,7 @@ import io.druid.server.http.RedirectInfo;
import io.druid.server.http.RulesResource;
import io.druid.server.http.ServersResource;
import io.druid.server.http.TiersResource;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
@ -221,12 +224,14 @@ public class CliCoordinator extends ServerRunnable
CuratorFramework curator,
ObjectMapper jsonMapper,
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config
DruidCoordinatorConfig config,
@Global HttpClient httpClient,
ZkPathsConfig zkPaths
)
{
return new LoadQueueTaskMaster(
curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"),
Executors.newSingleThreadExecutor(), config
Executors.newSingleThreadExecutor(), config, httpClient, zkPaths
);
}
}

View File

@ -51,6 +51,7 @@ import io.druid.guice.QueryRunnerFactoryModule;
import io.druid.guice.QueryableModule;
import io.druid.guice.QueryablePeonModule;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
import io.druid.indexing.common.TaskToolboxFactory;
@ -87,6 +88,7 @@ import io.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.BatchDataSegmentAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.server.http.SegmentListerResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
@ -94,6 +96,7 @@ import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import org.eclipse.jetty.server.Server;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@ -266,6 +269,21 @@ public class CliPeon extends GuiceRunnable
{
return task.getId();
}
@Provides
public SegmentListerResource getSegmentListerResource(
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
@Nullable BatchDataSegmentAnnouncer announcer
)
{
return new SegmentListerResource(
jsonMapper,
smileMapper,
announcer,
null
);
}
},
new QueryablePeonModule(),
new IndexingServiceFirehoseModule(),