mirror of https://github.com/apache/druid.git
Add additional index on task table (#14470)
This commit is contained in:
parent
e10e35aa2c
commit
4b2d87336a
|
@ -19,10 +19,12 @@
|
|||
|
||||
package org.apache.druid.metadata;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.commons.dbcp2.BasicDataSourceFactory;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
|
@ -51,8 +53,10 @@ import java.sql.SQLRecoverableException;
|
|||
import java.sql.SQLTransientException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
||||
{
|
||||
|
@ -377,10 +381,22 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
+ " PRIMARY KEY (id)\n"
|
||||
+ ")",
|
||||
tableName, getPayloadType(), getCollation()
|
||||
),
|
||||
StringUtils.format("CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date)", tableName)
|
||||
)
|
||||
)
|
||||
);
|
||||
final Set<String> createdIndexSet = getIndexOnTable(tableName);
|
||||
createIndex(
|
||||
tableName,
|
||||
StringUtils.format("idx_%1$s_active_created_date", tableName),
|
||||
ImmutableList.of("active", "created_date"),
|
||||
createdIndexSet
|
||||
);
|
||||
createIndex(
|
||||
tableName,
|
||||
StringUtils.format("idx_%1$s_datasource_active", tableName),
|
||||
ImmutableList.of("datasource", "active"),
|
||||
createdIndexSet
|
||||
);
|
||||
}
|
||||
|
||||
private void alterEntryTable(final String tableName)
|
||||
|
@ -830,4 +846,88 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|
|||
log.warn(e, "Exception while deleting records from table");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the Set of the index on given table
|
||||
*
|
||||
* @param tableName name of the table to fetch the index map
|
||||
* @return Set of the uppercase index names, returns empty set if table does not exist
|
||||
*/
|
||||
public Set<String> getIndexOnTable(String tableName)
|
||||
{
|
||||
Set<String> res = new HashSet<>();
|
||||
try {
|
||||
retryWithHandle(new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle) throws Exception
|
||||
{
|
||||
DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
|
||||
// Fetch the index for given table
|
||||
ResultSet resultSet = databaseMetaData.getIndexInfo(
|
||||
null,
|
||||
null,
|
||||
StringUtils.toUpperCase(tableName),
|
||||
false,
|
||||
false
|
||||
);
|
||||
while (resultSet.next()) {
|
||||
String indexName = resultSet.getString("INDEX_NAME");
|
||||
if (org.apache.commons.lang.StringUtils.isNotBlank(indexName)) {
|
||||
res.add(StringUtils.toUpperCase(indexName));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, "Exception while listing the index on table %s ", tableName);
|
||||
}
|
||||
return ImmutableSet.copyOf(res);
|
||||
}
|
||||
|
||||
/**
|
||||
* create index on the table with retry if not already exist, to be called after createTable
|
||||
*
|
||||
* @param tableName Name of the table to create index on
|
||||
* @param indexName case-insensitive string index name, it helps to check the existing index on table
|
||||
* @param indexCols List of columns to be indexed on
|
||||
* @param createdIndexSet
|
||||
*/
|
||||
public void createIndex(
|
||||
final String tableName,
|
||||
final String indexName,
|
||||
final List<String> indexCols,
|
||||
final Set<String> createdIndexSet
|
||||
)
|
||||
{
|
||||
try {
|
||||
retryWithHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle)
|
||||
{
|
||||
if (!createdIndexSet.contains(StringUtils.toUpperCase(indexName))) {
|
||||
String indexSQL = StringUtils.format(
|
||||
"CREATE INDEX %1$s ON %2$s(%3$s)",
|
||||
indexName,
|
||||
tableName,
|
||||
Joiner.on(",").join(indexCols)
|
||||
);
|
||||
log.info("Creating Index on Table [%s], sql: [%s] ", tableName, indexSQL);
|
||||
handle.execute(indexSQL);
|
||||
} else {
|
||||
log.info("Index [%s] on Table [%s] already exists", indexName, tableName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.error(e, StringUtils.format("Exception while creating index on table [%s]", tableName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.druid.metadata;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.dbcp2.BasicDataSource;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -41,8 +43,11 @@ import java.sql.SQLTransientException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
public class SQLMetadataConnectorTest
|
||||
|
@ -107,6 +112,58 @@ public class SQLMetadataConnectorTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIndexCreationOnTaskTable()
|
||||
{
|
||||
final String entryType = tablesConfig.getTaskEntryType();
|
||||
String entryTableName = tablesConfig.getEntryTable(entryType);
|
||||
connector.createTaskTables();
|
||||
Set<String> createdIndexSet = connector.getIndexOnTable(entryTableName);
|
||||
Set<String> expectedIndexSet = Sets.newHashSet(
|
||||
StringUtils.format("idx_%1$s_active_created_date", entryTableName),
|
||||
StringUtils.format("idx_%1$s_datasource_active", entryTableName)
|
||||
).stream().map(StringUtils::toUpperCase).collect(Collectors.toSet());
|
||||
|
||||
for (String expectedIndex : expectedIndexSet) {
|
||||
Assert.assertTrue(
|
||||
StringUtils.format("Failed to find the expected Index %s on entry table", expectedIndex),
|
||||
createdIndexSet.contains(expectedIndex)
|
||||
);
|
||||
}
|
||||
connector.createTaskTables();
|
||||
dropTable(entryTableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateIndexOnNoTable()
|
||||
{
|
||||
String tableName = "noTable";
|
||||
try {
|
||||
connector.createIndex(
|
||||
tableName,
|
||||
"some_string",
|
||||
Lists.newArrayList("a", "b"),
|
||||
new HashSet<>()
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.fail("Index creation should never throw an exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGeIndexOnNoTable()
|
||||
{
|
||||
String tableName = "noTable";
|
||||
try {
|
||||
Set<String> res = connector.getIndexOnTable(tableName);
|
||||
Assert.assertEquals(0, res.size());
|
||||
}
|
||||
catch (Exception e) {
|
||||
Assert.fail("getIndexOnTable should never throw an exception");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertOrUpdate()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue