This commit is contained in:
Clebert Suconic 2023-01-13 15:52:10 -05:00
commit 56111ebfef
65 changed files with 3604 additions and 321 deletions

View File

@ -44,6 +44,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final int id;
private final SimpleString address;
private final ClientSessionInternal session;
@ -75,7 +77,10 @@ public class ClientProducerImpl implements ClientProducerInternal {
final boolean autoGroup,
final SimpleString groupID,
final int minLargeMessageSize,
final SessionContext sessionContext) {
final SessionContext sessionContext,
final int producerID) {
this.id = producerID;
this.sessionContext = sessionContext;
this.session = session;
@ -197,6 +202,11 @@ public class ClientProducerImpl implements ClientProducerInternal {
return producerCredits;
}
@Override
public int getID() {
return id;
}
private void doCleanup() {
if (address != null) {
session.returnCredits(address);
@ -204,6 +214,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
session.removeProducer(this);
sessionContext.removeProducer(id);
closed = true;
}
@ -290,7 +302,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
theCredits.acquireCredits(creditSize);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address);
sessionContext.sendFullMessage(msgI, sendBlocking, handler, address, id);
}
private void checkClosed() throws ActiveMQException {
@ -381,7 +393,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), messageHandler);
int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.array(), id, messageHandler);
credits.acquireCredits(creditsUsed);
}
@ -492,7 +504,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
} else {
@ -501,7 +513,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
sendInitialLargeMessageHeader(msgI, credits);
}
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, id, handler);
credits.acquireCredits(creditsSent);
}
}

View File

@ -26,4 +26,6 @@ public interface ClientProducerInternal extends ClientProducer {
void cleanUp();
ClientProducerCredits getProducerCredits();
int getID();
}

View File

@ -161,6 +161,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private AtomicInteger producerIDs = new AtomicInteger();
ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
final String name,
final String username,
@ -2031,10 +2033,20 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final int maxRate) throws ActiveMQException {
checkClosed();
ClientProducerInternal producer = new ClientProducerImpl(this, address, maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false), autoCommitSends && blockOnNonDurableSend, autoCommitSends && blockOnDurableSend, autoGroup, groupID == null ? null : new SimpleString(groupID), minLargeMessageSize, sessionContext);
ClientProducerInternal producer = new ClientProducerImpl(this,
address,
maxRate == -1 ? null : new TokenBucketLimiterImpl(maxRate, false),
autoCommitSends && blockOnNonDurableSend,
autoCommitSends && blockOnDurableSend,
autoGroup, groupID == null ? null : new SimpleString(groupID),
minLargeMessageSize,
sessionContext,
producerIDs.incrementAndGet());
addProducer(producer);
sessionContext.createProducer(producer);
return producer;
}

View File

@ -76,6 +76,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
return version < PacketImpl.ARTEMIS_2_18_0_VERSION;
}
default boolean isBeforeProducerMetricsChanged() {
int version = getChannelVersion();
return version < PacketImpl.ARTEMIS_2_28_0_VERSION;
}
/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types

View File

@ -58,6 +58,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.core.Channel;
@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.ResponseHandler;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
@ -76,6 +78,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Disconnect
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.DisconnectConsumerWithKillMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RemoveProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessageV2;
@ -105,10 +108,12 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@ -371,6 +376,20 @@ public class ActiveMQSessionContext extends SessionContext {
return remotingConnection.isWritable(callback);
}
@Override
public void createProducer(ClientProducerInternal producer) {
if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
sessionChannel.send(new CreateProducerMessage(producer.getID(), producer.getAddress()));
}
}
@Override
public void removeProducer(int id) {
if (!sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
sessionChannel.send(new RemoveProducerMessage(id));
}
}
@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,
@ -546,15 +565,19 @@ public class ActiveMQSessionContext extends SessionContext {
public void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException {
SimpleString defaultAddress,
int senderID) throws ActiveMQException {
final SessionSendMessage packet;
if (sessionChannel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionSendMessage_1X(msgI, sendBlocking, handler);
} else if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendMessage(msgI, sendBlocking, handler);
} else {
} else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V2(msgI, responseRequired, handler);
} else {
boolean responseRequired = confirmationWindow != -1 || sendBlocking;
packet = new SessionSendMessage_V3(msgI, responseRequired, handler, senderID);
}
if (sendBlocking) {
sessionChannel.sendBlocking(packet, PacketImpl.NULL_RESPONSE);
@ -579,8 +602,9 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk,
byte[] chunk,
int reconnectID,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}
@Override
@ -589,8 +613,9 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, senderID, messageHandler);
}
@Override
@ -1043,13 +1068,16 @@ public class ActiveMQSessionContext extends SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket;
if (sessionChannel.getConnection().isVersionBeforeAsyncResponseChange()) {
chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
} else {
} else if (sessionChannel.getConnection().isBeforeProducerMetricsChanged()) {
chunkPacket = new SessionSendContinuationMessage_V2(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, messageHandler);
} else {
chunkPacket = new SessionSendContinuationMessage_V3(msgI, chunk, !lastChunk, requiresResponse || confirmationWindow != -1, messageBodySize, senderID, messageHandler);
}
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAAfterFailedMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXACommitMessage;
@ -423,8 +424,10 @@ public abstract class PacketDecoder implements Serializable {
case SESS_SEND_CONTINUATION: {
if (connection.isVersionBeforeAsyncResponseChange()) {
packet = new SessionSendContinuationMessage();
} else {
} else if (connection.isBeforeProducerMetricsChanged()) {
packet = new SessionSendContinuationMessage_V2();
} else {
packet = new SessionSendContinuationMessage_V3();
}
break;
}

View File

@ -47,6 +47,9 @@ public class PacketImpl implements Packet {
// 2.24.0
public static final int ARTEMIS_2_24_0_VERSION = 133;
// 2.28.0
public static final int ARTEMIS_2_28_0_VERSION = 134;
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
@ -293,7 +296,9 @@ public class PacketImpl implements Packet {
public static final byte DISCONNECT_V3 = -19;
public static final byte CREATE_PRODUCER = -20;
public static final byte REMOVE_PRODUCER = -21;
public PacketImpl(final byte type) {
this.type = type;

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import java.util.Objects;
public class CreateProducerMessage extends PacketImpl {
protected int id;
protected SimpleString address;
public CreateProducerMessage() {
super(PacketImpl.CREATE_PRODUCER);
}
public CreateProducerMessage(int id, SimpleString address) {
super(PacketImpl.CREATE_PRODUCER);
this.id = id;
this.address = address;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public SimpleString getAddress() {
return address;
}
public void setAddress(SimpleString address) {
this.address = address;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(id);
buffer.writeNullableSimpleString(address);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
id = buffer.readInt();
address = buffer.readNullableSimpleString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
CreateProducerMessage that = (CreateProducerMessage) o;
return Objects.equals(id, that.id) && Objects.equals(address, that.address);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), id, address);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import java.util.Objects;
public class RemoveProducerMessage extends PacketImpl {
protected int id;
public RemoveProducerMessage(int id) {
super(PacketImpl.REMOVE_PRODUCER);
this.id = id;
}
public RemoveProducerMessage() {
super(PacketImpl.REMOVE_PRODUCER);
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeInt(id);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
id = buffer.readInt();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RemoveProducerMessage that = (RemoveProducerMessage) o;
return Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), id);
}
}

View File

@ -180,4 +180,8 @@ public class SessionSendContinuationMessage extends SessionContinuationMessage {
public SendAcknowledgementHandler getHandler() {
return handler;
}
public int getSenderID() {
return -1;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.utils.DataConstants;
/**
* A SessionSendContinuationMessage<br>
*/
public class SessionSendContinuationMessage_V3 extends SessionSendContinuationMessage_V2 {
private int senderID;
public SessionSendContinuationMessage_V3() {
super();
}
/**
* @param body
* @param continues
* @param requiresResponse
*/
public SessionSendContinuationMessage_V3(final Message message,
final byte[] body,
final boolean continues,
final boolean requiresResponse,
final long messageBodySize,
final int senderID,
final SendAcknowledgementHandler handler) {
super(message, body, continues, requiresResponse, messageBodySize, handler);
this.senderID = senderID;
}
@Override
public int expectedEncodeSize() {
return super.expectedEncodeSize() + DataConstants.SIZE_INT;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeInt(senderID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
senderID = buffer.readInt();
}
@Override
public int getSenderID() {
return senderID;
}
}

View File

@ -136,4 +136,7 @@ public class SessionSendMessage extends MessagePacket {
return true;
}
public int getSenderID() {
return -1;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.utils.DataConstants;
public class SessionSendMessage_V3 extends SessionSendMessage_V2 {
private int senderID;
/** This will be using the CoreMessage because it is meant for the core-protocol */
public SessionSendMessage_V3(final ICoreMessage message,
final boolean requiresResponse,
final SendAcknowledgementHandler handler,
final int senderID) {
super(message, requiresResponse, handler);
this.senderID = senderID;
}
public SessionSendMessage_V3(final CoreMessage message) {
super(message);
}
@Override
public void encodeRest(ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeInt(senderID);
}
@Override
public void decodeRest(final ActiveMQBuffer buffer) {
super.decodeRest(buffer);
senderID = buffer.readInt();
}
@Override
protected int fieldsEncodeSize() {
return super.fieldsEncodeSize() + DataConstants.SIZE_INT;
}
@Override
public int getSenderID() {
return senderID;
}
}

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientProducerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -142,7 +143,8 @@ public abstract class SessionContext {
public abstract void sendFullMessage(ICoreMessage msgI,
boolean sendBlocking,
SendAcknowledgementHandler handler,
SimpleString defaultAddress) throws ActiveMQException;
SimpleString defaultAddress,
int senderID) throws ActiveMQException;
/**
* it should return the number of credits (or bytes) used to send this packet
@ -159,6 +161,7 @@ public abstract class SessionContext {
boolean lastChunk,
byte[] chunk,
int reconnectID,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract int sendServerLargeMessageChunk(Message msgI,
@ -166,6 +169,7 @@ public abstract class SessionContext {
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
int senderID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
@ -385,4 +389,8 @@ public abstract class SessionContext {
public abstract void linkFlowControl(SimpleString address, ClientProducerCredits clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
public abstract void createProducer(ClientProducerInternal producer);
public abstract void removeProducer(int id);
}

View File

@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
activemq.version.microVersion=${activemq.version.microVersion}
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
activemq.version.versionTag=${activemq.version.versionTag}
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132,133,134

View File

@ -98,7 +98,15 @@ var Artemis;
{name: "Address", visible: true},
{name: "Remote Address", visible: true},
{name: "Local Address", visible: true},
{name: "Creation Time", visible: true}
{name: "Creation Time", visible: true},
{name: "Messages in Transit", visible: false},
{name: "Messages in Transit Size", visible: false},
{name: "Messages Delivered", visible: false},
{name: "Messages Delivered Size", visible: false},
{name: "Messages Acknowledged", visible: false},
{name: "Messages Acknowledged awaiting Commit", visible: false},
{name: "Last Delivered Time", visible: false},
{name: "Last Acknowledged Time", visible: false}
]
};
@ -130,7 +138,13 @@ var Artemis;
{id: 'queue', name: 'Queue'},
{id: 'protocol', name: 'Protocol'},
{id: 'localAddress', name: 'Local Address'},
{id: 'remoteAddress', name: 'Remote Address'}
{id: 'remoteAddress', name: 'Remote Address'},
{id: 'messagesInTransit', name: 'Messages in Transit'},
{id: 'messagesInTransitSize', name: 'Messages in Transit Size'},
{id: 'messagesDelivered', name: 'Messages Delivered'},
{id: 'messagesDeliveredSize', name: 'Messages Delivered Size'},
{id: 'messagesAcknowledged', name: 'Messages Acknowledged'},
{id: 'messagesAcknowledgedAwaitingCommit', name: 'Messages Acknowledged awaiting Commit'}
],
operationOptions: [
{id: 'EQUALS', name: 'Equals'},
@ -181,7 +195,15 @@ var Artemis;
{ header: 'Address', itemField: 'addressName' , htmlTemplate: 'consumers-anchor-column-template', colActionFn: (item) => selectAddress(item.idx) },
{ header: 'Remote Address', itemField: 'remoteAddress' },
{ header: 'Local Address', itemField: 'localAddress' },
{ header: 'Creation Time', itemField: 'creationTime' }
{ header: 'Creation Time', itemField: 'creationTime' },
{ header: 'Messages in Transit', itemField: 'messagesInTransit' },
{ header: 'Messages in Transit Size', itemField: 'messagesInTransitSize' },
{ header: 'Messages Delivered', itemField: 'messagesDelivered' },
{ header: 'Messages Delivered Size', itemField: 'messagesDeliveredSize' },
{ header: 'Messages Acknowledged', itemField: 'messagesAcknowledged' },
{ header: 'Messages Acknowledged awaiting Commit', itemField: 'messagesAcknowledgedAwaitingCommit' },
{ header: 'Last Delivered Time', itemField: 'lastDeliveredTime', templateFn: function(value) { return formatTimestamp(value);} },
{ header: 'Last Acknowledged Time', itemField: 'lastAcknowledgedTime' , templateFn: function(value) { return formatTimestamp(value);} }
];
ctrl.refresh = function () {
@ -279,6 +301,25 @@ var Artemis;
}
};
function formatTimestamp(timestamp) {
Artemis.log.info("in timestamp " + timestamp + " " + (typeof timestamp !== "number"));
if (isNaN(timestamp) || typeof timestamp !== "number") {
Artemis.log.info("returning timestamp " + timestamp + " " + (typeof timestamp !== "number"));
return timestamp;
}
if (timestamp === 0) {
return "N/A";
}
var d = new Date(timestamp);
// "yyyy-MM-dd HH:mm:ss"
//add 1 to month as getmonth returns the position not the actual month
return d.getFullYear() + "-" + pad2(d.getMonth() + 1) + "-" + pad2(d.getDate()) + " " + pad2(d.getHours()) + ":" + pad2(d.getMinutes()) + ":" + pad2(d.getSeconds());
}
function pad2(value) {
return (value < 10 ? '0' : '') + value;
}
ctrl.pagination.setOperation(ctrl.loadOperation);
function onError(response) {

View File

@ -72,6 +72,7 @@ var Artemis;
ordering: false,
columns: [
{name: "ID", visible: true},
{name: "Name", visible: true},
{name: "Session", visible: true},
{name: "Client ID", visible: true},
{name: "Protocol", visible: true},
@ -79,7 +80,10 @@ var Artemis;
{name: "Validated User", visible: false},
{name: "Address", visible: true},
{name: "Remote Address", visible: true},
{name: "Local Address", visible: true}
{name: "Local Address", visible: true},
{name: "Messages Sent", visible: false},
{name: "Messages Sent Size", visible: false},
{name: "Last Produced Message ID", visible: false}
]
};
@ -103,6 +107,7 @@ var Artemis;
ctrl.filter = {
fieldOptions: [
{id: 'id', name: 'ID'},
{id: 'name', name: 'Name'},
{id: 'session', name: 'Session'},
{id: 'clientID', name: 'Client ID'},
{id: 'user', name: 'User'},
@ -140,6 +145,7 @@ var Artemis;
};
ctrl.tableColumns = [
{ header: 'ID', itemField: 'id' },
{ header: 'Name', itemField: 'name' },
{ header: 'Session', itemField: 'session' , htmlTemplate: 'producers-anchor-column-template', colActionFn: (item) => selectSession(item.idx) },
{ header: 'Client ID', itemField: 'clientID' },
{ header: 'Protocol', itemField: 'protocol' },
@ -147,7 +153,10 @@ var Artemis;
{ header: 'Validated User', name: 'validatedUser'},
{ header: 'Address', itemField: 'addressName' , htmlTemplate: 'producers-anchor-column-template', colActionFn: (item) => selectAddress(item.idx) },
{ header: 'Remote Address', itemField: 'remoteAddress' },
{ header: 'Local Address', itemField: 'localAddress' }
{ header: 'Local Address', itemField: 'localAddress' },
{ header: 'Messages Sent', itemField: 'msgSent'},
{ header: 'Messages Sent Size', itemField: 'msgSizeSent'},
{ header: 'Last Produced Message ID', itemField: 'lastProducedMessageID'}
];
ctrl.refresh = function () {

View File

@ -41,7 +41,6 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -202,14 +201,14 @@ public class AMQPSessionCallback implements SessionCallback {
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), false);
} else {
serverSession = manager.getServer().createSession(name, connection.getUser(), connection.getPassword(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser());
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser(), false);
}
}
@ -532,7 +531,7 @@ public class AMQPSessionCallback implements SessionCallback {
OperationContext oldContext = recoverContext();
try {
if (invokeIncoming((AMQPMessage) message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection()) == null) {
serverSession.send(transaction, message, directDeliver, false, routingContext);
serverSession.send(transaction, message, directDeliver, receiver.getName(), false, routingContext);
afterIO(new IOCallback() {
@Override
@ -744,8 +743,8 @@ public class AMQPSessionCallback implements SessionCallback {
return protonSPI.invokeOutgoingInterceptors(message, connection);
}
public void addProducer(ServerProducer serverProducer) {
serverSession.addProducer(serverProducer);
public void addProducer(String name, String address) {
serverSession.addProducer(name, ProtonProtocolManagerFactory.AMQP_PROTOCOL_NAME, address);
}
public void removeProducer(String name) {

View File

@ -24,8 +24,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
@ -160,7 +158,6 @@ public class AMQPSessionContext extends ProtonInitializable {
}
public void removeReceiver(Receiver receiver) {
sessionSPI.removeProducer(receiver.getName());
receivers.remove(receiver);
}
@ -229,8 +226,7 @@ public class AMQPSessionContext extends ProtonInitializable {
AMQPMirrorControllerTarget protonReceiver = new AMQPMirrorControllerTarget(sessionSPI, connection, this, receiver, server);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
sessionSPI.addProducer(receiver.getName(), receiver.getTarget().getAddress());
receiver.setContext(protonReceiver);
HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, server.getNodeID().toString());
@ -255,8 +251,7 @@ public class AMQPSessionContext extends ProtonInitializable {
ProtonServerReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
protonReceiver.initialize();
receivers.put(receiver, protonReceiver);
ServerProducer serverProducer = new ServerProducerImpl(receiver.getName(), "AMQP", receiver.getTarget().getAddress());
sessionSPI.addProducer(serverProducer);
sessionSPI.addProducer(receiver.getName(), receiver.getTarget().getAddress());
receiver.setContext(protonReceiver);
connection.runNow(() -> {
receiver.open();

View File

@ -270,6 +270,7 @@ public class ProtonServerReceiverContext extends ProtonAbstractReceiver {
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
sessionSPI.removeProducer(receiver.getName());
org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
try {

View File

@ -169,7 +169,9 @@ public class MQTTConnectionManager {
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
server.newOperationContext(),
session.getProtocolManager().getPrefixes(),
session.getProtocolManager().getSecurityDomain(), validatedUser);
session.getProtocolManager().getSecurityDomain(),
validatedUser,
false);
return (ServerSessionImpl) serverSession;
}

View File

@ -42,8 +42,10 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
@ -72,6 +74,10 @@ public class MQTTPublishManager {
private SimpleString managementAddress;
private final String senderName = UUIDGenerator.getInstance().generateUUID().toString();
private boolean createProducer = true;
private ServerConsumer managementConsumer;
private MQTTSession session;
@ -173,6 +179,10 @@ public class MQTTPublishManager {
*/
void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
synchronized (lock) {
if (createProducer) {
session.getServerSession().addProducer(senderName, MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
createProducer = false;
}
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
@ -206,7 +216,6 @@ public class MQTTPublishManager {
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
int messageId = message.variableHeader().packetId();
if (qos < 2 || !state.getPubRec().contains(messageId)) {
if (qos == 2 && !internal)
@ -217,7 +226,7 @@ public class MQTTPublishManager {
if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
}
session.getServerSession().send(tx, serverMessage, true, false);
session.getServerSession().send(tx, serverMessage, true, senderName, false);
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@ -303,7 +312,7 @@ public class MQTTPublishManager {
if (ref != null) {
Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
//send the management message via the internal server session to bypass security.
session.getInternalServerSession().send(m, true);
session.getInternalServerSession().send(m, true, senderName);
session.getServerSession().individualAcknowledge(ref.getB(), ref.getA());
releaseFlowControl(ref.getB());
} else {

View File

@ -837,7 +837,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser);
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser, false);
}
//raise the refCount of context
@ -1261,6 +1261,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
ss.addProducer(info);
getSession(info.getProducerId().getParentId()).getCoreSession().addProducer(
info.getProducerId().toString(),
OpenWireProtocolManagerFactory.OPENWIRE_PROTOCOL_NAME,
info.getDestination() != null ? info.getDestination().getPhysicalName() : null);
}
return null;
@ -1290,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
synchronized (producerExchanges) {
producerExchanges.remove(id);
getSession(id.getParentId()).getCoreSession().removeProducer(id.toString());
}
return null;
}

View File

@ -362,6 +362,7 @@ public class AMQConsumer {
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
ref.acknowledge(transaction, serverConsumer);
serverConsumer.metricsAcknowledge(ref, transaction);
removeRolledback(ref);
}
} else if (ack.isPoisonAck()) {

View File

@ -132,7 +132,7 @@ public class AMQSession implements SessionCallback {
// now
try {
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser());
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser(), false);
} catch (Exception e) {
logger.error("error init session", e);
}
@ -392,7 +392,6 @@ public class AMQSession implements SessionCallback {
assert clientId.toString().equals(this.connection.getState().getInfo().getClientId()) : "Session cached clientId must be the same of the connection";
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, clientId);
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
@ -438,7 +437,7 @@ public class AMQSession implements SessionCallback {
restoreAutoRead();
}
getCoreSession().send(coreMsg, false, dest.isTemporary());
getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
if (count == null || count.decrementAndGet() == 0) {
if (sendProducerAck) {
@ -462,7 +461,7 @@ public class AMQSession implements SessionCallback {
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, dest.isTemporary());
getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
} catch (Exception e) {
logger.debug("Sending exception to the client", e);
exceptionToSend = e;

View File

@ -71,7 +71,7 @@ public class OpenWireConnectionTest {
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any());
Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), Mockito.anyBoolean());
OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");

View File

@ -247,7 +247,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
final String validatedUser = server.validateUser(connection.getLogin(), connection.getPasscode(), connection, getSecurityDomain());
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain(), validatedUser);
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain(), validatedUser, false);
stompSession.setServerSession(session);
sessions.put(id, stompSession);
}

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -55,11 +56,16 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
public class StompSession implements SessionCallback {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String senderName = UUIDGenerator.getInstance().generateUUID().toString();
private boolean createProducer = true;
private final StompProtocolManager manager;
private final StompConnection connection;
@ -217,6 +223,7 @@ public class StompSession implements SessionCallback {
@Override
public void closed() {
session.removeProducer(senderName);
}
@Override
@ -379,10 +386,18 @@ public class StompSession implements SessionCallback {
}
public void sendInternal(Message message, boolean direct) throws Exception {
session.send(message, direct);
if (createProducer) {
session.addProducer(senderName, STOMP_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
createProducer = false;
}
session.send(message, direct, senderName);
}
public void sendInternalLarge(CoreMessage message, boolean direct) throws Exception {
if (createProducer) {
session.addProducer(senderName, STOMP_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
createProducer = false;
}
int headerSize = message.getHeadersAndPropertiesEncodeSize();
if (headerSize >= connection.getMinLargeMessageSize()) {
throw BUNDLE.headerTooBig();
@ -402,7 +417,7 @@ public class StompSession implements SessionCallback {
largeMessage.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);
session.send(largeMessage.toMessage(), direct);
session.send(largeMessage.toMessage(), direct, senderName);
}
}

View File

@ -122,7 +122,7 @@ public abstract class AbstractControl extends StandardMBean {
Integer.MAX_VALUE, fakeConnection,
true, true, false,
false, address.toString(), fakeConnection.callback,
false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser);
false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser, false);
try {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {
@ -159,7 +159,7 @@ public abstract class AbstractControl extends StandardMBean {
}
// There's no point on direct delivery using the management thread, use false here
serverSession.send(message, false);
serverSession.send(message, false, null);
return "" + message.getMessageID();
} finally {
try {

View File

@ -79,6 +79,7 @@ import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.management.impl.view.AddressView;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionView;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerView;
import org.apache.activemq.artemis.core.management.impl.view.ProducerView;
import org.apache.activemq.artemis.core.management.impl.view.QueueView;
@ -107,6 +108,7 @@ import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
@ -2609,7 +2611,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
Set<ServerProducer> producers = new HashSet<>();
for (ServerSession session : server.getSessions()) {
producers.addAll(session.getServerProducers().values());
producers.addAll(session.getServerProducers());
}
ProducerView view = new ProducerView(server);
view.setCollection(producers);
@ -2767,7 +2769,24 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
private JsonObject toJSONObject(ServerConsumer consumer) throws Exception {
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", consumer.getID()).add("connectionID", consumer.getConnectionID().toString()).add("sessionID", consumer.getSessionID()).add("queueName", consumer.getQueue().getName().toString()).add("browseOnly", consumer.isBrowseOnly()).add("creationTime", consumer.getCreationTime()).add("deliveringCount", consumer.getDeliveringMessages().size());
List<MessageReference> deliveringMessages = consumer.getDeliveringMessages();
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ConsumerField.ID.getAlternativeName(), consumer.getID())
.add(ConsumerField.CONNECTION.getAlternativeName(), consumer.getConnectionID().toString())
.add(ConsumerField.SESSION.getAlternativeName(), consumer.getSessionID())
.add(ConsumerField.QUEUE.getAlternativeName(), consumer.getQueue().getName().toString())
.add(ConsumerField.BROWSE_ONLY.getName(), consumer.isBrowseOnly())
.add(ConsumerField.CREATION_TIME.getName(), consumer.getCreationTime())
// deliveringCount is renamed as MESSAGES_IN_TRANSIT but left in json for backward compatibility
.add(ConsumerField.MESSAGES_IN_TRANSIT.getAlternativeName(), consumer.getMessagesInTransit())
.add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), consumer.getMessagesInTransit())
.add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), consumer.getMessagesInTransitSize())
.add(ConsumerField.MESSAGES_DELIVERED.getName(), consumer.getMessagesDelivered())
.add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), consumer.getMessagesDeliveredSize())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), consumer.getMessagesAcknowledged())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), consumer.getMessagesAcknowledgedAwaitingCommit())
.add(ConsumerField.LAST_DELIVERED_TIME.getName(), consumer.getLastDeliveredTime())
.add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), consumer.getLastAcknowledgedTime());
if (consumer.getFilter() != null) {
obj.add("filter", consumer.getFilter().getFilterString().toString());
}
@ -4407,14 +4426,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
}
public String[] listTargetAddresses(final String sessionID) {
ServerSession session = server.getSessionByID(sessionID);
if (session != null) {
return session.getTargetAddresses();
}
return new String[0];
}
@Override
public void onNotification(org.apache.activemq.artemis.core.server.management.Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -1826,9 +1827,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (consumer instanceof ServerConsumer) {
ServerConsumer serverConsumer = (ServerConsumer) consumer;
JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("consumerID", serverConsumer.getID()).add("connectionID", serverConsumer.getConnectionID().toString()).add("sessionID", serverConsumer.getSessionID()).add("browseOnly", serverConsumer.isBrowseOnly()).add("creationTime", serverConsumer.getCreationTime());
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ConsumerField.ID.getAlternativeName(), serverConsumer.getID())
.add(ConsumerField.CONNECTION.getAlternativeName(), serverConsumer.getConnectionID().toString())
.add(ConsumerField.SESSION.getAlternativeName(), serverConsumer.getSessionID())
.add(ConsumerField.BROWSE_ONLY.getName(), serverConsumer.isBrowseOnly())
.add(ConsumerField.CREATION_TIME.getName(), serverConsumer.getCreationTime())
.add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), serverConsumer.getMessagesInTransit())
.add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), serverConsumer.getMessagesInTransitSize())
.add(ConsumerField.MESSAGES_DELIVERED.getName(), serverConsumer.getMessagesDelivered())
.add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), serverConsumer.getMessagesDeliveredSize())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), serverConsumer.getMessagesAcknowledged())
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), serverConsumer.getMessagesAcknowledgedAwaitingCommit())
.add(ConsumerField.LAST_DELIVERED_TIME.getName(), serverConsumer.getLastDeliveredTime())
.add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), serverConsumer.getLastAcknowledgedTime());
jsonArray.add(obj);
}

View File

@ -20,9 +20,10 @@ import java.util.Map;
import java.util.TreeMap;
public enum ConsumerField {
ID("id"),
SESSION("session"),
QUEUE("queue"),
ID("id", "consumerID"),
SESSION("session", "sessionID"),
CONNECTION("connection", "connectionID"),
QUEUE("queue", "queueName"),
FILTER("filter"),
ADDRESS("address"),
USER("user"),
@ -32,7 +33,16 @@ public enum ConsumerField {
LOCAL_ADDRESS("localAddress"),
REMOTE_ADDRESS("remoteAddress"),
QUEUE_TYPE("queueType"),
CREATION_TIME("creationTime");
BROWSE_ONLY("browseOnly"),
CREATION_TIME("creationTime"),
MESSAGES_IN_TRANSIT("messagesInTransit", "deliveringCount"),
MESSAGES_IN_TRANSIT_SIZE("messagesInTransitSize"),
MESSAGES_DELIVERED("messagesDelivered"),
MESSAGES_DELIVERED_SIZE("messagesDeliveredSize"),
MESSAGES_ACKNOWLEDGED("messagesAcknowledged"),
MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT("messagesAcknowledgedAwaitingCommit"),
LAST_DELIVERED_TIME("lastDeliveredTime"),
LAST_ACKNOWLEDGED_TIME("lastAcknowledgedTime");
private static final Map<String, ConsumerField> lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@ -44,12 +54,28 @@ public enum ConsumerField {
private final String name;
private final String alternativeName;
public String getName() {
return name;
}
/**
* There is some inconsistency with some json objects returned for consumers because they were hard coded.
* This is just to track the differences and provide backward compatibility.
* @return the old alternative name
*/
public String getAlternativeName() {
return alternativeName;
}
ConsumerField(String name) {
this(name, "");
}
ConsumerField(String name, String alternativeName) {
this.name = name;
this.alternativeName = alternativeName;
}
public static ConsumerField valueOfName(String name) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.management.impl.view;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
import java.util.Date;
@ -46,7 +47,7 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
@Override
public JsonObjectBuilder toJson(ServerConsumer consumer) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
ActiveMQServerControlImpl serverControl = server.getActiveMQServerControl();
//if session is not available then consumer is not in valid state - ignore
if (session == null) {
return null;
@ -71,7 +72,15 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
.add(ConsumerField.ADDRESS.getName(), toString(consumer.getQueueAddress()))
.add(ConsumerField.LOCAL_ADDRESS.getName(), toString(consumer.getConnectionLocalAddress()))
.add(ConsumerField.REMOTE_ADDRESS.getName(), toString(consumer.getConnectionRemoteAddress()))
.add(ConsumerField.CREATION_TIME.getName(), new Date(consumer.getCreationTime()).toString());
.add(ConsumerField.CREATION_TIME.getName(), new Date(consumer.getCreationTime()).toString())
.add(ConsumerField.MESSAGES_IN_TRANSIT.getName(), toString(consumer.getMessagesInTransit()))
.add(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName(), toString(consumer.getMessagesInTransitSize()))
.add(ConsumerField.MESSAGES_DELIVERED.getName(), toString(consumer.getMessagesDelivered()))
.add(ConsumerField.MESSAGES_DELIVERED_SIZE.getName(), toString(consumer.getMessagesDeliveredSize()))
.add(ConsumerField.MESSAGES_ACKNOWLEDGED.getName(), toString(consumer.getMessagesAcknowledged()))
.add(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName(), toString(consumer.getMessagesAcknowledgedAwaitingCommit()))
.add(ConsumerField.LAST_DELIVERED_TIME.getName(), consumer.getLastDeliveredTime())
.add(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName(), consumer.getLastAcknowledgedTime());
return obj;
}
@ -111,6 +120,22 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
return consumer.getConnectionRemoteAddress();
case CREATION_TIME:
return new Date(consumer.getCreationTime());
case MESSAGES_IN_TRANSIT:
return consumer.getMessagesInTransit();
case MESSAGES_IN_TRANSIT_SIZE:
return consumer.getMessagesInTransitSize();
case MESSAGES_DELIVERED:
return consumer.getMessagesDelivered();
case MESSAGES_DELIVERED_SIZE:
return consumer.getMessagesDeliveredSize();
case MESSAGES_ACKNOWLEDGED:
return consumer.getMessagesAcknowledged();
case MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT:
return consumer.getMessagesAcknowledgedAwaitingCommit();
case LAST_DELIVERED_TIME:
return consumer.getLastDeliveredTime();
case LAST_ACKNOWLEDGED_TIME:
return consumer.getLastAcknowledgedTime();
default:
throw new IllegalArgumentException("Unsupported field, " + fieldName);
}

View File

@ -21,15 +21,20 @@ import java.util.TreeMap;
public enum ProducerField {
ID("id"),
SESSION("session"),
NAME("name"),
SESSION("session", "sessionID"),
CONNECTION_ID("connectionID"),
ADDRESS("address"), USER("user"),
ADDRESS("address", "destination"),
USER("user"),
VALIDATED_USER("validatedUser"),
PROTOCOL("protocol"),
CLIENT_ID("clientID"),
LOCAL_ADDRESS("localAddress"),
REMOTE_ADDRESS("remoteAddress"),
CREATION_TIME("creationTime");
CREATION_TIME("creationTime"),
MESSAGE_SENT("msgSent"),
MESSAGE_SENT_SIZE("msgSizeSent"),
LAST_PRODUCED_MESSAGE_ID("lastProducedMessageID");
private static final Map<String, ProducerField> lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@ -41,12 +46,27 @@ public enum ProducerField {
private final String name;
private final String alternativeName;
public String getName() {
return name;
}
/**
* There is some inconsistency with some json objects returned for consumers because they were hard coded.
* This is just to track the differences and provide backward compatibility.
* @return the old alternative name
*/
public String getAlternativeName() {
return alternativeName;
}
ProducerField(String name) {
this(name, "");
}
ProducerField(String name, String alternativeName) {
this.name = name;
this.alternativeName = alternativeName;
}
public static ProducerField valueOfName(String name) {

View File

@ -59,6 +59,7 @@ public class ProducerView extends ActiveMQAbstractView<ServerProducer> {
JsonObjectBuilder obj = JsonLoader.createObjectBuilder()
.add(ProducerField.ID.getName(), toString(producer.getID()))
.add(ProducerField.NAME.getName(), toString(producer.getName()))
.add(ProducerField.SESSION.getName(), toString(session.getName()))
.add(ProducerField.CLIENT_ID.getName(), toString(sessionClientID))
.add(ProducerField.USER.getName(), toString(session.getUsername()))
@ -67,7 +68,10 @@ public class ProducerView extends ActiveMQAbstractView<ServerProducer> {
.add(ProducerField.ADDRESS.getName(), toString(producer.getAddress() != null ? producer.getAddress() : session.getDefaultAddress()))
.add(ProducerField.LOCAL_ADDRESS.getName(), toString(session.getRemotingConnection().getTransportConnection().getLocalAddress()))
.add(ProducerField.REMOTE_ADDRESS.getName(), toString(session.getRemotingConnection().getTransportConnection().getRemoteAddress()))
.add(ProducerField.CREATION_TIME.getName(), toString(producer.getCreationTime()));
.add(ProducerField.CREATION_TIME.getName(), toString(producer.getCreationTime()))
.add(ProducerField.MESSAGE_SENT.getName(), producer.getMessagesSent())
.add(ProducerField.MESSAGE_SENT_SIZE.getName(), producer.getMessagesSentSize())
.add(ProducerField.LAST_PRODUCED_MESSAGE_ID.getName(), toString(producer.getLastProducedMessageID()));
return obj;
}

View File

@ -59,6 +59,18 @@ public class ConsumerFilterPredicate extends ActiveMQFilterPredicate<ServerConsu
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getTransportConnection().getLocalAddress());
case REMOTE_ADDRESS:
return matches(server.getSessionByID(consumer.getSessionID()).getRemotingConnection().getTransportConnection().getRemoteAddress());
case MESSAGES_IN_TRANSIT:
return matches(consumer.getMessagesInTransit());
case MESSAGES_IN_TRANSIT_SIZE:
return matches(consumer.getMessagesInTransitSize());
case MESSAGES_DELIVERED:
return matches(consumer.getDeliveringMessages());
case MESSAGES_DELIVERED_SIZE:
return matches(consumer.getMessagesDeliveredSize());
case MESSAGES_ACKNOWLEDGED:
return matches(consumer.getMessagesAcknowledged());
case MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT:
return matches(consumer.getMessagesAcknowledgedAwaitingCommit());
}
return true;
}

View File

@ -29,11 +29,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRequ
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacketI;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RemoveProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
@ -58,16 +60,19 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSen
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_V3;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_CONNECT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CLUSTER_CONNECT_REPLY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_PRODUCER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.FEDERATION_DOWNSTREAM_CONNECT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.QUORUM_VOTE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.QUORUM_VOTE_REPLY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REMOVE_PRODUCER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_APPEND;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_APPEND_TX;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
@ -106,8 +111,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools));
} else if (connection.isVersionBeforeAsyncResponseChange()) {
sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools));
} else {
} else if (connection.isBeforeProducerMetricsChanged()) {
sendMessage = new SessionSendMessage_V2(new CoreMessage(this.coreMessageObjectPools));
} else {
sendMessage = new SessionSendMessage_V3(new CoreMessage(this.coreMessageObjectPools));
}
sendMessage.decode(in);
@ -269,6 +276,14 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
packet = new FederationDownstreamConnectMessage();
break;
}
case CREATE_PRODUCER: {
packet = new CreateProducerMessage();
break;
}
case REMOVE_PRODUCER: {
packet = new RemoveProducerMessage();
break;
}
default: {
packet = super.decode(packetType, connection);
}

