[ML] Add random offset to the maintenance task execution time (elastic/x-pack-elasticsearch#2483)
Currently the maintenance task is executed at 30 minutes past midnight of each day. In the scenario where multiple clusters are running on the same hardware infrastructure they all will be running at the same time, competing for resources. This commit changes this by adding a random offset to the execution time which ranges from 0 to 119 minutes. The minute granularity means that different offsets give at least 1 minute for the maintenance task to end. Moreover, the 2 hour window gives enough slots for different offsets to occur and remains within what most people would think as "middle of the night". relates elastic/x-pack-elasticsearch#2273 Original commit: elastic/x-pack-elasticsearch@b538923aca
This commit is contained in:
parent
e4753656bc
commit
99ffbb1cd6
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
@ -18,6 +19,7 @@ import org.joda.time.DateTime;
|
||||||
import org.joda.time.chrono.ISOChronology;
|
import org.joda.time.chrono.ISOChronology;
|
||||||
|
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
@ -28,6 +30,8 @@ public class MlDailyMaintenanceService implements Releasable {
|
||||||
|
|
||||||
private static final Logger LOGGER = Loggers.getLogger(MlDailyMaintenanceService.class);
|
private static final Logger LOGGER = Loggers.getLogger(MlDailyMaintenanceService.class);
|
||||||
|
|
||||||
|
private static final int MAX_TIME_OFFSET_MINUTES = 120;
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final Client client;
|
private final Client client;
|
||||||
|
|
||||||
|
@ -45,16 +49,26 @@ public class MlDailyMaintenanceService implements Releasable {
|
||||||
this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
|
this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MlDailyMaintenanceService(ThreadPool threadPool, Client client) {
|
public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client) {
|
||||||
this(threadPool, client, createAfterMidnightScheduleProvider());
|
this(threadPool, client, () -> delayToNextTime(clusterName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Supplier<TimeValue> createAfterMidnightScheduleProvider() {
|
/**
|
||||||
return () -> {
|
* Calculates the delay until the next time the maintenance should be triggered.
|
||||||
DateTime now = DateTime.now(ISOChronology.getInstance());
|
* The next time is 30 minutes past midnight of the following day plus a random
|
||||||
DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30);
|
* offset. The random offset is added in order to avoid multiple clusters
|
||||||
return TimeValue.timeValueMillis(next.getMillis() - now.getMillis());
|
* running the maintenance tasks at the same time. A cluster with a given name
|
||||||
};
|
* shall have the same offset throughout its life.
|
||||||
|
*
|
||||||
|
* @param clusterName the cluster name is used to seed the random offset
|
||||||
|
* @return the delay to the next time the maintenance should be triggered
|
||||||
|
*/
|
||||||
|
private static TimeValue delayToNextTime(ClusterName clusterName) {
|
||||||
|
Random random = new Random(clusterName.hashCode());
|
||||||
|
int minutesOffset = random.ints(0, MAX_TIME_OFFSET_MINUTES).findFirst().getAsInt();
|
||||||
|
DateTime now = DateTime.now(ISOChronology.getInstance());
|
||||||
|
DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30).plusMinutes(minutesOffset);
|
||||||
|
return TimeValue.timeValueMillis(next.getMillis() - now.getMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
|
@ -63,7 +63,7 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
private void installMlMetadata(MetaData metaData) {
|
private void installMlMetadata(MetaData metaData) {
|
||||||
if (metaData.custom(MlMetadata.TYPE) == null) {
|
if (metaData.custom(MlMetadata.TYPE) == null) {
|
||||||
if (installMlMetadataCheck.compareAndSet(false, true)) {
|
if (installMlMetadataCheck.compareAndSet(false, true)) {
|
||||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
threadPool.executor(ThreadPool.Names.GENERIC).execute(() ->
|
||||||
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||||
|
@ -83,8 +83,8 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
installMlMetadataCheck.set(false);
|
installMlMetadataCheck.set(false);
|
||||||
logger.error("unable to install ml metadata", e);
|
logger.error("unable to install ml metadata", e);
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
});
|
);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
installMlMetadataCheck.set(false);
|
installMlMetadataCheck.set(false);
|
||||||
|
@ -93,7 +93,7 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
private void installDailyMaintenanceService() {
|
private void installDailyMaintenanceService() {
|
||||||
if (mlDailyMaintenanceService == null) {
|
if (mlDailyMaintenanceService == null) {
|
||||||
mlDailyMaintenanceService = new MlDailyMaintenanceService(threadPool, client);
|
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);
|
||||||
mlDailyMaintenanceService.start();
|
mlDailyMaintenanceService.start();
|
||||||
clusterService.addLifecycleListener(new LifecycleListener() {
|
clusterService.addLifecycleListener(new LifecycleListener() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -40,6 +40,8 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class MlInitializationServiceTests extends ESTestCase {
|
public class MlInitializationServiceTests extends ESTestCase {
|
||||||
|
|
||||||
|
private static final ClusterName CLUSTER_NAME = new ClusterName("my_cluster");
|
||||||
|
|
||||||
private ThreadPool threadPool;
|
private ThreadPool threadPool;
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
|
@ -60,6 +62,8 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
|
|
||||||
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
|
ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
|
||||||
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture);
|
when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture);
|
||||||
|
|
||||||
|
when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize() throws Exception {
|
public void testInitialize() throws Exception {
|
||||||
|
@ -93,7 +97,6 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize_alreadyInitialized() throws Exception {
|
public void testInitialize_alreadyInitialized() throws Exception {
|
||||||
ClusterService clusterService = mock(ClusterService.class);
|
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
@ -113,7 +116,6 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize_onlyOnce() throws Exception {
|
public void testInitialize_onlyOnce() throws Exception {
|
||||||
ClusterService clusterService = mock(ClusterService.class);
|
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
|
Loading…
Reference in New Issue