Simple improvement for realtime manager (#4377)

* Simple improvement for realtime manager

* Address comments

* tmp

* Address comments and add more tests

* Add catch for InterruptedException

* Address comments
This commit is contained in:
Jihoon Son 2017-06-13 07:25:34 +09:00 committed by Fangjin Yang
parent 113b8007b7
commit 073695d311
2 changed files with 306 additions and 225 deletions

View File

@ -22,6 +22,7 @@ package io.druid.segment.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
@ -29,11 +30,13 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.FinalizeResultsQueryRunner;
@ -54,11 +57,12 @@ import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
*/
@ -75,6 +79,9 @@ public class RealtimeManager implements QuerySegmentWalker
*/
private final Map<String, Map<Integer, FireChief>> chiefs;
private ExecutorService fireChiefExecutor;
private boolean stopping;
@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
@ -99,43 +106,46 @@ public class RealtimeManager implements QuerySegmentWalker
this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs);
}
@VisibleForTesting
Map<Integer, FireChief> getFireChiefs(String dataSource)
{
return chiefs.get(dataSource);
}
@LifecycleStart
public void start() throws IOException
{
serverAnnouncer.announce();
fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d");
for (final FireDepartment fireDepartment : fireDepartments) {
final DataSchema schema = fireDepartment.getDataSchema();
final FireChief chief = new FireChief(fireDepartment, conglomerate);
Map<Integer, FireChief> partitionChiefs = chiefs.get(schema.getDataSource());
if (partitionChiefs == null) {
partitionChiefs = new HashMap<>();
chiefs.put(schema.getDataSource(), partitionChiefs);
}
partitionChiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>())
.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
chief.setName(
String.format(
"chief-%s[%s]",
schema.getDataSource(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum()
)
);
chief.setDaemon(true);
chief.start();
fireChiefExecutor.submit(chief);
}
}
@LifecycleStop
public void stop()
{
for (Map<Integer, FireChief> chiefs : this.chiefs.values()) {
for (FireChief chief : chiefs.values()) {
CloseQuietly.close(chief);
stopping = true;
try {
if (fireChiefExecutor != null) {
fireChiefExecutor.shutdownNow();
Preconditions.checkState(
fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS),
"persistExecutor not terminated"
);
}
}
catch (InterruptedException e) {
throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()");
}
serverAnnouncer.unannounce();
}
@ -211,19 +221,16 @@ public class RealtimeManager implements QuerySegmentWalker
);
}
static class FireChief extends Thread implements Closeable
class FireChief implements Runnable
{
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;
private final QueryRunnerFactoryConglomerate conglomerate;
private volatile Firehose firehose = null;
private volatile FirehoseV2 firehoseV2 = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
private Plumber plumber;
public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
{
this.fireDepartment = fireDepartment;
this.conglomerate = conglomerate;
@ -231,60 +238,39 @@ public class RealtimeManager implements QuerySegmentWalker
this.metrics = fireDepartment.getMetrics();
}
public Firehose initFirehose()
private Firehose initFirehose()
{
synchronized (this) {
if (firehose == null) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
return fireDepartment.connect();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehose().");
}
return firehose;
}
}
public FirehoseV2 initFirehoseV2(Object metaData)
private FirehoseV2 initFirehoseV2(Object metaData)
{
synchronized (this) {
if (firehoseV2 == null) {
try {
log.info("Calling the FireDepartment and getting a FirehoseV2.");
firehoseV2 = fireDepartment.connect(metaData);
log.info("FirehoseV2 acquired!");
return fireDepartment.connect(metaData);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("FirehoseV2 already connected, skipping initFirehoseV2().");
}
return firehoseV2;
}
}
public Plumber initPlumber()
private void initPlumber()
{
synchronized (this) {
if (plumber == null) {
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
log.info("We have our plumber!");
} else {
log.warn("Plumber already trained, skipping initPlumber().");
}
@VisibleForTesting
Plumber getPlumber()
{
return plumber;
}
}
public FireDepartmentMetrics getMetrics()
{
@ -294,60 +280,69 @@ public class RealtimeManager implements QuerySegmentWalker
@Override
public void run()
{
plumber = initPlumber();
initPlumber();
try {
final Closer closer = Closer.create();
try {
Object metadata = plumber.startJob();
Firehose firehose;
FirehoseV2 firehoseV2;
final boolean success;
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
runFirehoseV2(firehoseV2);
closer.register(firehoseV2);
success = runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
runFirehose(firehose);
closer.register(firehose);
success = runFirehose(firehose);
}
if (success) {
// pluber.finishJob() is called only when every processing is successfully finished.
closer.register(() -> plumber.finishJob());
}
catch (RuntimeException e) {
}
catch (InterruptedException e) {
log.warn("Interrupted while running a firehose");
throw closer.rethrow(e);
}
catch (Exception e) {
log.makeAlert(
e,
"[%s] aborted realtime processing[%s]",
e.getClass().getSimpleName(),
fireDepartment.getDataSchema().getDataSource()
).emit();
normalExit = false;
throw Throwables.propagate(e);
throw closer.rethrow(e);
}
catch (Error e) {
log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
throw closer.rethrow(e);
}
finally {
CloseQuietly.close(firehose);
if (normalExit) {
plumber.finishJob();
plumber = null;
firehose = null;
closer.close();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private void runFirehoseV2(FirehoseV2 firehose)
private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception
{
try {
firehose.start();
}
catch (Exception e) {
log.error(e, "Failed to start firehoseV2");
return;
}
log.info("FirehoseV2 started");
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehoseV2(firehose);
boolean haveRow = true;
while (haveRow) {
if (Thread.interrupted() || stopping) {
return false;
}
InputRow inputRow = null;
int numRows = 0;
try {
@ -378,14 +373,19 @@ public class RealtimeManager implements QuerySegmentWalker
metrics.incrementUnparseable();
}
}
return true;
}
private void runFirehose(Firehose firehose)
private boolean runFirehose(Firehose firehose)
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
if (Thread.interrupted() || stopping) {
return false;
}
Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics);
}
return true;
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
@ -395,16 +395,5 @@ public class RealtimeManager implements QuerySegmentWalker
return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
}
@Override
public void close() throws IOException
{
synchronized (this) {
if (firehose != null) {
normalExit = false;
firehose.close();
}
}
}
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -83,13 +84,14 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
@ -97,6 +99,14 @@ import java.util.concurrent.TimeUnit;
public class RealtimeManagerTest
{
private static QueryRunnerFactory factory;
private static QueryRunnerFactoryConglomerate conglomerate;
private static final List<TestInputRowHolder> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()),
makeRow(new ParseException("parse error")),
null,
makeRow(new DateTime().getMillis())
);
private RealtimeManager realtimeManager;
private RealtimeManager realtimeManager2;
@ -105,7 +115,6 @@ public class RealtimeManagerTest
private DataSchema schema2;
private TestPlumber plumber;
private TestPlumber plumber2;
private CountDownLatch chiefStartedLatch;
private RealtimeTuningConfig tuningConfig_0;
private RealtimeTuningConfig tuningConfig_1;
private DataSchema schema3;
@ -114,18 +123,19 @@ public class RealtimeManagerTest
public static void setupStatic()
{
factory = initFactory();
conglomerate = new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return factory;
}
};
}
@Before
public void setUp() throws Exception
{
final List<TestInputRowHolder> rows = Arrays.asList(
makeRow(new DateTime("9000-01-01").getMillis()),
makeRow(new ParseException("parse error")),
null,
makeRow(new DateTime().getMillis())
);
ObjectMapper jsonMapper = new DefaultObjectMapper();
schema = new DataSchema(
@ -289,67 +299,20 @@ public class RealtimeManagerTest
FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0);
FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1);
QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate()
{
@Override
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return factory;
}
};
chiefStartedLatch = new CountDownLatch(2);
RealtimeManager.FireChief fireChief_0 = new RealtimeManager.FireChief(department_0, conglomerate)
{
@Override
public void run()
{
super.initPlumber();
chiefStartedLatch.countDown();
}
};
RealtimeManager.FireChief fireChief_1 = new RealtimeManager.FireChief(department_1, conglomerate)
{
@Override
public void run()
{
super.initPlumber();
chiefStartedLatch.countDown();
}
};
realtimeManager3 = new RealtimeManager(
Arrays.asList(department_0, department_1),
conglomerate,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
ImmutableMap.<String, Map<Integer, RealtimeManager.FireChief>>of(
"testing",
ImmutableMap.of(
0,
fireChief_0,
1,
fireChief_1
)
)
null
);
startFireChiefWithPartitionNum(fireChief_0, 0);
startFireChiefWithPartitionNum(fireChief_1, 1);
}
private void startFireChiefWithPartitionNum(RealtimeManager.FireChief fireChief, int partitionNum)
@After
public void tearDown() throws Exception
{
fireChief.setName(
String.format(
"chief-%s[%s]",
"testing",
partitionNum
)
);
fireChief.start();
realtimeManager.stop();
realtimeManager2.stop();
realtimeManager3.stop();
}
@Test
@ -394,6 +357,84 @@ public class RealtimeManagerTest
Assert.assertEquals(0, plumber2.getPersistCount());
}
@Test(timeout = 5000L)
public void testNormalStop() throws IOException, InterruptedException
{
final TestFirehose firehose = new TestFirehose(rows.iterator());
final TestFirehoseV2 firehoseV2 = new TestFirehoseV2(rows.iterator());
final RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException
{
return firehose;
}
},
(schema, config, metrics) -> plumber,
null
);
RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(
null,
(schema, config, metrics) -> plumber2,
(parser, arg) -> firehoseV2
);
final FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0);
final FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1);
final RealtimeManager realtimeManager = new RealtimeManager(
Arrays.asList(department_0, department_1),
conglomerate,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
null
);
realtimeManager.start();
while (realtimeManager.getMetrics("testing").processed() < 2) {
Thread.sleep(100);
}
realtimeManager.stop();
Assert.assertTrue(firehose.isClosed());
Assert.assertTrue(firehoseV2.isClosed());
Assert.assertTrue(plumber.isFinishedJob());
Assert.assertTrue(plumber2.isFinishedJob());
}
@Test(timeout = 5000L)
public void testStopByInterruption() throws IOException
{
final SleepingFirehose firehose = new SleepingFirehose();
final RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException
{
return firehose;
}
},
(schema, config, metrics) -> plumber,
null
);
final FireDepartment department_0 = new FireDepartment(schema, ioConfig, tuningConfig_0);
final RealtimeManager realtimeManager = new RealtimeManager(
Collections.singletonList(department_0),
conglomerate,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
null
);
realtimeManager.start();
realtimeManager.stop();
Assert.assertTrue(firehose.isClosed());
Assert.assertFalse(plumber.isFinishedJob());
}
@Test(timeout = 10_000L)
public void testQueryWithInterval() throws IOException, InterruptedException
{
@ -419,7 +460,18 @@ public class RealtimeManagerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L)
);
chiefStartedLatch.await();
realtimeManager3.start();
while (realtimeManager3.getFireChiefs("testing").values().stream()
.anyMatch(
fireChief -> {
final Plumber plumber = fireChief.getPlumber();
return plumber == null || !((TestPlumber)plumber).isStartedJob();
}
)
) {
Thread.sleep(10);
}
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
GroupByQuery query = GroupByQuery
@ -477,7 +529,18 @@ public class RealtimeManagerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
chiefStartedLatch.await();
realtimeManager3.start();
while (realtimeManager3.getFireChiefs("testing").values().stream()
.anyMatch(
fireChief -> {
final Plumber plumber = fireChief.getPlumber();
return plumber == null || !((TestPlumber)plumber).isStartedJob();
}
)
) {
Thread.sleep(10);
}
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
GroupByQuery query = GroupByQuery
@ -574,7 +637,18 @@ public class RealtimeManagerTest
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L)
);
chiefStartedLatch.await();
realtimeManager3.start();
while (realtimeManager3.getFireChiefs("testing").values().stream()
.anyMatch(
fireChief -> {
final Plumber plumber = fireChief.getPlumber();
return plumber == null || !((TestPlumber)plumber).isStartedJob();
}
)
) {
Thread.sleep(10);
}
final Interval interval_26_28 = new Interval("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z");
final Interval interval_28_29 = new Interval("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z");
@ -682,20 +756,12 @@ public class RealtimeManagerTest
return GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
}
@After
public void tearDown() throws Exception
{
realtimeManager.stop();
realtimeManager2.stop();
realtimeManager3.stop();
}
private TestInputRowHolder makeRow(final long timestamp)
private static TestInputRowHolder makeRow(final long timestamp)
{
return new TestInputRowHolder(timestamp, null);
}
private TestInputRowHolder makeRow(final RuntimeException e)
private static TestInputRowHolder makeRow(final RuntimeException e)
{
return new TestInputRowHolder(0, e);
}
@ -770,38 +836,10 @@ public class RealtimeManagerTest
}
}
private static class InfiniteTestFirehose implements Firehose
{
private boolean hasMore = true;
@Override
public boolean hasMore()
{
return hasMore;
}
@Override
public InputRow nextRow()
{
return null;
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
@Override
public void close() throws IOException
{
hasMore = false;
}
}
private static class TestFirehose implements Firehose
{
private final Iterator<TestInputRowHolder> rows;
private boolean closed;
private TestFirehose(Iterator<TestInputRowHolder> rows)
{
@ -831,9 +869,15 @@ public class RealtimeManagerTest
return Runnables.getNoopRunnable();
}
public boolean isClosed()
{
return closed;
}
@Override
public void close() throws IOException
{
closed = true;
}
}
@ -842,6 +886,7 @@ public class RealtimeManagerTest
private final Iterator<TestInputRowHolder> rows;
private InputRow currRow;
private boolean stop;
private boolean closed;
private TestFirehoseV2(Iterator<TestInputRowHolder> rows)
{
@ -860,6 +905,12 @@ public class RealtimeManagerTest
@Override
public void close() throws IOException
{
closed = true;
}
public boolean isClosed()
{
return closed;
}
@Override
@ -905,6 +956,47 @@ public class RealtimeManagerTest
}
}
private static class SleepingFirehose implements Firehose
{
private boolean closed;
@Override
public boolean hasMore()
{
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
return true;
}
@Nullable
@Override
public InputRow nextRow()
{
return null;
}
@Override
public Runnable commit()
{
return null;
}
public boolean isClosed()
{
return closed;
}
@Override
public void close() throws IOException
{
closed = true;
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;