View File

@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
import io.netty.util.collection.IntObjectHashMap;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
@ -39,12 +41,14 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateAddressMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSharedQueueMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NullResponseMessage_V2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RemoveProducerMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.RollbackMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@ -97,6 +101,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.activemq.artemis.utils.pools.Pool;
import org.apache.activemq.artemis.utils.SimpleFuture;
@ -108,11 +113,13 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_PRODUCER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_SHARED_QUEUE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REMOVE_PRODUCER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@ -151,6 +158,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String PRODUCER_ID_PREFIX = "artemis:sender:ID:";
private final ServerSession session;
private final StorageManager storageManager;
@ -174,6 +183,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private final Pool<NullResponseMessage_V2> poolNullResponseV2;
private final IntObjectHashMap<String> producers = new IntObjectHashMap<>();
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
final Channel channel) {
@ -323,7 +334,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_SEND_CONTINUATION: {
SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
requiresResponse = message.isRequiresResponse();
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues());
int senderID = message.getSenderID();
sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues(), senderID);
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@ -635,6 +647,28 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
break;
}
case CREATE_PRODUCER: {
CreateProducerMessage message = (CreateProducerMessage) packet;
if (!producers.containsKey(message.getId())) {
// this is used to create/destroy the producer so needs to be unique
String senderName = PRODUCER_ID_PREFIX + UUIDGenerator.getInstance().generateUUID();
producers.put(message.getId(), senderName);
session.addProducer(senderName, ActiveMQClient.DEFAULT_CORE_PROTOCOL, message.getAddress() != null ? message.getAddress().toString() : null);
} else {
ActiveMQServerLogger.LOGGER.producerAlreadyExists(message.getId(),session.getName(), remotingConnection.getRemoteAddress());
}
break;
}
case REMOVE_PRODUCER: {
RemoveProducerMessage message = (RemoveProducerMessage) packet;
String remove = producers.remove(message.getId());
if (remove != null) {
session.removeProducer(remove);
} else {
ActiveMQServerLogger.LOGGER.producerDoesNotExist(message.getId(), session.getName(), remotingConnection.getRemoteAddress());
}
break;
}
}
} catch (ActiveMQIOErrorException e) {
response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
@ -747,7 +781,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
try {
final SessionSendMessage message = (SessionSendMessage) packet;
requiresResponse = message.isRequiresResponse();
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage(), storageManager), this.direct);
this.session.send(EmbedMessageUtil.extractEmbedded(message.getMessage(), storageManager), this.direct, producers.get(message.getSenderID()));
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@ -1049,7 +1083,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
private void sendContinuations(final int packetSize,
final long messageBodySize,
final byte[] body,
final boolean continues) throws Exception {
final boolean continues,
final int senderID) throws Exception {
synchronized (largeMessageLock) {
if (currentLargeMessage == null) {
@ -1071,7 +1106,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
LargeServerMessage message = currentLargeMessage;
currentLargeMessage.setStorageManager(storageManager);
currentLargeMessage = null;
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, false);
session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, producers.get(senderID), false);
}
}
}

View File

@ -45,9 +45,7 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerProducerImpl;
import org.apache.activemq.artemis.core.version.Version;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.slf4j.Logger;
@ -183,9 +181,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser);
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
boolean isLegacyProducer = request.getVersion() < PacketImpl.ARTEMIS_2_28_0_VERSION;
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser, isLegacyProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel);
channel.setHandler(handler);
sessionCallback.setSessionHandler(handler);

View File

@ -349,7 +349,9 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes,
String securityDomain, String validatedUser) throws Exception;
String securityDomain,
String validatedUser,
boolean isLegacyProducer) throws Exception;
/** This is to be used in places where security is bypassed, like internal sessions, broker connections, etc... */
ServerSession createInternalSession(String name,
@ -364,7 +366,8 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception;
String securityDomain,
boolean isLegacyProducer) throws Exception;
/** should the server rebuild page counters upon startup.
* this will be useful on testing or an embedded broker scenario */

View File

@ -1220,7 +1220,11 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 222308, value = "Unable to listen for incoming fail-back request because {} is null. Ensure the broker has the proper cluster-connection configuration.", level = LogMessage.Level.WARN)
void failBackCheckerFailure(String component);
@LogMessage(id = 222309, value = "Trying to remove a producer with ID {} that doesnt exist from session {} on Connection {}.", level = LogMessage.Level.WARN)
void producerDoesNotExist(int id, String session, String remoteAddress);
@LogMessage(id = 222310, value = "Trying to add a producer with ID {} that already exists to session {} on Connection {}.", level = LogMessage.Level.WARN)
void producerAlreadyExists(int id, String session, String remoteAddress);
@LogMessage(id = 224000, value = "Failure in initialisation", level = LogMessage.Level.ERROR)
void initializationError(Throwable e);

View File

