fix background annoucing race condition

This commit is contained in:
Xavier Léauté 2014-09-23 14:55:22 -07:00
parent d7b39fa7ea
commit 05d4f71ddc
1 changed files with 53 additions and 38 deletions

View File

@ -187,15 +187,15 @@ public class ZkCoordinator extends BaseZkCoordinator
new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
backgroundSegmentAnnouncer.startAnnouncing();
final List<ListenableFuture<Boolean>> segmentLoading = Lists.newArrayList();
final List<ListenableFuture> segmentLoading = Lists.newArrayList();
for (final DataSegment segment : segments) {
segmentLoading.add(
getLoadingExecutor().submit(
new Callable<Boolean>()
new Callable<Void>()
{
@Override
public Boolean call() throws SegmentLoadingException
public Void call() throws SegmentLoadingException
{
try {
log.info("Loading segment %s", segment.getIdentifier());
@ -209,7 +209,7 @@ public class ZkCoordinator extends BaseZkCoordinator
throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
return loaded;
return null;
} catch(SegmentLoadingException e) {
log.error(e, "[%s] failed to load", segment.getIdentifier());
throw e;
@ -299,6 +299,8 @@ public class ZkCoordinator extends BaseZkCoordinator
private final LinkedBlockingQueue<DataSegment> queue;
private final SettableFuture<Boolean> doneAnnouncing;
private final Object lock = new Object();
private volatile boolean finished = false;
private volatile ScheduledFuture startedAnnouncing = null;
private volatile ScheduledFuture nextAnnoucement = null;
@ -339,9 +341,10 @@ public class ZkCoordinator extends BaseZkCoordinator
@Override
public void run()
{
synchronized (lock) {
try {
if (!(finished && queue.isEmpty())) {
List<DataSegment> segments = Lists.newLinkedList();
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
try {
announcer.announceSegments(segments);
@ -360,6 +363,7 @@ public class ZkCoordinator extends BaseZkCoordinator
doneAnnouncing.setException(e);
}
}
}
},
intervalMillis,
TimeUnit.MILLISECONDS
@ -368,14 +372,30 @@ public class ZkCoordinator extends BaseZkCoordinator
public void finishAnnouncing() throws SegmentLoadingException
{
synchronized (lock) {
finished = true;
if (startedAnnouncing != null && (startedAnnouncing.isDone() || !startedAnnouncing.cancel(false))) {
log.info("Waiting for background segment announcing task to complete");
// background announcing already started, wait for it to complete
// announce any remaining segments
try {
final List<DataSegment> segments = Lists.newLinkedList();
queue.drainTo(segments);
announcer.announceSegments(segments);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
// get any exception that may have been thrown in background annoucing
try {
// check in case we did not call startAnnouncing
if (startedAnnouncing != null) {
startedAnnouncing.cancel(false);
}
// - if the task is waiting on the lock, then the queue will be empty by the time it runs
// - if the task just released it, then the lock ensures any exception is set in doneAnnouncing
if (doneAnnouncing.isDone()) {
doneAnnouncing.get();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SegmentLoadingException(e, "Loading Interrupted");
@ -383,14 +403,6 @@ public class ZkCoordinator extends BaseZkCoordinator
catch (ExecutionException e) {
throw new SegmentLoadingException(e.getCause(), "Background Announcing Task Failed");
}
} else {
// background task has not started yet, announcing immediately
try {
announcer.announceSegments(queue);
}
catch (Exception e) {
throw new SegmentLoadingException(e, "Failed to announce segments[%s]", queue);
}
}
log.info("Completed background segment announcing");
}
@ -398,6 +410,8 @@ public class ZkCoordinator extends BaseZkCoordinator
@Override
public void close()
{
// stop background scheduling
synchronized (lock) {
finished = true;
if (nextAnnoucement != null) {
nextAnnoucement.cancel(false);
@ -405,3 +419,4 @@ public class ZkCoordinator extends BaseZkCoordinator
}
}
}
}