diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 481600877b3..6c926d61b22 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; @@ -18,6 +19,7 @@ import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import java.util.Objects; +import java.util.Random; import java.util.concurrent.ScheduledFuture; import java.util.function.Supplier; @@ -28,6 +30,8 @@ public class MlDailyMaintenanceService implements Releasable { private static final Logger LOGGER = Loggers.getLogger(MlDailyMaintenanceService.class); + private static final int MAX_TIME_OFFSET_MINUTES = 120; + private final ThreadPool threadPool; private final Client client; @@ -45,16 +49,26 @@ public class MlDailyMaintenanceService implements Releasable { this.schedulerProvider = Objects.requireNonNull(scheduleProvider); } - public MlDailyMaintenanceService(ThreadPool threadPool, Client client) { - this(threadPool, client, createAfterMidnightScheduleProvider()); + public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client) { + this(threadPool, client, () -> delayToNextTime(clusterName)); } - private static Supplier createAfterMidnightScheduleProvider() { - return () -> { - DateTime now = DateTime.now(ISOChronology.getInstance()); - DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30); - return TimeValue.timeValueMillis(next.getMillis() - now.getMillis()); - }; + /** + * Calculates the delay until the next time the maintenance should be triggered. + * The next time is 30 minutes past midnight of the following day plus a random + * offset. The random offset is added in order to avoid multiple clusters + * running the maintenance tasks at the same time. A cluster with a given name + * shall have the same offset throughout its life. + * + * @param clusterName the cluster name is used to seed the random offset + * @return the delay to the next time the maintenance should be triggered + */ + private static TimeValue delayToNextTime(ClusterName clusterName) { + Random random = new Random(clusterName.hashCode()); + int minutesOffset = random.ints(0, MAX_TIME_OFFSET_MINUTES).findFirst().getAsInt(); + DateTime now = DateTime.now(ISOChronology.getInstance()); + DateTime next = now.plusDays(1).withTimeAtStartOfDay().plusMinutes(30).plusMinutes(minutesOffset); + return TimeValue.timeValueMillis(next.getMillis() - now.getMillis()); } public void start() { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 4006ffe806f..a63a6e77381 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -63,7 +63,7 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL private void installMlMetadata(MetaData metaData) { if (metaData.custom(MlMetadata.TYPE) == null) { if (installMlMetadataCheck.compareAndSet(false, true)) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -83,8 +83,8 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL installMlMetadataCheck.set(false); logger.error("unable to install ml metadata", e); } - }); - }); + }) + ); } } else { installMlMetadataCheck.set(false); @@ -93,7 +93,7 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL private void installDailyMaintenanceService() { if (mlDailyMaintenanceService == null) { - mlDailyMaintenanceService = new MlDailyMaintenanceService(threadPool, client); + mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client); mlDailyMaintenanceService.start(); clusterService.addLifecycleListener(new LifecycleListener() { @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 7adfc0cbd84..e4672af0b8a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -40,6 +40,8 @@ import static org.mockito.Mockito.when; public class MlInitializationServiceTests extends ESTestCase { + private static final ClusterName CLUSTER_NAME = new ClusterName("my_cluster"); + private ThreadPool threadPool; private ExecutorService executorService; private ClusterService clusterService; @@ -60,6 +62,8 @@ public class MlInitializationServiceTests extends ESTestCase { ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); + + when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); } public void testInitialize() throws Exception { @@ -93,7 +97,6 @@ public class MlInitializationServiceTests extends ESTestCase { } public void testInitialize_alreadyInitialized() throws Exception { - ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -113,7 +116,6 @@ public class MlInitializationServiceTests extends ESTestCase { } public void testInitialize_onlyOnce() throws Exception { - ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name"))