From fffb2e4fe72c9c8a0e59d4a191a1de8f6763df88 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 17 Aug 2023 18:02:43 +0530 Subject: [PATCH] Speed up SQLMetadataStorageActionHandlerTest (#14856) Changes - Reduce test time of `SQLMetadataStorageActionHandlerTest.testMigration` - Slightly modify log messages to adhere to Druid style --- .../SQLMetadataStorageActionHandler.java | 49 ++++++++++--------- .../SQLMetadataStorageActionHandlerTest.java | 44 +++++++---------- 2 files changed, 46 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 8d84432ff1e..9fbc79f273c 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -62,6 +62,7 @@ import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.IntStream; public abstract class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler @@ -1032,16 +1033,14 @@ public abstract class SQLMetadataStorageActionHandler fetchTaskMetadatas(String tableName, String id, int limit) + private List fetchTasksWithTypeColumnNullAndIdGreaterThan(String id, int limit) { List taskIdentifiers = new ArrayList<>(); connector.retryWithHandle( handle -> { String sql = StringUtils.format( "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s", - tableName, - id, - connector.limitClause(limit) + entryTable, id, connector.limitClause(limit) ); Query> query = handle.createQuery(sql); taskIdentifiers.addAll(query.map(taskIdentifierMapper).list()); @@ -1051,19 +1050,21 @@ public abstract class SQLMetadataStorageActionHandler taskIdentifiers) + private int updateColumnsTypeAndGroupIdForTasks(List taskIdentifiers) { - connector.retryWithHandle( + return connector.retryWithHandle( handle -> { - Batch batch = handle.createBatch(); - String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'"; - for (TaskIdentifier metadata : taskIdentifiers) { + final Batch batch = handle.createBatch(); + for (TaskIdentifier task : taskIdentifiers) { batch.add( - StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId()) + StringUtils.format( + "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'", + entryTable, task.getType(), task.getGroupId(), task.getId() + ) ); } - batch.execute(); - return null; + int[] result = batch.execute(); + return IntStream.of(result).sum(); } ); } @@ -1083,14 +1084,14 @@ public abstract class SQLMetadataStorageActionHandler taskIdentifiers; try { - taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit); + taskIdentifiers = fetchTasksWithTypeColumnNullAndIdGreaterThan(lastUpdatedTaskId, limit); } catch (Exception e) { log.warn(e, "Task migration failed while reading entries from task table"); @@ -1100,15 +1101,17 @@ public abstract class SQLMetadataStorageActionHandler 0) { + numUpdatedTasks += updatedCount; + log.info("Successfully updated columns [type] and [group_id] for [%d] tasks.", numUpdatedTasks); + } } catch (Exception e) { log.warn(e, "Task migration failed while updating entries in task table"); return false; } - id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId(); + lastUpdatedTaskId = taskIdentifiers.get(taskIdentifiers.size() - 1).getId(); try { Thread.sleep(1000); @@ -1118,7 +1121,9 @@ public abstract class SQLMetadataStorageActionHandler 0) { + log.info("Task migration for table[%s] successful.", entryTable); + } return true; } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 2ade4f96019..cee24d314e3 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -43,7 +43,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.sql.ResultSet; import java.util.HashMap; @@ -59,12 +58,7 @@ public class SQLMetadataStorageActionHandlerTest @Rule public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); - @Rule - public final ExpectedException thrown = ExpectedException.none(); - private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - - private static final Random RANDOM = new Random(1); private SQLMetadataStorageActionHandler, Map, Map, Map> handler; @@ -256,16 +250,17 @@ public class SQLMetadataStorageActionHandlerTest } @Test(timeout = 60_000L) - public void testRepeatInsert() + public void testDuplicateInsertThrowsEntryExistsException() { final String entryId = "abcd"; Map entry = ImmutableMap.of("a", 1); Map status = ImmutableMap.of("count", 42); handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); - - thrown.expect(EntryExistsException.class); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); + Assert.assertThrows( + EntryExistsException.class, + () -> handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group") + ); } @Test @@ -469,17 +464,17 @@ public class SQLMetadataStorageActionHandlerTest @Test public void testMigration() { - int active = 1234; - for (int i = 0; i < active; i++) { - insertTaskInfo(createRandomTaskInfo(true), false); + int numActiveTasks = 123; + for (int i = 0; i < numActiveTasks; i++) { + insertTaskInfo(createRandomTaskInfo(TaskState.RUNNING), false); } - int completed = 2345; - for (int i = 0; i < completed; i++) { - insertTaskInfo(createRandomTaskInfo(false), false); + int numCompletedTasks = 101; + for (int i = 0; i < numCompletedTasks; i++) { + insertTaskInfo(createRandomTaskInfo(TaskState.SUCCESS), false); } - Assert.assertEquals(active + completed, getUnmigratedTaskCount().intValue()); + Assert.assertEquals(numActiveTasks + numCompletedTasks, getUnmigratedTaskCount().intValue()); handler.populateTaskTypeAndGroupId(); @@ -490,16 +485,16 @@ public class SQLMetadataStorageActionHandlerTest public void testGetTaskStatusPlusListInternal() { // SETUP - TaskInfo, Map> activeUnaltered = createRandomTaskInfo(true); + TaskInfo, Map> activeUnaltered = createRandomTaskInfo(TaskState.RUNNING); insertTaskInfo(activeUnaltered, false); - TaskInfo, Map> completedUnaltered = createRandomTaskInfo(false); + TaskInfo, Map> completedUnaltered = createRandomTaskInfo(TaskState.SUCCESS); insertTaskInfo(completedUnaltered, false); - TaskInfo, Map> activeAltered = createRandomTaskInfo(true); + TaskInfo, Map> activeAltered = createRandomTaskInfo(TaskState.RUNNING); insertTaskInfo(activeAltered, true); - TaskInfo, Map> completedAltered = createRandomTaskInfo(false); + TaskInfo, Map> completedAltered = createRandomTaskInfo(TaskState.SUCCESS); insertTaskInfo(completedAltered, true); Map taskLookups = new HashMap<>(); @@ -561,7 +556,7 @@ public class SQLMetadataStorageActionHandlerTest ); } - private TaskInfo, Map> createRandomTaskInfo(boolean active) + private TaskInfo, Map> createRandomTaskInfo(TaskState taskState) { String id = UUID.randomUUID().toString(); DateTime createdTime = DateTime.now(DateTimeZone.UTC); @@ -576,7 +571,7 @@ public class SQLMetadataStorageActionHandlerTest Map status = new HashMap<>(); status.put("id", id); - status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS); + status.put("status", taskState); status.put("duration", RANDOM.nextLong()); status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995)); status.put("errorMsg", UUID.randomUUID().toString()); @@ -590,8 +585,7 @@ public class SQLMetadataStorageActionHandlerTest ); } - private void insertTaskInfo(TaskInfo, Map> taskInfo, - boolean altered) + private void insertTaskInfo(TaskInfo, Map> taskInfo, boolean altered) { try { handler.insert(