This closes #562

This commit is contained in:
jbertram 2016-06-06 14:41:21 -05:00
commit 25316e4232
6 changed files with 31 additions and 41 deletions

View File

@ -79,10 +79,6 @@ public class ClientProducerCreditManagerImpl implements ClientProducerCreditMana
// while this is still sending requests causing a dead lock
if (needInit) {
credits.init(context);
// Setting default address on the session
session.setAddress(null, address);
}
return credits;

View File

@ -210,10 +210,13 @@ public class ClientProducerImpl implements ClientProducerInternal {
closed = true;
}
private void doSend(final SimpleString address1,
private void doSend(SimpleString sendingAddress,
final Message msg,
final SendAcknowledgementHandler handler,
final boolean forceAsync) throws ActiveMQException {
if (sendingAddress == null) {
sendingAddress = this.address;
}
session.startCall();
try {
@ -233,28 +236,16 @@ public class ClientProducerImpl implements ClientProducerInternal {
isLarge = false;
}
if (address1 != null) {
if (!isLarge) {
session.setAddress(msg, address1);
}
else {
msg.setAddress(address1);
}
// Anonymous
theCredits = session.getCredits(address1, true);
if (!isLarge) {
session.setAddress(msg, sendingAddress);
}
else {
if (!isLarge) {
session.setAddress(msg, this.address);
}
else {
msg.setAddress(this.address);
}
theCredits = producerCredits;
msg.setAddress(sendingAddress);
}
// Anonymous
theCredits = session.getCredits(sendingAddress, true);
if (rateLimiter != null) {
// Rate flow control
@ -276,6 +267,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
}
else {
sendRegularMessage(msgI, sendBlocking, theCredits, handler);
session.checkDefaultAddress(sendingAddress);
}
}
finally {

View File

@ -1053,22 +1053,30 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
public void setAddress(final Message message, final SimpleString address) {
if (defaultAddress == null) {
logger.tracef("setAddress() Setting default address as %s", address);
defaultAddress = address;
if (message != null) {
message.setAddress(address);
}
message.setAddress(address);
}
else if (message != null) {
else {
if (!address.equals(defaultAddress)) {
logger.tracef("setAddress() setting non default address %s on message", address);
message.setAddress(address);
}
else {
logger.trace("setAddress() being set as null");
message.setAddress(null);
}
}
}
@Override
public void checkDefaultAddress(SimpleString address) {
if (defaultAddress == null) {
logger.tracef("checkDefaultAddress(%s)", address);
defaultAddress = address;
}
}
@Override
public void setPacketSize(final int packetSize) {
if (packetSize > this.initialMessagePacketSize) {

View File

@ -87,8 +87,11 @@ public interface ClientSessionInternal extends ClientSession {
ClientProducerCreditManager getProducerCreditManager();
/** This will set the address at the message */
void setAddress(Message message, SimpleString address);
void checkDefaultAddress(SimpleString address);
void setPacketSize(int packetSize);
void resetIfNeeded() throws ActiveMQException;

View File

@ -1233,7 +1233,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
SimpleString address = message.getAddress();
checkDefaultAddress(address);
if (defaultAddress == null && address != null) {
defaultAddress = address;
}
if (address == null) {
if (message.isDurable()) {
@ -1267,14 +1269,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return result;
}
protected void checkDefaultAddress(SimpleString address) {
logger.tracef("checkDefaultAddress %s, defaultAddress=%s", address, defaultAddress);
if (defaultAddress == null && address != null) {
logger.tracef("checkDefaultAddress setting as %s", address);
defaultAddress = address;
}
}
@Override
public void sendContinuations(final int packetSize,
final long messageBodySize,
@ -1304,10 +1298,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void requestProducerCredits(final SimpleString address, final int credits) throws Exception {
// When the client gets the producer credits it already has the intention of sending messages
// and it will already register the default address on the core protocol
// hence we need to set it here when we request credits as well
checkDefaultAddress(address);
PagingStore store = server.getPagingManager().getPageStore(address);
if (!store.checkMemory(new Runnable() {

View File

@ -96,6 +96,7 @@ public class MultipleProducersTest extends JMSTestBase {
}
}
catch (Throwable t) {
// t.printStackTrace();
// expected
}