mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4645 - fix and test. adjust lease correctly in both behind and ahead of db case
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0826fdcf06
commit
17bf5c7d69
|
@ -170,18 +170,18 @@ public class LeaseDatabaseLocker extends AbstractLocker {
|
||||||
return diffFromCurrentTime;
|
return diffFromCurrentTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long determineTimeDifference(Connection connection) throws SQLException {
|
protected long determineTimeDifference(Connection connection) throws SQLException {
|
||||||
PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
|
PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
|
||||||
ResultSet resultSet = statement.executeQuery();
|
ResultSet resultSet = statement.executeQuery();
|
||||||
long result = 0l;
|
long result = 0l;
|
||||||
if (resultSet.next()) {
|
if (resultSet.next()) {
|
||||||
Timestamp timestamp = resultSet.getTimestamp(1);
|
Timestamp timestamp = resultSet.getTimestamp(1);
|
||||||
long diff = System.currentTimeMillis() - timestamp.getTime();
|
long diff = System.currentTimeMillis() - timestamp.getTime();
|
||||||
LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
|
if (Math.abs(diff) > maxAllowableDiffFromDBTime) {
|
||||||
if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
|
|
||||||
// off by more than maxAllowableDiffFromDBTime so lets adjust
|
// off by more than maxAllowableDiffFromDBTime so lets adjust
|
||||||
result = diff;
|
result = (-diff);
|
||||||
}
|
}
|
||||||
|
LOG.info(getLeaseHolderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,12 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.store.jdbc;
|
package org.apache.activemq.store.jdbc;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.Timestamp;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -29,17 +27,24 @@ import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.broker.AbstractLocker;
|
import org.apache.activemq.broker.AbstractLocker;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
import org.apache.derby.jdbc.EmbeddedDataSource;
|
||||||
|
import org.jmock.Expectations;
|
||||||
|
import org.jmock.Mockery;
|
||||||
|
import org.jmock.lib.legacy.ClassImposteriser;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class LeaseDatabaseLockerTest {
|
public class LeaseDatabaseLockerTest {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
|
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
|
||||||
|
@ -204,6 +209,59 @@ public class LeaseDatabaseLockerTest {
|
||||||
printLockTable(connection);
|
printLockTable(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiffOffsetAhead() throws Exception {
|
||||||
|
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
|
||||||
|
assertTrue("when ahead of db adjustment is negative", callDiffOffset(underTest, System.currentTimeMillis() - 60000) < 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiffOffsetBehind() throws Exception {
|
||||||
|
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
|
||||||
|
assertTrue("when behind db adjustment is positive", callDiffOffset(underTest, System.currentTimeMillis() + 60000) > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDiffIngoredIfLessthanMaxAllowableDiffFromDBTime() throws Exception {
|
||||||
|
LeaseDatabaseLocker underTest = new LeaseDatabaseLocker();
|
||||||
|
underTest.setMaxAllowableDiffFromDBTime(60000);
|
||||||
|
assertEquals("no adjust when under limit", 0, callDiffOffset(underTest,System.currentTimeMillis() - 40000 ));
|
||||||
|
}
|
||||||
|
|
||||||
|
public long callDiffOffset(LeaseDatabaseLocker underTest, final long dbTime) throws Exception {
|
||||||
|
|
||||||
|
Mockery context = new Mockery() {{
|
||||||
|
setImposteriser(ClassImposteriser.INSTANCE);
|
||||||
|
}};
|
||||||
|
final Statements statements = context.mock(Statements.class);
|
||||||
|
final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class);
|
||||||
|
final Connection connection = context.mock(Connection.class);
|
||||||
|
final PreparedStatement preparedStatement = context.mock(PreparedStatement.class);
|
||||||
|
final ResultSet resultSet = context.mock(ResultSet.class);
|
||||||
|
final Timestamp timestamp = context.mock(Timestamp.class);
|
||||||
|
|
||||||
|
context.checking(new Expectations() {{
|
||||||
|
allowing(jdbcPersistenceAdapter).getStatements();
|
||||||
|
will(returnValue(statements));
|
||||||
|
allowing(jdbcPersistenceAdapter);
|
||||||
|
allowing(statements);
|
||||||
|
allowing(connection).prepareStatement(with(any(String.class)));
|
||||||
|
will(returnValue(preparedStatement));
|
||||||
|
allowing(connection);
|
||||||
|
allowing(preparedStatement).executeQuery();
|
||||||
|
will(returnValue(resultSet));
|
||||||
|
allowing(resultSet).next();
|
||||||
|
will(returnValue(true));
|
||||||
|
allowing(resultSet).getTimestamp(1);
|
||||||
|
will(returnValue(timestamp));
|
||||||
|
allowing(timestamp).getTime();
|
||||||
|
will(returnValue(dbTime));
|
||||||
|
}});
|
||||||
|
|
||||||
|
underTest.configure(jdbcPersistenceAdapter);
|
||||||
|
return underTest.determineTimeDifference(connection);
|
||||||
|
}
|
||||||
|
|
||||||
private void printLockTable(Connection connection) throws Exception {
|
private void printLockTable(Connection connection) throws Exception {
|
||||||
DefaultJDBCAdapter.printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
|
DefaultJDBCAdapter.printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue