mirror of https://github.com/apache/druid.git
RealtimeIndexTask: Fix a couple of problems with restoring.
- Shedding locks at startup is bad, we actually want to keep them. Stop doing that. - stopGracefully now interrupts the run thread if had started running finishJob. This avoids waiting for handoff unnecessarily.
This commit is contained in:
parent
f4ce2b9bc5
commit
32edd1538d
|
@ -115,7 +115,13 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
private volatile Firehose firehose = null;
|
private volatile Firehose firehose = null;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private volatile boolean stopped = false;
|
private volatile boolean gracefullyStopped = false;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private volatile boolean finishingJob = false;
|
||||||
|
|
||||||
|
@JsonIgnore
|
||||||
|
private volatile Thread runThread = null;
|
||||||
|
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
|
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;
|
||||||
|
@ -172,16 +178,12 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
@Override
|
@Override
|
||||||
public TaskStatus run(final TaskToolbox toolbox) throws Exception
|
public TaskStatus run(final TaskToolbox toolbox) throws Exception
|
||||||
{
|
{
|
||||||
|
runThread = Thread.currentThread();
|
||||||
|
|
||||||
if (this.plumber != null) {
|
if (this.plumber != null) {
|
||||||
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
|
throw new IllegalStateException("WTF?!? run with non-null plumber??!");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire
|
|
||||||
// them if we actually need them
|
|
||||||
for (final TaskLock taskLock : getTaskLocks(toolbox)) {
|
|
||||||
toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval()));
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean normalExit = true;
|
boolean normalExit = true;
|
||||||
|
|
||||||
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
|
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
|
||||||
|
@ -313,7 +315,7 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
committerSupplier = Committers.supplierFromFirehose(firehose);
|
committerSupplier = Committers.supplierFromFirehose(firehose);
|
||||||
|
|
||||||
// Time to read data!
|
// Time to read data!
|
||||||
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
|
while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
|
||||||
final InputRow inputRow;
|
final InputRow inputRow;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -350,39 +352,55 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
finally {
|
finally {
|
||||||
if (normalExit) {
|
if (normalExit) {
|
||||||
try {
|
try {
|
||||||
if (!stopped) {
|
// Always want to persist.
|
||||||
// Hand off all pending data
|
log.info("Persisting remaining data.");
|
||||||
log.info("Persisting and handing off pending data.");
|
|
||||||
plumber.persist(committerSupplier.get());
|
|
||||||
plumber.finishJob();
|
|
||||||
} else {
|
|
||||||
log.info("Persisting pending data without handoff, in preparation for restart.");
|
|
||||||
final Committer committer = committerSupplier.get();
|
|
||||||
final CountDownLatch persistLatch = new CountDownLatch(1);
|
|
||||||
plumber.persist(
|
|
||||||
new Committer()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public Object getMetadata()
|
|
||||||
{
|
|
||||||
return committer.getMetadata();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
final Committer committer = committerSupplier.get();
|
||||||
public void run()
|
final CountDownLatch persistLatch = new CountDownLatch(1);
|
||||||
{
|
plumber.persist(
|
||||||
try {
|
new Committer()
|
||||||
committer.run();
|
{
|
||||||
}
|
@Override
|
||||||
finally {
|
public Object getMetadata()
|
||||||
persistLatch.countDown();
|
{
|
||||||
}
|
return committer.getMetadata();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
committer.run();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
persistLatch.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
}
|
||||||
persistLatch.await();
|
);
|
||||||
|
persistLatch.await();
|
||||||
|
|
||||||
|
if (gracefullyStopped) {
|
||||||
|
log.info("Gracefully stopping.");
|
||||||
|
} else {
|
||||||
|
log.info("Finishing the job.");
|
||||||
|
synchronized (this) {
|
||||||
|
if (gracefullyStopped) {
|
||||||
|
// Someone called stopGracefully after we checked the flag. That's okay, just stop now.
|
||||||
|
log.info("Gracefully stopping.");
|
||||||
|
} else {
|
||||||
|
finishingJob = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (finishingJob) {
|
||||||
|
plumber.finishJob();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.debug(e, "Interrupted while finishing the job");
|
||||||
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.makeAlert(e, "Failed to finish realtime task").emit();
|
log.makeAlert(e, "Failed to finish realtime task").emit();
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -410,13 +428,17 @@ public class RealtimeIndexTask extends AbstractTask
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (!stopped) {
|
if (!gracefullyStopped) {
|
||||||
stopped = true;
|
gracefullyStopped = true;
|
||||||
log.info("Gracefully stopping.");
|
if (finishingJob) {
|
||||||
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
log.info("stopGracefully: Interrupting finishJob.");
|
||||||
|
runThread.interrupt();
|
||||||
|
} else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
|
||||||
|
log.info("stopGracefully: Draining firehose.");
|
||||||
firehose.close();
|
firehose.close();
|
||||||
} else {
|
} else {
|
||||||
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
|
log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread.");
|
||||||
|
runThread.interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ import com.google.api.client.util.Charsets;
|
||||||
import com.google.api.client.util.Sets;
|
import com.google.api.client.util.Sets;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
@ -63,6 +65,7 @@ import io.druid.indexing.test.TestDataSegmentKiller;
|
||||||
import io.druid.indexing.test.TestDataSegmentPusher;
|
import io.druid.indexing.test.TestDataSegmentPusher;
|
||||||
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
|
import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
|
||||||
import io.druid.jackson.DefaultObjectMapper;
|
import io.druid.jackson.DefaultObjectMapper;
|
||||||
|
import io.druid.metadata.EntryExistsException;
|
||||||
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
|
||||||
import io.druid.query.Druids;
|
import io.druid.query.Druids;
|
||||||
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
||||||
|
@ -96,6 +99,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
|
||||||
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
|
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
|
||||||
import io.druid.server.coordination.DruidServerMetadata;
|
import io.druid.server.coordination.DruidServerMetadata;
|
||||||
import io.druid.server.metrics.EventReceiverFirehoseRegister;
|
import io.druid.server.metrics.EventReceiverFirehoseRegister;
|
||||||
|
import io.druid.timeline.DataSegment;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Period;
|
import org.joda.time.Period;
|
||||||
|
@ -175,6 +179,7 @@ public class RealtimeIndexTaskTest
|
||||||
final RealtimeIndexTask task = makeRealtimeTask(null);
|
final RealtimeIndexTask task = makeRealtimeTask(null);
|
||||||
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
|
final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder());
|
||||||
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
|
final ListenableFuture<TaskStatus> statusFuture = runTask(task, taskToolbox);
|
||||||
|
final DataSegment publishedSegment;
|
||||||
|
|
||||||
// Wait for firehose to show up, it starts off null.
|
// Wait for firehose to show up, it starts off null.
|
||||||
while (task.getFirehose() == null) {
|
while (task.getFirehose() == null) {
|
||||||
|
@ -207,11 +212,22 @@ public class RealtimeIndexTaskTest
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
|
||||||
|
|
||||||
// Do a query.
|
// Do a query.
|
||||||
Assert.assertEquals(2, countEvents(task));
|
Assert.assertEquals(2, countEvents(task));
|
||||||
|
|
||||||
// Simulate handoff.
|
// Simulate handoff.
|
||||||
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
|
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
|
||||||
|
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
|
||||||
|
Assert.assertEquals(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
publishedSegment.getInterval(),
|
||||||
|
publishedSegment.getVersion(),
|
||||||
|
publishedSegment.getShardSpec().getPartitionNum()
|
||||||
|
),
|
||||||
|
entry.getKey()
|
||||||
|
);
|
||||||
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
|
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
|
||||||
}
|
}
|
||||||
handOffCallbacks.clear();
|
handOffCallbacks.clear();
|
||||||
|
@ -226,6 +242,7 @@ public class RealtimeIndexTaskTest
|
||||||
{
|
{
|
||||||
final File directory = tempFolder.newFolder();
|
final File directory = tempFolder.newFolder();
|
||||||
final RealtimeIndexTask task1 = makeRealtimeTask(null);
|
final RealtimeIndexTask task1 = makeRealtimeTask(null);
|
||||||
|
final DataSegment publishedSegment;
|
||||||
|
|
||||||
// First run:
|
// First run:
|
||||||
{
|
{
|
||||||
|
@ -298,11 +315,123 @@ public class RealtimeIndexTaskTest
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
|
||||||
|
|
||||||
// Do a query.
|
// Do a query.
|
||||||
Assert.assertEquals(2, countEvents(task2));
|
Assert.assertEquals(2, countEvents(task2));
|
||||||
|
|
||||||
// Simulate handoff.
|
// Simulate handoff.
|
||||||
for(Pair<Executor, Runnable> executorRunnablePair : handOffCallbacks.values()){
|
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
|
||||||
|
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
|
||||||
|
Assert.assertEquals(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
publishedSegment.getInterval(),
|
||||||
|
publishedSegment.getVersion(),
|
||||||
|
publishedSegment.getShardSpec().getPartitionNum()
|
||||||
|
),
|
||||||
|
entry.getKey()
|
||||||
|
);
|
||||||
|
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
|
||||||
|
}
|
||||||
|
handOffCallbacks.clear();
|
||||||
|
|
||||||
|
// Wait for the task to finish.
|
||||||
|
final TaskStatus taskStatus = statusFuture.get();
|
||||||
|
Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000L)
|
||||||
|
public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception
|
||||||
|
{
|
||||||
|
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
|
||||||
|
final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator();
|
||||||
|
final File directory = tempFolder.newFolder();
|
||||||
|
final RealtimeIndexTask task1 = makeRealtimeTask(null);
|
||||||
|
final DataSegment publishedSegment;
|
||||||
|
|
||||||
|
// First run:
|
||||||
|
{
|
||||||
|
final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory);
|
||||||
|
final ListenableFuture<TaskStatus> statusFuture = runTask(task1, taskToolbox);
|
||||||
|
|
||||||
|
// Wait for firehose to show up, it starts off null.
|
||||||
|
while (task1.getFirehose() == null) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
|
||||||
|
(EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose();
|
||||||
|
|
||||||
|
firehose.addRows(
|
||||||
|
ImmutableList.<InputRow>of(
|
||||||
|
new MapBasedInputRow(
|
||||||
|
now,
|
||||||
|
ImmutableList.of("dim1"),
|
||||||
|
ImmutableMap.<String, Object>of("dim1", "foo")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Stop the firehose, this will trigger a finishJob.
|
||||||
|
firehose.close();
|
||||||
|
|
||||||
|
// Wait for publish.
|
||||||
|
while (mdc.getPublished().isEmpty()) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
publishedSegment = Iterables.getOnlyElement(mdc.getPublished());
|
||||||
|
|
||||||
|
// Do a query.
|
||||||
|
Assert.assertEquals(1, countEvents(task1));
|
||||||
|
|
||||||
|
// Trigger graceful shutdown.
|
||||||
|
task1.stopGracefully();
|
||||||
|
|
||||||
|
// Wait for the task to finish. The status doesn't really matter.
|
||||||
|
while (!statusFuture.isDone()) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second run:
|
||||||
|
{
|
||||||
|
final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId());
|
||||||
|
final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory);
|
||||||
|
final ListenableFuture<TaskStatus> statusFuture = runTask(task2, taskToolbox);
|
||||||
|
|
||||||
|
// Wait for firehose to show up, it starts off null.
|
||||||
|
while (task2.getFirehose() == null) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the firehose again, this will start another handoff.
|
||||||
|
final EventReceiverFirehoseFactory.EventReceiverFirehose firehose =
|
||||||
|
(EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose();
|
||||||
|
|
||||||
|
// Stop the firehose, this will trigger a finishJob.
|
||||||
|
firehose.close();
|
||||||
|
|
||||||
|
// publishedSegment is still published. No reason it shouldn't be.
|
||||||
|
Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished());
|
||||||
|
|
||||||
|
// Wait for a handoffCallback to show up.
|
||||||
|
while (handOffCallbacks.isEmpty()) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate handoff.
|
||||||
|
for (Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry : handOffCallbacks.entrySet()) {
|
||||||
|
final Pair<Executor, Runnable> executorRunnablePair = entry.getValue();
|
||||||
|
Assert.assertEquals(
|
||||||
|
new SegmentDescriptor(
|
||||||
|
publishedSegment.getInterval(),
|
||||||
|
publishedSegment.getVersion(),
|
||||||
|
publishedSegment.getShardSpec().getPartitionNum()
|
||||||
|
),
|
||||||
|
entry.getKey()
|
||||||
|
);
|
||||||
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
|
executorRunnablePair.lhs.execute(executorRunnablePair.rhs);
|
||||||
}
|
}
|
||||||
handOffCallbacks.clear();
|
handOffCallbacks.clear();
|
||||||
|
@ -452,11 +581,36 @@ public class RealtimeIndexTaskTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory)
|
private TaskToolbox makeToolbox(
|
||||||
|
final Task task,
|
||||||
|
final IndexerMetadataStorageCoordinator mdc,
|
||||||
|
final File directory
|
||||||
|
)
|
||||||
|
{
|
||||||
|
return makeToolbox(
|
||||||
|
task,
|
||||||
|
new HeapMemoryTaskStorage(new TaskStorageConfig(null)),
|
||||||
|
mdc,
|
||||||
|
directory
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TaskToolbox makeToolbox(
|
||||||
|
final Task task,
|
||||||
|
final TaskStorage taskStorage,
|
||||||
|
final IndexerMetadataStorageCoordinator mdc,
|
||||||
|
final File directory
|
||||||
|
)
|
||||||
{
|
{
|
||||||
final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
|
|
||||||
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
|
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
|
||||||
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
|
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
|
||||||
|
try {
|
||||||
|
taskStorage.insert(task, TaskStatus.running(task.getId()));
|
||||||
|
}
|
||||||
|
catch (EntryExistsException e) {
|
||||||
|
// suppress
|
||||||
|
}
|
||||||
|
taskLockbox.syncFromStorage();
|
||||||
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
|
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
|
||||||
taskLockbox,
|
taskLockbox,
|
||||||
mdc,
|
mdc,
|
||||||
|
@ -564,7 +718,6 @@ public class RealtimeIndexTaskTest
|
||||||
new CacheConfig()
|
new CacheConfig()
|
||||||
);
|
);
|
||||||
|
|
||||||
taskLockbox.add(task);
|
|
||||||
return toolboxFactory.build(task);
|
return toolboxFactory.build(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue