This closes #430

This commit is contained in:
jbertram 2016-04-04 11:03:48 -05:00
commit bd5a78802f
10 changed files with 26 additions and 0 deletions

View File

@ -154,6 +154,7 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
@Override
public int getReconnectID() {
return sessionChannel.getReconnectID();
}

View File

@ -142,6 +142,7 @@ public final class ChannelImpl implements Channel {
this.interceptors = interceptors;
}
@Override
public int getReconnectID() {
return reconnectID.get();
}
@ -217,6 +218,7 @@ public final class ChannelImpl implements Channel {
return send(packet, -1, false, false);
}
@Override
public boolean send(Packet packet, final int reconnectID) {
return send(packet, reconnectID, false, false);
}

View File

@ -700,10 +700,12 @@ public class JDBCJournalImpl implements Journal {
return null;
}
@Override
public final void synchronizationLock() {
journalLock.writeLock().lock();
}
@Override
public final void synchronizationUnlock() {
journalLock.writeLock().unlock();
}

View File

@ -56,10 +56,12 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
}
}
@Override
public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction) {
preparedTransactions.add(preparedTransaction);
}
@Override
public synchronized void addRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
@ -71,11 +73,13 @@ public class JDBCJournalLoaderCallback implements LoaderCallback {
checkMaxId(info.id);
}
@Override
public synchronized void updateRecord(final RecordInfo info) {
int index = committedRecords.size();
committedRecords.add(index, info);
}
@Override
public synchronized void deleteRecord(final long id) {
for (Integer i : deleteReferences.get(id)) {
committedRecords.remove(i);

View File

@ -37,18 +37,22 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
this.loadManager = loadManager;
}
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
loadManager.addRecord(info);
}
@Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
loadManager.updateRecord(info);
}
@Override
public void onReadDeleteRecord(final long recordID) throws Exception {
loadManager.deleteRecord(recordID);
}
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
@ -58,6 +62,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
tx.recordInfos.add(info);
}
@Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
@ -67,6 +72,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
tx.recordInfos.add(info);
}
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
@ -76,6 +82,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
tx.recordsToDelete.add(info);
}
@Override
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
@ -88,6 +95,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
tx.extraData = extraData;
}
@Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
// It is possible that the TX could be null, since deletes could have happened in the journal.
TransactionHolder tx = loadTransactions.get(transactionID);
@ -106,6 +114,7 @@ public class JDBCJournalReaderCallback implements JournalReaderCallback {
}
}
@Override
public void onReadRollbackRecord(final long transactionID) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
if (tx == null) {

View File

@ -82,6 +82,7 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
}
@Override
public EmbeddedJMS setConfiguration(Configuration configuration) {
super.setConfiguration(configuration);
return this;

View File

@ -58,6 +58,7 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
private final Executor dispatchExecutor;
private final Runnable dispatchRunnable = new Runnable() {
@Override
public void run() {
dispatch();
}

View File

@ -35,6 +35,7 @@ public abstract class AbstractAcceptor implements Acceptor {
/**
* This will update the list of interceptors for each ProtocolManager inside the acceptor.
* */
@Override
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
for (ProtocolManager manager : protocolMap.values()) {
manager.updateInterceptors(incomingInterceptors, outgoingInterceptors);

View File

@ -87,6 +87,7 @@ public class InMemoryDirectoryServiceFactory implements DirectoryServiceFactory
/**
* {@inheritDoc}
*/
@Override
public void init(String name) throws Exception {
if ((directoryService != null) && directoryService.isStarted()) {
return;
@ -154,6 +155,7 @@ public class InMemoryDirectoryServiceFactory implements DirectoryServiceFactory
/**
* {@inheritDoc}
*/
@Override
public DirectoryService getDirectoryService() throws Exception {
return directoryService;
}
@ -161,6 +163,7 @@ public class InMemoryDirectoryServiceFactory implements DirectoryServiceFactory
/**
* {@inheritDoc}
*/
@Override
public PartitionFactory getPartitionFactory() throws Exception {
return partitionFactory;
}

View File

@ -62,6 +62,7 @@ public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
MessageProducer producer;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
@ -214,6 +215,7 @@ public class LargeMessageOverReplicationTest extends ActiveMQTestBase {
if (messageChunkCount == 100) {
final CountDownLatch latch = new CountDownLatch(1);
new Thread() {
@Override
public void run() {
try {
latch.countDown();