security: remove SelfReSchedulingRunnable

This commit removes the SelfReschedulingRunnable and changes the native stores
to use the threadpool for scheduling again since we have now fixed the issue in core.

Original commit: elastic/x-pack-elasticsearch@50030e31ff
This commit is contained in:
jaymode 2016-04-05 11:00:32 -04:00
parent 4552df11da
commit 9be5c7df60
6 changed files with 9 additions and 389 deletions

View File

@ -235,10 +235,6 @@ thirdPartyAudit.excludes = [
// someone figure out what the x-plugins logic should be
licenseHeaders.enabled = false
forbiddenApisMain {
signaturesURLs += [file('signatures.txt').toURI().toURL()]
}
modifyPom { MavenPom pom ->
pom.withXml { XmlProvider xml ->
// first find if we have dependencies at all, and grab the node

View File

@ -35,8 +35,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -58,11 +56,11 @@ import org.elasticsearch.xpack.security.action.user.PutUserRequest;
import org.elasticsearch.xpack.security.authc.support.Hasher;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.client.SecurityClient;
import org.elasticsearch.xpack.security.support.SelfReschedulingRunnable;
import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.User.Fields;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.ArrayList;
@ -117,7 +115,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
private final InternalClient client;
private final ThreadPool threadPool;
private SelfReschedulingRunnable userPoller;
private Cancellable pollerCancellable;
private int scrollSize;
private TimeValue scrollKeepAlive;
@ -533,8 +531,8 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
} catch (Exception e) {
logger.warn("failed to do initial poll of users", e);
}
userPoller = new SelfReschedulingRunnable(poller, threadPool, POLL_INTERVAL_SETTING.get(settings), Names.GENERIC, logger);
userPoller.start();
TimeValue interval = settings.getAsTime("shield.authc.native.reload.interval", TimeValue.timeValueSeconds(30L));
pollerCancellable = threadPool.scheduleWithFixedDelay(poller, interval, Names.GENERIC);
state.set(State.STARTED);
}
} catch (Exception e) {
@ -546,7 +544,7 @@ public class NativeUsersStore extends AbstractComponent implements ClusterStateL
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
userPoller.stop();
pollerCancellable.cancel();
} catch (Exception e) {
state.set(State.FAILED);
throw e;

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -53,8 +52,8 @@ import org.elasticsearch.xpack.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.security.authz.permission.IndicesPermission.Group;
import org.elasticsearch.xpack.security.authz.permission.Role;
import org.elasticsearch.xpack.security.client.SecurityClient;
import org.elasticsearch.xpack.security.support.SelfReschedulingRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.ArrayList;
@ -113,7 +112,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
private SecurityClient securityClient;
private int scrollSize;
private TimeValue scrollKeepAlive;
private SelfReschedulingRunnable rolesPoller;
private Cancellable pollerCancellable;
private volatile boolean securityIndexExists = false;
@ -160,8 +159,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
logger.warn("failed to perform initial poll of roles index [{}]. scheduling again in [{}]", e,
SecurityTemplateService.SECURITY_INDEX_NAME, pollInterval);
}
rolesPoller = new SelfReschedulingRunnable(poller, threadPool, pollInterval, Names.GENERIC, logger);
rolesPoller.start();
pollerCancellable = threadPool.scheduleWithFixedDelay(poller, pollInterval, Names.GENERIC);
state.set(State.STARTED);
}
} catch (Exception e) {
@ -173,7 +171,7 @@ public class NativeRolesStore extends AbstractComponent implements RolesStore, C
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
rolesPoller.stop();
pollerCancellable.cancel();
} finally {
state.set(State.STOPPED);
}

View File

@ -1,76 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.support;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
public class SelfReschedulingRunnable extends AbstractRunnable {
private final AbstractRunnable runnable;
private final ThreadPool threadPool;
private final TimeValue interval;
private final String executorName;
private final ESLogger logger;
private ScheduledFuture<?> scheduledFuture = null;
private volatile boolean run = false;
public SelfReschedulingRunnable(AbstractRunnable runnable, ThreadPool threadPool, TimeValue interval, String executorName,
ESLogger logger) {
this.runnable = runnable;
this.threadPool = threadPool;
this.interval = interval;
this.executorName = executorName;
this.logger = logger;
}
public synchronized void start() {
if (run != false || scheduledFuture != null) {
throw new IllegalStateException("start should not be called again before calling stop");
}
run = true;
scheduledFuture = threadPool.schedule(interval, executorName, this);
}
@Override
public synchronized void onAfter() {
if (run) {
scheduledFuture = threadPool.schedule(interval, executorName, this);
}
}
@Override
public void onFailure(Exception e) {
logger.warn("failed to run scheduled task", e);
}
@Override
protected void doRun() throws Exception {
if (run) {
runnable.run();
}
}
public synchronized void stop() {
if (run == false) {
throw new IllegalStateException("stop called but not started or stop called twice");
}
run = false;
FutureUtils.cancel(scheduledFuture);
scheduledFuture = null;
}
// package private for testing
ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
}
}

View File

@ -1,278 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.support;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class SelfReschedulingRunnableTests extends ESTestCase {
public void testSelfReschedulingRunnableReschedules() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
final int iterations = randomIntBetween(4, 24);
for (int i = 0; i < iterations; i++) {
// pretend we are the scheduler running it
reschedulingRunnable.run();
}
verify(threadPool, times(iterations + 1)).schedule(timeValue, name, reschedulingRunnable);
verifyZeroInteractions(logger);
}
public void testThrowingRunnableReschedules() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception throwable) {
}
@Override
protected void doRun() throws Exception {
throw randomFrom(new UnsupportedOperationException(),
new ElasticsearchSecurityException(""),
new IllegalArgumentException(),
new NullPointerException());
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
final int iterations = randomIntBetween(4, 24);
for (int i = 0; i < iterations; i++) {
// pretend we are the scheduler running it
reschedulingRunnable.run();
}
verify(threadPool, times(iterations + 1)).schedule(timeValue, name, reschedulingRunnable);
verifyZeroInteractions(logger);
}
public void testDoesNotRescheduleUntilExecutionFinished() throws Exception {
final ThreadPool threadPool = mock(ThreadPool.class);
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch pauseLatch = new CountDownLatch(1);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception throwable) {
}
@Override
protected void doRun() throws Exception {
startLatch.countDown();
pauseLatch.await();
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
final SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
verify(threadPool).schedule(timeValue, name, reschedulingRunnable);
Thread thread = new Thread() {
@Override
public void run() {
reschedulingRunnable.run();
}
};
thread.start();
startLatch.await();
verify(threadPool).schedule(timeValue, name, reschedulingRunnable);
pauseLatch.countDown();
thread.join();
verify(threadPool, times(2)).schedule(timeValue, name, reschedulingRunnable);
}
public void testStopCancelsScheduledFuture() {
final ThreadPool threadPool = mock(ThreadPool.class);
final ScheduledFuture future = mock(ScheduledFuture.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
when(threadPool.schedule(timeValue, name, reschedulingRunnable)).thenReturn(future);
reschedulingRunnable.start();
reschedulingRunnable.run();
reschedulingRunnable.stop();
verify(threadPool, times(2)).schedule(timeValue, name, reschedulingRunnable);
verify(future).cancel(false);
}
public void testDoubleStartThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
try {
reschedulingRunnable.start();
fail("calling start before stopping is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("start should not be called again"));
}
}
public void testDoubleStopThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
reschedulingRunnable.stop();
try {
reschedulingRunnable.stop();
fail("calling stop while not running is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("stop called but not started or stop called twice"));
}
}
public void testStopWithoutStartThrowsException() {
final ThreadPool threadPool = mock(ThreadPool.class);
final AbstractRunnable runnable = mock(AbstractRunnable.class);
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
try {
reschedulingRunnable.stop();
fail("calling stop while not running is not allowed");
} catch (IllegalStateException e) {
assertThat(e.getMessage(), containsString("stop called but not started or stop called twice"));
}
}
public void testStopPreventsRunning() throws Exception {
final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule");
final AtomicInteger failureCounter = new AtomicInteger(0);
final AtomicInteger runCounter = new AtomicInteger(0);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception throwable) {
failureCounter.incrementAndGet();
}
@Override
protected void doRun() throws Exception {
runCounter.incrementAndGet();
}
};
final ESLogger logger = mock(ESLogger.class);
// arbitrary run time
final TimeValue timeValue = TimeValue.timeValueSeconds(2L);
final String name = Names.GENERIC;
try {
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
ScheduledFuture future = reschedulingRunnable.getScheduledFuture();
assertThat(future, notNullValue());
assertThat(future.isDone(), is(false));
assertThat(future.isCancelled(), is(false));
reschedulingRunnable.stop();
assertThat(reschedulingRunnable.getScheduledFuture(), nullValue());
assertThat(future.isCancelled(), is(true));
assertThat(future.isDone(), is(true));
boolean ran = awaitBusy(() -> runCounter.get() > 0 || failureCounter.get() > 0, 3L, TimeUnit.SECONDS);
assertThat(ran, is(false));
} finally {
threadPool.shutdownNow();
}
}
public void testStopPreventsRescheduling() throws Exception {
final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule");
final CountDownLatch threadRunningLatch = new CountDownLatch(randomIntBetween(1, 16));
final CountDownLatch stopCalledLatch = new CountDownLatch(1);
final AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception throwable) {
throw new IllegalStateException("we should never be in this method!");
}
@Override
protected void doRun() throws Exception {
// notify we are running
threadRunningLatch.countDown();
if (threadRunningLatch.getCount() > 0) {
// let it keep running
return;
}
stopCalledLatch.await();
}
};
final ESLogger logger = mock(ESLogger.class);
final TimeValue timeValue = TimeValue.timeValueMillis(1L);
final String name = Names.GENERIC;
try {
SelfReschedulingRunnable reschedulingRunnable = new SelfReschedulingRunnable(runnable, threadPool, timeValue, name, logger);
reschedulingRunnable.start();
threadRunningLatch.await();
// call stop
reschedulingRunnable.stop();
stopCalledLatch.countDown();
assertThat(reschedulingRunnable.getScheduledFuture(), nullValue());
final ScheduledThreadPoolExecutor scheduledThreadPooleExecutor = (ScheduledThreadPoolExecutor) threadPool.scheduler();
boolean somethingQueued = awaitBusy(() -> scheduledThreadPooleExecutor.getQueue().isEmpty() == false, 1L, TimeUnit.SECONDS);
assertThat(somethingQueued, is(false));
} finally {
threadPool.shutdownNow();
}
}
}

View File

@ -1,18 +0,0 @@
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on
# an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
@defaultMessage reschedule yourself using #schedule if you are using blocking code in runnable
org.elasticsearch.threadpool.ThreadPool#scheduleWithFixedDelay(java.lang.Runnable, org.elasticsearch.common.unit.TimeValue)