This commit is contained in:
Clebert Suconic 2019-01-15 14:30:53 -05:00
commit 1e65b295c1
9 changed files with 542 additions and 129 deletions

View File

@ -21,8 +21,19 @@ import javax.management.MBeanInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.DummyOperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public abstract class AbstractControl extends StandardMBean {
@ -77,8 +88,61 @@ public abstract class AbstractControl extends StandardMBean {
return new MBeanInfo(info.getClassName(), info.getDescription(), fillMBeanAttributeInfo(), info.getConstructors(), fillMBeanOperationInfo(), info.getNotifications());
}
// Private -------------------------------------------------------
protected String sendMessage(SimpleString address,
ActiveMQServer server,
Map<String, String> headers,
int type,
String body,
boolean durable,
String user,
String password,
Long...queueID) throws Exception {
ManagementRemotingConnection fakeConnection = new ManagementRemotingConnection();
ServerSession serverSession = server.createSession("management::" + UUIDGenerator.getInstance().generateStringUUID(), user, password,
Integer.MAX_VALUE, fakeConnection,
true, true, false,
false, address.toString(), fakeConnection.callback,
false, new DummyOperationContext(), Collections.emptyMap());
try {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
}
// Inner classes -------------------------------------------------
message.setAddress(address);
// if a queueID is used, we set the routeToIDs property
// to one or many specific queues
if (queueID != null && queueID.length > 0) {
ByteBuffer buffer = ByteBuffer.allocate(8 * queueID.length);
for (Long q : queueID) {
buffer.putLong(q);
}
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
}
// There's no point on direct delivery using the management thread, use false here
serverSession.send(message, false);
return "" + message.getMessageID();
} finally {
try {
serverSession.close(false);
} catch (Exception ignored) {
}
}
}
// Inner classes------------------------------------------------
}

View File

