fixing BackupSyncJournalTest
This commit is contained in:
parent
2d3061d9b6
commit
2e973c4bff
|
@ -84,7 +84,9 @@ public class SessionReceiveLargeMessage extends PacketImpl implements MessagePac
|
|||
buffer.writeLong(consumerID);
|
||||
buffer.writeInt(deliveryCount);
|
||||
buffer.writeLong(largeMessageSize);
|
||||
message.encodeHeadersAndProperties(buffer);
|
||||
if (message != null) {
|
||||
message.encodeHeadersAndProperties(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -2493,8 +2493,10 @@ public class QueueImpl implements Queue {
|
|||
private void proceedDeliver(Consumer consumer, MessageReference reference) {
|
||||
try {
|
||||
consumer.proceedDeliver(reference);
|
||||
deliveriesInTransit.countDown();
|
||||
}
|
||||
catch (Throwable t) {
|
||||
deliveriesInTransit.countDown();
|
||||
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
|
||||
|
||||
synchronized (this) {
|
||||
|
@ -2510,9 +2512,6 @@ public class QueueImpl implements Queue {
|
|||
addHead(reference);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
deliveriesInTransit.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkExpired(final MessageReference reference) {
|
||||
|
|
|
@ -970,7 +970,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
public boolean deliver() throws Exception {
|
||||
lockDelivery.readLock().lock();
|
||||
try {
|
||||
if (largeMessage == null) {
|
||||
LargeServerMessage currentLargeMessage = largeMessage;
|
||||
if (currentLargeMessage == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -984,7 +985,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
}
|
||||
|
||||
if (!sentInitialPacket) {
|
||||
context = largeMessage.getBodyEncoder();
|
||||
context = currentLargeMessage.getBodyEncoder();
|
||||
|
||||
sizePendingLargeMessage = context.getLargeBodySize();
|
||||
|
||||
|
@ -992,7 +993,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
|
|||
|
||||
sentInitialPacket = true;
|
||||
|
||||
int packetSize = callback.sendLargeMessage(largeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
|
||||
int packetSize = callback.sendLargeMessage(currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
|
||||
|
||||
if (availableCredits != null) {
|
||||
availableCredits.addAndGet(-packetSize);
|
||||
|
|
Loading…
Reference in New Issue