mirror of https://github.com/apache/activemq.git
AMQ-5709 Reworking Logging on Locker
https://issues.apache.org/jira/browse/AMQ-5709 This is about changing how the Log is done in case of not being able to lock (log only once)
This commit is contained in:
parent
c705523cd0
commit
89c75ca28a
|
@ -16,15 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.store;
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.AbstractLocker;
|
import org.apache.activemq.broker.AbstractLocker;
|
||||||
import org.apache.activemq.util.LockFile;
|
import org.apache.activemq.util.LockFile;
|
||||||
import org.apache.activemq.util.ServiceStopper;
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an exclusive lock on a database to avoid multiple brokers running
|
* Represents an exclusive lock on a database to avoid multiple brokers running
|
||||||
* against the same logical database.
|
* against the same logical database.
|
||||||
|
@ -48,19 +48,30 @@ public class SharedFileLocker extends AbstractLocker {
|
||||||
if (failIfLocked) {
|
if (failIfLocked) {
|
||||||
lockFile.lock();
|
lockFile.lock();
|
||||||
} else {
|
} else {
|
||||||
|
// Print a warning only once
|
||||||
|
boolean warned = false;
|
||||||
boolean locked = false;
|
boolean locked = false;
|
||||||
while ((!isStopped()) && (!isStopping())) {
|
while ((!isStopped()) && (!isStopping())) {
|
||||||
try {
|
try {
|
||||||
lockFile.lock();
|
lockFile.lock();
|
||||||
locked = keepAlive();
|
locked = keepAlive();
|
||||||
|
LOG.info("locked " + locked);
|
||||||
break;
|
break;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Database "
|
if (!warned)
|
||||||
+ lockFileName
|
{
|
||||||
+ " is locked... waiting "
|
LOG.info("Database "
|
||||||
+ (lockAcquireSleepInterval / 1000)
|
+ lockFileName
|
||||||
+ " seconds for the database to be unlocked. Reason: "
|
+ " is locked by another server. This broker is now in slave mode waiting a lock to be acquired");
|
||||||
+ e);
|
warned = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("Database "
|
||||||
|
+ lockFileName
|
||||||
|
+ " is locked... waiting "
|
||||||
|
+ (lockAcquireSleepInterval / 1000)
|
||||||
|
+ " seconds for the database to be unlocked. Reason: "
|
||||||
|
+ e);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(lockAcquireSleepInterval);
|
Thread.sleep(lockAcquireSleepInterval);
|
||||||
} catch (InterruptedException e1) {
|
} catch (InterruptedException e1) {
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
public class SharedFileLockerTest
|
||||||
|
{
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder testFolder = new TemporaryFolder();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLock() throws Exception
|
||||||
|
{
|
||||||
|
final AtomicInteger errors = new AtomicInteger(0);
|
||||||
|
|
||||||
|
Thread thread = null;
|
||||||
|
|
||||||
|
SharedFileLocker locker1 = new SharedFileLocker();
|
||||||
|
locker1.setDirectory(testFolder.getRoot());
|
||||||
|
|
||||||
|
final SharedFileLocker locker2 = new SharedFileLocker();
|
||||||
|
locker2.setLockAcquireSleepInterval(1);
|
||||||
|
locker2.setDirectory(testFolder.getRoot());
|
||||||
|
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
locker1.doStart();
|
||||||
|
|
||||||
|
Assert.assertTrue(locker1.keepAlive());
|
||||||
|
|
||||||
|
Thread.sleep(10);
|
||||||
|
|
||||||
|
thread = new Thread("Locker Thread")
|
||||||
|
{
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
locker2.doStart();
|
||||||
|
}
|
||||||
|
catch (Throwable e)
|
||||||
|
{
|
||||||
|
errors.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
thread.start();
|
||||||
|
|
||||||
|
// Waiting some small time here, you shouldn't see many messages
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
Assert.assertTrue(thread.isAlive());
|
||||||
|
|
||||||
|
locker1.stop();
|
||||||
|
|
||||||
|
// 10 seconds here is an eternity, but it should only take milliseconds
|
||||||
|
thread.join(5000);
|
||||||
|
|
||||||
|
long timeout = System.currentTimeMillis() + 5000;
|
||||||
|
|
||||||
|
while (timeout > System.currentTimeMillis() && !locker2.keepAlive())
|
||||||
|
{
|
||||||
|
Thread.sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(locker2.keepAlive());
|
||||||
|
|
||||||
|
locker2.stop();
|
||||||
|
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
// to make sure we won't leak threads if the test ever failed for any reason
|
||||||
|
thread.join(1000);
|
||||||
|
if (thread.isAlive())
|
||||||
|
{
|
||||||
|
thread.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
File lockFile = new File(testFolder.getRoot(), "lock");
|
||||||
|
lockFile.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue