diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 15ac0971269..fd30f61b29b 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -22,6 +22,7 @@ package io.druid.segment.realtime; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -29,11 +30,13 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseV2; import io.druid.data.input.InputRow; -import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.query.FinalizeResultsQueryRunner; @@ -54,11 +57,12 @@ import io.druid.segment.realtime.plumber.Plumbers; import io.druid.server.coordination.DataSegmentServerAnnouncer; import org.joda.time.Interval; -import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** */ @@ -75,6 +79,9 @@ public class RealtimeManager implements QuerySegmentWalker */ private final Map> chiefs; + private ExecutorService fireChiefExecutor; + private boolean stopping; + @Inject public RealtimeManager( List fireDepartments, @@ -99,43 +106,46 @@ public class RealtimeManager implements QuerySegmentWalker this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs); } + @VisibleForTesting + Map getFireChiefs(String dataSource) + { + return chiefs.get(dataSource); + } + @LifecycleStart public void start() throws IOException { serverAnnouncer.announce(); + fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d"); + for (final FireDepartment fireDepartment : fireDepartments) { final DataSchema schema = fireDepartment.getDataSchema(); final FireChief chief = new FireChief(fireDepartment, conglomerate); - Map partitionChiefs = chiefs.get(schema.getDataSource()); - if (partitionChiefs == null) { - partitionChiefs = new HashMap<>(); - chiefs.put(schema.getDataSource(), partitionChiefs); - } - partitionChiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); + chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>()) + .put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); - chief.setName( - String.format( - "chief-%s[%s]", - schema.getDataSource(), - fireDepartment.getTuningConfig().getShardSpec().getPartitionNum() - ) - ); - chief.setDaemon(true); - chief.start(); + fireChiefExecutor.submit(chief); } } @LifecycleStop public void stop() { - for (Map chiefs : this.chiefs.values()) { - for (FireChief chief : chiefs.values()) { - CloseQuietly.close(chief); + stopping = true; + try { + if (fireChiefExecutor != null) { + fireChiefExecutor.shutdownNow(); + Preconditions.checkState( + fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS), + "persistExecutor not terminated" + ); } } - + catch (InterruptedException e) { + throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()"); + } serverAnnouncer.unannounce(); } @@ -211,19 +221,16 @@ public class RealtimeManager implements QuerySegmentWalker ); } - static class FireChief extends Thread implements Closeable + class FireChief implements Runnable { private final FireDepartment fireDepartment; private final FireDepartmentMetrics metrics; private final RealtimeTuningConfig config; private final QueryRunnerFactoryConglomerate conglomerate; - private volatile Firehose firehose = null; - private volatile FirehoseV2 firehoseV2 = null; - private volatile Plumber plumber = null; - private volatile boolean normalExit = true; + private Plumber plumber; - public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate) + FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate) { this.fireDepartment = fireDepartment; this.conglomerate = conglomerate; @@ -231,59 +238,38 @@ public class RealtimeManager implements QuerySegmentWalker this.metrics = fireDepartment.getMetrics(); } - public Firehose initFirehose() + private Firehose initFirehose() { - synchronized (this) { - if (firehose == null) { - try { - log.info("Calling the FireDepartment and getting a Firehose."); - firehose = fireDepartment.connect(); - log.info("Firehose acquired!"); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } else { - log.warn("Firehose already connected, skipping initFirehose()."); - } - - return firehose; + try { + log.info("Calling the FireDepartment and getting a Firehose."); + return fireDepartment.connect(); + } + catch (IOException e) { + throw Throwables.propagate(e); } } - public FirehoseV2 initFirehoseV2(Object metaData) + private FirehoseV2 initFirehoseV2(Object metaData) { - synchronized (this) { - if (firehoseV2 == null) { - try { - log.info("Calling the FireDepartment and getting a FirehoseV2."); - firehoseV2 = fireDepartment.connect(metaData); - log.info("FirehoseV2 acquired!"); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } else { - log.warn("FirehoseV2 already connected, skipping initFirehoseV2()."); - } - - return firehoseV2; + try { + log.info("Calling the FireDepartment and getting a FirehoseV2."); + return fireDepartment.connect(metaData); + } + catch (IOException e) { + throw Throwables.propagate(e); } } - public Plumber initPlumber() + private void initPlumber() { - synchronized (this) { - if (plumber == null) { - log.info("Someone get us a plumber!"); - plumber = fireDepartment.findPlumber(); - log.info("We have our plumber!"); - } else { - log.warn("Plumber already trained, skipping initPlumber()."); - } + log.info("Someone get us a plumber!"); + plumber = fireDepartment.findPlumber(); + } - return plumber; - } + @VisibleForTesting + Plumber getPlumber() + { + return plumber; } public FireDepartmentMetrics getMetrics() @@ -294,60 +280,69 @@ public class RealtimeManager implements QuerySegmentWalker @Override public void run() { - plumber = initPlumber(); + initPlumber(); try { - Object metadata = plumber.startJob(); + final Closer closer = Closer.create(); - if (fireDepartment.checkFirehoseV2()) { - firehoseV2 = initFirehoseV2(metadata); - runFirehoseV2(firehoseV2); - } else { - firehose = initFirehose(); - runFirehose(firehose); + try { + Object metadata = plumber.startJob(); + + Firehose firehose; + FirehoseV2 firehoseV2; + final boolean success; + if (fireDepartment.checkFirehoseV2()) { + firehoseV2 = initFirehoseV2(metadata); + closer.register(firehoseV2); + success = runFirehoseV2(firehoseV2); + } else { + firehose = initFirehose(); + closer.register(firehose); + success = runFirehose(firehose); + } + if (success) { + // pluber.finishJob() is called only when every processing is successfully finished. + closer.register(() -> plumber.finishJob()); + } + } + catch (InterruptedException e) { + log.warn("Interrupted while running a firehose"); + throw closer.rethrow(e); + } + catch (Exception e) { + log.makeAlert( + e, + "[%s] aborted realtime processing[%s]", + e.getClass().getSimpleName(), + fireDepartment.getDataSchema().getDataSource() + ).emit(); + throw closer.rethrow(e); + } + catch (Error e) { + log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource()) + .emit(); + throw closer.rethrow(e); + } + finally { + closer.close(); } - } - catch (RuntimeException e) { - log.makeAlert( - e, - "[%s] aborted realtime processing[%s]", - e.getClass().getSimpleName(), - fireDepartment.getDataSchema().getDataSource() - ).emit(); - normalExit = false; + catch (IOException e) { throw Throwables.propagate(e); } - catch (Error e) { - log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource()) - .emit(); - normalExit = false; - throw e; - } - finally { - CloseQuietly.close(firehose); - if (normalExit) { - plumber.finishJob(); - plumber = null; - firehose = null; - } - } } - private void runFirehoseV2(FirehoseV2 firehose) + private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception { - try { - firehose.start(); - } - catch (Exception e) { - log.error(e, "Failed to start firehoseV2"); - return; - } + firehose.start(); log.info("FirehoseV2 started"); final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); boolean haveRow = true; while (haveRow) { + if (Thread.interrupted() || stopping) { + return false; + } InputRow inputRow = null; int numRows = 0; try { @@ -378,14 +373,19 @@ public class RealtimeManager implements QuerySegmentWalker metrics.incrementUnparseable(); } } + return true; } - private void runFirehose(Firehose firehose) + private boolean runFirehose(Firehose firehose) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); while (firehose.hasMore()) { + if (Thread.interrupted() || stopping) { + return false; + } Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics); } + return true; } public QueryRunner getQueryRunner(Query query) @@ -395,16 +395,5 @@ public class RealtimeManager implements QuerySegmentWalker return new FinalizeResultsQueryRunner(plumber.getQueryRunner(query), toolChest); } - - @Override - public void close() throws IOException - { - synchronized (this) { - if (firehose != null) { - normalExit = false; - firehose.close(); - } - } - } } } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index d9e5823bc7b..85e60f3ba61 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -83,13 +84,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -97,6 +99,14 @@ import java.util.concurrent.TimeUnit; public class RealtimeManagerTest { private static QueryRunnerFactory factory; + private static QueryRunnerFactoryConglomerate conglomerate; + + private static final List rows = Arrays.asList( + makeRow(new DateTime("9000-01-01").getMillis()), + makeRow(new ParseException("parse error")), + null, + makeRow(new DateTime().getMillis()) + ); private RealtimeManager realtimeManager; private RealtimeManager realtimeManager2; @@ -105,7 +115,6 @@ public class RealtimeManagerTest private DataSchema schema2; private TestPlumber plumber; private TestPlumber plumber2; - private CountDownLatch chiefStartedLatch; private RealtimeTuningConfig tuningConfig_0; private RealtimeTuningConfig tuningConfig_1; private DataSchema schema3; @@ -114,18 +123,19 @@ public class RealtimeManagerTest public static void setupStatic() { factory = initFactory(); + conglomerate = new QueryRunnerFactoryConglomerate() + { + @Override + public > QueryRunnerFactory findFactory(QueryType query) + { + return factory; + } + }; } @Before public void setUp() throws Exception { - final List rows = Arrays.asList( - makeRow(new DateTime("9000-01-01").getMillis()), - makeRow(new ParseException("parse error")), - null, - makeRow(new DateTime().getMillis()) - ); - ObjectMapper jsonMapper = new DefaultObjectMapper(); schema = new DataSchema( @@ -289,67 +299,20 @@ public class RealtimeManagerTest FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); - QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate() - { - @Override - public > QueryRunnerFactory findFactory(QueryType query) - { - return factory; - } - }; - - chiefStartedLatch = new CountDownLatch(2); - - RealtimeManager.FireChief fireChief_0 = new RealtimeManager.FireChief(department_0, conglomerate) - { - @Override - public void run() - { - super.initPlumber(); - chiefStartedLatch.countDown(); - } - }; - - RealtimeManager.FireChief fireChief_1 = new RealtimeManager.FireChief(department_1, conglomerate) - { - @Override - public void run() - { - super.initPlumber(); - chiefStartedLatch.countDown(); - } - }; - - realtimeManager3 = new RealtimeManager( Arrays.asList(department_0, department_1), conglomerate, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), - ImmutableMap.>of( - "testing", - ImmutableMap.of( - 0, - fireChief_0, - 1, - fireChief_1 - ) - ) + null ); - - startFireChiefWithPartitionNum(fireChief_0, 0); - startFireChiefWithPartitionNum(fireChief_1, 1); } - private void startFireChiefWithPartitionNum(RealtimeManager.FireChief fireChief, int partitionNum) + @After + public void tearDown() throws Exception { - fireChief.setName( - String.format( - "chief-%s[%s]", - "testing", - partitionNum - ) - ); - fireChief.start(); + realtimeManager.stop(); + realtimeManager2.stop(); + realtimeManager3.stop(); } @Test @@ -394,6 +357,84 @@ public class RealtimeManagerTest Assert.assertEquals(0, plumber2.getPersistCount()); } + @Test(timeout = 5000L) + public void testNormalStop() throws IOException, InterruptedException + { + final TestFirehose firehose = new TestFirehose(rows.iterator()); + final TestFirehoseV2 firehoseV2 = new TestFirehoseV2(rows.iterator()); + final RealtimeIOConfig ioConfig = new RealtimeIOConfig( + new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException + { + return firehose; + } + }, + (schema, config, metrics) -> plumber, + null + ); + RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( + null, + (schema, config, metrics) -> plumber2, + (parser, arg) -> firehoseV2 + ); + + final FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); + final FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); + + final RealtimeManager realtimeManager = new RealtimeManager( + Arrays.asList(department_0, department_1), + conglomerate, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + null + ); + + realtimeManager.start(); + while (realtimeManager.getMetrics("testing").processed() < 2) { + Thread.sleep(100); + } + realtimeManager.stop(); + + Assert.assertTrue(firehose.isClosed()); + Assert.assertTrue(firehoseV2.isClosed()); + Assert.assertTrue(plumber.isFinishedJob()); + Assert.assertTrue(plumber2.isFinishedJob()); + } + + @Test(timeout = 5000L) + public void testStopByInterruption() throws IOException + { + final SleepingFirehose firehose = new SleepingFirehose(); + final RealtimeIOConfig ioConfig = new RealtimeIOConfig( + new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException + { + return firehose; + } + }, + (schema, config, metrics) -> plumber, + null + ); + + final FireDepartment department_0 = new FireDepartment(schema, ioConfig, tuningConfig_0); + + final RealtimeManager realtimeManager = new RealtimeManager( + Collections.singletonList(department_0), + conglomerate, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), + null + ); + + realtimeManager.start(); + realtimeManager.stop(); + + Assert.assertTrue(firehose.isClosed()); + Assert.assertFalse(plumber.isFinishedJob()); + } + @Test(timeout = 10_000L) public void testQueryWithInterval() throws IOException, InterruptedException { @@ -419,7 +460,18 @@ public class RealtimeManagerTest GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L) ); - chiefStartedLatch.await(); + realtimeManager3.start(); + + while (realtimeManager3.getFireChiefs("testing").values().stream() + .anyMatch( + fireChief -> { + final Plumber plumber = fireChief.getPlumber(); + return plumber == null || !((TestPlumber)plumber).isStartedJob(); + } + ) + ) { + Thread.sleep(10); + } for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { GroupByQuery query = GroupByQuery @@ -477,7 +529,18 @@ public class RealtimeManagerTest GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) ); - chiefStartedLatch.await(); + realtimeManager3.start(); + + while (realtimeManager3.getFireChiefs("testing").values().stream() + .anyMatch( + fireChief -> { + final Plumber plumber = fireChief.getPlumber(); + return plumber == null || !((TestPlumber)plumber).isStartedJob(); + } + ) + ) { + Thread.sleep(10); + } for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { GroupByQuery query = GroupByQuery @@ -574,7 +637,18 @@ public class RealtimeManagerTest GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L) ); - chiefStartedLatch.await(); + realtimeManager3.start(); + + while (realtimeManager3.getFireChiefs("testing").values().stream() + .anyMatch( + fireChief -> { + final Plumber plumber = fireChief.getPlumber(); + return plumber == null || !((TestPlumber)plumber).isStartedJob(); + } + ) + ) { + Thread.sleep(10); + } final Interval interval_26_28 = new Interval("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z"); final Interval interval_28_29 = new Interval("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z"); @@ -682,20 +756,12 @@ public class RealtimeManagerTest return GroupByQueryRunnerTest.makeQueryRunnerFactory(config); } - @After - public void tearDown() throws Exception - { - realtimeManager.stop(); - realtimeManager2.stop(); - realtimeManager3.stop(); - } - - private TestInputRowHolder makeRow(final long timestamp) + private static TestInputRowHolder makeRow(final long timestamp) { return new TestInputRowHolder(timestamp, null); } - private TestInputRowHolder makeRow(final RuntimeException e) + private static TestInputRowHolder makeRow(final RuntimeException e) { return new TestInputRowHolder(0, e); } @@ -770,38 +836,10 @@ public class RealtimeManagerTest } } - private static class InfiniteTestFirehose implements Firehose - { - private boolean hasMore = true; - - @Override - public boolean hasMore() - { - return hasMore; - } - - @Override - public InputRow nextRow() - { - return null; - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - @Override - public void close() throws IOException - { - hasMore = false; - } - } - private static class TestFirehose implements Firehose { private final Iterator rows; + private boolean closed; private TestFirehose(Iterator rows) { @@ -831,9 +869,15 @@ public class RealtimeManagerTest return Runnables.getNoopRunnable(); } + public boolean isClosed() + { + return closed; + } + @Override public void close() throws IOException { + closed = true; } } @@ -842,6 +886,7 @@ public class RealtimeManagerTest private final Iterator rows; private InputRow currRow; private boolean stop; + private boolean closed; private TestFirehoseV2(Iterator rows) { @@ -860,6 +905,12 @@ public class RealtimeManagerTest @Override public void close() throws IOException { + closed = true; + } + + public boolean isClosed() + { + return closed; } @Override @@ -905,6 +956,47 @@ public class RealtimeManagerTest } } + private static class SleepingFirehose implements Firehose + { + private boolean closed; + + @Override + public boolean hasMore() + { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + return true; + } + + @Nullable + @Override + public InputRow nextRow() + { + return null; + } + + @Override + public Runnable commit() + { + return null; + } + + public boolean isClosed() + { + return closed; + } + + @Override + public void close() throws IOException + { + closed = true; + } + } + private static class TestPlumber implements Plumber { private final Sink sink;