@ -26,30 +26,23 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
public class AddressControlImpl extends AbstractControl implements AddressControl {
@ -60,7 +53,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
private AddressInfo addressInfo;
private final PostOffice postOffice;
private final ActiveMQServer server;
private final PagingManager pagingManager;
@ -75,15 +68,15 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
// Constructors --------------------------------------------------
public AddressControlImpl(AddressInfo addressInfo,
final PostOffice postOffice,
final ActiveMQServer server,
final PagingManager pagingManager,
final StorageManager storageManager,
final HierarchicalRepository<Set<Role>> securityRepository,
final SecurityStore securityStore,
final ManagementService managementService)throws Exception {
super(AddressControl.class, storageManager);
this.server = server;
this.addressInfo = addressInfo;
this.postOffice = postOffice;
this.pagingManager = pagingManager;
this.securityRepository = securityRepository;
this.securityStore = securityStore;
@ -130,7 +123,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public String[] getQueueNames() throws Exception {
clearIO();
try {
Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
@ -149,7 +142,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
public String[] getBindingNames() throws Exception {
clearIO();
try {
Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
String[] bindingNames = new String[bindings.getBindings().size()];
int i = 0;
for (Binding binding : bindings.getBindings()) {
@ -234,7 +227,7 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
clearIO();
long totalMsgs = 0;
try {
Bindings bindings = postOffice.getBindingsForAddress(addressInfo.getName());
Bindings bindings = server.getPostOffice().getBindingsForAddress(addressInfo.getName());
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
totalMsgs += ((QueueBinding) binding).getQueue().getMessageCount();
@ -302,46 +295,13 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
final String user,
final String password) throws Exception {
try {
securityStore.check(addressInfo.getName(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
@Override
public String getPassword() {
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return null;
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
}
message.setAddress(addressInfo.getName());
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
return sendMessage(addressInfo.getName(), server, headers, type, body, durable, user, password);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage());
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);

View File

@ -0,0 +1,277 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import javax.security.auth.Subject;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class ManagementRemotingConnection implements RemotingConnection {
@Override
public Object getID() {
return null;
}
@Override
public long getCreationTime() {
return 0;
}
@Override
public String getRemoteAddress() {
return "Management";
}
@Override
public void scheduledFlush() {
}
@Override
public void addFailureListener(FailureListener listener) {
}
@Override
public boolean removeFailureListener(FailureListener listener) {
return false;
}
@Override
public void addCloseListener(CloseListener listener) {
}
@Override
public boolean removeCloseListener(CloseListener listener) {
return false;
}
@Override
public List<CloseListener> removeCloseListeners() {
return null;
}
@Override
public void setCloseListeners(List<CloseListener> listeners) {
}
@Override
public List<FailureListener> getFailureListeners() {
return null;
}
@Override
public List<FailureListener> removeFailureListeners() {
return null;
}
@Override
public void setFailureListeners(List<FailureListener> listeners) {
}
@Override
public ActiveMQBuffer createTransportBuffer(int size) {
return null;
}
@Override
public void fail(ActiveMQException me) {
}
@Override
public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
}
@Override
public void destroy() {
}
@Override
public Connection getTransportConnection() {
return null;
}
@Override
public boolean isClient() {
return false;
}
@Override
public boolean isDestroyed() {
return false;
}
@Override
public void disconnect(boolean criticalError) {
}
@Override
public void disconnect(String scaleDownNodeID, boolean criticalError) {
}
@Override
public boolean checkDataReceived() {
return false;
}
@Override
public void flush() {
}
@Override
public boolean isWritable(ReadyListener callback) {
return false;
}
@Override
public void killMessage(SimpleString nodeID) {
}
@Override
public boolean isSupportReconnect() {
return false;
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
@Override
public Subject getSubject() {
return null;
}
@Override
public String getProtocolName() {
return null;
}
@Override
public void setClientID(String cID) {
}
@Override
public String getClientID() {
return null;
}
@Override
public String getTransportLocalAddress() {
return "Manaement";
}
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
}
public SessionCallback callback = new SessionCallback() {
@Override
public boolean hasCredits(ServerConsumer consumerID) {
return false;
}
@Override
public void afterDelivery() throws Exception {
}
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
}
@Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}
@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
return 0;
}
@Override
public int sendLargeMessage(MessageReference reference,
Message message,
ServerConsumer consumerID,
long bodySize,
int deliveryCount) {
return 0;
}
@Override
public int sendLargeMessageContinuation(ServerConsumer consumerID,
byte[] body,
boolean continues,
boolean requiresResponse) {
return 0;
}
@Override
public void closed() {
}
@Override
public void disconnect(ServerConsumer consumerId, SimpleString queueName) {
}
@Override
public boolean isWritable(ReadyListener callback, Object protocolContext) {
return false;
}
@Override
public void browserFinished(ServerConsumer consumer) {
}
};
}

View File

@ -22,7 +22,6 @@ import javax.json.JsonObjectBuilder;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
@ -39,24 +38,19 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@ -72,7 +66,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
private final String address;
private final PostOffice postOffice;
private final ActiveMQServer server;
private final StorageManager storageManager;
private final SecurityStore securityStore;
@ -111,14 +105,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
public QueueControlImpl(final Queue queue,
final String address,
final PostOffice postOffice,
final ActiveMQServer server,
final StorageManager storageManager,
final SecurityStore securityStore,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
super(QueueControl.class, storageManager);
this.queue = queue;
this.address = address;
this.postOffice = postOffice;
this.server = server;
this.storageManager = storageManager;
this.securityStore = securityStore;
this.addressSettingsRepository = addressSettingsRepository;
@ -896,7 +890,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
clearIO();
try {
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@ -925,7 +919,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
Filter filter = FilterImpl.createFilter(filterStr);
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
Binding binding = server.getPostOffice().getBinding(new SimpleString(otherQueueName));
if (binding == null) {
throw ActiveMQMessageBundle.BUNDLE.noQueueFound(otherQueueName);
@ -969,45 +963,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
final String user,
final String password) throws Exception {
try {
securityStore.check(queue.getAddress(), queue.getName(), CheckType.SEND, new SecurityAuth() {
@Override
public String getUsername() {
return user;
}
@Override
public String getPassword() {
return password;
}
@Override
public RemotingConnection getRemotingConnection() {
return null;
}
});
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
for (String header : headers.keySet()) {
message.putStringProperty(new SimpleString(header), new SimpleString(headers.get(header)));
}
}
message.setType((byte) type);
message.setDurable(durable);
message.setTimestamp(System.currentTimeMillis());
if (body != null) {
if (type == Message.TEXT_TYPE) {
message.getBodyBuffer().writeNullableSimpleString(new SimpleString(body));
} else {
message.getBodyBuffer().writeBytes(Base64.decode(body));
}
}
message.setAddress(queue.getAddress());
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(queue.getID());
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, buffer.array());
postOffice.route(message, true);
return "" + message.getMessageID();
} catch (ActiveMQException e) {
return sendMessage(queue.getAddress(), server, headers, type, body, durable, user, password, queue.getID());
} catch (Exception e) {
throw new IllegalStateException(e.getMessage());
}
}
@ -1419,7 +1376,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
// Private -------------------------------------------------------
private void checkStarted() {
if (!postOffice.isStarted()) {
if (!server.getPostOffice().isStarted()) {
throw new IllegalStateException("Broker is not started. Queue can not be managed yet");
}
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
final class DummyOperationContext implements OperationContext {
public final class DummyOperationContext implements OperationContext {
private static DummyOperationContext instance = new DummyOperationContext();

View File

@ -24,11 +24,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants;
@ -39,6 +41,30 @@ import io.netty.buffer.Unpooled;
public final class LargeServerMessageImpl extends CoreMessage implements LargeServerMessage {
/** This will check if a regular message needs to be converted as large message */
public static Message checkLargeMessage(Message message, StorageManager storageManager) throws Exception {
if (message.isLargeMessage()) {
return message; // nothing to be done on this case
}
if (message.getEncodeSize() > storageManager.getMaxRecordSize()) {
return asLargeMessage(message, storageManager);
} else {
return message;
}
}
private static Message asLargeMessage(Message message, StorageManager storageManager) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = storageManager.createLargeMessage(storageManager.generateID(), coreMessage);
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources();
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm;
}
// Constants -----------------------------------------------------
private static final Logger logger = Logger.getLogger(LargeServerMessageImpl.class);

View File

@ -35,12 +35,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -55,6 +53,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@ -71,7 +70,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
@ -1465,18 +1463,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
}
private LargeServerMessage messageToLargeMessage(Message message) throws Exception {
ICoreMessage coreMessage = message.toCore();
LargeServerMessage lsm = getStorageManager().createLargeMessage(storageManager.generateID(), coreMessage);
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources();
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm;
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
@ -1487,17 +1473,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
Message messageParameter,
final boolean direct,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception {
final Message message;
if ((msg.getEncodeSize() > storageManager.getMaxRecordSize()) && !msg.isLargeMessage()) {
message = messageToLargeMessage(msg);
} else {
message = msg;
}
final Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeSend(this, tx, message, direct, noAutoCreateQueue));

View File

@ -216,7 +216,7 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public void registerAddress(AddressInfo addressInfo) throws Exception {
ObjectName objectName = objectNameBuilder.getAddressObjectName(addressInfo.getName());
AddressControlImpl addressControl = new AddressControlImpl(addressInfo, postOffice, pagingManager, storageManager, securityRepository, securityStore, this);
AddressControlImpl addressControl = new AddressControlImpl(addressInfo, messagingServer, pagingManager, storageManager, securityRepository, securityStore, this);
registerInJMX(objectName, addressControl);
@ -246,7 +246,7 @@ public class ManagementServiceImpl implements ManagementService {
return;
}
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurableMessage(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.management;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
public class LargeMessageOverManagementTest extends ManagementTestBase {
private ClientSession session;
private ServerLocator locator;
private ClientSessionFactory sf;
private ActiveMQServer server;
protected AddressControl createManagementControl(final SimpleString address) throws Exception {
return ManagementControlHelper.createAddressControl(address, mbeanServer);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
Configuration config = createBasicConfig();
TransportConfiguration acceptorConfig = createTransportConfiguration(false, true, generateParams(0, false));
config.addAcceptorConfiguration(acceptorConfig);
server = createServer(true, config);
server.setMBeanServer(mbeanServer);
server.start();
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true);
sf = createSessionFactory(locator);
session = sf.createSession(false, true, false);
session.start();
addClientSession(session);
}
@Test
public void testSendOverSizeMessageOverQueueControl() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString emptyqueue = RandomUtil.randomSimpleString();
session.createQueue(address, RoutingType.MULTICAST, queue, null, true);
session.createQueue(address, RoutingType.MULTICAST, emptyqueue, null, true);
QueueControl queueControl = createManagementControl(address, queue);
int bodySize = (int) server.getStorageManager().getMaxRecordSize() + 100;
byte[] bigData = createBytesData(bodySize);
queueControl.sendMessage(new HashMap<String, String>(), Message.BYTES_TYPE, Base64.encodeBytes(bigData), true, "myUser", "myPassword");
ClientConsumer consumer = session.createConsumer(queue);
ClientMessage message = consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(bigData.length, message.getBodySize());
Assert.assertTrue(message.isLargeMessage());
byte[] bytesRead = new byte[bigData.length];
message.getBodyBuffer().readBytes(bytesRead);
for (int i = 0; i < bytesRead.length; i++) {
Assert.assertEquals(bytesRead[i], bigData[i]);
}
consumer.close();
// this is an extra check,
consumer = session.createConsumer(emptyqueue);
Assert.assertNull(consumer.receiveImmediate());
}
@Test
public void testSendOverSizeMessageOverAddressControl() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);
AddressControl addressControl = createManagementControl(address);
session.createQueue(address, RoutingType.ANYCAST, address);
int bodySize = server.getConfiguration().getJournalBufferSize_AIO();
byte[] bigData = createBytesData(bodySize);
addressControl.sendMessage(null, Message.BYTES_TYPE, Base64.encodeBytes(bigData), false, null, null);
ClientConsumer consumer = session.createConsumer(address);
ClientMessage message = consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(bigData.length, message.getBodySize());
Assert.assertTrue(message.isLargeMessage());
byte[] bytesRead = new byte[bigData.length];
message.getBodyBuffer().readBytes(bytesRead);
for (int i = 0; i < bytesRead.length; i++) {
Assert.assertEquals(bytesRead[i], bigData[i]);
}
}
byte[] createBytesData(int nbytes) {
byte[] result = new byte[nbytes];
for (int i = 0; i < nbytes; i++) {
result[i] = RandomUtil.randomByte();
}
return result;
}
}