LifecycleLock for better synchronization in lifecycled objects (#3964)

* Introduce LifecycleLock

* Add LifecycleLockTest

* Rename LifecycleLock.release() to exitStart()

* Rewrite LifecycleLock using AbstractQueuedSynchronizer for more safety, added tests

* Add LifecycleLock.exitStop() and reset()

* Add LifecycleLock.awaitStarted(timeout)

* Braces

* Fix
This commit is contained in:
Roman Leventov 2017-03-02 14:22:57 -06:00 committed by Gian Merlino
parent e63eefd7ff
commit ea1f5b7954
3 changed files with 450 additions and 17 deletions

View File

@ -0,0 +1,263 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.concurrent;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* A synchronization tool for lifecycled objects (see {@link io.druid.java.util.common.lifecycle.Lifecycle}, that need
* happens-before between start() and other methods and/or to check that the object was successfully started in other
* methods.
*
* Guarantees in terms of JMM: happens-before between {@link #exitStart()} and {@link #awaitStarted()},
* exitStart() and {@link #canStop()}, if it returns {@code true}.
*
* Example:
* class ExampleLifecycledClass
* {
* final LifecycleLock lifecycleLock = new LifecycleLock();
*
* void start()
* {
* if (!lifecycleLock.canStart()) {
* .. return or throw exception
* }
* try {
* .. do start
* lifecycleLock.started();
* }
* finally {
* lifecycleLock.exitStart();
* }
* }
*
* void otherMethod()
* {
* Preconditions.checkState(lifecycleLock.awaitStarted());
* // all actions done in start() are visible here
* .. do stuff
* }
*
* void stop()
* {
* if (!lifecycleLock.canStop()) {
* .. return or throw exception
* }
* // all actions done in start() are visible here
* .. do stop
* }
* }
*/
public final class LifecycleLock
{
private static class Sync extends AbstractQueuedSynchronizer
{
private static final int NOT_STARTED = 0;
private static final int STARTING = 1;
private static final int STARTED = 2;
private static final int START_EXITED_SUCCESSFUL = 3;
private static final int START_EXITED_FAIL = 4;
private static final int STOPPING = 5;
private static final int STOPPED = 6;
boolean canStart()
{
return compareAndSetState(NOT_STARTED, STARTING);
}
void started()
{
if (!compareAndSetState(STARTING, STARTED)) {
throw new IllegalMonitorStateException("Called started() not in the context of start()");
}
}
void exitStart()
{
// see tryReleaseShared()
releaseShared(1);
}
@Override
protected boolean tryReleaseShared(int ignore)
{
while (true) {
int state = getState();
if (state == STARTING) {
if (compareAndSetState(STARTING, START_EXITED_FAIL)) {
return true;
}
} else if (state == STARTED) {
if (compareAndSetState(STARTED, START_EXITED_SUCCESSFUL)) {
return true;
}
} else {
throw new IllegalMonitorStateException("exitStart() called not in the end of the start() method");
}
}
}
boolean awaitStarted()
{
try {
// see tryAcquireShared()
acquireSharedInterruptibly(1);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
}
boolean awaitStarted(long timeNanos)
{
try {
// see tryAcquireShared()
if (!tryAcquireSharedNanos(1, timeNanos)) {
return false;
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
}
@Override
protected int tryAcquireShared(int ignore)
{
return getState() > STARTED ? 1 : -1;
}
boolean canStop()
{
while (true) {
int state = getState();
if (state == START_EXITED_FAIL || state == STOPPING) {
return false;
} else if (state == START_EXITED_SUCCESSFUL) {
if (compareAndSetState(START_EXITED_SUCCESSFUL, STOPPING)) {
return true;
}
} else {
throw new IllegalMonitorStateException("Called canStop() before start()");
}
}
}
void exitStop()
{
if (!compareAndSetState(STOPPING, STOPPED)) {
throw new IllegalMonitorStateException("Called exitStop() not in the context of stop()");
}
}
void reset()
{
if (!compareAndSetState(STOPPED, NOT_STARTED)) {
throw new IllegalMonitorStateException("Not called exitStop() before reset()");
}
}
}
private final Sync sync = new Sync();
/**
* Start latch, only one canStart() call in any thread on this LifecycleLock object could return true, if {@link
* #reset()} is not called in between.
*/
public boolean canStart()
{
return sync.canStart();
}
/**
* Announce the start was successful.
*
* @throws IllegalMonitorStateException if {@link #canStart()} is not yet called or if {@link #exitStart()} is already
* called on this LifecycleLock
*/
public void started()
{
sync.started();
}
/**
* Must be called before exit from start() on the lifecycled object, usually in a finally block.
*
* @throws IllegalMonitorStateException if {@link #canStart()} is not yet called on this LifecycleLock
*/
public void exitStart()
{
sync.exitStart();
}
/**
* Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called
* before that.
*/
public boolean awaitStarted()
{
return sync.awaitStarted();
}
/**
* Awaits until {@link #exitStart()} is called for at most the specified timeout, and returns {@code true} if {@link
* #started()} was called before that. Returns {@code false} if {@code started()} wasn't called before {@code
* exitStart()}, or if {@code exitStart()} isn't called on this LifecycleLock until the specified timeout expires.
*/
public boolean awaitStarted(long timeout, TimeUnit unit)
{
return sync.awaitStarted(unit.toNanos(timeout));
}
/**
* Stop latch, only one canStop() call in any thread on this LifecycleLock object could return {@code true}. If
* {@link #started()} wasn't called on this LifecycleLock object, always returns {@code false}.
*
* @throws IllegalMonitorStateException if {@link #exitStart()} are not yet called on this LifecycleLock
*/
public boolean canStop()
{
return sync.canStop();
}
/**
* If this LifecycleLock is used in a restartable object, which uses {@link #reset()}, exitStop() must be called
* before exit from stop() on this object, usually in a finally block.
*
* @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock
*/
public void exitStop()
{
sync.exitStop();
}
/**
* Resets the LifecycleLock after {@link #exitStop()}, so that {@link #canStart()} could be called again.
*
* @throws IllegalMonitorStateException if {@link #exitStop()} is not yet called on this LifecycleLock
*/
public void reset()
{
sync.reset();
}
}

View File

@ -0,0 +1,173 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.concurrent;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class LifecycleLockTest
{
@Test
public void testOnlyOneCanStart() throws InterruptedException
{
for (int i = 0; i < 100; i++) {
testOnlyOneCanStartRun();
}
}
private void testOnlyOneCanStartRun() throws InterruptedException
{
final LifecycleLock lifecycleLock = new LifecycleLock();
final CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = 100;
final CountDownLatch finishLatch = new CountDownLatch(numThreads);
final AtomicInteger successful = new AtomicInteger(0);
for (int i = 0; i < numThreads; i++) {
new Thread()
{
@Override
public void run()
{
try {
startLatch.await();
if (lifecycleLock.canStart()) {
successful.incrementAndGet();
}
finishLatch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
startLatch.countDown();
finishLatch.await();
Assert.assertEquals(1, successful.get());
}
@Test
public void testOnlyOneCanStop() throws InterruptedException
{
for (int i = 0; i < 100; i++) {
testOnlyOneCanStopRun();
}
}
private void testOnlyOneCanStopRun() throws InterruptedException
{
final LifecycleLock lifecycleLock = new LifecycleLock();
Assert.assertTrue(lifecycleLock.canStart());
lifecycleLock.started();
lifecycleLock.exitStart();
final CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = 100;
final CountDownLatch finishLatch = new CountDownLatch(numThreads);
final AtomicInteger successful = new AtomicInteger(0);
for (int i = 0; i < numThreads; i++) {
new Thread()
{
@Override
public void run()
{
try {
startLatch.await();
if (lifecycleLock.canStop()) {
successful.incrementAndGet();
}
finishLatch.countDown();
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
startLatch.countDown();
finishLatch.await();
Assert.assertEquals(1, successful.get());
}
@Test
public void testNoStartAfterStop()
{
LifecycleLock lifecycleLock = new LifecycleLock();
Assert.assertTrue(lifecycleLock.canStart());
lifecycleLock.started();
lifecycleLock.exitStart();
Assert.assertTrue(lifecycleLock.canStop());
Assert.assertFalse(lifecycleLock.canStart());
}
@Test
public void testNotStarted()
{
LifecycleLock lifecycleLock = new LifecycleLock();
Assert.assertTrue(lifecycleLock.canStart());
lifecycleLock.exitStart();
Assert.assertFalse(lifecycleLock.awaitStarted());
Assert.assertFalse(lifecycleLock.canStop());
}
@Test
public void testRestart()
{
LifecycleLock lifecycleLock = new LifecycleLock();
Assert.assertTrue(lifecycleLock.canStart());
lifecycleLock.started();
lifecycleLock.exitStart();
Assert.assertTrue(lifecycleLock.canStop());
lifecycleLock.exitStop();
lifecycleLock.reset();
Assert.assertTrue(lifecycleLock.canStart());
}
@Test(expected = IllegalMonitorStateException.class)
public void testDoubleStarted()
{
LifecycleLock lifecycleLock = new LifecycleLock();
lifecycleLock.canStart();
lifecycleLock.started();
lifecycleLock.started();
}
@Test(expected = IllegalMonitorStateException.class)
public void testDoubleExitStart()
{
LifecycleLock lifecycleLock = new LifecycleLock();
lifecycleLock.canStart();
lifecycleLock.started();
lifecycleLock.exitStart();
lifecycleLock.exitStart();
}
@Test(expected = IllegalMonitorStateException.class)
public void testCanStopNotExitedStart()
{
LifecycleLock lifecycleLock = new LifecycleLock();
lifecycleLock.canStart();
lifecycleLock.started();
lifecycleLock.canStop();
}
}

View File

@ -20,6 +20,7 @@
package io.druid.indexing.overlord.autoscaling;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.LifecycleLock;
import io.druid.indexing.overlord.WorkerTaskRunner;
import io.druid.java.util.common.granularity.PeriodGranularity;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
@ -38,9 +39,7 @@ public abstract class AbstractWorkerResourceManagementStrategy implements Resour
private final ResourceManagementSchedulerConfig resourceManagementSchedulerConfig;
private final ScheduledExecutorService exec;
private final Object lock = new Object();
private volatile boolean started = false;
private final LifecycleLock lifecycleLock = new LifecycleLock();
protected AbstractWorkerResourceManagementStrategy(
ResourceManagementSchedulerConfig resourceManagementSchedulerConfig,
@ -54,10 +53,10 @@ public abstract class AbstractWorkerResourceManagementStrategy implements Resour
@Override
public void startManagement(final WorkerTaskRunner runner)
{
synchronized (lock) {
if (started) {
return;
}
if (!lifecycleLock.canStart()) {
return;
}
try {
log.info("Started Resource Management Scheduler");
@ -98,9 +97,10 @@ public abstract class AbstractWorkerResourceManagementStrategy implements Resour
}
}
);
started = true;
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}
@ -111,14 +111,11 @@ public abstract class AbstractWorkerResourceManagementStrategy implements Resour
@Override
public void stopManagement()
{
synchronized (lock) {
if (!started) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
started = false;
if (!lifecycleLock.canStop()) {
return;
}
log.info("Stopping Resource Management Scheduler");
exec.shutdown();
}
}