mirror of https://github.com/apache/druid.git
support PostgreSQL >= 9.5 upsert capability
This commit is contained in:
parent
18b9ea62cf
commit
0f8a037bcd
|
@ -31,6 +31,7 @@ import org.skife.jdbi.v2.Handle;
|
|||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
import org.skife.jdbi.v2.util.StringMapper;
|
||||
|
||||
import java.sql.DatabaseMetaData;
|
||||
import java.sql.SQLException;
|
||||
|
||||
public class PostgreSQLConnector extends SQLMetadataConnector
|
||||
|
@ -41,6 +42,8 @@ public class PostgreSQLConnector extends SQLMetadataConnector
|
|||
|
||||
private final DBI dbi;
|
||||
|
||||
private volatile Boolean canUpsert;
|
||||
|
||||
@Inject
|
||||
public PostgreSQLConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
|
||||
{
|
||||
|
@ -68,6 +71,18 @@ public class PostgreSQLConnector extends SQLMetadataConnector
|
|||
return SERIAL_TYPE;
|
||||
}
|
||||
|
||||
protected boolean canUpsert(Handle handle) throws SQLException
|
||||
{
|
||||
if (canUpsert == null) {
|
||||
DatabaseMetaData metaData = handle.getConnection().getMetaData();
|
||||
canUpsert = metaData.getDatabaseMajorVersion() > 9 || (
|
||||
metaData.getDatabaseMajorVersion() == 9 &&
|
||||
metaData.getDatabaseMinorVersion() >= 5
|
||||
);
|
||||
}
|
||||
return canUpsert;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExists(final Handle handle, final String tableName)
|
||||
{
|
||||
|
@ -95,21 +110,35 @@ public class PostgreSQLConnector extends SQLMetadataConnector
|
|||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;",
|
||||
tableName,
|
||||
keyColumn,
|
||||
valueColumn
|
||||
)
|
||||
)
|
||||
.bind("key", key)
|
||||
.bind("value", value)
|
||||
.execute();
|
||||
if (canUpsert(handle)) {
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON CONFLICT (%2$s) DO UPDATE SET %3$s = EXCLUDED.%3$s",
|
||||
tableName,
|
||||
keyColumn,
|
||||
valueColumn
|
||||
)
|
||||
)
|
||||
.bind("key", key)
|
||||
.bind("value", value)
|
||||
.execute();
|
||||
} else {
|
||||
handle.createStatement(
|
||||
String.format(
|
||||
"BEGIN;\n" +
|
||||
"LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" +
|
||||
"WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" +
|
||||
" INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" +
|
||||
"COMMIT;",
|
||||
tableName,
|
||||
keyColumn,
|
||||
valueColumn
|
||||
)
|
||||
)
|
||||
.bind("key", key)
|
||||
.bind("value", value)
|
||||
.execute();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue