mirror of https://github.com/apache/druid.git
Allow coordinator to be configured to kill segments in future (#10877)
Allow a Druid cluster to kill segments whose interval_end is a date in the future. This can be done by setting druid.coordinator.kill.durationToRetain to a negative period. For example PT-24H would allow segments to be killed if their interval_end date was 24 hours or less into the future at the time that the kill task is generated by the system. A cluster operator can also disregard the druid.coordinator.kill.durationToRetain entirely by setting a new configuration, druid.coordinator.kill.ignoreDurationToRetain=true. This ignores interval_end date when looking for segments to kill, and instead is capable of killing any segment marked unused. This new configuration is off by default, and a cluster operator should fully understand and accept the risks if they enable it.
This commit is contained in:
parent
60b4fa0f75
commit
deb69d1bc0
|
@ -39,8 +39,9 @@ public final class DateTimes
|
|||
public static final DateTime EPOCH = utc(0);
|
||||
public static final DateTime MAX = utc(JodaUtils.MAX_INSTANT);
|
||||
public static final DateTime MIN = utc(JodaUtils.MIN_INSTANT);
|
||||
public static final DateTime CAN_COMPARE_AS_YEAR_MIN = of("0000-01-01");
|
||||
public static final DateTime CAN_COMPARE_AS_YEAR_MAX = of("10000-01-01").minus(1);
|
||||
// The following two DateTime objects are utilities that can be used for accurately comparing date strings
|
||||
public static final DateTime COMPARE_DATE_AS_STRING_MIN = of("0000-01-01");
|
||||
public static final DateTime COMPARE_DATE_AS_STRING_MAX = of("10000-01-01").minus(1);
|
||||
|
||||
public static final UtcFormatter ISO_DATE_TIME = wrapFormatter(ISODateTimeFormat.dateTime());
|
||||
public static final UtcFormatter ISO_DATE_OPTIONAL_TIME = wrapFormatter(ISODateTimeFormat.dateOptionalTimeParser());
|
||||
|
@ -188,8 +189,8 @@ public final class DateTimes
|
|||
*/
|
||||
public static boolean canCompareAsString(final DateTime dateTime)
|
||||
{
|
||||
return dateTime.getMillis() >= CAN_COMPARE_AS_YEAR_MIN.getMillis()
|
||||
&& dateTime.getMillis() <= CAN_COMPARE_AS_YEAR_MAX.getMillis()
|
||||
return dateTime.getMillis() >= COMPARE_DATE_AS_STRING_MIN.getMillis()
|
||||
&& dateTime.getMillis() <= COMPARE_DATE_AS_STRING_MAX.getMillis()
|
||||
&& ISOChronology.getInstanceUTC().equals(dateTime.getChronology());
|
||||
}
|
||||
|
||||
|
|
|
@ -107,8 +107,8 @@ public class DateTimesTest
|
|||
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.EPOCH));
|
||||
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("0000-01-01")));
|
||||
|
||||
Assert.assertEquals("0000-01-01T00:00:00.000Z", DateTimes.CAN_COMPARE_AS_YEAR_MIN.toString());
|
||||
Assert.assertEquals("9999-12-31T23:59:59.999Z", DateTimes.CAN_COMPARE_AS_YEAR_MAX.toString());
|
||||
Assert.assertEquals("0000-01-01T00:00:00.000Z", DateTimes.COMPARE_DATE_AS_STRING_MIN.toString());
|
||||
Assert.assertEquals("9999-12-31T23:59:59.999Z", DateTimes.COMPARE_DATE_AS_STRING_MAX.toString());
|
||||
|
||||
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("9999")));
|
||||
Assert.assertTrue(DateTimes.canCompareAsString(DateTimes.of("2000")));
|
||||
|
|
|
@ -812,7 +812,8 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|
|||
|`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not the Coordinator clean up old entries in the `pendingSegments` table of metadata store. If set to true, Coordinator will check the created time of most recently complete task. If it doesn't exist, it finds the created time of the earliest running/pending/waiting tasks. Once the created time is found, then for all dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic configuration](#dynamic-configuration)), Coordinator will ask the Overlord to clean up the entries 1 day or more older than the found created time in the `pendingSegments` table. This will be done periodically based on `druid.coordinator.period.indexingPeriod` specified.|true|
|
||||
|`druid.coordinator.kill.on`|Boolean flag for whether or not the Coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all whitelisted dataSources (or optionally all), Coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all unused segments except for the last `durationToRetain` period. A whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|true|
|
||||
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|
||||
|`druid.coordinator.kill.durationToRetain`| Do not kill unused segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on.|`P90D`|
|
||||
|`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.|`P90D`|
|
||||
|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false|
|
||||
|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100|
|
||||
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`|
|
||||
|`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false|
|
||||
|
|
|
@ -51,6 +51,10 @@ public abstract class DruidCoordinatorConfig
|
|||
@Default("P90D")
|
||||
public abstract Duration getCoordinatorKillDurationToRetain();
|
||||
|
||||
@Config("druid.coordinator.kill.ignoreDurationToRetain")
|
||||
@Default("false")
|
||||
public abstract boolean getCoordinatorKillIgnoreDurationToRetain();
|
||||
|
||||
@Config("druid.coordinator.kill.maxSegments")
|
||||
@Default("100")
|
||||
public abstract int getCoordinatorKillMaxSegments();
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
|
|||
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
|
||||
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
|
||||
import org.apache.druid.utils.CollectionUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -37,8 +38,11 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Completely removes information about unused segments whose end time is older than {@link #retainDuration} from now
|
||||
* from the metadata store. This action is called "to kill a segment".
|
||||
* Completely removes information about unused segments who have an interval end that comes before
|
||||
* now - {@link #retainDuration} from the metadata store. retainDuration can be a positive or negative duration,
|
||||
* negative meaning the interval end target will be in the future. Also, retainDuration can be ignored,
|
||||
* meaning that there is no upper bound to the end interval of segments that will be killed. This action is called
|
||||
* "to kill a segment".
|
||||
*
|
||||
* See org.apache.druid.indexing.common.task.KillUnusedSegmentsTask.
|
||||
*/
|
||||
|
@ -48,6 +52,7 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
|
||||
private final long period;
|
||||
private final long retainDuration;
|
||||
private final boolean ignoreRetainDuration;
|
||||
private final int maxSegmentsToKill;
|
||||
private long lastKillTime = 0;
|
||||
|
||||
|
@ -67,8 +72,15 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
"coordinator kill period must be greater than druid.coordinator.period.indexingPeriod"
|
||||
);
|
||||
|
||||
this.ignoreRetainDuration = config.getCoordinatorKillIgnoreDurationToRetain();
|
||||
this.retainDuration = config.getCoordinatorKillDurationToRetain().getMillis();
|
||||
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator kill retainDuration must be >= 0");
|
||||
if (this.ignoreRetainDuration) {
|
||||
log.debug(
|
||||
"druid.coordinator.kill.durationToRetain [%s] will be ignored when discovering segments to kill "
|
||||
+ "because you have set druid.coordinator.kill.ignoreDurationToRetain to True.",
|
||||
this.retainDuration
|
||||
);
|
||||
}
|
||||
|
||||
this.maxSegmentsToKill = config.getCoordinatorKillMaxSegments();
|
||||
Preconditions.checkArgument(this.maxSegmentsToKill > 0, "coordinator kill maxSegments must be > 0");
|
||||
|
@ -76,7 +88,7 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
log.info(
|
||||
"Kill Task scheduling enabled with period [%s], retainDuration [%s], maxSegmentsToKill [%s]",
|
||||
this.period,
|
||||
this.retainDuration,
|
||||
this.ignoreRetainDuration ? "IGNORING" : this.retainDuration,
|
||||
this.maxSegmentsToKill
|
||||
);
|
||||
|
||||
|
@ -118,12 +130,20 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
return params;
|
||||
}
|
||||
|
||||
/**
|
||||
* For a given datasource and limit of segments that can be killed in one task, determine the interval to be
|
||||
* submitted with the kill task.
|
||||
*
|
||||
* @param dataSource dataSource whose unused segments are being killed.
|
||||
* @param limit the maximum number of segments that can be included in the kill task.
|
||||
* @return {@link Interval} to be used in the kill task.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Nullable
|
||||
Interval findIntervalForKill(String dataSource, int limit)
|
||||
{
|
||||
List<Interval> unusedSegmentIntervals =
|
||||
segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, DateTimes.nowUtc().minus(retainDuration), limit);
|
||||
segmentsMetadataManager.getUnusedSegmentIntervals(dataSource, getEndTimeUpperLimit(), limit);
|
||||
|
||||
if (unusedSegmentIntervals != null && unusedSegmentIntervals.size() > 0) {
|
||||
return JodaUtils.umbrellaInterval(unusedSegmentIntervals);
|
||||
|
@ -131,4 +151,25 @@ public class KillUnusedSegments implements CoordinatorDuty
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the {@link DateTime} that wil form the upper bound when looking for segments that are
|
||||
* eligible to be killed. If ignoreDurationToRetain is true, we have no upper bound and return a DateTime object
|
||||
* for "max" time that works when comparing date strings.
|
||||
*
|
||||
* @return {@link DateTime} representing the upper bound time used when looking for segments to kill.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
DateTime getEndTimeUpperLimit()
|
||||
{
|
||||
return ignoreRetainDuration
|
||||
? DateTimes.COMPARE_DATE_AS_STRING_MAX
|
||||
: DateTimes.nowUtc().minus(retainDuration);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Long getRetainDuration()
|
||||
{
|
||||
return retainDuration;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -377,6 +377,12 @@ public class SqlSegmentsMetadataManagerTest
|
|||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 1)
|
||||
);
|
||||
|
||||
// Test the DateTime maxEndTime argument of getUnusedSegmentIntervals
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment2.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of(2012, 1, 7, 0, 0), 1)
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
ImmutableList.of(segment2.getInterval(), segment1.getInterval()),
|
||||
sqlSegmentsMetadataManager.getUnusedSegmentIntervals("wikipedia", DateTimes.of("3000"), 5)
|
||||
|
|
|
@ -183,7 +183,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
new Duration("PT0s")
|
||||
new Duration("PT0s"),
|
||||
false
|
||||
);
|
||||
sourceLoadQueueChildrenCache = new PathChildrenCache(
|
||||
curator,
|
||||
|
|
|
@ -48,6 +48,7 @@ public class DruidCoordinatorConfigTest
|
|||
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
|
||||
Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
|
||||
Assert.assertTrue(config.getCompactionSkipLockedIntervals());
|
||||
Assert.assertFalse(config.getCoordinatorKillIgnoreDurationToRetain());
|
||||
|
||||
//with non-defaults
|
||||
Properties props = new Properties();
|
||||
|
@ -62,6 +63,7 @@ public class DruidCoordinatorConfigTest
|
|||
props.setProperty("druid.coordinator.load.timeout", "PT1s");
|
||||
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
|
||||
props.setProperty("druid.coordinator.compaction.skipLockedIntervals", "false");
|
||||
props.setProperty("druid.coordinator.kill.ignoreDurationToRetain", "true");
|
||||
|
||||
factory = Config.createFactory(props);
|
||||
config = factory.build(DruidCoordinatorConfig.class);
|
||||
|
@ -75,5 +77,13 @@ public class DruidCoordinatorConfigTest
|
|||
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
|
||||
Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay());
|
||||
Assert.assertFalse(config.getCompactionSkipLockedIntervals());
|
||||
Assert.assertTrue(config.getCoordinatorKillIgnoreDurationToRetain());
|
||||
|
||||
// Test negative druid.coordinator.kill.durationToRetain now that it is valid.
|
||||
props = new Properties();
|
||||
props.setProperty("druid.coordinator.kill.durationToRetain", "PT-1s");
|
||||
factory = Config.createFactory(props);
|
||||
config = factory.build(DruidCoordinatorConfig.class);
|
||||
Assert.assertEquals(new Duration("PT-1s"), config.getCoordinatorKillDurationToRetain());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,7 +163,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
new Duration("PT0s")
|
||||
new Duration("PT0s"),
|
||||
false
|
||||
);
|
||||
pathChildrenCache = new PathChildrenCache(
|
||||
curator,
|
||||
|
@ -943,6 +944,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
|
|||
null,
|
||||
10,
|
||||
new Duration("PT0s"),
|
||||
false,
|
||||
false
|
||||
);
|
||||
CoordinatorCustomDutyGroup compactSegmentCustomGroup = new CoordinatorCustomDutyGroup("group1", Duration.standardSeconds(1), ImmutableList.of(new CompactSegments(differentConfigUsedInCustomGroup, null, null)));
|
||||
|
|
|
@ -91,7 +91,8 @@ public class HttpLoadQueuePeonTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
Duration.ZERO
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -106,7 +106,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
Duration.millis(0)
|
||||
Duration.millis(0),
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -311,7 +312,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
new Duration("PT1s")
|
||||
new Duration("PT1s"),
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -373,7 +375,8 @@ public class LoadQueuePeonTest extends CuratorTestBase
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
new Duration("PT1s")
|
||||
new Duration("PT1s"),
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
|
|
|
@ -55,7 +55,8 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
new Duration("PT1s")
|
||||
new Duration("PT1s"),
|
||||
false
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
private final Duration getLoadQueuePeonRepeatDelay;
|
||||
private final int coordinatorKillMaxSegments;
|
||||
private final boolean compactionSkipLockedIntervals;
|
||||
private final boolean coordinatorKillIgnoreDurationToRetain;
|
||||
|
||||
public TestDruidCoordinatorConfig(
|
||||
Duration coordinatorStartDelay,
|
||||
|
@ -61,7 +62,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
Duration coordinatorDatasourceKillPeriod,
|
||||
Duration coordinatorDatasourceKillDurationToRetain,
|
||||
int coordinatorKillMaxSegments,
|
||||
Duration getLoadQueuePeonRepeatDelay
|
||||
Duration getLoadQueuePeonRepeatDelay,
|
||||
boolean coordinatorKillIgnoreDurationToRetain
|
||||
)
|
||||
{
|
||||
this.coordinatorStartDelay = coordinatorStartDelay;
|
||||
|
@ -83,6 +85,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
|
||||
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
|
||||
this.compactionSkipLockedIntervals = true;
|
||||
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
|
||||
}
|
||||
|
||||
public TestDruidCoordinatorConfig(
|
||||
|
@ -104,7 +107,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
Duration coordinatorDatasourceKillDurationToRetain,
|
||||
int coordinatorKillMaxSegments,
|
||||
Duration getLoadQueuePeonRepeatDelay,
|
||||
boolean compactionSkipLockedIntervals
|
||||
boolean compactionSkipLockedIntervals,
|
||||
boolean coordinatorKillIgnoreDurationToRetain
|
||||
)
|
||||
{
|
||||
this.coordinatorStartDelay = coordinatorStartDelay;
|
||||
|
@ -126,6 +130,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
this.coordinatorKillMaxSegments = coordinatorKillMaxSegments;
|
||||
this.getLoadQueuePeonRepeatDelay = getLoadQueuePeonRepeatDelay;
|
||||
this.compactionSkipLockedIntervals = compactionSkipLockedIntervals;
|
||||
this.coordinatorKillIgnoreDurationToRetain = coordinatorKillIgnoreDurationToRetain;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -241,4 +246,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
|
|||
{
|
||||
return compactionSkipLockedIntervals;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getCoordinatorKillIgnoreDurationToRetain()
|
||||
{
|
||||
return coordinatorKillIgnoreDurationToRetain;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,7 +72,8 @@ public class KillAuditLogTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
|
||||
killAuditLog.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -101,7 +102,8 @@ public class KillAuditLogTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killAuditLog = new KillAuditLog(mockAuditManager, druidCoordinatorConfig);
|
||||
killAuditLog.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -130,7 +132,8 @@ public class KillAuditLogTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
|
||||
|
@ -158,7 +161,8 @@ public class KillAuditLogTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("coordinator audit kill retainDuration must be >= 0");
|
||||
|
|
|
@ -104,7 +104,8 @@ public class KillCompactionConfigTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killCompactionConfig = new KillCompactionConfig(
|
||||
druidCoordinatorConfig,
|
||||
|
@ -140,7 +141,8 @@ public class KillCompactionConfigTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Coordinator compaction configuration kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
|
||||
|
@ -189,7 +191,8 @@ public class KillCompactionConfigTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killCompactionConfig = new KillCompactionConfig(
|
||||
druidCoordinatorConfig,
|
||||
|
@ -294,7 +297,8 @@ public class KillCompactionConfigTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killCompactionConfig = new KillCompactionConfig(
|
||||
druidCoordinatorConfig,
|
||||
|
@ -413,7 +417,8 @@ public class KillCompactionConfigTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killCompactionConfig = new KillCompactionConfig(
|
||||
druidCoordinatorConfig,
|
||||
|
|
|
@ -81,7 +81,8 @@ public class KillDatasourceMetadataTest
|
|||
new Duration(Long.MAX_VALUE),
|
||||
new Duration("PT1S"),
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
|
||||
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -112,7 +113,8 @@ public class KillDatasourceMetadataTest
|
|||
new Duration("PT6S"),
|
||||
new Duration("PT1S"),
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
|
||||
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -141,7 +143,8 @@ public class KillDatasourceMetadataTest
|
|||
new Duration("PT3S"),
|
||||
new Duration("PT1S"),
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Coordinator datasource metadata kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
|
||||
|
@ -169,7 +172,8 @@ public class KillDatasourceMetadataTest
|
|||
new Duration("PT6S"),
|
||||
new Duration("PT-1S"),
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Coordinator datasource metadata kill retainDuration must be >= 0");
|
||||
|
@ -199,7 +203,8 @@ public class KillDatasourceMetadataTest
|
|||
new Duration("PT6S"),
|
||||
new Duration("PT1S"),
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killDatasourceMetadata = new KillDatasourceMetadata(druidCoordinatorConfig, mockIndexerMetadataStorageCoordinator, mockMetadataSupervisorManager);
|
||||
killDatasourceMetadata.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
|
|
@ -79,7 +79,8 @@ public class KillRulesTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killRules = new KillRules(druidCoordinatorConfig);
|
||||
killRules.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -108,7 +109,8 @@ public class KillRulesTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killRules = new KillRules(druidCoordinatorConfig);
|
||||
killRules.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -137,7 +139,8 @@ public class KillRulesTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("coordinator rule kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
|
||||
|
@ -165,7 +168,8 @@ public class KillRulesTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("coordinator rule kill retainDuration must be >= 0");
|
||||
|
|
|
@ -72,7 +72,8 @@ public class KillSupervisorsTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);
|
||||
killSupervisors.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -101,7 +102,8 @@ public class KillSupervisorsTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
killSupervisors = new KillSupervisors(druidCoordinatorConfig, mockMetadataSupervisorManager);
|
||||
killSupervisors.run(mockDruidCoordinatorRuntimeParams);
|
||||
|
@ -130,7 +132,8 @@ public class KillSupervisorsTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Coordinator supervisor kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod");
|
||||
|
@ -158,7 +161,8 @@ public class KillSupervisorsTest
|
|||
null,
|
||||
null,
|
||||
10,
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
exception.expect(IllegalArgumentException.class);
|
||||
exception.expectMessage("Coordinator supervisor kill retainDuration must be >= 0");
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.duty;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.druid.client.indexing.IndexingServiceClient;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.metadata.SegmentsMetadataManager;
|
||||
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
|
||||
|
@ -193,7 +194,8 @@ public class KillUnusedSegmentsTest
|
|||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -202,5 +204,169 @@ public class KillUnusedSegmentsTest
|
|||
unusedSegmentsKiller.findIntervalForKill("test", 10000)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that retainDuration is properly set based on the value available in the
|
||||
* Coordinator config. Positive and Negative durations should work as well as
|
||||
* null, if and only if ignoreDurationToRetain is true.
|
||||
*/
|
||||
@Test
|
||||
public void testRetainDurationValues()
|
||||
{
|
||||
// Positive duration to retain
|
||||
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
|
||||
null,
|
||||
null,
|
||||
new TestDruidCoordinatorConfig(
|
||||
null,
|
||||
null,
|
||||
Duration.parse("PT76400S"),
|
||||
null,
|
||||
new Duration(1),
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT86400S"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
);
|
||||
Assert.assertEquals((Long) Duration.parse("PT86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
|
||||
|
||||
// Negative duration to retain
|
||||
unusedSegmentsKiller = new KillUnusedSegments(
|
||||
null,
|
||||
null,
|
||||
new TestDruidCoordinatorConfig(
|
||||
null,
|
||||
null,
|
||||
Duration.parse("PT76400S"),
|
||||
null,
|
||||
new Duration(1),
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT-86400S"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
);
|
||||
Assert.assertEquals((Long) Duration.parse("PT-86400S").getMillis(), unusedSegmentsKiller.getRetainDuration());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the end time upper limit is properly computated for both positive and
|
||||
* negative durations. Also ensure that if durationToRetain is to be ignored, that
|
||||
* the upper limit is {@link DateTime} max time.
|
||||
*/
|
||||
@Test
|
||||
public void testGetEndTimeUpperLimit()
|
||||
{
|
||||
// If ignoreDurationToRetain is true, ignore the value configured for durationToRetain and return 9999-12-31T23:59
|
||||
KillUnusedSegments unusedSegmentsKiller = new KillUnusedSegments(
|
||||
null,
|
||||
null,
|
||||
new TestDruidCoordinatorConfig(
|
||||
null,
|
||||
null,
|
||||
Duration.parse("PT76400S"),
|
||||
null,
|
||||
new Duration(1),
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT86400S"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO,
|
||||
true
|
||||
)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
DateTimes.COMPARE_DATE_AS_STRING_MAX,
|
||||
unusedSegmentsKiller.getEndTimeUpperLimit()
|
||||
);
|
||||
|
||||
// Testing a negative durationToRetain period returns proper date in future
|
||||
unusedSegmentsKiller = new KillUnusedSegments(
|
||||
null,
|
||||
null,
|
||||
new TestDruidCoordinatorConfig(
|
||||
null,
|
||||
null,
|
||||
Duration.parse("PT76400S"),
|
||||
null,
|
||||
new Duration(1),
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT-86400S"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
DateTime expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT-86400S").getMillis());
|
||||
Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit());
|
||||
|
||||
// Testing a positive durationToRetain period returns expected value in the past
|
||||
unusedSegmentsKiller = new KillUnusedSegments(
|
||||
null,
|
||||
null,
|
||||
new TestDruidCoordinatorConfig(
|
||||
null,
|
||||
null,
|
||||
Duration.parse("PT76400S"),
|
||||
null,
|
||||
new Duration(1),
|
||||
Duration.parse("PT86400S"),
|
||||
Duration.parse("PT86400S"),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1000,
|
||||
Duration.ZERO,
|
||||
false
|
||||
)
|
||||
);
|
||||
expectedTime = DateTimes.nowUtc().minus(Duration.parse("PT86400S").getMillis());
|
||||
Assert.assertEquals(expectedTime, unusedSegmentsKiller.getEndTimeUpperLimit());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1863,6 +1863,7 @@ druid_taskLock
|
|||
druid_taskLog
|
||||
druid_tasks
|
||||
DruidQueryRel
|
||||
durationToRetain
|
||||
ec2
|
||||
equalDistribution
|
||||
extractionFn
|
||||
|
|
Loading…
Reference in New Issue