From 051c3ccedef0917380aa65eb52d29cd6d07c082a Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Thu, 14 May 2015 16:46:20 -0700 Subject: [PATCH] Add futures to SQLMetadataSegmentManager and SQLMetadataRuleManager --- .../metadata/SQLMetadataRuleManager.java | 83 +++++++++++-------- .../metadata/SQLMetadataSegmentManager.java | 31 ++++--- 2 files changed, 67 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java index ee1a693d9e0..9a4f82b9e3a 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java @@ -25,9 +25,11 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -41,7 +43,6 @@ import io.druid.guice.annotations.Json; import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.Rule; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -58,7 +59,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -134,12 +135,13 @@ public class SQLMetadataRuleManager implements MetadataRuleManager private final AtomicReference>> rules; private final AuditManager auditManager; - private volatile ScheduledExecutorService exec; - private final Object lock = new Object(); private volatile boolean started = false; + private volatile ListeningScheduledExecutorService exec = null; + private volatile ListenableFuture future = null; + @Inject public SQLMetadataRuleManager( @Json ObjectMapper jsonMapper, @@ -168,21 +170,26 @@ public class SQLMetadataRuleManager implements MetadataRuleManager return; } - this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); + exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d")); createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper); - ScheduledExecutors.scheduleWithFixedDelay( - exec, - new Duration(0), - config.get().getPollDuration().toStandardDuration(), + future = exec.scheduleWithFixedDelay( new Runnable() { @Override public void run() { - poll(); + try { + poll(); + } + catch (Exception e) { + log.error(e, "uncaught exception in rule manager polling thread"); + } } - } + }, + 0, + config.get().getPollDuration().toStandardDuration().getMillis(), + TimeUnit.MILLISECONDS ); started = true; @@ -199,6 +206,8 @@ public class SQLMetadataRuleManager implements MetadataRuleManager rules.set(ImmutableMap.>of()); + future.cancel(false); + future = null; started = false; exec.shutdownNow(); exec = null; @@ -235,7 +244,9 @@ public class SQLMetadataRuleManager implements MetadataRuleManager return Pair.of( r.getString("dataSource"), jsonMapper.>readValue( - r.getBytes("payload"), new TypeReference>(){} + r.getBytes("payload"), new TypeReference>() + { + } ) ); } @@ -245,29 +256,29 @@ public class SQLMetadataRuleManager implements MetadataRuleManager } } ) - .fold( - Maps.>newHashMap(), - new Folder3>, Pair>>() - { - @Override - public Map> fold( - Map> retVal, - Pair> stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - String dataSource = stringObjectMap.lhs; - retVal.put(dataSource, stringObjectMap.rhs); - return retVal; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); + .fold( + Maps.>newHashMap(), + new Folder3>, Pair>>() + { + @Override + public Map> fold( + Map> retVal, + Pair> stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + String dataSource = stringObjectMap.lhs; + retVal.put(dataSource, stringObjectMap.rhs); + return retVal; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); } } ) diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index fc7a8a8768a..f74a6060fe9 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -22,9 +22,11 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.MapUtils; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -56,7 +58,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -74,7 +76,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final AtomicReference> dataSources; private final IDBI dbi; - private volatile ScheduledExecutorService exec; + private volatile ListeningScheduledExecutorService exec = null; + private volatile ListenableFuture future = null; private volatile boolean started = false; @@ -103,23 +106,27 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager return; } - this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"); + exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d")); final Duration delay = config.get().getPollDuration().toStandardDuration(); - ScheduledExecutors.scheduleWithFixedDelay( - exec, - new Duration(0), - delay, + future = exec.scheduleWithFixedDelay( new Runnable() { @Override public void run() { - poll(); + try { + poll(); + } + catch (Exception e) { + log.error(e, "uncaught exception in segment manager polling thread"); + } } - } + }, + 0, + delay.getMillis(), + TimeUnit.MILLISECONDS ); - started = true; } } @@ -134,6 +141,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager started = false; dataSources.set(new ConcurrentHashMap()); + future.cancel(false); + future = null; exec.shutdownNow(); exec = null; }