ARTEMIS-1128 QueueControlImpl.sendMessage() fixes

sendMessage() may throw ActiveMQException that causes CNFE
at the management client. Also it should check if headers
in the message is null (to prevent NPE).
This commit is contained in:
Howard Gao 2017-04-24 22:51:12 +08:00 committed by Justin Bertram
parent e078666c03
commit 694a5092b6
2 changed files with 49 additions and 34 deletions

View File

@ -743,42 +743,48 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
boolean durable,
final String user,
final String password) throws Exception {
securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
try {
securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
@Override
public String getPassword() {
return password;
}
@Override
public String getPassword() {
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return null;
@Override
public RemotingConnection getRemotingConnection() {
return null;
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
}
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
}
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
}
@Override

View File

@ -57,6 +57,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
public class QueueControlTest extends ManagementTestBase {
private ActiveMQServer server;
@ -2110,15 +2112,22 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
queueControl.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
Assert.assertEquals(2, getMessageCount(queueControl));
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(1, browse.length);
Assert.assertEquals(2, browse.length);
byte[] body = (byte[]) browse[0].get("BodyPreview");
byte[] body = (byte[]) browse[0].get(BODY);
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "theBody");
body = (byte[]) browse[1].get(BODY);
Assert.assertNotNull(body);