Fix : Druid throws java.util.concurrent.RejectedExecutionException when ingest task is stopping. (#10555)

* check exec status before return Signal

* add more log

* change log level to debug and add UT

* change log leverl to warn and merge master

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010 2020-11-24 06:52:03 +08:00 committed by GitHub
parent 4537016cad
commit 31740b3b29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 1 deletions

View File

@ -53,7 +53,12 @@ public class ScheduledExecutors
public Signal call()
{
runnable.run(); // (Exceptions are handled for us)
return Signal.REPEAT;
if (exec.isShutdown()) {
log.warn("ScheduledExecutorService is ShutDown. Return 'Signal.STOP' and stopped rescheduling %s (delay %s)", this, delay);
return Signal.STOP;
} else {
return Signal.REPEAT;
}
}
}
);

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.concurrent;
import org.joda.time.Duration;
import org.junit.Test;
import java.util.concurrent.ScheduledExecutorService;
public class ScheduledExecutorsTest
{
@Test
public void testscheduleWithFixedDelay() throws InterruptedException
{
Duration initialDelay = new Duration(1000);
Duration delay = new Duration(1000);
ScheduledExecutorService exec = Execs.scheduledSingleThreaded("BasicAuthenticatorCacheManager-Exec--%d");
ScheduledExecutors.scheduleWithFixedDelay(
exec,
initialDelay,
delay,
() -> {
System.out.println("TEST!");
}
);
Thread.sleep(5 * 1000);
exec.shutdown();
}
}