Remove published segments immediately (#4530)

This commit is contained in:
Jihoon Son 2017-07-12 02:09:32 +09:00 committed by Fangjin Yang
parent c5d5263c10
commit 98b1385bcd
2 changed files with 31 additions and 42 deletions

View File

@ -20,6 +20,7 @@
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -47,7 +48,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@ -129,6 +129,18 @@ public class AppenderatorDriver implements Closeable
this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d"));
}
@VisibleForTesting
Map<String, NavigableMap<Long, SegmentIdentifier>> getActiveSegments()
{
return activeSegments;
}
@VisibleForTesting
Map<String, List<SegmentIdentifier>> getPublishPendingSegments()
{
return publishPendingSegments;
}
/**
* Perform any initial setup and return currently persisted commit metadata.
*
@ -451,11 +463,17 @@ public class AppenderatorDriver implements Closeable
final Committer committer
)
{
final List<String> sequenceNames;
final List<SegmentIdentifier> theSegments;
synchronized (activeSegments) {
sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet());
final List<String> sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet());
theSegments = sequenceNames.stream()
.map(publishPendingSegments::remove)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
sequenceNames.forEach(activeSegments::remove);
}
return publish(publisher, committer, sequenceNames);
return publish(publisher, wrapCommitter(committer), theSegments);
}
/**
@ -478,49 +496,14 @@ public class AppenderatorDriver implements Closeable
final List<SegmentIdentifier> theSegments;
synchronized (activeSegments) {
theSegments = sequenceNames.stream()
.map(publishPendingSegments::get)
.map(publishPendingSegments::remove)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
sequenceNames.forEach(activeSegments::remove);
}
final ListenableFuture<SegmentsAndMetadata> publishFuture = publish(
publisher,
wrapCommitter(committer),
theSegments
);
Futures.addCallback(
publishFuture,
new FutureCallback<SegmentsAndMetadata>()
{
@Override
public void onSuccess(@Nullable SegmentsAndMetadata result)
{
if (result != null) {
synchronized (activeSegments) {
// Remove sequenceName from both publishPendingSemgments and activeSegments
sequenceNames.forEach(
sequenceName -> {
activeSegments.remove(sequenceName);
publishPendingSegments.remove(sequenceName);
}
);
}
}
}
@Override
public void onFailure(Throwable t)
{
// The throwable is propagated anyway when get() is called on the future.
// See FiniteAppenderatorFailTest.testInterruptDuringPush().
log.error(t, "Failed to publish segments[%s]", theSegments);
}
}
);
return publishFuture;
return publish(publisher, wrapCommitter(committer), theSegments);
}
/**

View File

@ -138,6 +138,8 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
@ -183,6 +185,8 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published)
.get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size());
@ -207,6 +211,8 @@ public class AppenderatorDriverTest
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
Assert.assertFalse(driver.getActiveSegments().containsKey("dummy"));
Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy"));
driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS);
}