mirror of https://github.com/apache/druid.git
fix config manager
This commit is contained in:
parent
183a133ee4
commit
aec6457a4d
|
@ -63,8 +63,9 @@ public class ConfigManager
|
|||
private final ScheduledExecutorService exec;
|
||||
private final ConcurrentMap<String, ConfigHolder> watchedConfigs;
|
||||
private final String selectStatement;
|
||||
private final String insertStatement;
|
||||
private final String configTable;
|
||||
|
||||
private volatile String insertStatement;
|
||||
private volatile ConfigManager.PollingCallable poller;
|
||||
|
||||
@Inject
|
||||
|
@ -76,19 +77,9 @@ public class ConfigManager
|
|||
this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
|
||||
this.watchedConfigs = Maps.newConcurrentMap();
|
||||
|
||||
final String configTable = dbTables.get().getConfigTable();
|
||||
this.configTable = dbTables.get().getConfigTable();
|
||||
|
||||
this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", configTable);
|
||||
this.insertStatement = String.format(
|
||||
DbConnector.isPostgreSQL(dbi) ?
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;" :
|
||||
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
|
||||
configTable
|
||||
);
|
||||
}
|
||||
|
||||
@LifecycleStart
|
||||
|
@ -99,6 +90,17 @@ public class ConfigManager
|
|||
return;
|
||||
}
|
||||
|
||||
insertStatement = String.format(
|
||||
DbConnector.isPostgreSQL(dbi) ?
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;" :
|
||||
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
|
||||
configTable
|
||||
);
|
||||
|
||||
poller = new PollingCallable();
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec, new Duration(0), config.get().getPollDuration().toStandardDuration(), poller
|
||||
|
@ -116,6 +118,7 @@ public class ConfigManager
|
|||
return;
|
||||
}
|
||||
|
||||
insertStatement = null;
|
||||
poller.stop();
|
||||
poller = null;
|
||||
|
||||
|
@ -196,18 +199,18 @@ public class ConfigManager
|
|||
public byte[] withHandle(Handle handle) throws Exception
|
||||
{
|
||||
List<byte[]> matched = handle.createQuery(selectStatement)
|
||||
.bind("name", key)
|
||||
.map(
|
||||
new ResultSetMapper<byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
return r.getBytes("payload");
|
||||
}
|
||||
}
|
||||
).list();
|
||||
.bind("name", key)
|
||||
.map(
|
||||
new ResultSetMapper<byte[]>()
|
||||
{
|
||||
@Override
|
||||
public byte[] map(int index, ResultSet r, StatementContext ctx)
|
||||
throws SQLException
|
||||
{
|
||||
return r.getBytes("payload");
|
||||
}
|
||||
}
|
||||
).list();
|
||||
|
||||
if (matched.isEmpty()) {
|
||||
return null;
|
||||
|
@ -225,7 +228,7 @@ public class ConfigManager
|
|||
|
||||
public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj)
|
||||
{
|
||||
if (obj == null) {
|
||||
if (obj == null || !started) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue