background announce segment cache

This commit is contained in:
Xavier Léauté 2014-09-18 12:01:28 -07:00
parent a4777ede94
commit 32b6135f3d
2 changed files with 140 additions and 12 deletions

View File

@ -40,6 +40,9 @@ public class SegmentLoaderConfig
@JsonProperty("dropSegmentDelayMillis")
private int dropSegmentDelayMillis = 30 * 1000; // 30 seconds
@JsonProperty("announceIntervalMillis")
private int announceIntervalMillis = 5 * 1000; // 5 seconds
@JsonProperty
private File infoDir = null;
@ -58,6 +61,11 @@ public class SegmentLoaderConfig
return dropSegmentDelayMillis;
}
public int getAnnounceIntervalMillis()
{
return announceIntervalMillis;
}
public File getInfoDir()
{
if (infoDir == null) {

View File

@ -21,7 +21,10 @@ package io.druid.server.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
import io.druid.segment.loading.SegmentLoaderConfig;
@ -33,7 +36,10 @@ import org.apache.curator.framework.CuratorFramework;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
@ -170,10 +176,11 @@ public class ZkCoordinator extends BaseZkCoordinator
public void addSegments(Iterable<DataSegment> segments, DataSegmentChangeCallback callback)
{
try {
final List<String> segmentFailures = Lists.newArrayList();
final List<DataSegment> validSegments = Lists.newArrayList();
try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final List<String> segmentFailures = Lists.newArrayList();
for (DataSegment segment : segments) {
log.info("Loading segment %s", segment.getIdentifier());
@ -202,15 +209,12 @@ public class ZkCoordinator extends BaseZkCoordinator
}
}
validSegments.add(segment);
}
}
try {
announcer.announceSegments(validSegments);
backgroundSegmentAnnouncer.announceSegment(segment);
} catch(InterruptedException e) {
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
catch (IOException e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", segments);
}
if (!segmentFailures.isEmpty()) {
@ -219,9 +223,11 @@ public class ZkCoordinator extends BaseZkCoordinator
}
throw new SegmentLoadingException("%,d errors seen while loading segments", segmentFailures.size());
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
log.makeAlert(e, "Failed to load segments for dataSource")
log.makeAlert(e, "Failed to load segments")
.addData("segments", segments)
.emit();
}
@ -272,4 +278,118 @@ public class ZkCoordinator extends BaseZkCoordinator
callback.execute();
}
}
private static class BackgroundSegmentAnnouncer implements AutoCloseable {
private static final EmittingLogger log = new EmittingLogger(BackgroundSegmentAnnouncer.class);
private final int intervalMillis;
private final DataSegmentAnnouncer announcer;
private final ScheduledExecutorService exec;
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private volatile boolean finished = false;
private volatile ScheduledFuture startedAnnouncing = null;
private volatile ScheduledFuture nextAnnoucement = null;
private BackgroundSegmentAnnouncer(
DataSegmentAnnouncer announcer,
ScheduledExecutorService exec,
int intervalMillis
)
{
this.announcer = announcer;
this.exec = exec;
this.intervalMillis = intervalMillis;
this.queue = Queues.newLinkedBlockingQueue();
this.doneAnnouncing = SettableFuture.create();
}
public void announceSegment(final DataSegment segment) throws InterruptedException
{
if (finished) {
throw new ISE("Announce segment called after finishAnnouncing");
}
queue.put(segment);
}
public void startAnnouncing()
{
if (intervalMillis <= 0) {
return;
}
log.info("Starting background segment announcing task");
// schedule background announcing task
nextAnnoucement = startedAnnouncing = exec.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (!(finished && queue.isEmpty())) {
List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
nextAnnoucement = exec.schedule(this, intervalMillis, TimeUnit.MILLISECONDS);
}
catch (IOException e) {
doneAnnouncing.setException(
new SegmentLoadingException(e, "Failed to announce segments[%s]", segments)
);
}
} else {
doneAnnouncing.set(true);
}
}
catch (Exception e) {
doneAnnouncing.setException(e);
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
);
}
public void finishAnnouncing() throws SegmentLoadingException
{
finished = true;
if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) {
log.info("Waiting for background segment announcing task to complete");
// background announcing already started, wait for it to complete
try {
doneAnnouncing.get();
}
catch (InterruptedException e) {
throw new SegmentLoadingException(e, "Loading Interrupted");
}
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
} else {
// background task has not started yet, announcing immediately
try {
announcer.announceSegments(queue);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
}
log.info("Completed background segment announcing");
}
@Override
public void close()
{
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
}
}
}
}