From 4b2d87336a56eafe6c28fc93f1040e737013671e Mon Sep 17 00:00:00 2001 From: Pranav Date: Thu, 29 Jun 2023 15:32:43 -0700 Subject: [PATCH] Add additional index on task table (#14470) --- .../druid/metadata/SQLMetadataConnector.java | 104 +++++++++++++++++- .../metadata/SQLMetadataConnectorTest.java | 57 ++++++++++ 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 108128e09c2..5c8c9d39c26 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -19,10 +19,12 @@ package org.apache.druid.metadata; +import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.apache.druid.java.util.common.ISE; @@ -51,8 +53,10 @@ import java.sql.SQLRecoverableException; import java.sql.SQLTransientException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; public abstract class SQLMetadataConnector implements MetadataStorageConnector { @@ -377,10 +381,22 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector + " PRIMARY KEY (id)\n" + ")", tableName, getPayloadType(), getCollation() - ), - StringUtils.format("CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date)", tableName) + ) ) ); + final Set createdIndexSet = getIndexOnTable(tableName); + createIndex( + tableName, + StringUtils.format("idx_%1$s_active_created_date", tableName), + ImmutableList.of("active", "created_date"), + createdIndexSet + ); + createIndex( + tableName, + StringUtils.format("idx_%1$s_datasource_active", tableName), + ImmutableList.of("datasource", "active"), + createdIndexSet + ); } private void alterEntryTable(final String tableName) @@ -830,4 +846,88 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector log.warn(e, "Exception while deleting records from table"); } } + + /** + * Get the Set of the index on given table + * + * @param tableName name of the table to fetch the index map + * @return Set of the uppercase index names, returns empty set if table does not exist + */ + public Set getIndexOnTable(String tableName) + { + Set res = new HashSet<>(); + try { + retryWithHandle(new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); + // Fetch the index for given table + ResultSet resultSet = databaseMetaData.getIndexInfo( + null, + null, + StringUtils.toUpperCase(tableName), + false, + false + ); + while (resultSet.next()) { + String indexName = resultSet.getString("INDEX_NAME"); + if (org.apache.commons.lang.StringUtils.isNotBlank(indexName)) { + res.add(StringUtils.toUpperCase(indexName)); + } + } + return null; + } + }); + } + catch (Exception e) { + log.error(e, "Exception while listing the index on table %s ", tableName); + } + return ImmutableSet.copyOf(res); + } + + /** + * create index on the table with retry if not already exist, to be called after createTable + * + * @param tableName Name of the table to create index on + * @param indexName case-insensitive string index name, it helps to check the existing index on table + * @param indexCols List of columns to be indexed on + * @param createdIndexSet + */ + public void createIndex( + final String tableName, + final String indexName, + final List indexCols, + final Set createdIndexSet + ) + { + try { + retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + if (!createdIndexSet.contains(StringUtils.toUpperCase(indexName))) { + String indexSQL = StringUtils.format( + "CREATE INDEX %1$s ON %2$s(%3$s)", + indexName, + tableName, + Joiner.on(",").join(indexCols) + ); + log.info("Creating Index on Table [%s], sql: [%s] ", tableName, indexSQL); + handle.execute(indexSQL); + } else { + log.info("Index [%s] on Table [%s] already exists", indexName, tableName); + } + return null; + } + } + ); + } + catch (Exception e) { + log.error(e, StringUtils.format("Exception while creating index on table [%s]", tableName)); + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index d9df47920da..4dba35ca84b 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -22,6 +22,8 @@ package org.apache.druid.metadata; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.druid.java.util.common.StringUtils; import org.junit.Assert; @@ -41,8 +43,11 @@ import java.sql.SQLTransientException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class SQLMetadataConnectorTest @@ -107,6 +112,58 @@ public class SQLMetadataConnectorTest } } + @Test + public void testIndexCreationOnTaskTable() + { + final String entryType = tablesConfig.getTaskEntryType(); + String entryTableName = tablesConfig.getEntryTable(entryType); + connector.createTaskTables(); + Set createdIndexSet = connector.getIndexOnTable(entryTableName); + Set expectedIndexSet = Sets.newHashSet( + StringUtils.format("idx_%1$s_active_created_date", entryTableName), + StringUtils.format("idx_%1$s_datasource_active", entryTableName) + ).stream().map(StringUtils::toUpperCase).collect(Collectors.toSet()); + + for (String expectedIndex : expectedIndexSet) { + Assert.assertTrue( + StringUtils.format("Failed to find the expected Index %s on entry table", expectedIndex), + createdIndexSet.contains(expectedIndex) + ); + } + connector.createTaskTables(); + dropTable(entryTableName); + } + + @Test + public void testCreateIndexOnNoTable() + { + String tableName = "noTable"; + try { + connector.createIndex( + tableName, + "some_string", + Lists.newArrayList("a", "b"), + new HashSet<>() + ); + } + catch (Exception e) { + Assert.fail("Index creation should never throw an exception"); + } + } + + @Test + public void testGeIndexOnNoTable() + { + String tableName = "noTable"; + try { + Set res = connector.getIndexOnTable(tableName); + Assert.assertEquals(0, res.size()); + } + catch (Exception e) { + Assert.fail("getIndexOnTable should never throw an exception"); + } + } + @Test public void testInsertOrUpdate() {