mirror of
https://github.com/apache/druid.git
synced 2025-02-24 03:26:04 +00:00
remove database specific code from ConfigManager
This commit is contained in:
parent
aec6457a4d
commit
e84128ebbe
@ -65,7 +65,6 @@ public class ConfigManager
|
||||
private final String selectStatement;
|
||||
private final String configTable;
|
||||
|
||||
private volatile String insertStatement;
|
||||
private volatile ConfigManager.PollingCallable poller;
|
||||
|
||||
@Inject
|
||||
@ -90,17 +89,6 @@ public class ConfigManager
|
||||
return;
|
||||
}
|
||||
|
||||
insertStatement = String.format(
|
||||
DbConnector.isPostgreSQL(dbi) ?
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET payload=:payload WHERE name=:name RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (name, payload) SELECT :name, :payload WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;" :
|
||||
"INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload",
|
||||
configTable
|
||||
);
|
||||
|
||||
poller = new PollingCallable();
|
||||
ScheduledExecutors.scheduleWithFixedDelay(
|
||||
exec, new Duration(0), config.get().getPollDuration().toStandardDuration(), poller
|
||||
@ -118,7 +106,6 @@ public class ConfigManager
|
||||
return;
|
||||
}
|
||||
|
||||
insertStatement = null;
|
||||
poller.stop();
|
||||
poller = null;
|
||||
|
||||
@ -241,20 +228,7 @@ public class ConfigManager
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(insertStatement)
|
||||
.bind("name", key)
|
||||
.bind("payload", newBytes)
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
DbConnector.insertOrUpdate(dbi, configTable, "name", "payload", key, newBytes);
|
||||
|
||||
final ConfigHolder configHolder = watchedConfigs.get(key);
|
||||
if (configHolder != null) {
|
||||
|
@ -205,6 +205,42 @@ public class DbConnector
|
||||
}
|
||||
}
|
||||
|
||||
public static Void insertOrUpdate(
|
||||
final IDBI dbi,
|
||||
final String tableName,
|
||||
final String keyColumn,
|
||||
final String valueColumn,
|
||||
final String key,
|
||||
final byte[] value
|
||||
) throws SQLException
|
||||
{
|
||||
final String insertOrUpdateStatement = String.format(
|
||||
isPostgreSQL(dbi) ?
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;" :
|
||||
"INSERT INTO %s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
|
||||
tableName, keyColumn, valueColumn
|
||||
);
|
||||
|
||||
return dbi.withHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(insertOrUpdateStatement)
|
||||
.bind("key", key)
|
||||
.bind("value", value)
|
||||
.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static Boolean isPostgreSQL(final IDBI dbi)
|
||||
{
|
||||
return dbi.withHandle(
|
||||
@ -219,7 +255,7 @@ public class DbConnector
|
||||
);
|
||||
}
|
||||
|
||||
public static Boolean isPostgreSQL(final Handle handle) throws SQLException
|
||||
protected static Boolean isPostgreSQL(final Handle handle) throws SQLException
|
||||
{
|
||||
return handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL");
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user