@ -62,4 +62,51 @@ public interface ConsumerInfo {
*/
String getConnectionRemoteAddress();
/**
* Returns how many messages are out for delivery but not yet acknowledged
* @return delivering count
*/
int getMessagesInTransit();
/**
* Returns the combined size of all the messages out for delivery but not yet acknowledged
* @return the total size of all the messages
*/
long getMessagesInTransitSize();
/**
* Returns The total number of messages sent to a consumer including redeliveries that have been acknowledged
* @return the total number of messages delivered.
*/
long getMessagesDelivered();
/**
* Returns the total size of all the messages delivered to the consumer. This includes redelivered messages
* @return The total size of all the messages
*/
long getMessagesDeliveredSize();
/**
* Returns the number of messages acknowledged by this consumer since it was created
* @return messages acknowledged
*/
long getMessagesAcknowledged();
/**
* Returns the number of acknowledged messages that are awaiting commit in a transaction
* @return th eno acknowledgements awaiting commit
*/
int getMessagesAcknowledgedAwaitingCommit();
/**
* Returns the time in milliseconds that the last message was delivered to a consumer
* @return the time of the last message delivered
*/
long getLastDeliveredTime();
/**
* Returns the time in milliseconds that the last message was acknowledged by a consumer
* @return the time of the last message was acknowledged
*/
long getLastAcknowledgedTime();
}

View File

@ -110,4 +110,11 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
long getCreationTime();
String getSessionID();
/**
* This is needed when some protocols (OW) handle the acks themselves and need to update the metrics
* @param ref the message reference
* @param transaction the tx
*/
void metricsAcknowledge(MessageReference ref, Transaction transaction);
}

View File

@ -17,7 +17,12 @@
package org.apache.activemq.artemis.core.server;
public interface ServerProducer {
String getAddress();
String ANONYMOUS = "ANONYMOUS";
long getID();
String getName();
String getAddress();;
String getProtocol();
@ -29,7 +34,14 @@ public interface ServerProducer {
String getConnectionID();
String getID();
long getCreationTime();
Object getLastProducedMessageID();
long getMessagesSent();
long getMessagesSentSize();
void updateMetrics(Object lastProducedMessageID, int encodeSize);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import javax.transaction.xa.Xid;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -332,11 +333,13 @@ public interface ServerSession extends SecurityAuth {
RoutingStatus send(Transaction tx,
Message message,
boolean direct,
String senderName,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Transaction tx,
Message message,
boolean direct,
String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
@ -345,18 +348,20 @@ public interface ServerSession extends SecurityAuth {
Message msg,
SimpleString originalAddress,
boolean direct,
String senderName,
boolean noAutoCreateQueue) throws Exception;
RoutingStatus doSend(Transaction tx,
Message msg,
SimpleString originalAddress,
boolean direct,
String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception;
RoutingStatus send(Message message, boolean direct, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Message message, boolean direct, String senderName, boolean noAutoCreateQueue) throws Exception;
RoutingStatus send(Message message, boolean direct) throws Exception;
RoutingStatus send(Message message, boolean direct, String senderName) throws Exception;
void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
@ -376,8 +381,6 @@ public interface ServerSession extends SecurityAuth {
Map<String, String> getMetaData();
String[] getTargetAddresses();
/**
* Add all the producers detail to the JSONArray object.
* This is a method to be used by the management layer.
@ -387,7 +390,6 @@ public interface ServerSession extends SecurityAuth {
*/
void describeProducersInfo(JsonArrayBuilder objs) throws Exception;
String getLastSentMessageID(String address);
long getCreationTime();
@ -522,11 +524,11 @@ public interface ServerSession extends SecurityAuth {
Pair<SimpleString, EnumSet<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
EnumSet<RoutingType> defaultRoutingTypes);
void addProducer(ServerProducer serverProducer);
void addProducer(String name, String protocol, String address);
void removeProducer(String ID);
Map<String, ServerProducer> getServerProducers();
Collection<ServerProducer> getServerProducers();
String getDefaultAddress();

View File

@ -1746,7 +1746,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final OperationContext context,
final Map<SimpleString, RoutingType> prefixes,
final String securityDomain,
String validatedUser) throws Exception {
String validatedUser,
boolean isLegacyProducer) throws Exception {
if (validatedUser == null) {
validatedUser = validateUser(username, password, connection, securityDomain);
}
@ -1758,7 +1759,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, prefixes);
}
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain);
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain, isLegacyProducer);
return session;
}
@ -1786,8 +1787,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception {
ServerSessionImpl session = internalCreateSession(name, null, null, null, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain);
String securityDomain,
boolean isLegacyProducer) throws Exception {
ServerSessionImpl session = internalCreateSession(name, null, null, null, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes, securityDomain, isLegacyProducer);
session.disableSecurity();
return session;
}
@ -1864,14 +1866,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
OperationContext context,
boolean autoCreateQueues,
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception {
String securityDomain,
boolean isLegacyProducer) throws Exception {
if (hasBrokerSessionPlugins()) {
callBrokerSessionPlugins(plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection,
autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes));
}
ServerSessionImpl session = new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes, securityDomain);
ServerSessionImpl session = new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, pagingManager, prefixes, securityDomain, isLegacyProducer);
sessions.put(name, session);

View File

