mirror of https://github.com/apache/druid.git
Speed up SQLMetadataStorageActionHandlerTest (#14856)
Changes - Reduce test time of `SQLMetadataStorageActionHandlerTest.testMigration` - Slightly modify log messages to adhere to Druid style
This commit is contained in:
parent
b97cc45d81
commit
fffb2e4fe7
|
@ -62,6 +62,7 @@ import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
|
public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
|
||||||
implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
|
implements MetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
|
||||||
|
@ -1032,16 +1033,14 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TaskIdentifier> fetchTaskMetadatas(String tableName, String id, int limit)
|
private List<TaskIdentifier> fetchTasksWithTypeColumnNullAndIdGreaterThan(String id, int limit)
|
||||||
{
|
{
|
||||||
List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
|
List<TaskIdentifier> taskIdentifiers = new ArrayList<>();
|
||||||
connector.retryWithHandle(
|
connector.retryWithHandle(
|
||||||
handle -> {
|
handle -> {
|
||||||
String sql = StringUtils.format(
|
String sql = StringUtils.format(
|
||||||
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s",
|
"SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s",
|
||||||
tableName,
|
entryTable, id, connector.limitClause(limit)
|
||||||
id,
|
|
||||||
connector.limitClause(limit)
|
|
||||||
);
|
);
|
||||||
Query<Map<String, Object>> query = handle.createQuery(sql);
|
Query<Map<String, Object>> query = handle.createQuery(sql);
|
||||||
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
|
taskIdentifiers.addAll(query.map(taskIdentifierMapper).list());
|
||||||
|
@ -1051,19 +1050,21 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
|
||||||
return taskIdentifiers;
|
return taskIdentifiers;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateTaskMetadatas(String tasksTable, List<TaskIdentifier> taskIdentifiers)
|
private int updateColumnsTypeAndGroupIdForTasks(List<TaskIdentifier> taskIdentifiers)
|
||||||
{
|
{
|
||||||
connector.retryWithHandle(
|
return connector.retryWithHandle(
|
||||||
handle -> {
|
handle -> {
|
||||||
Batch batch = handle.createBatch();
|
final Batch batch = handle.createBatch();
|
||||||
String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'";
|
for (TaskIdentifier task : taskIdentifiers) {
|
||||||
for (TaskIdentifier metadata : taskIdentifiers) {
|
|
||||||
batch.add(
|
batch.add(
|
||||||
StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId())
|
StringUtils.format(
|
||||||
|
"UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'",
|
||||||
|
entryTable, task.getType(), task.getGroupId(), task.getId()
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
batch.execute();
|
int[] result = batch.execute();
|
||||||
return null;
|
return IntStream.of(result).sum();
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -1083,14 +1084,14 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
boolean populateTaskTypeAndGroupId()
|
boolean populateTaskTypeAndGroupId()
|
||||||
{
|
{
|
||||||
log.info("Populate fields task and group_id of task entry table [%s] from payload", entryTable);
|
log.debug("Populating columns [task] and [group_id] in task table[%s] from payload.", entryTable);
|
||||||
String id = "";
|
String lastUpdatedTaskId = "";
|
||||||
int limit = 100;
|
final int limit = 100;
|
||||||
int count = 0;
|
int numUpdatedTasks = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
List<TaskIdentifier> taskIdentifiers;
|
List<TaskIdentifier> taskIdentifiers;
|
||||||
try {
|
try {
|
||||||
taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit);
|
taskIdentifiers = fetchTasksWithTypeColumnNullAndIdGreaterThan(lastUpdatedTaskId, limit);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.warn(e, "Task migration failed while reading entries from task table");
|
log.warn(e, "Task migration failed while reading entries from task table");
|
||||||
|
@ -1100,15 +1101,17 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
updateTaskMetadatas(entryTable, taskIdentifiers);
|
final int updatedCount = updateColumnsTypeAndGroupIdForTasks(taskIdentifiers);
|
||||||
count += taskIdentifiers.size();
|
if (updatedCount > 0) {
|
||||||
log.info("Successfully updated type and groupId for [%d] tasks", count);
|
numUpdatedTasks += updatedCount;
|
||||||
|
log.info("Successfully updated columns [type] and [group_id] for [%d] tasks.", numUpdatedTasks);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
log.warn(e, "Task migration failed while updating entries in task table");
|
log.warn(e, "Task migration failed while updating entries in task table");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId();
|
lastUpdatedTaskId = taskIdentifiers.get(taskIdentifiers.size() - 1).getId();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
|
@ -1118,7 +1121,9 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.info("Task migration for table [%s] successful", entryTable);
|
if (numUpdatedTasks > 0) {
|
||||||
|
log.info("Task migration for table[%s] successful.", entryTable);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,6 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
|
||||||
|
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -59,12 +58,7 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
@Rule
|
@Rule
|
||||||
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
|
||||||
|
|
||||||
@Rule
|
|
||||||
public final ExpectedException thrown = ExpectedException.none();
|
|
||||||
|
|
||||||
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
|
||||||
|
|
||||||
|
|
||||||
private static final Random RANDOM = new Random(1);
|
private static final Random RANDOM = new Random(1);
|
||||||
|
|
||||||
private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String, Object>, Map<String, String>, Map<String, Object>> handler;
|
private SQLMetadataStorageActionHandler<Map<String, Object>, Map<String, Object>, Map<String, String>, Map<String, Object>> handler;
|
||||||
|
@ -256,16 +250,17 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60_000L)
|
@Test(timeout = 60_000L)
|
||||||
public void testRepeatInsert()
|
public void testDuplicateInsertThrowsEntryExistsException()
|
||||||
{
|
{
|
||||||
final String entryId = "abcd";
|
final String entryId = "abcd";
|
||||||
Map<String, Object> entry = ImmutableMap.of("a", 1);
|
Map<String, Object> entry = ImmutableMap.of("a", 1);
|
||||||
Map<String, Object> status = ImmutableMap.of("count", 42);
|
Map<String, Object> status = ImmutableMap.of("count", 42);
|
||||||
|
|
||||||
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
|
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
|
||||||
|
Assert.assertThrows(
|
||||||
thrown.expect(EntryExistsException.class);
|
EntryExistsException.class,
|
||||||
handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group");
|
() -> handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -469,17 +464,17 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
@Test
|
@Test
|
||||||
public void testMigration()
|
public void testMigration()
|
||||||
{
|
{
|
||||||
int active = 1234;
|
int numActiveTasks = 123;
|
||||||
for (int i = 0; i < active; i++) {
|
for (int i = 0; i < numActiveTasks; i++) {
|
||||||
insertTaskInfo(createRandomTaskInfo(true), false);
|
insertTaskInfo(createRandomTaskInfo(TaskState.RUNNING), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
int completed = 2345;
|
int numCompletedTasks = 101;
|
||||||
for (int i = 0; i < completed; i++) {
|
for (int i = 0; i < numCompletedTasks; i++) {
|
||||||
insertTaskInfo(createRandomTaskInfo(false), false);
|
insertTaskInfo(createRandomTaskInfo(TaskState.SUCCESS), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(active + completed, getUnmigratedTaskCount().intValue());
|
Assert.assertEquals(numActiveTasks + numCompletedTasks, getUnmigratedTaskCount().intValue());
|
||||||
|
|
||||||
handler.populateTaskTypeAndGroupId();
|
handler.populateTaskTypeAndGroupId();
|
||||||
|
|
||||||
|
@ -490,16 +485,16 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
public void testGetTaskStatusPlusListInternal()
|
public void testGetTaskStatusPlusListInternal()
|
||||||
{
|
{
|
||||||
// SETUP
|
// SETUP
|
||||||
TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered = createRandomTaskInfo(true);
|
TaskInfo<Map<String, Object>, Map<String, Object>> activeUnaltered = createRandomTaskInfo(TaskState.RUNNING);
|
||||||
insertTaskInfo(activeUnaltered, false);
|
insertTaskInfo(activeUnaltered, false);
|
||||||
|
|
||||||
TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered = createRandomTaskInfo(false);
|
TaskInfo<Map<String, Object>, Map<String, Object>> completedUnaltered = createRandomTaskInfo(TaskState.SUCCESS);
|
||||||
insertTaskInfo(completedUnaltered, false);
|
insertTaskInfo(completedUnaltered, false);
|
||||||
|
|
||||||
TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered = createRandomTaskInfo(true);
|
TaskInfo<Map<String, Object>, Map<String, Object>> activeAltered = createRandomTaskInfo(TaskState.RUNNING);
|
||||||
insertTaskInfo(activeAltered, true);
|
insertTaskInfo(activeAltered, true);
|
||||||
|
|
||||||
TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered = createRandomTaskInfo(false);
|
TaskInfo<Map<String, Object>, Map<String, Object>> completedAltered = createRandomTaskInfo(TaskState.SUCCESS);
|
||||||
insertTaskInfo(completedAltered, true);
|
insertTaskInfo(completedAltered, true);
|
||||||
|
|
||||||
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
|
Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups = new HashMap<>();
|
||||||
|
@ -561,7 +556,7 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TaskInfo<Map<String, Object>, Map<String, Object>> createRandomTaskInfo(boolean active)
|
private TaskInfo<Map<String, Object>, Map<String, Object>> createRandomTaskInfo(TaskState taskState)
|
||||||
{
|
{
|
||||||
String id = UUID.randomUUID().toString();
|
String id = UUID.randomUUID().toString();
|
||||||
DateTime createdTime = DateTime.now(DateTimeZone.UTC);
|
DateTime createdTime = DateTime.now(DateTimeZone.UTC);
|
||||||
|
@ -576,7 +571,7 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
|
|
||||||
Map<String, Object> status = new HashMap<>();
|
Map<String, Object> status = new HashMap<>();
|
||||||
status.put("id", id);
|
status.put("id", id);
|
||||||
status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS);
|
status.put("status", taskState);
|
||||||
status.put("duration", RANDOM.nextLong());
|
status.put("duration", RANDOM.nextLong());
|
||||||
status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995));
|
status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995));
|
||||||
status.put("errorMsg", UUID.randomUUID().toString());
|
status.put("errorMsg", UUID.randomUUID().toString());
|
||||||
|
@ -590,8 +585,7 @@ public class SQLMetadataStorageActionHandlerTest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String, Object>> taskInfo,
|
private void insertTaskInfo(TaskInfo<Map<String, Object>, Map<String, Object>> taskInfo, boolean altered)
|
||||||
boolean altered)
|
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
handler.insert(
|
handler.insert(
|
||||||
|
|
Loading…
Reference in New Issue