https://issues.apache.org/jira/browse/AMQ-4705 - close window where both master and slave can be active - slave waits for possible keepAlivePeriod with the lock before startin. Ensures a keepAlive has completed on the master

This commit is contained in:
gtully 2015-05-22 11:40:25 +01:00
parent cc36633cef
commit 9bc602be43
2 changed files with 209 additions and 116 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.store;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.AbstractLocker; import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.util.LockFile; import org.apache.activemq.util.LockFile;
@ -54,6 +55,13 @@ public class SharedFileLocker extends AbstractLocker {
while ((!isStopped()) && (!isStopping())) { while ((!isStopped()) && (!isStopping())) {
try { try {
lockFile.lock(); lockFile.lock();
if (warned) {
// ensure lockHolder has released; wait for one keepAlive iteration
try {
TimeUnit.MILLISECONDS.sleep(lockable != null ? lockable.getLockKeepAlivePeriod() : 0l);
} catch (InterruptedException e1) {
}
}
locked = keepAlive(); locked = keepAlive();
break; break;
} catch (IOException e) { } catch (IOException e) {
@ -72,7 +80,7 @@ public class SharedFileLocker extends AbstractLocker {
+ " seconds for the database to be unlocked. Reason: " + " seconds for the database to be unlocked. Reason: "
+ e); + e);
try { try {
Thread.sleep(lockAcquireSleepInterval); TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
} }
} }

View File

@ -18,25 +18,39 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.util.DefaultTestAppender; import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.slf4j.LoggerFactory;
import static junit.framework.Assert.assertTrue;
public class SharedFileLockerTest {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(SharedFileLockerTest.class);
public class SharedFileLockerTest
{
@Rule @Rule
public TemporaryFolder testFolder; public TemporaryFolder testFolder;
public SharedFileLockerTest() public SharedFileLockerTest() {
{
File file = new File(IOHelper.getDefaultDataDirectory()); File file = new File(IOHelper.getDefaultDataDirectory());
file.mkdir(); file.mkdir();
@ -46,11 +60,9 @@ public class SharedFileLockerTest
} }
@Test @Test
public void testLoop() throws Exception public void testLoop() throws Exception {
{
// Increase the number of iterations if you are debugging races // Increase the number of iterations if you are debugging races
for (int i = 0 ; i < 100; i++) for (int i = 0; i < 100; i++) {
{
internalLoop(5); internalLoop(5);
} }
@ -58,15 +70,13 @@ public class SharedFileLockerTest
@Test @Test
public void testLogging() throws Exception public void testLogging() throws Exception {
{
// using a bigger wait here // using a bigger wait here
// to make sure we won't log any extra info // to make sure we won't log any extra info
internalLoop(100); internalLoop(100);
} }
private void internalLoop(long timewait) throws Exception private void internalLoop(long timewait) throws Exception {
{
final AtomicInteger logCounts = new AtomicInteger(0); final AtomicInteger logCounts = new AtomicInteger(0);
DefaultTestAppender appender = new DefaultTestAppender() { DefaultTestAppender appender = new DefaultTestAppender() {
@Override @Override
@ -89,22 +99,16 @@ public class SharedFileLockerTest
locker2.setDirectory(testFolder.getRoot()); locker2.setDirectory(testFolder.getRoot());
try try {
{
locker1.doStart(); locker1.doStart();
Assert.assertTrue(locker1.keepAlive()); Assert.assertTrue(locker1.keepAlive());
thread = new Thread("Locker Thread") thread = new Thread("Locker Thread") {
{ public void run() {
public void run() try {
{
try
{
locker2.doStart(); locker2.doStart();
} } catch (Throwable e) {
catch (Throwable e)
{
errors.incrementAndGet(); errors.incrementAndGet();
} }
} }
@ -117,14 +121,12 @@ public class SharedFileLockerTest
// as I want the test to run as fast as possible // as I want the test to run as fast as possible
{ {
long timeout = System.currentTimeMillis() + 5000; long timeout = System.currentTimeMillis() + 5000;
while (logCounts.get() < 1 && System.currentTimeMillis() < timeout) while (logCounts.get() < 1 && System.currentTimeMillis() < timeout) {
{
Thread.sleep(1); Thread.sleep(1);
} }
} }
if (timewait > 0) if (timewait > 0) {
{
Thread.sleep(timewait); Thread.sleep(timewait);
} }
@ -139,8 +141,7 @@ public class SharedFileLockerTest
long timeout = System.currentTimeMillis() + 5000; long timeout = System.currentTimeMillis() + 5000;
while (timeout > System.currentTimeMillis() && !locker2.keepAlive()) while (timeout > System.currentTimeMillis() && !locker2.keepAlive()) {
{
Thread.sleep(1); Thread.sleep(1);
} }
@ -150,17 +151,14 @@ public class SharedFileLockerTest
Assert.assertEquals(0, errors.get()); Assert.assertEquals(0, errors.get());
} } finally {
finally
{
Logger.getRootLogger().removeAppender(appender); Logger.getRootLogger().removeAppender(appender);
// to make sure we won't leak threads if the test ever failed for any reason // to make sure we won't leak threads if the test ever failed for any reason
thread.join(1000); thread.join(1000);
if (thread.isAlive()) if (thread.isAlive()) {
{
thread.interrupt(); thread.interrupt();
} }
@ -169,4 +167,91 @@ public class SharedFileLockerTest
} }
} }
@Test
public void verifyLockAcquireWaitsForLockDrop() throws Exception {
final AtomicInteger logCounts = new AtomicInteger(0);
DefaultTestAppender appender = new DefaultTestAppender() {
@Override
public void doAppend(LoggingEvent event) {
logCounts.incrementAndGet();
}
};
Logger sharedFileLogger = Logger.getLogger(SharedFileLocker.class);
sharedFileLogger.addAppender(appender);
LockableServiceSupport config = new LockableServiceSupport() {
@Override
public long getLockKeepAlivePeriod() {
return 500;
}
@Override
public Locker createDefaultLocker() throws IOException {
return null;
}
public void init() throws Exception {
}
protected void doStop(ServiceStopper stopper) throws Exception {
}
protected void doStart() throws Exception {
}
};
final SharedFileLocker underTest = new SharedFileLocker();
underTest.setDirectory(testFolder.getRoot());
underTest.setLockAcquireSleepInterval(5);
underTest.setLockable(config);
// get the in jvm lock
File lockFile = new File(testFolder.getRoot(), "lock");
String jvmProp = LockFile.class.getName() + ".lock." + lockFile.getCanonicalPath();
System.getProperties().put(jvmProp, jvmProp);
final CountDownLatch locked = new CountDownLatch(1);
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
final AtomicLong acquireTime = new AtomicLong(0l);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
underTest.start();
acquireTime.set(System.currentTimeMillis());
locked.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("locker failed to obtain lock", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return logCounts.get() > 0;
}
}, 5000, 10));
// release vm lock
long releaseTime = System.currentTimeMillis();
System.getProperties().remove(jvmProp);
assertTrue("locker got lock", locked.await(5, TimeUnit.SECONDS));
// verify delay in start
LOG.info("ReleaseTime: " + releaseTime + ", AcquireTime:" + acquireTime.get());
assertTrue("acquire delayed for keepAlive: " + config.getLockKeepAlivePeriod(), acquireTime.get() >= releaseTime + config.getLockKeepAlivePeriod());
} finally {
executorService.shutdownNow();
underTest.stop();
lockFile.delete();
sharedFileLogger.removeAppender(appender);
}
}
} }