@ -26,7 +26,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Function;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -61,6 +63,8 @@ import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@ -153,14 +157,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private AtomicLong messageConsumedSnapshot = new AtomicLong(0);
private long acks;
private boolean requiresLegacyPrefix = false;
private boolean anycast = false;
private boolean isClosed = false;
ServerConsumerMetrics metrics = new ServerConsumerMetrics();
public ServerConsumerImpl(final long id,
final ServerSession session,
@ -355,6 +359,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return this.session.getName();
}
@Override
public void metricsAcknowledge(MessageReference ref, Transaction transaction) {
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), transaction);
}
@Override
public List<MessageReference> getDeliveringMessages() {
synchronized (lock) {
@ -441,6 +450,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
deliveringRefs.add(ref);
}
metrics.addMessage(ref.getMessage().getEncodeSize());
ref.handled();
ref.setConsumerId(this.id);
@ -466,7 +477,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (preAcknowledge) {
// With pre-ack, we ack *before* sending to the client
ref.getQueue().acknowledge(ref, this);
acks++;
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
}
}
@ -704,9 +715,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
final List<MessageReference> refs = new ArrayList<>(deliveringRefs.size());
MessageReference ref;
while ((ref = deliveringRefs.poll()) != null) {
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
if (performACK) {
ref.acknowledge(tx, this);
performACK = false;
} else {
refs.add(ref);
@ -911,8 +922,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ref.acknowledge(tx, this);
ackedRefs.add(ref.getMessageID());
acks++;
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
}
while (ref.getMessageID() != messageID);
@ -973,11 +983,9 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.markAsRollbackOnly(ils);
throw ils;
}
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), tx);
ref.acknowledge(tx, this);
acks++;
if (startedTransaction) {
tx.commit();
}
@ -1016,7 +1024,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (!failed) {
ref.decrementDeliveryCount();
}
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
ref.getQueue().cancel(ref, System.currentTimeMillis());
}
@ -1032,7 +1040,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (ref == null) {
return; // nothing to be done
}
metrics.addAcknowledge(ref.getMessage().getEncodeSize(), null);
ref.getQueue().sendToDeadLetterAddress(null, ref);
}
@ -1040,6 +1048,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public synchronized void backToDelivering(MessageReference reference) {
synchronized (lock) {
deliveringRefs.addFirst(reference);
metrics.addMessage(reference.getMessage().getEncodeSize());
}
}
@ -1059,10 +1068,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
if (deliveringRefs.peek().getMessage().getMessageID() == messageID) {
return deliveringRefs.poll();
MessageReference ref = deliveringRefs.poll();
return ref;
}
//slow path in a separate method
return removeDeliveringRefById(messageID);
MessageReference ref = removeDeliveringRefById(messageID);
return ref;
}
}
@ -1114,6 +1125,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
public float getRate() {
float timeSlice = ((System.currentTimeMillis() - consumerRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
long acks = metrics.getMessagesAcknowledged();
if (timeSlice == 0) {
messageConsumedSnapshot.getAndSet(acks);
return 0.0f;
@ -1513,7 +1525,143 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress();
}
@Override
public long getMessagesInTransitSize() {
return metrics.getMessagesInTransitSize();
}
@Override
public int getMessagesInTransit() {
return deliveringRefs.size();
}
@Override
public long getLastDeliveredTime() {
return metrics.getLastDeliveredTime();
}
@Override
public long getLastAcknowledgedTime() {
return metrics.getLastAcknowledgedTime();
}
@Override
public long getMessagesAcknowledged() {
return metrics.getMessagesAcknowledged();
}
@Override
public long getMessagesDeliveredSize() {
return metrics.getMessagesDeliveredSize();
}
@Override
public long getMessagesDelivered() {
return metrics.getMessagesDelivered();
}
@Override
public int getMessagesAcknowledgedAwaitingCommit() {
return metrics.getMessagesAcknowledgedAwaitingCommit();
}
public SessionCallback getCallback() {
return callback;
}
static class ServerConsumerMetrics extends TransactionOperationAbstract {
/**
* Since messages can be delivered (incremented) and acknowledged (decremented) at the same time we have to protect
* the encode size and make it atomic. The other fields are ok since they are only accessed from a single thread.
*/
private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesInTransitSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesInTransitSize");
private volatile long messagesInTransitSize = 0;
private static final AtomicIntegerFieldUpdater<ServerConsumerMetrics> messagesAcknowledgedAwaitingCommitUpdater = AtomicIntegerFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledgedAwaitingCommit");
private volatile int messagesAcknowledgedAwaitingCommit = 0;
private static final AtomicLongFieldUpdater<ServerConsumerMetrics>messagesDeliveredSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDeliveredSize");
private volatile long messagesDeliveredSize = 0;
private volatile long lastDeliveredTime = 0;
private volatile long lastAcknowledgedTime = 0;
private static final AtomicLongFieldUpdater<ServerConsumerMetrics>messagesDeliveredUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesDelivered");
private volatile long messagesDelivered = 0;
private static final AtomicLongFieldUpdater<ServerConsumerMetrics> messagesAcknowledgedUpdater = AtomicLongFieldUpdater.newUpdater(ServerConsumerMetrics.class, "messagesAcknowledged");
private volatile long messagesAcknowledged = 0;
public long getMessagesInTransitSize() {
return messagesInTransitSizeUpdater.get(this);
}
public long getMessagesDeliveredSize() {
return messagesDeliveredSizeUpdater.get(this);
}
public long getLastDeliveredTime() {
return lastDeliveredTime;
}
public long getLastAcknowledgedTime() {
return lastAcknowledgedTime;
}
public long getMessagesDelivered() {
return messagesDeliveredUpdater.get(this);
}
public long getMessagesAcknowledged() {
return messagesAcknowledgedUpdater.get(this);
}
public int getMessagesAcknowledgedAwaitingCommit() {
return messagesAcknowledgedAwaitingCommitUpdater.get(this);
}
public void addMessage(int encodeSize) {
messagesInTransitSizeUpdater.addAndGet(this, encodeSize);
messagesDeliveredSizeUpdater.addAndGet(this, encodeSize);
messagesDeliveredUpdater.addAndGet(this, 1);
lastDeliveredTime = System.currentTimeMillis();
}
public void addAcknowledge(int encodeSize, Transaction tx) {
messagesInTransitSizeUpdater.addAndGet(this, -encodeSize);
messagesAcknowledgedUpdater.addAndGet(this, 1);
lastAcknowledgedTime = System.currentTimeMillis();
if (tx != null) {
addOperation(tx);
messagesAcknowledgedAwaitingCommitUpdater.addAndGet(this, 1);
}
}
@Override
public void afterCommit(Transaction tx) {
messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
}
@Override
public void afterRollback(Transaction tx) {
messagesAcknowledgedAwaitingCommitUpdater.set(this, 0);
}
public void addOperation(Transaction tx) {
Object property = tx.getProperty(TransactionPropertyIndexes.CONSUMER_METRICS_OPERATION);
if (property == null) {
tx.putProperty(TransactionPropertyIndexes.CONSUMER_METRICS_OPERATION, this);
tx.addOperation(this);
}
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.utils.collections.MaxSizeMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ServerLegacyProducersImpl implements ServerProducers {
private static final int MAX_PRODUCER_METRIC_SIZE = 100;
protected final Map<String, ServerProducer> producers = Collections.synchronizedMap(new MaxSizeMap<>(MAX_PRODUCER_METRIC_SIZE));
private final String sessionName;
private final String connectionID;
public ServerLegacyProducersImpl(ServerSession session) {
this.sessionName = session.getName();
this.connectionID = session.getConnectionID() != null ? session.getConnectionID().toString() : null;
}
@Override
public Map<String, ServerProducer> cloneProducers() {
return new HashMap<>(producers);
}
@Override
public Collection<ServerProducer> getServerProducers() {
return new ArrayList<>(producers.values());
}
@Override
public ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession) {
String address = msg.getAddress();
String name = sessionName + ":" + address;
ServerProducer producer = producers.get(name);
if (producer == null) {
producer = new ServerProducerImpl(name, "CORE", address);
producer.setSessionID(sessionName);
producer.setConnectionID(connectionID);
producers.put(name, producer);
}
return producer;
}
@Override
public void put(String id, ServerProducer producer) {
//never called as we track by address the old way
}
@Override
public void remove(String id) {
//never called as we track by address the old way
}
@Override
public void clear() {
producers.clear();
}
}

View File

@ -18,23 +18,50 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ServerProducer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class ServerProducerImpl implements ServerProducer {
private final String ID;
private static final AtomicLong PRODUCER_ID_GENERATOR = new AtomicLong();
private final long ID;
private final String name;
private final String protocol;
private final long creationTime;
private volatile long messagesSent = 0;
private volatile long messagesSentSize = 0;
private static final AtomicLongFieldUpdater<ServerProducerImpl> messagesSentUpdater = AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSent");
private static final AtomicLongFieldUpdater<ServerProducerImpl> messagesSentSizeUpdater = AtomicLongFieldUpdater.newUpdater(ServerProducerImpl.class, "messagesSentSize");
private final String address;
private volatile Object lastProducedMessageID;
private String sessionID;
private String connectionID;
public ServerProducerImpl(String ID, String protocol, String address) {
this.ID = ID;
public ServerProducerImpl(String name, String protocol, String address) {
this.ID = PRODUCER_ID_GENERATOR.incrementAndGet();
this.name = name;
this.protocol = protocol;
this.address = address;
this.creationTime = System.currentTimeMillis();
}
@Override
public long getID() {
return ID;
}
@Override
public String getName() {
return name;
}
@Override
public String getAddress() {
return address;
@ -52,7 +79,6 @@ public class ServerProducerImpl implements ServerProducer {
@Override
public void setConnectionID(String connectionID) {
this.connectionID = connectionID;
}
@ -66,13 +92,30 @@ public class ServerProducerImpl implements ServerProducer {
return connectionID;
}
@Override
public String getID() {
return ID;
}
@Override
public long getCreationTime() {
return creationTime;
}
@Override
public void updateMetrics(Object lastProducedMessageID, int encodeSize) {
messagesSentUpdater.addAndGet(this, 1);
messagesSentSizeUpdater.getAndAdd(this, encodeSize);
this.lastProducedMessageID = lastProducedMessageID;
}
@Override
public Object getLastProducedMessageID() {
return lastProducedMessageID;
}
@Override
public long getMessagesSent() {
return messagesSent;
}
@Override
public long getMessagesSentSize() {
return messagesSentSize;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ServerProducer;
import java.util.Collection;
import java.util.Map;
public interface ServerProducers {
Map<String, ServerProducer> cloneProducers();
Collection<ServerProducer> getServerProducers();
ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession);
void put(String id, ServerProducer producer);
void remove(String id);
void clear();
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ServerProducer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ServerProducersImpl implements ServerProducers {
protected final Map<String, ServerProducer> producers = new ConcurrentHashMap<>();
@Override
public Map<String, ServerProducer> cloneProducers() {
return new HashMap<>(producers);
}
@Override
public Collection<ServerProducer> getServerProducers() {
return new ArrayList<>(producers.values());
}
@Override
public ServerProducer getServerProducer(String senderName, Message msg, ServerSessionImpl serverSession) {
if (senderName != null) {
return producers.get(senderName);
}
return null;
}
@Override
public void put(String id, ServerProducer producer) {
producers.put(id, producer);
}
@Override
public void remove(String id) {
producers.remove(id);
}
@Override
public void clear() {
producers.clear();
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -25,17 +26,16 @@ import java.security.cert.X509Certificate;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -102,7 +102,6 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.MaxSizeMap;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -115,7 +114,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean securityEnabled = true;
private final String securityDomain;
@ -140,7 +138,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<>();
protected final Map<String, ServerProducer> producers = new ConcurrentHashMap<>();
protected final ServerProducers serverProducers;
protected Transaction tx;
@ -184,9 +182,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
private final OperationContext context;
// Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
protected final Map<SimpleString, Pair<Object, AtomicLong>> targetAddressInfos = new MaxSizeMap<>(100);
private final long creationTime = System.currentTimeMillis();
// to prevent session from being closed twice.
@ -228,7 +223,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final OperationContext context,
final PagingManager pagingManager,
final Map<SimpleString, RoutingType> prefixes,
final String securityDomain) throws Exception {
final String securityDomain,
boolean isLegacyProducer) throws Exception {
this.username = username;
this.password = password;
@ -289,6 +285,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
sendSessionNotification(CoreNotificationType.SESSION_CREATED);
this.securityDomain = securityDomain;
if (isLegacyProducer) {
serverProducers = new ServerLegacyProducersImpl(this);
} else {
serverProducers = new ServerProducersImpl();
}
}
// ServerSession implementation ---------------------------------------------------------------------------
@ -450,7 +451,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
consumers.clear();
producers.clear();
serverProducers.clear();
if (closeables != null) {
for (Closeable closeable : closeables) {
@ -1186,10 +1187,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
remotingConnection.removeFailureListener(cleaner);
}
if (server.getAddressInfo(unPrefixedQueueName) == null) {
targetAddressInfos.remove(queueToDelete);
}
}
@Override
@ -1790,29 +1787,32 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public RoutingStatus send(final Message message, final boolean direct) throws Exception {
return send(message, direct, false);
public RoutingStatus send(final Message message, final boolean direct, final String senderName) throws Exception {
return send(message, direct, senderName,false);
}
@Override
public RoutingStatus send(final Message message,
final boolean direct,
final String senderName,
boolean noAutoCreateQueue) throws Exception {
return send(getCurrentTransaction(), message, direct, noAutoCreateQueue);
return send(getCurrentTransaction(), message, direct, senderName, noAutoCreateQueue);
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message msg,
final boolean direct,
final String senderName,
boolean noAutoCreateQueue) throws Exception {
return send(tx, msg, direct, noAutoCreateQueue, routingContext);
return send(tx, msg, direct, senderName, noAutoCreateQueue, routingContext);
}
@Override
public synchronized RoutingStatus send(Transaction tx,
Message messageParameter,
final boolean direct,
final String senderName,
boolean noAutoCreateQueue,
RoutingContext routingContext) throws Exception {
final Message message = LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
@ -1865,7 +1865,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
result = handleManagementMessage(tx, message, direct);
} else {
result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
result = doSend(tx, message, address, direct, senderName, noAutoCreateQueue, routingContext);
}
if (AuditLogger.isMessageLoggingEnabled()) {
@ -1975,30 +1975,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return metaData;
}
@Override
public String[] getTargetAddresses() {
Map<SimpleString, Pair<Object, AtomicLong>> copy = cloneTargetAddresses();
Iterator<SimpleString> iter = copy.keySet().iterator();
int num = copy.keySet().size();
String[] addresses = new String[num];
int i = 0;
while (iter.hasNext()) {
addresses[i] = iter.next().toString();
i++;
}
return addresses;
}
@Override
public String getLastSentMessageID(String address) {
Pair<Object, AtomicLong> value = targetAddressInfos.get(SimpleString.toSimpleString(address));
if (value != null) {
return value.getA().toString();
} else {
return null;
}
}
@Override
public long getCreationTime() {
return this.creationTime;
@ -2010,14 +1986,22 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void describeProducersInfo(JsonArrayBuilder array) throws Exception {
Map<SimpleString, Pair<Object, AtomicLong>> targetCopy = cloneTargetAddresses();
for (Map.Entry<SimpleString, Pair<Object, AtomicLong>> entry : targetCopy.entrySet()) {
Map<String, ServerProducer> targetCopy = cloneProducers();
for (Map.Entry<String, ServerProducer> entry : targetCopy.entrySet()) {
String uuid = null;
if (entry.getValue().getA() != null) {
uuid = entry.getValue().getA().toString();
if (entry.getValue().getLastProducedMessageID() != null) {
uuid = entry.getValue().getLastProducedMessageID().toString();
}
JsonObjectBuilder producerInfo = JsonLoader.createObjectBuilder().add("connectionID", this.getConnectionID().toString()).add("sessionID", this.getName()).add("destination", entry.getKey().toString()).add("lastUUIDSent", uuid, JsonValue.NULL).add("msgSent", entry.getValue().getB().longValue());
JsonObjectBuilder producerInfo = JsonLoader.createObjectBuilder()
.add(ProducerField.ID.getName(), String.valueOf(entry.getValue().getID()))
.add(ProducerField.NAME.getName(), entry.getValue().getName())
.add(ProducerField.CONNECTION_ID.getName(), this.getConnectionID().toString())
.add(ProducerField.SESSION.getAlternativeName(), this.getName())
.add(ProducerField.CREATION_TIME.getName(), String.valueOf(entry.getValue().getCreationTime()))
.add(ProducerField.ADDRESS.getAlternativeName(), entry.getValue().getAddress())
.add(ProducerField.LAST_PRODUCED_MESSAGE_ID.getName(), uuid, JsonValue.NULL)
.add(ProducerField.MESSAGE_SENT.getName(), entry.getValue().getMessagesSent())
.add(ProducerField.MESSAGE_SENT_SIZE.getName(), entry.getValue().getMessagesSentSize());
array.add(producerInfo);
}
}
@ -2098,8 +2082,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
connectionFailed(me, failedOver);
}
public Map<SimpleString, Pair<Object, AtomicLong>> cloneTargetAddresses() {
return new HashMap<>(targetAddressInfos);
public Map<String, ServerProducer> cloneProducers() {
return serverProducers.cloneProducers();
}
private void setStarted(final boolean s) {
@ -2140,7 +2124,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
reply.setAddress(replyTo);
doSend(tx, reply, null, direct, false, routingContext);
doSend(tx, reply, null, direct, null, false, routingContext);
}
return RoutingStatus.OK;
}
@ -2202,8 +2186,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final String senderName,
final boolean noAutoCreateQueue) throws Exception {
return doSend(tx, msg, originalAddress, direct, noAutoCreateQueue, routingContext);
return doSend(tx, msg, originalAddress, direct, senderName, noAutoCreateQueue, routingContext);
}
@ -2212,6 +2197,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final Message msg,
final SimpleString originalAddress,
final boolean direct,
final String senderName,
final boolean noAutoCreateQueue,
final RoutingContext routingContext) throws Exception {
@ -2272,14 +2258,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
logger.debug("Routing result for {} = {}", msg, result);
Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
if (value == null) {
targetAddressInfos.put(msg.getAddressSimpleString(), new Pair<>(msg.getUserID(), new AtomicLong(1)));
} else {
value.setA(msg.getUserID());
value.getB().incrementAndGet();
}
updateProducerMetrics(msg, senderName);
} finally {
if (!routingContext.isReusable()) {
routingContext.clear();
@ -2395,20 +2374,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public void addProducer(ServerProducer serverProducer) {
serverProducer.setSessionID(getName());
serverProducer.setConnectionID(getConnectionID().toString());
producers.put(serverProducer.getID(), serverProducer);
public void addProducer(String name, String protocol, String address) {
ServerProducer producer = new ServerProducerImpl(name, protocol, address != null ? address : ServerProducer.ANONYMOUS);
producer.setSessionID(getName());
producer.setConnectionID(getConnectionID() != null ? getConnectionID().toString() : null);
serverProducers.put(name, producer);
}
@Override
public void removeProducer(String ID) {
producers.remove(ID);
serverProducers.remove(ID);
}
@Override
public Map<String, ServerProducer> getServerProducers() {
return Collections.unmodifiableMap(new HashMap(producers));
public Collection<ServerProducer> getServerProducers() {
return serverProducers.getServerProducers();
}
@Override
@ -2436,4 +2416,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public String toManagementString() {
return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]";
}
private void updateProducerMetrics(Message msg, String senderName) {
ServerProducer serverProducer = serverProducers.getServerProducer(senderName, msg, this);
if (serverProducer != null) {
serverProducer.updateMetrics(msg.getUserID(), msg instanceof LargeServerMessageImpl ? ((LargeServerMessageImpl)msg).getBodyBufferSize() : msg.getEncodeSize());
}
}
}

View File

@ -35,4 +35,6 @@ public class TransactionPropertyIndexes {
public static final int PAGE_CURSOR_POSITIONS = 8;
public static final int EXPIRY_LOGGER = 9;
public static final int CONSUMER_METRICS_OPERATION = 10;
}

View File

@ -48,7 +48,7 @@ public class TransactionImpl implements Transaction {
private List<TransactionOperation> storeOperations;
private static final int INITIAL_NUM_PROPERTIES = 10;
private static final int INITIAL_NUM_PROPERTIES = 11;
private Object[] properties = null;

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -136,6 +137,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@ -2627,6 +2629,15 @@ public abstract class ActiveMQTestBase extends Assert {
return !hadToInterrupt;
}
public String createJsonFilter(String fieldName, String operationName, String value) {
HashMap<String, Object> filterMap = new HashMap<>();
filterMap.put("field", fieldName);
filterMap.put("operation", operationName);
filterMap.put("value", value);
JsonObject jsonFilterObject = JsonUtil.toJsonObject(filterMap);
return jsonFilterObject.toString();
}
protected static ReplicationEndpoint getReplicationEndpoint(ActiveMQServer server) {
final Activation activation = server.getActivation();
if (activation instanceof SharedNothingBackupActivation) {

View File

@ -182,7 +182,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.incrementingVersion>134,133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>

View File

@ -180,6 +180,26 @@ public class DummyServerConsumer implements ServerConsumer {
return null;
}
@Override
public void metricsAcknowledge(MessageReference ref, Transaction transaction) {
}
@Override
public long getMessagesInTransitSize() {
return 0;
}
@Override
public long getMessagesDeliveredSize() {
return 0;
}
@Override
public long getMessagesDelivered() {
return 0;
}
@Override
public void promptDelivery() {
@ -278,4 +298,29 @@ public class DummyServerConsumer implements ServerConsumer {
// TODO Auto-generated method stub
return null;
}
@Override
public int getMessagesInTransit() {
return 0;
}
@Override
public long getLastDeliveredTime() {
return 0;
}
@Override
public long getLastAcknowledgedTime() {
return 0;
}
@Override
public long getMessagesAcknowledged() {
return 0;
}
@Override
public int getMessagesAcknowledgedAwaitingCommit() {
return 0;
}
}

View File

@ -595,8 +595,9 @@ public class HangConsumerTest extends ActiveMQTestBase {
OperationContext context,
boolean autoCreateQueue,
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception {
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager(), prefixes, securityDomain);
String securityDomain,
boolean isLegacyProducer) throws Exception {
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager(), prefixes, securityDomain, isLegacyProducer);
}
}

View File

@ -29,6 +29,7 @@ import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@ -61,6 +62,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
@ -2570,6 +2572,79 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.commit();
}
@Test
public void testSendServerMessageMetrics() throws Exception {
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(false, false);
LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
fileMessage.setMessageID(1005);
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources(false, false);
session.createQueue(new QueueConfiguration(ADDRESS));
ClientProducer prod = session.createProducer(ADDRESS);
prod.send(fileMessage);
fileMessage.deleteFile();
session.commit();
Collection<ServerProducer> serverProducers = server.getSessions().iterator().next().getServerProducers();
Assert.assertEquals(1, serverProducers.size());
ServerProducer producer = serverProducers.iterator().next();
Assert.assertEquals(1, producer.getMessagesSent());
Assert.assertEquals(largeMessageSize, producer.getMessagesSentSize());
fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
fileMessage.setMessageID(1006);
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources(false, false);
prod.send(fileMessage);
fileMessage.deleteFile();
session.commit();
serverProducers = server.getSessions().iterator().next().getServerProducers();
Assert.assertEquals(1, serverProducers.size());
producer = serverProducers.iterator().next();
Assert.assertEquals(2, producer.getMessagesSent());
Assert.assertEquals(largeMessageSize * 2, producer.getMessagesSentSize());
}
@Override

View File

@ -279,7 +279,7 @@ public class TemporaryDestinationTest extends JMSTestBase {
producer.send(s.createMessage());
temporaryQueue.delete();
for (ServerSession serverSession : server.getSessions()) {
assertFalse(((ServerSessionImpl)serverSession).cloneTargetAddresses().containsKey(SimpleString.toSimpleString(temporaryQueue.getQueueName())));
assertFalse(((ServerSessionImpl)serverSession).cloneProducers().containsKey(temporaryQueue.getQueueName()));
}
Wait.assertTrue(() -> server.locateQueue(temporaryQueue.getQueueName()) == null, 1000, 100);
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(temporaryQueue.getQueueName())) == null, 1000, 100);
@ -323,26 +323,6 @@ public class TemporaryDestinationTest extends JMSTestBase {
Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(temporaryQueue.getQueueName())) == null, 1000, 100);
}
@Test
public void testForTempQueueTargetInfosSizeLimit() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 200; i++) {
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
MessageProducer producer = s.createProducer(temporaryQueue);
producer.send(s.createMessage());
}
for (ServerSession serverSession : server.getSessions()) {
assertTrue(((ServerSessionImpl)serverSession).cloneTargetAddresses().size() <= 100);
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testForSecurityCacheLeak() throws Exception {
server.getSecurityStore().setSecurityEnabled(true);

View File

@ -31,6 +31,7 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
public ActiveMQServerControlUsingCoreTest(boolean legacyCreateQueue) {
super(legacyCreateQueue);
extraProducers = 1;
}

View File

@ -19,14 +19,24 @@ package org.apache.activemq.artemis.tests.integration.management;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerTestAccessor;
import org.apache.activemq.artemis.core.server.impl.QueueImplTestAccessor;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
@ -466,6 +476,10 @@ public class QueueControlTest extends ManagementTestBase {
assertEquals(1, obj.size());
assertEquals(0, obj.get(0).asJsonObject().getInt(ConsumerField.LAST_DELIVERED_TIME.getName()));
assertEquals(0, obj.get(0).asJsonObject().getInt(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()));
consumer.close();
Assert.assertEquals(0, queueControl.getConsumerCount());
@ -476,6 +490,442 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testGetConsumerWithMessagesJSON() throws Exception {
long currentTime = System.currentTimeMillis();
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
QueueControl queueControl = createManagementControl(address, queue);
ClientProducer producer = session.createProducer(address);
for (int i = 0; i < 10; i++) {
producer.send(session.createMessage(true));
}
Wait.assertEquals(0, () -> queueControl.getConsumerCount());
ClientConsumer consumer = session.createConsumer(queue);
Wait.assertEquals(1, () -> queueControl.getConsumerCount());
session.start();
ClientMessage clientMessage = null;
int size = 0;
for (int i = 0; i < 5; i++) {
clientMessage = consumer.receiveImmediate();
size += clientMessage.getEncodeSize();
}
JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(5, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
JsonObject jsonObject = obj.get(0).asJsonObject();
assertEquals(5, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(size, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(5, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertEquals(size, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
long lastDelivered = jsonObject.getJsonNumber(ConsumerField.LAST_DELIVERED_TIME.getName()).longValue();
assertTrue(lastDelivered > currentTime);
assertEquals( 0, jsonObject.getInt(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()));
clientMessage.acknowledge();
session.commit();
Wait.assertEquals(5, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
long lastAcked = jsonObject.getJsonNumber(ConsumerField.LAST_ACKNOWLEDGED_TIME.getName()).longValue();
assertTrue("lastAcked = " + lastAcked + " lastDelivered = " + lastDelivered, lastAcked >= lastDelivered);
currentTime = System.currentTimeMillis();
//now make sure they fall between the test time window
assertTrue("currentTime = " + currentTime + " lastAcked = " + lastAcked,currentTime >= lastAcked);
consumer.close();
Assert.assertEquals(0, queueControl.getConsumerCount());
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(0, obj.size());
session.deleteQueue(queue);
}
@Test
public void testGetConsumerMessageCountsAutoAckCore() throws Exception {
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
factory.setConsumerWindowSize(1);
testGetConsumerMessageCountsAutoAck(factory, false);
}
@Test
public void testGetConsumerMessageCountsAutoAckAMQP() throws Exception {
testGetConsumerMessageCountsAutoAck(new JmsConnectionFactory("amqp://localhost:61616"), false);
}
@Test
public void testGetConsumerMessageCountsAutoAckOpenWire() throws Exception {
testGetConsumerMessageCountsAutoAck(new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"), false);
}
@Test
public void testGetConsumerMessageCountsAutoAckCoreIndividualAck() throws Exception {
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
testGetConsumerMessageCountsAutoAck(factory, true);
}
public void testGetConsumerMessageCountsAutoAck(ConnectionFactory factory, boolean usePriority) throws Exception {
SimpleString queueName = RandomUtil.randomSimpleString();
this.session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
QueueControl queueControl = createManagementControl(queueName, queueName, RoutingType.ANYCAST);
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 100; i++) {
javax.jms.Message message = session.createMessage();
if (usePriority) {
producer.send(message, DeliveryMode.PERSISTENT, 1, javax.jms.Message.DEFAULT_TIME_TO_LIVE);
} else {
producer.send(message);
}
}
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
javax.jms.Message message = null;
for (int i = 0; i < 100; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("message " + i + " not received",message);
}
JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
JsonObject jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertTrue(jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()) > 0);
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
consumer.close();
} finally {
connection.close();
}
}
@Test
public void testGetConsumerMessageCountsClientAckCore() throws Exception {
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
factory.setConsumerWindowSize(1);
testGetConsumerMessageCountsClientAck(factory, false);
}
@Test
public void testGetConsumerMessageCountsClientAckAMQP() throws Exception {
testGetConsumerMessageCountsClientAck(new JmsConnectionFactory("amqp://localhost:61616"), false);
}
@Test
public void testGetConsumerMessageCountsClientAckOpenWire() throws Exception {
testGetConsumerMessageCountsClientAck(new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"), false);
}
public void testGetConsumerMessageCountsClientAck(ConnectionFactory factory, boolean usePriority) throws Exception {
SimpleString queueName = RandomUtil.randomSimpleString();
this.session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
QueueControl queueControl = createManagementControl(queueName, queueName, RoutingType.ANYCAST);
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 100; i++) {
javax.jms.Message message = session.createMessage();
if (usePriority) {
producer.send(message, DeliveryMode.PERSISTENT, 1, javax.jms.Message.DEFAULT_TIME_TO_LIVE);
} else {
producer.send(message);
}
}
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
javax.jms.Message message = null;
for (int i = 0; i < 100; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("message " + i + " not received",message);
message.acknowledge();
}
JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
JsonObject jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertTrue(jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()) > 0);
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
consumer.close();
} finally {
connection.close();
}
}
@Test
public void testGetConsumerMessageCountsTransactedCore() throws Exception {
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
factory.setConsumerWindowSize(1);
testGetConsumerMessageCountsTransacted(factory, false);
}
@Test
public void testGetConsumerMessageCountsTransactedAMQP() throws Exception {
testGetConsumerMessageCountsTransacted(new JmsConnectionFactory("amqp://localhost:61616"), false);
}
@Test
public void testGetConsumerMessageCountsTransactedOpenWire() throws Exception {
testGetConsumerMessageCountsTransacted(new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"), false);
}
public void testGetConsumerMessageCountsTransacted(ConnectionFactory factory, boolean usePriority) throws Exception {
SimpleString queueName = RandomUtil.randomSimpleString();
this.session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
QueueControl queueControl = createManagementControl(queueName, queueName, RoutingType.ANYCAST);
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
javax.jms.Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 100; i++) {
javax.jms.Message message = session.createMessage();
if (usePriority) {
producer.send(message, DeliveryMode.PERSISTENT, 1, javax.jms.Message.DEFAULT_TIME_TO_LIVE);
} else {
producer.send(message);
}
}
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
javax.jms.Message message = null;
for (int i = 0; i < 100; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("message " + i + " not received",message);
}
session.commit();
JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
JsonObject jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertTrue(jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()) > 0);
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
consumer.close();
} finally {
connection.close();
}
}
@Test
public void testGetConsumerMessageCountsTransactedXACore() throws Exception {
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY));
factory.setConsumerWindowSize(1);
testGetConsumerMessageCountsTransactedXA(factory, false);
}
@Test
public void testGetConsumerMessageCountsTransactedXAOpenWire() throws Exception {
testGetConsumerMessageCountsTransactedXA(new org.apache.activemq.ActiveMQXAConnectionFactory("tcp://localhost:61616"), false);
}
public void testGetConsumerMessageCountsTransactedXA(XAConnectionFactory factory, boolean usePriority) throws Exception {
SimpleString queueName = RandomUtil.randomSimpleString();
this.session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
QueueControl queueControl = createManagementControl(queueName, queueName, RoutingType.ANYCAST);
XAConnection connection = factory.createXAConnection();
try {
XASession session = connection.createXASession();
XidImpl xid = newXID();
javax.jms.Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = 0; i < 100; i++) {
javax.jms.Message message = session.createMessage();
if (usePriority) {
producer.send(message, DeliveryMode.PERSISTENT, 1, javax.jms.Message.DEFAULT_TIME_TO_LIVE);
} else {
producer.send(message);
}
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().commit(xid, true);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
xid = newXID();
javax.jms.Message message = null;
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = 0; i < 100; i++) {
message = consumer.receive(5000);
Assert.assertNotNull("message " + i + " not received",message);
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
JsonArray obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(100, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
JsonObject jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertTrue(jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()) > 0);
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
session.getXAResource().commit(xid, true);
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.size());
Wait.assertEquals(0, () -> JsonUtil.readJsonArray(queueControl.listConsumersAsJSON()).get(0).asJsonObject().getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
obj = JsonUtil.readJsonArray(queueControl.listConsumersAsJSON());
jsonObject = obj.get(0).asJsonObject();
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_IN_TRANSIT_SIZE.getName()));
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED.getName()));
assertTrue(jsonObject.getInt(ConsumerField.MESSAGES_DELIVERED_SIZE.getName()) > 0);
assertEquals(100, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED.getName()));
assertEquals(0, jsonObject.getInt(ConsumerField.MESSAGES_ACKNOWLEDGED_AWAITING_COMMIT.getName()));
consumer.close();
} finally {
connection.close();
}
}
@Test
public void testGetMessageCount() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
@ -589,7 +1039,7 @@ public class QueueControlTest extends ManagementTestBase {
if (key.equals("StringProperties")) {
// these are very verbose composite data structures
assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 2048);
assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 3000);
} else {
assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 512);
}

View File

@ -37,12 +37,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
@ -50,6 +52,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@ -168,6 +172,63 @@ public class MQTTTest extends MQTTTestSupport {
publishProvider.disconnect();
}
@Test
public void testProducerMetrics() throws Exception {
final MQTTClientProvider publishProvider = getMQTTClientProvider();
initializeConnection(publishProvider);
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
}
String filterString = createJsonFilter("", "", "");
String producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
JsonObject producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
JsonArray array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 1, array.size());
JsonObject producer = array.getJsonObject(0);
Assert.assertNotEquals(ProducerField.ID.getName(), "", producer.getString(ProducerField.ID.getName()));
Assert.assertNotEquals(ProducerField.SESSION.getName(), "", producer.getString(ProducerField.SESSION.getName()));
Assert.assertNotEquals(ProducerField.CLIENT_ID.getName(), "", producer.getString(ProducerField.CLIENT_ID.getName()));
Assert.assertEquals(ProducerField.USER.getName(), "", producer.getString(ProducerField.USER.getName()));
Assert.assertEquals(ProducerField.PROTOCOL.getAlternativeName(), "MQTT", producer.getString(ProducerField.PROTOCOL.getName()));
Assert.assertEquals(ProducerField.ADDRESS.getName(), "ANONYMOUS", producer.getString(ProducerField.ADDRESS.getName()));
Assert.assertNotEquals(ProducerField.LOCAL_ADDRESS.getName(), "", producer.getString(ProducerField.LOCAL_ADDRESS.getName()));
Assert.assertNotEquals(ProducerField.REMOTE_ADDRESS.getName(), "", producer.getString(ProducerField.REMOTE_ADDRESS.getName()));
Assert.assertNotEquals(ProducerField.CREATION_TIME.getName(), "", producer.getString(ProducerField.CREATION_TIME.getName()));
Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(), NUM_MESSAGES, producer.getInt(ProducerField.MESSAGE_SENT.getName()));
Assert.assertEquals(ProducerField.LAST_PRODUCED_MESSAGE_ID.getName(), "", producer.getString(ProducerField.LAST_PRODUCED_MESSAGE_ID.getName()));
final MQTTClientProvider publishProvider2 = getMQTTClientProvider();
initializeConnection(publishProvider2);
for (int i = 0; i < NUM_MESSAGES; i++) {
String payload = "Message " + i;
publishProvider2.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
}
filterString = createJsonFilter("", "", "");
producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 2, array.size());
publishProvider.disconnect();
publishProvider2.disconnect();
filterString = createJsonFilter("", "", "");
producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 0, array.size());
}
@Test(timeout = 60 * 1000)
public void testDirectDeliverFalse() throws Exception {
final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();

View File

@ -34,6 +34,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -47,6 +48,7 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
@ -59,6 +61,8 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@ -163,6 +167,44 @@ public class StompTest extends StompTestBase {
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
@Test
public void testProducerMetrics() throws Exception {
conn.connect(defUser, defPass);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!", true);
String filterString = createJsonFilter("", "", "");
String producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
JsonObject producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
JsonArray array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 1, array.size());
JsonObject producer = array.getJsonObject(0);
Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(), 1, producer.getInt(ProducerField.MESSAGE_SENT.getName()));
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!", true);
producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 1, array.size());
producer = array.getJsonObject(0);
Assert.assertEquals(ProducerField.MESSAGE_SENT.getName(), 2, producer.getInt(ProducerField.MESSAGE_SENT.getName()));
conn.closeTransport();
Wait.assertEquals(0, () -> ((JsonArray) JsonUtil.readJsonObject(server.getActiveMQServerControl().listProducers(filterString, 1, 50)).get("data")).size());
producersAsJsonString = server.getActiveMQServerControl().listProducers(filterString, 1, 50);
producersAsJsonObject = JsonUtil.readJsonObject(producersAsJsonString);
array = (JsonArray) producersAsJsonObject.get("data");
Assert.assertEquals("number of producers returned from query", 0, array.size());
}
@Test
public void testSendOverDiskFull() throws Exception {
AssertionLoggerHandler.startCapture();