mirror of https://github.com/apache/druid.git
Refactor tests and code clean up (#16129)
* Add update() in TestDerbyConnectorRule
* use common function.
* fixup build.
* fixup indentations.
* Revert "fixup indentations."
This reverts commit a9d6b73e79
.
* fixup indentataions.
* Remove Thread.sleep() by directly calling updateUsedStatusLastUpdated.
* another indentation slip.
* Move common segment initialization to setup().
* Fix for checkstyle.
* review comments: indentation fixes, type.
* Wrapper class for Segments table
* Add KillUnusedSegmentsTaskBuilder in test class
* Remove javadocs for self-explanatory methods.
This commit is contained in:
parent
466057c61b
commit
3eefc47722
File diff suppressed because it is too large
Load Diff
|
@ -422,21 +422,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
|
|||
@Nullable final DataSourceMetadata endMetadata
|
||||
) throws IOException
|
||||
{
|
||||
if (segments.isEmpty()) {
|
||||
throw new IllegalArgumentException("segment set must not be empty");
|
||||
}
|
||||
|
||||
final String dataSource = segments.iterator().next().getDataSource();
|
||||
for (DataSegment segment : segments) {
|
||||
if (!dataSource.equals(segment.getDataSource())) {
|
||||
throw new IllegalArgumentException("segments must all be from the same dataSource");
|
||||
}
|
||||
}
|
||||
verifySegmentsToCommit(segments);
|
||||
|
||||
if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) {
|
||||
throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
|
||||
}
|
||||
|
||||
final String dataSource = segments.iterator().next().getDataSource();
|
||||
|
||||
// Find which segments are used (i.e. not overshadowed).
|
||||
final Set<DataSegment> usedSegments = new HashSet<>();
|
||||
List<TimelineObjectHolder<String, DataSegment>> segmentHolders =
|
||||
|
|
|
@ -383,17 +383,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
|
|||
for (final DataSegment segment : segments) {
|
||||
Assert.assertEquals(
|
||||
1,
|
||||
(int) derbyConnector.getDBI().<Integer>withHandle(
|
||||
handle -> {
|
||||
String request = StringUtils.format(
|
||||
"UPDATE %s SET used = false, used_status_last_updated = :used_status_last_updated WHERE id = :id",
|
||||
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
|
||||
);
|
||||
return handle.createStatement(request)
|
||||
.bind("id", segment.getId().toString())
|
||||
.bind("used_status_last_updated", usedStatusLastUpdatedTime.toString()
|
||||
).execute();
|
||||
}
|
||||
derbyConnectorRule.segments().update(
|
||||
"UPDATE %s SET used = false, used_status_last_updated = ? WHERE id = ?",
|
||||
usedStatusLastUpdatedTime.toString(),
|
||||
segment.getId().toString()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -30,13 +30,11 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.skife.jdbi.v2.Batch;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.Handle;
|
||||
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
|
||||
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
|
||||
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
|
||||
import org.skife.jdbi.v2.tweak.HandleCallback;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLRecoverableException;
|
||||
|
@ -47,7 +45,6 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -175,29 +172,7 @@ public class SQLMetadataConnectorTest
|
|||
public void testAlterSegmentTableAddLastUsed()
|
||||
{
|
||||
connector.createSegmentTable();
|
||||
|
||||
// Drop column used_status_last_updated to bring us in line with pre-upgrade state
|
||||
derbyConnectorRule.getConnector().retryWithHandle(
|
||||
new HandleCallback<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void withHandle(Handle handle)
|
||||
{
|
||||
final Batch batch = handle.createBatch();
|
||||
batch.add(
|
||||
StringUtils.format(
|
||||
"ALTER TABLE %1$s DROP COLUMN USED_STATUS_LAST_UPDATED",
|
||||
derbyConnectorRule.metadataTablesConfigSupplier()
|
||||
.get()
|
||||
.getSegmentsTable()
|
||||
.toUpperCase(Locale.ENGLISH)
|
||||
)
|
||||
);
|
||||
batch.execute();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
derbyConnectorRule.segments().update("ALTER TABLE %1$s DROP COLUMN USED_STATUS_LAST_UPDATED");
|
||||
|
||||
connector.alterSegmentTableAddUsedFlagLastUpdated();
|
||||
connector.tableHasColumn(
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.druid.timeline.DataSegment;
|
|||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.apache.druid.timeline.partition.NoneShardSpec;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.joda.time.Interval;
|
||||
import org.joda.time.Period;
|
||||
|
@ -52,7 +51,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -401,7 +399,10 @@ public class SqlSegmentsMetadataManagerTest
|
|||
"2017-10-15T20:19:12.565Z"
|
||||
);
|
||||
publishUnusedSegments(koalaSegment1);
|
||||
updateUsedStatusLastUpdated(koalaSegment1, DateTimes.nowUtc().minus(Duration.standardHours(2)));
|
||||
derbyConnectorRule.segments().updateUsedStatusLastUpdated(
|
||||
koalaSegment1.getId().toString(),
|
||||
DateTimes.nowUtc().minus(Duration.standardHours(2))
|
||||
);
|
||||
|
||||
// Publish an unused segment with used_status_last_updated 2 days ago
|
||||
final DataSegment koalaSegment2 = createSegment(
|
||||
|
@ -410,7 +411,10 @@ public class SqlSegmentsMetadataManagerTest
|
|||
"2017-10-15T20:19:12.565Z"
|
||||
);
|
||||
publishUnusedSegments(koalaSegment2);
|
||||
updateUsedStatusLastUpdated(koalaSegment2, DateTimes.nowUtc().minus(Duration.standardDays(2)));
|
||||
derbyConnectorRule.segments().updateUsedStatusLastUpdated(
|
||||
koalaSegment2.getId().toString(),
|
||||
DateTimes.nowUtc().minus(Duration.standardDays(2))
|
||||
);
|
||||
|
||||
// Publish an unused segment and set used_status_last_updated to null
|
||||
final DataSegment koalaSegment3 = createSegment(
|
||||
|
@ -904,54 +908,35 @@ public class SqlSegmentsMetadataManagerTest
|
|||
Assert.assertEquals(0, getCountOfRowsWithLastUsedNull());
|
||||
}
|
||||
|
||||
private void updateSegmentPayload(DataSegment segment, byte[] payload)
|
||||
{
|
||||
executeUpdate(
|
||||
"UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?",
|
||||
payload,
|
||||
segment.getId().toString()
|
||||
);
|
||||
}
|
||||
|
||||
private int getCountOfRowsWithLastUsedNull()
|
||||
{
|
||||
return derbyConnectorRule.getConnector().retryWithHandle(
|
||||
handle -> handle.select(
|
||||
StringUtils.format(
|
||||
"SELECT ID FROM %1$s WHERE USED_STATUS_LAST_UPDATED IS NULL",
|
||||
getSegmentsTable()
|
||||
derbyConnectorRule.segments().getTableName()
|
||||
)
|
||||
).size()
|
||||
);
|
||||
}
|
||||
|
||||
private void updateUsedStatusLastUpdated(DataSegment segment, DateTime newValue)
|
||||
private void updateSegmentPayload(DataSegment segment, byte[] payload)
|
||||
{
|
||||
executeUpdate(
|
||||
"UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?",
|
||||
newValue.toString(),
|
||||
derbyConnectorRule.segments().update(
|
||||
"UPDATE %1$s SET PAYLOAD = ? WHERE ID = ?",
|
||||
payload,
|
||||
segment.getId().toString()
|
||||
);
|
||||
}
|
||||
|
||||
private void updateUsedStatusLastUpdatedToNull(DataSegment segment)
|
||||
{
|
||||
executeUpdate(
|
||||
derbyConnectorRule.segments().update(
|
||||
"UPDATE %1$s SET USED_STATUS_LAST_UPDATED = NULL WHERE ID = ?",
|
||||
segment.getId().toString()
|
||||
);
|
||||
}
|
||||
|
||||
private void executeUpdate(String sqlFormat, Object... args)
|
||||
{
|
||||
derbyConnectorRule.getConnector().retryWithHandle(
|
||||
handle -> handle.update(
|
||||
StringUtils.format(sqlFormat, getSegmentsTable()),
|
||||
args
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Alters the column used_status_last_updated to be nullable. This is used to
|
||||
* test backward compatibility with versions of Druid without this column
|
||||
|
@ -959,14 +944,8 @@ public class SqlSegmentsMetadataManagerTest
|
|||
*/
|
||||
private void allowUsedFlagLastUpdatedToBeNullable()
|
||||
{
|
||||
executeUpdate("ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL");
|
||||
}
|
||||
|
||||
private String getSegmentsTable()
|
||||
{
|
||||
return derbyConnectorRule.metadataTablesConfigSupplier()
|
||||
.get()
|
||||
.getSegmentsTable()
|
||||
.toUpperCase(Locale.ENGLISH);
|
||||
derbyConnectorRule.segments().update(
|
||||
"ALTER TABLE %1$s ALTER COLUMN USED_STATUS_LAST_UPDATED NULL"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,12 +23,14 @@ import com.google.common.base.Supplier;
|
|||
import com.google.common.base.Suppliers;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.metadata.storage.derby.DerbyConnector;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.rules.ExternalResource;
|
||||
import org.skife.jdbi.v2.DBI;
|
||||
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.Locale;
|
||||
import java.util.UUID;
|
||||
|
||||
public class TestDerbyConnector extends DerbyConnector
|
||||
|
@ -135,5 +137,57 @@ public class TestDerbyConnector extends DerbyConnector
|
|||
{
|
||||
return dbTables;
|
||||
}
|
||||
|
||||
public SegmentsTable segments()
|
||||
{
|
||||
return new SegmentsTable(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper class for queries on the segments table.
|
||||
*/
|
||||
public static class SegmentsTable
|
||||
{
|
||||
private final DerbyConnectorRule rule;
|
||||
|
||||
public SegmentsTable(DerbyConnectorRule rule)
|
||||
{
|
||||
this.rule = rule;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the segments table with the supplied SQL query format and arguments.
|
||||
*
|
||||
* @param sqlFormat the SQL query format with %s placeholder for the table name and ? for each query {@code args}
|
||||
* @param args the arguments to be substituted into the SQL query
|
||||
* @return the number of rows affected by the update operation
|
||||
*/
|
||||
public int update(String sqlFormat, Object... args)
|
||||
{
|
||||
return this.rule.getConnector().retryWithHandle(
|
||||
handle -> handle.update(
|
||||
StringUtils.format(sqlFormat, getTableName()),
|
||||
args
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public int updateUsedStatusLastUpdated(String segmentId, DateTime lastUpdatedTime)
|
||||
{
|
||||
return update(
|
||||
"UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?",
|
||||
lastUpdatedTime.toString(),
|
||||
segmentId
|
||||
);
|
||||
}
|
||||
|
||||
public String getTableName()
|
||||
{
|
||||
return this.rule.metadataTablesConfigSupplier()
|
||||
.get()
|
||||
.getSegmentsTable()
|
||||
.toUpperCase(Locale.ENGLISH);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.druid.indexer.TaskStatusPlus;
|
|||
import org.apache.druid.java.util.common.CloseableIterators;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||
import org.apache.druid.metadata.SQLMetadataSegmentPublisher;
|
||||
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
|
||||
|
@ -69,7 +68,6 @@ import java.util.ArrayList;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
public class KillUnusedSegmentsTest
|
||||
|
@ -776,7 +774,7 @@ public class KillUnusedSegmentsTest
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
sqlSegmentsMetadataManager.markSegmentsAsUnused(ImmutableSet.of(segment.getId()));
|
||||
updateUsedStatusLastUpdated(segment, lastUpdatedTime);
|
||||
derbyConnectorRule.segments().updateUsedStatusLastUpdated(segment.getId().toString(), lastUpdatedTime);
|
||||
}
|
||||
|
||||
private DataSegment createSegment(final String dataSource, final Interval interval, final String version)
|
||||
|
@ -925,25 +923,4 @@ public class KillUnusedSegmentsTest
|
|||
observedDatasourceToLastKillTaskId.remove(dataSource);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateUsedStatusLastUpdated(DataSegment segment, DateTime lastUpdatedTime)
|
||||
{
|
||||
derbyConnectorRule.getConnector().retryWithHandle(
|
||||
handle -> handle.update(
|
||||
StringUtils.format(
|
||||
"UPDATE %1$s SET USED_STATUS_LAST_UPDATED = ? WHERE ID = ?", getSegmentsTable()
|
||||
),
|
||||
lastUpdatedTime.toString(),
|
||||
segment.getId().toString()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private String getSegmentsTable()
|
||||
{
|
||||
return derbyConnectorRule.metadataTablesConfigSupplier()
|
||||
.get()
|
||||
.getSegmentsTable()
|
||||
.toUpperCase(Locale.ENGLISH);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue