shield: do not use ThreadPool#scheduleWithFixedDelay for pollers

This commit makes the user and roles poller use a self rescheduling runnable to schedule the
next run of the poller rather than using scheduleWithFixedDelay. This is done because the
pollers perform blocking I/O operations and everything using that thread pool method runs on
the schedule thread and because of this, in certain situations this can lead to a deadlock which
will prevent the cluster from forming.

Original commit: elastic/x-pack-elasticsearch@9fd0748c8c
This commit is contained in:
jaymode 2016-04-01 14:45:19 -04:00
parent 3126fcb856
commit 4036ce97c1
6 changed files with 388 additions and 12 deletions

View File

@ -174,3 +174,7 @@ thirdPartyAudit.excludes = [
// someone figure out what the x-plugins logic should be
licenseHeaders.enabled = false
forbiddenApisMain {
signaturesURLs += [file('signatures.txt').toURI().toURL()]
}

View File

@ -39,7 +39,6 @@ import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException;
@ -56,7 +55,9 @@ import org.elasticsearch.shield.action.user.PutUserRequest;
import org.elasticsearch.shield.authc.support.Hasher;
import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.client.SecurityClient;
import org.elasticsearch.shield.support.SelfReschedulingRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.RemoteTransportException;
import java.util.ArrayList;
@ -67,7 +68,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -103,7 +103,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
private final Provider<InternalClient> clientProvider;
private final ThreadPool threadPool;
private ScheduledFuture<?> versionChecker;
private SelfReschedulingRunnable userPoller;
private Client client;
private int scrollSize;
private TimeValue scrollKeepAlive;
@ -455,8 +455,9 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
} catch (Exception e) {
logger.warn("failed to do initial poll of users", e);
}
versionChecker = threadPool.scheduleWithFixedDelay(poller,
settings.getAsTime("shield.authc.native.reload.interval", TimeValue.timeValueSeconds(30L)));
userPoller = new SelfReschedulingRunnable(poller, threadPool,
settings.getAsTime("shield.authc.native.reload.interval", TimeValue.timeValueSeconds(30L)), Names.GENERIC, logger);
userPoller.start();
state.set(State.STARTED);
}
} catch (Exception e) {
@ -468,7 +469,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
FutureUtils.cancel(versionChecker);
userPoller.stop();
} catch (Throwable t) {
state.set(State.FAILED);
throw t;

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexNotFoundException;
@ -46,9 +45,10 @@ import org.elasticsearch.shield.action.role.DeleteRoleRequest;
import org.elasticsearch.shield.action.role.PutRoleRequest;
import org.elasticsearch.shield.authz.RoleDescriptor;
import org.elasticsearch.shield.authz.permission.Role;
import org.elasticsearch.shield.authz.store.RolesStore;
import org.elasticsearch.shield.client.SecurityClient;
import org.elasticsearch.shield.support.SelfReschedulingRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.ArrayList;
import java.util.Arrays;
@ -58,7 +58,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
@ -96,7 +95,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
private SecurityClient securityClient;
private int scrollSize;
private TimeValue scrollKeepAlive;
private ScheduledFuture<?> versionChecker;
private SelfReschedulingRunnable rolesPoller;
private volatile boolean shieldIndexExists = false;
@ -144,7 +143,8 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
logger.warn("failed to perform initial poll of roles index [{}]. scheduling again in [{}]", e,
ShieldTemplateService.SECURITY_INDEX_NAME, pollInterval);
}
versionChecker = threadPool.scheduleWithFixedDelay(poller, pollInterval);
rolesPoller = new SelfReschedulingRunnable(poller, threadPool, pollInterval, Names.GENERIC, logger);
rolesPoller.start();
state.set(State.STARTED);
}
} catch (Exception e) {
@ -156,7 +156,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
FutureUtils.cancel(versionChecker);
rolesPoller.stop();
} finally {
state.set(State.STOPPED);
}

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.support;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
public class SelfReschedulingRunnable extends AbstractRunnable {
private final AbstractRunnable runnable;
private final ThreadPool threadPool;
private final TimeValue interval;
private final String executorName;
private final ESLogger logger;
private ScheduledFuture<?> scheduledFuture = null;
private volatile boolean run = false;
public SelfReschedulingRunnable(AbstractRunnable runnable, ThreadPool threadPool, TimeValue interval, String executorName,
ESLogger logger) {
this.runnable = runnable;
this.threadPool = threadPool;
this.interval = interval;
this.executorName = executorName;
this.logger = logger;
}
public synchronized void start() {
if (run != false || scheduledFuture != null) {
throw new IllegalStateException("start should not be called again before calling stop");
}
run = true;
scheduledFuture = threadPool.schedule(interval, executorName, this);
}
@Override
public synchronized void onAfter() {
if (run) {
scheduledFuture = threadPool.schedule(interval, executorName, this);
}
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to run scheduled task", t);
}
@Override
protected void doRun() throws Exception {
if (run) {
runnable.run();
}
}
public synchronized void stop() {
if (run == false) {
throw new IllegalStateException("stop called but not started or stop called twice");
}
run = false;
FutureUtils.cancel(scheduledFuture);
scheduledFuture = null;
}
// package private for testing
ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
}
}

View File

@ -0,0 +1,277 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.support;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class SelfReschedulingRunnableTests extends ESTestCase {
public void testSelfReschedulingRunnableReschedules() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
final int iterations = randomIntBetween(4, 24);
for (int i = 0; i < iterations; i++) {
// pretend we are the scheduler running it
reschedulingRunnable.run();
}
verify(threadPool, times(iterations + 1)).schedule(timeValue, name, reschedulingRunnable);
verifyZeroInteractions(logger);
}
public void testThrowingRunnableReschedules() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
protected void doRun() throws Exception {
throw randomFrom(new UnsupportedOperationException(),
new ElasticsearchSecurityException(""),
new IllegalArgumentException(),
new NullPointerException());
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
final int iterations = randomIntBetween(4, 24);
for (int i = 0; i < iterations; i++) {
// pretend we are the scheduler running it
reschedulingRunnable.run();
}
verify(threadPool, times(iterations + 1)).schedule(timeValue, name, reschedulingRunnable);
verifyZeroInteractions(logger);
}
public void testDoesNotRescheduleUntilExecutionFinished() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch pauseLatch = new CountDownLatch(1);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
protected void doRun() throws Exception {
startLatch.countDown();
pauseLatch.await();
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
final SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
verify(threadPool).schedule(timeValue, name, reschedulingRunnable);
Thread thread = new Thread() {
@Override
public void run() {
reschedulingRunnable.run();
}
};
thread.start();
startLatch.await();
verify(threadPool).schedule(timeValue, name, reschedulingRunnable);
pauseLatch.countDown();
thread.join();
verify(threadPool, times(2)).schedule(timeValue, name, reschedulingRunnable);
}
public void testStopCancelsScheduledFuture() {
final ThreadPool threadPool = mock(ThreadPool.class);
final ScheduledFuture future = mock(ScheduledFuture.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
when(threadPool.schedule(timeValue, name, reschedulingRunnable)).thenReturn(future);
reschedulingRunnable.start();
reschedulingRunnable.run();
reschedulingRunnable.stop();
verify(threadPool, times(2)).schedule(timeValue, name, reschedulingRunnable);
verify(future).cancel(false);
}
public void testDoubleStartThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
try {
reschedulingRunnable.start();
fail("calling start before stopping is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("start should not be called again"));
}
}
public void testDoubleStopThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
reschedulingRunnable.stop();
try {
reschedulingRunnable.stop();
fail("calling stop while not running is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("stop called but not started or stop called twice"));
}
}
public void testStopWithoutStartThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
try {
reschedulingRunnable.stop();
fail("calling stop while not running is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("stop called but not started or stop called twice"));
}
}
public void testStopPreventsRunning() throws Exception {
final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule");
final AtomicInteger failureCounter = new AtomicInteger(0);
final AtomicInteger runCounter = new AtomicInteger(0);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable throwable) {
failureCounter.incrementAndGet();
}
@Override
protected void doRun() throws Exception {
runCounter.incrementAndGet();
}
};
final ESLogger logger = mock(ESLogger.class);
// arbitrary run time
final TimeValue timeValue = TimeValue.timeValueSeconds(2L);
final String name = Names.GENERIC;
try {
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
ScheduledFuture future = reschedulingRunnable.getScheduledFuture();
assertThat(future, notNullValue());
assertThat(future.isDone(), is(false));
assertThat(future.isCancelled(), is(false));
reschedulingRunnable.stop();
assertThat(reschedulingRunnable.getScheduledFuture(), nullValue());
assertThat(future.isCancelled(), is(true));
assertThat(future.isDone(), is(true));
boolean ran = awaitBusy(() -> runCounter.get() > 0 || failureCounter.get() > 0, 3L, TimeUnit.SECONDS);
assertThat(ran, is(false));
} finally {
threadPool.shutdownNow();
}
}
public void testStopPreventsRescheduling() throws Exception {
final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule");
final CountDownLatch threadRunningLatch = new CountDownLatch(randomIntBetween(1, 16));
final CountDownLatch stopCalledLatch = new CountDownLatch(1);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Throwable throwable) {
throw new IllegalStateException("we should never be in this method!");
}
@Override
protected void doRun() throws Exception {
// notify we are running
threadRunningLatch.countDown();
if (threadRunningLatch.getCount() > 0) {
// let it keep running
return;
}
stopCalledLatch.await();
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
try {
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
threadRunningLatch.await();
// call stop
reschedulingRunnable.stop();
stopCalledLatch.countDown();
assertThat(reschedulingRunnable.getScheduledFuture(), nullValue());
final ScheduledThreadPoolExecutor scheduledThreadPooleExecutor = (ScheduledThreadPoolExecutor) threadPool.scheduler();
boolean somethingQueued = awaitBusy(() -> scheduledThreadPooleExecutor.getQueue().isEmpty() == false, 1L, TimeUnit.SECONDS);
assertThat(somethingQueued, is(false));
} finally {
threadPool.shutdownNow();
}
}
}

View File

@ -0,0 +1,18 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
@defaultMessage reschedule yourself using #schedule if you are using blocking code in runnable
org.elasticsearch.threadpool.ThreadPool#scheduleWithFixedDelay(java.lang.Runnable, org.elasticsearch.common.unit.TimeValue)