Merge pull request #1366 from metamx/futurizeSQLMetadataSegmentManager

Properly shutdown SQLMetadataSegmentManager and SQLMetadataRuleManager
This commit is contained in:
Xavier Léauté 2015-05-15 17:33:25 -07:00
commit 21ba859f33
2 changed files with 67 additions and 47 deletions

View File

@ -25,9 +25,11 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -41,7 +43,6 @@ import io.druid.guice.annotations.Json;
import io.druid.server.coordinator.rules.ForeverLoadRule; import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.Rule; import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -58,7 +59,7 @@ import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -134,12 +135,13 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules; private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
private final AuditManager auditManager; private final AuditManager auditManager;
private volatile ScheduledExecutorService exec;
private final Object lock = new Object(); private final Object lock = new Object();
private volatile boolean started = false; private volatile boolean started = false;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;
@Inject @Inject
public SQLMetadataRuleManager( public SQLMetadataRuleManager(
@Json ObjectMapper jsonMapper, @Json ObjectMapper jsonMapper,
@ -168,21 +170,26 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
return; return;
} }
this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"); exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));
createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper); createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper);
ScheduledExecutors.scheduleWithFixedDelay( future = exec.scheduleWithFixedDelay(
exec,
new Duration(0),
config.get().getPollDuration().toStandardDuration(),
new Runnable() new Runnable()
{ {
@Override @Override
public void run() public void run()
{ {
poll(); try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
}
} }
} },
0,
config.get().getPollDuration().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
); );
started = true; started = true;
@ -199,6 +206,8 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
rules.set(ImmutableMap.<String, List<Rule>>of()); rules.set(ImmutableMap.<String, List<Rule>>of());
future.cancel(false);
future = null;
started = false; started = false;
exec.shutdownNow(); exec.shutdownNow();
exec = null; exec = null;
@ -235,7 +244,9 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
return Pair.of( return Pair.of(
r.getString("dataSource"), r.getString("dataSource"),
jsonMapper.<List<Rule>>readValue( jsonMapper.<List<Rule>>readValue(
r.getBytes("payload"), new TypeReference<List<Rule>>(){} r.getBytes("payload"), new TypeReference<List<Rule>>()
{
}
) )
); );
} }
@ -245,29 +256,29 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
} }
} }
) )
.fold( .fold(
Maps.<String, List<Rule>>newHashMap(), Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>() new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{ {
@Override @Override
public Map<String, List<Rule>> fold( public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal, Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap, Pair<String, List<Rule>> stringObjectMap,
FoldController foldController, FoldController foldController,
StatementContext statementContext StatementContext statementContext
) throws SQLException ) throws SQLException
{ {
try { try {
String dataSource = stringObjectMap.lhs; String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs); retVal.put(dataSource, stringObjectMap.rhs);
return retVal; return retVal;
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
} }
} }
); );
} }
} }
) )

View File

@ -22,9 +22,11 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.MapUtils; import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
@ -56,7 +58,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -74,7 +76,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources; private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi; private final IDBI dbi;
private volatile ScheduledExecutorService exec; private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;
private volatile boolean started = false; private volatile boolean started = false;
@ -103,23 +106,27 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
return; return;
} }
this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"); exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));
final Duration delay = config.get().getPollDuration().toStandardDuration(); final Duration delay = config.get().getPollDuration().toStandardDuration();
ScheduledExecutors.scheduleWithFixedDelay( future = exec.scheduleWithFixedDelay(
exec,
new Duration(0),
delay,
new Runnable() new Runnable()
{ {
@Override @Override
public void run() public void run()
{ {
poll(); try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in segment manager polling thread");
}
} }
} },
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
); );
started = true; started = true;
} }
} }
@ -134,6 +141,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
started = false; started = false;
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>()); dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
future.cancel(false);
future = null;
exec.shutdownNow(); exec.shutdownNow();
exec = null; exec = null;
} }