This closes #277

This commit is contained in:
Clebert Suconic 2015-12-21 22:06:04 -05:00
commit d435a3d00a
10 changed files with 19 additions and 0 deletions

View File

@ -539,6 +539,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
rollback(false);
}
@Override
public void rollback(final boolean isLastMessageAsDelivered) throws ActiveMQException
{
rollback(isLastMessageAsDelivered, true);

View File

@ -100,6 +100,7 @@ public class NettyConnection implements Connection {
// Connection implementation ----------------------------
@Override
public boolean isWritable(ReadyListener callback) {
synchronized (readyListeners) {
readyListeners.push(callback);
@ -108,6 +109,7 @@ public class NettyConnection implements Connection {
}
}
@Override
public void fireReady(final boolean ready) {
synchronized (readyListeners) {
this.ready = ready;

View File

@ -53,6 +53,7 @@ public class MQTTConnection implements RemotingConnection {
this.destroyed = false;
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}

View File

@ -161,6 +161,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
this.creationTime = System.currentTimeMillis();
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}

View File

@ -119,6 +119,7 @@ public final class StompConnection implements RemotingConnection {
return frame;
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}

View File

@ -101,6 +101,7 @@ public class InVMConnection implements Connection {
// no op
}
@Override
public boolean isWritable(ReadyListener listener) {
return true;
}

View File

@ -357,10 +357,12 @@ public class TransactionImpl implements Transaction {
// to execute this runnable in the correct order
storageManager.afterCompleteOperations(new IOCallback() {
@Override
public void onError(final int errorCode, final String errorMessage) {
ActiveMQServerLogger.LOGGER.ioErrorOnTX(errorCode, errorMessage);
}
@Override
public void done() {
afterRollback(operationsToComplete);
}

View File

@ -157,6 +157,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
final int NUMBER_OF_MESSAGES = 1000;
Thread producer = new Thread() {
@Override
public void run() {
try {
ServerLocator locator = createInVMLocator(0);
@ -199,6 +200,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
final AtomicBoolean metaDataFailed = new AtomicBoolean(false);
Thread buggerThread = new Thread() {
@Override
public void run() {
while (running.get()) {
try {
@ -398,6 +400,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
isDeliveryTransacted = deliveryTransacted;
}
@Override
public MessageEndpoint createEndpoint(XAResource xaResource) throws UnavailableException {
TestEndpoint retEnd = new TestEndpoint();
if (xaResource != null) {
@ -406,6 +409,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
return retEnd;
}
@Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return isDeliveryTransacted;
}
@ -446,6 +450,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase
}
@Override
public void onMessage(Message message) {
Integer value = 0;

View File

@ -142,6 +142,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
this.finish = finish;
}
@Override
public void run() {
try {
align.countDown();
@ -215,6 +216,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
this.numberOfMessages = numberOfMessages;
}
@Override
public void run() {
long deletesNr = 0;
try {

View File

@ -93,6 +93,7 @@ public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
cf = new ActiveMQConnectionFactory();
Thread slowSending = new Thread() {
@Override
public void run() {
Connection conn = null;
try {
@ -207,6 +208,7 @@ public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
int errors = 0;
@Override
public void run() {
try {
@ -258,6 +260,7 @@ public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
this.finish = finish;
}
@Override
public void run() {
try {