This commit is contained in:
Justin Bertram 2017-04-24 13:52:51 -05:00
commit d0219bea18
2 changed files with 49 additions and 34 deletions

View File

@ -743,42 +743,48 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
boolean durable,
final String user,
final String password) throws Exception {
securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
try {
securityStore.check(queue.getAddress(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
@Override
public String getPassword() {
return password;
}
@Override
public String getPassword() {
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return null;
@Override
public RemotingConnection getRemotingConnection() {
return null;
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
}
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
}
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
}
@Override

View File

@ -57,6 +57,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
public class QueueControlTest extends ManagementTestBase {
private ActiveMQServer server;
@ -2110,15 +2112,22 @@ public class QueueControlTest extends ManagementTestBase {
QueueControl queueControl = createManagementControl(address, queue);
queueControl.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
queueControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes("theBody".getBytes()), true, "myUser", "myPassword");
Assert.assertEquals(1, getMessageCount(queueControl));
Assert.assertEquals(2, getMessageCount(queueControl));
// the message IDs are set on the server
CompositeData[] browse = queueControl.browse(null);
Assert.assertEquals(1, browse.length);
Assert.assertEquals(2, browse.length);
byte[] body = (byte[]) browse[0].get("BodyPreview");
byte[] body = (byte[]) browse[0].get(BODY);
Assert.assertNotNull(body);
Assert.assertEquals(new String(body), "theBody");
body = (byte[]) browse[1].get(BODY);
Assert.assertNotNull(body);