diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java index ffe38fc8966..07e7f62aa68 100644 --- a/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java +++ b/extensions-core/postgresql-metadata-storage/src/main/java/io/druid/metadata/storage/postgresql/PostgreSQLConnector.java @@ -31,6 +31,7 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.StringMapper; +import java.sql.DatabaseMetaData; import java.sql.SQLException; public class PostgreSQLConnector extends SQLMetadataConnector @@ -41,6 +42,8 @@ public class PostgreSQLConnector extends SQLMetadataConnector private final DBI dbi; + private volatile Boolean canUpsert; + @Inject public PostgreSQLConnector(Supplier config, Supplier dbTables) { @@ -68,6 +71,18 @@ public class PostgreSQLConnector extends SQLMetadataConnector return SERIAL_TYPE; } + protected boolean canUpsert(Handle handle) throws SQLException + { + if (canUpsert == null) { + DatabaseMetaData metaData = handle.getConnection().getMetaData(); + canUpsert = metaData.getDatabaseMajorVersion() > 9 || ( + metaData.getDatabaseMajorVersion() == 9 && + metaData.getDatabaseMinorVersion() >= 5 + ); + } + return canUpsert; + } + @Override public boolean tableExists(final Handle handle, final String tableName) { @@ -95,21 +110,35 @@ public class PostgreSQLConnector extends SQLMetadataConnector @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - String.format( - "BEGIN;\n" + - "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + - "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + - " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + - "COMMIT;", - tableName, - keyColumn, - valueColumn - ) - ) - .bind("key", key) - .bind("value", value) - .execute(); + if (canUpsert(handle)) { + handle.createStatement( + String.format( + "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON CONFLICT (%2$s) DO UPDATE SET %3$s = EXCLUDED.%3$s", + tableName, + keyColumn, + valueColumn + ) + ) + .bind("key", key) + .bind("value", value) + .execute(); + } else { + handle.createStatement( + String.format( + "BEGIN;\n" + + "LOCK TABLE %1$s IN SHARE ROW EXCLUSIVE MODE;\n" + + "WITH upsert AS (UPDATE %1$s SET %3$s=:value WHERE %2$s=:key RETURNING *)\n" + + " INSERT INTO %1$s (%2$s, %3$s) SELECT :key, :value WHERE NOT EXISTS (SELECT * FROM upsert)\n;" + + "COMMIT;", + tableName, + keyColumn, + valueColumn + ) + ) + .bind("key", key) + .bind("value", value) + .execute(); + } return null; } }