ARTEMIS-1656 OpenWire scalability improvements
OpenWireFormat instances are shared between OpenWire connections/sessions/consumers, preventing the clients to scale due to the synchronized marshal/unmarshal on it. It includes: - direct transport buffer pooling - groupId SimpleString pooling - clientId SimpleString pooling - reduced ActiveMQDestination[] and AtomicLong allocations on AMQSession:send - reduced ActiveMQDestination allocations - refactored shouldBlockProducer path of AMQPSession::send to reduce method size - exclusive OpenWireFormat per session and connection (in/out) to avoid contention - refactored trace log to favour inlining - changed lastSent volatile set into lazy set to avoid full barrier cost on x86 - stateless OpenWireMessageConverter - send's lock removal thanks to thread-safe NettyConnection
This commit is contained in:
parent
aceacceb97
commit
b5fa5ed3b7
|
@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
@ -126,7 +127,6 @@ import org.apache.activemq.state.ProducerState;
|
|||
import org.apache.activemq.state.SessionState;
|
||||
import org.apache.activemq.transport.TransmitCallback;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
@ -142,9 +142,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
private boolean destroyed = false;
|
||||
|
||||
private final Object sendLock = new Object();
|
||||
//separated in/out wireFormats allow deliveries (eg async and consumers) to not slow down bufferReceived
|
||||
private final OpenWireFormat inWireFormat;
|
||||
|
||||
private final OpenWireFormat wireFormat;
|
||||
private final OpenWireFormat outWireFormat;
|
||||
|
||||
private AMQConnectionContext context;
|
||||
|
||||
|
@ -181,6 +182,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
private final OperationContext operationContext;
|
||||
|
||||
private static final AtomicLongFieldUpdater<OpenWireConnection> LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent");
|
||||
private volatile long lastSent = -1;
|
||||
private ConnectionEntry connectionEntry;
|
||||
private boolean useKeepAlive;
|
||||
|
@ -199,7 +201,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
this.server = server;
|
||||
this.operationContext = server.newOperationContext();
|
||||
this.protocolManager = openWireProtocolManager;
|
||||
this.wireFormat = wf;
|
||||
this.inWireFormat = wf;
|
||||
this.outWireFormat = wf.copy();
|
||||
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
|
||||
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
|
||||
}
|
||||
|
@ -248,8 +251,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
//tells the connection that
|
||||
//some bytes just sent
|
||||
public void bufferSent() {
|
||||
lastSent = System.currentTimeMillis();
|
||||
private void bufferSent() {
|
||||
//much cheaper than a volatile set if contended, but less precise (ie allows stale loads)
|
||||
LAST_SENT_UPDATER.lazySet(this, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
/**
|
||||
* Log packaged into a separate method for performance reasons.
|
||||
*/
|
||||
private static void traceBufferReceived(Object connectionID, Command command) {
|
||||
logger.trace("connectionID: " + connectionID + " RECEIVED: " + (command == null ? "NULL" : command));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -259,11 +270,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
recoverOperationContext();
|
||||
|
||||
Command command = (Command) wireFormat.unmarshal(buffer);
|
||||
Command command = (Command) inWireFormat.unmarshal(buffer);
|
||||
|
||||
// log the openwire command
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("connectionID: " + connectionID + " RECEIVED: " + (command == null ? "NULL" : command));
|
||||
traceBufferReceived(connectionID, command);
|
||||
}
|
||||
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
|
@ -430,7 +441,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
// send a WireFormatInfo to the peer
|
||||
public void sendHandshake() {
|
||||
WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
|
||||
WireFormatInfo info = inWireFormat.getPreferedWireFormatInfo();
|
||||
sendCommand(info);
|
||||
}
|
||||
|
||||
|
@ -438,19 +449,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
return state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log packaged into a separate method for performance reasons.
|
||||
*/
|
||||
private static void tracePhysicalSend(Connection transportConnection, Command command) {
|
||||
logger.trace("connectionID: " + (transportConnection == null ? "" : transportConnection.getID()) + " SENDING: " + (command == null ? "NULL" : command));
|
||||
}
|
||||
|
||||
public void physicalSend(Command command) throws IOException {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("connectionID: " + (getTransportConnection() == null ? "" : getTransportConnection().getID())
|
||||
+ " SENDING: " + (command == null ? "NULL" : command));
|
||||
tracePhysicalSend(transportConnection, command);
|
||||
}
|
||||
|
||||
try {
|
||||
ByteSequence bytes = wireFormat.marshal(command);
|
||||
ActiveMQBuffer buffer = OpenWireUtil.toActiveMQBuffer(bytes);
|
||||
synchronized (sendLock) {
|
||||
getTransportConnection().write(buffer, false, false);
|
||||
}
|
||||
final ByteSequence bytes = outWireFormat.marshal(command);
|
||||
final int bufferSize = bytes.length;
|
||||
final ActiveMQBuffer buffer = transportConnection.createTransportBuffer(bufferSize);
|
||||
buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
|
||||
transportConnection.write(buffer, false, false);
|
||||
bufferSent();
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
|
@ -560,8 +577,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
sendCommand(dispatch);
|
||||
}
|
||||
|
||||
public WireFormat getMarshaller() {
|
||||
return this.wireFormat;
|
||||
public OpenWireFormat wireFormat() {
|
||||
return this.inWireFormat;
|
||||
}
|
||||
|
||||
private void shutdown(boolean fail) {
|
||||
|
@ -589,9 +606,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
callFailureListeners(me);
|
||||
|
||||
// this should clean up temp dests
|
||||
synchronized (sendLock) {
|
||||
callClosingListeners();
|
||||
}
|
||||
callClosingListeners();
|
||||
|
||||
destroyed = true;
|
||||
|
||||
|
@ -649,7 +664,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
|
||||
public AMQConnectionContext initContext(ConnectionInfo info) throws Exception {
|
||||
WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
|
||||
WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
|
||||
// Older clients should have been defaulting this field to true.. but
|
||||
// they were not.
|
||||
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
|
||||
|
@ -692,7 +707,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
//raise the refCount of context
|
||||
public void reconnect(AMQConnectionContext existingContext, ConnectionInfo info) {
|
||||
this.context = existingContext;
|
||||
WireFormatInfo wireFormatInfo = wireFormat.getPreferedWireFormatInfo();
|
||||
WireFormatInfo wireFormatInfo = inWireFormat.getPreferedWireFormatInfo();
|
||||
// Older clients should have been defaulting this field to true.. but
|
||||
// they were not.
|
||||
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
|
||||
|
@ -1240,7 +1255,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
@Override
|
||||
public Response processWireFormat(WireFormatInfo command) throws Exception {
|
||||
wireFormat.renegotiateWireFormat(command);
|
||||
inWireFormat.renegotiateWireFormat(command);
|
||||
outWireFormat.renegotiateWireFormat(command);
|
||||
//throw back a brokerInfo here
|
||||
protocolManager.sendBrokerInfo(OpenWireConnection.this);
|
||||
protocolManager.setUpInactivityParams(OpenWireConnection.this, command);
|
||||
|
@ -1648,7 +1664,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
@Override
|
||||
public String getTransportLocalAddress() {
|
||||
return getTransportConnection().getLocalAddress();
|
||||
return transportConnection.getLocalAddress();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
|
|||
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
|
||||
import org.apache.activemq.artemis.utils.DataConstants;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||
|
@ -69,7 +68,7 @@ import org.apache.activemq.util.MarshallingSupport;
|
|||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.fusesource.hawtbuf.UTF8Buffer;
|
||||
|
||||
public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
|
||||
public final class OpenWireMessageConverter {
|
||||
|
||||
private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
|
||||
private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
|
||||
|
@ -97,36 +96,13 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
|
||||
private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
|
||||
|
||||
private final WireFormat marshaller;
|
||||
private OpenWireMessageConverter() {
|
||||
|
||||
public OpenWireMessageConverter(WireFormat marshaller) {
|
||||
this.marshaller = marshaller;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenwireMessage fromCore(ICoreMessage coreMessage) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ICoreMessage toCore(OpenwireMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// @Override
|
||||
public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) {
|
||||
// TODO: implement this
|
||||
return null;
|
||||
}
|
||||
|
||||
public org.apache.activemq.artemis.api.core.Message inbound(Message message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
return inbound(message, marshaller, coreMessageObjectPools);
|
||||
}
|
||||
|
||||
private static org.apache.activemq.artemis.api.core.Message inbound(final Message messageSend,
|
||||
final WireFormat marshaller,
|
||||
final CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
public static org.apache.activemq.artemis.api.core.Message inbound(final Message messageSend,
|
||||
final WireFormat marshaller,
|
||||
final CoreMessageObjectPools coreMessageObjectPools) throws Exception {
|
||||
|
||||
final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
|
||||
|
||||
|
@ -197,7 +173,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
final String groupId = messageSend.getGroupID();
|
||||
if (groupId != null) {
|
||||
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
|
||||
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, coreMessageObjectPools.getGroupIdStringSimpleStringPool().getOrCreate(groupId));
|
||||
}
|
||||
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
|
||||
|
||||
|
@ -509,9 +485,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
|
|||
}
|
||||
}
|
||||
|
||||
public MessageDispatch createMessageDispatch(MessageReference reference,
|
||||
ICoreMessage message,
|
||||
AMQConsumer consumer) throws IOException, JMSException {
|
||||
public static MessageDispatch createMessageDispatch(MessageReference reference,
|
||||
ICoreMessage message,
|
||||
WireFormat marshaller,
|
||||
AMQConsumer consumer) throws IOException {
|
||||
ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
|
||||
|
||||
//we can use core message id for sequenceId
|
||||
|
|
|
@ -130,7 +130,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
//to management service
|
||||
private boolean suppressInternalManagementObjects = true;
|
||||
|
||||
private final OpenWireMessageConverter internalConverter;
|
||||
private final OpenWireFormat wireFormat;
|
||||
|
||||
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
|
||||
|
||||
|
@ -145,7 +145,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
wireFactory.setCacheEnabled(false);
|
||||
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
scheduledPool = server.getScheduledPool();
|
||||
this.internalConverter = new OpenWireMessageConverter(wireFactory.createWireFormat());
|
||||
this.wireFormat = (OpenWireFormat) wireFactory.createWireFormat();
|
||||
|
||||
final ClusterManager clusterManager = this.server.getClusterManager();
|
||||
|
||||
|
@ -597,8 +597,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
|
|||
return total;
|
||||
}
|
||||
|
||||
public OpenWireMessageConverter getInternalConverter() {
|
||||
return internalConverter;
|
||||
public OpenWireFormat wireFormat() {
|
||||
return wireFormat;
|
||||
}
|
||||
|
||||
public boolean isSupportAdvisory() {
|
||||
|
|
|
@ -223,7 +223,8 @@ public class AMQConsumer {
|
|||
//so we need to remove this property too.
|
||||
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
|
||||
}
|
||||
dispatch = session.getConverter().createMessageDispatch(reference, message, this);
|
||||
//handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat()
|
||||
dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this);
|
||||
int size = dispatch.getMessage().getSize();
|
||||
reference.setProtocolData(dispatch.getMessage().getMessageId());
|
||||
session.deliverMessage(dispatch);
|
||||
|
|
|
@ -70,20 +70,20 @@ public class AMQSession implements SessionCallback {
|
|||
// ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
|
||||
protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
|
||||
|
||||
private ConnectionInfo connInfo;
|
||||
private final ConnectionInfo connInfo;
|
||||
private ServerSession coreSession;
|
||||
private SessionInfo sessInfo;
|
||||
private ActiveMQServer server;
|
||||
private OpenWireConnection connection;
|
||||
private final SessionInfo sessInfo;
|
||||
private final ActiveMQServer server;
|
||||
private final OpenWireConnection connection;
|
||||
|
||||
private AtomicBoolean started = new AtomicBoolean(false);
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
|
||||
private final ScheduledExecutorService scheduledPool;
|
||||
|
||||
// The sessionWireformat used by the session
|
||||
// this object is meant to be used per thread / session
|
||||
// so we make a new one per AMQSession
|
||||
private final OpenWireMessageConverter converter;
|
||||
private final OpenWireFormat protocolManagerWireFormat;
|
||||
|
||||
private final OpenWireProtocolManager protocolManager;
|
||||
|
||||
|
@ -93,6 +93,8 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
private String[] existingQueuesCache;
|
||||
|
||||
private final SimpleString clientId;
|
||||
|
||||
public AMQSession(ConnectionInfo connInfo,
|
||||
SessionInfo sessInfo,
|
||||
ActiveMQServer server,
|
||||
|
@ -100,14 +102,12 @@ public class AMQSession implements SessionCallback {
|
|||
OpenWireProtocolManager protocolManager) {
|
||||
this.connInfo = connInfo;
|
||||
this.sessInfo = sessInfo;
|
||||
|
||||
this.clientId = SimpleString.toSimpleString(connInfo.getClientId());
|
||||
this.server = server;
|
||||
this.connection = connection;
|
||||
this.protocolManager = protocolManager;
|
||||
this.scheduledPool = protocolManager.getScheduledPool();
|
||||
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
|
||||
|
||||
this.converter = new OpenWireMessageConverter(marshaller.copy());
|
||||
this.protocolManagerWireFormat = protocolManager.wireFormat().copy();
|
||||
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
|
||||
this.existingQueuesCache = null;
|
||||
}
|
||||
|
@ -116,8 +116,8 @@ public class AMQSession implements SessionCallback {
|
|||
return coreSession.isClosed();
|
||||
}
|
||||
|
||||
public OpenWireMessageConverter getConverter() {
|
||||
return protocolManager.getInternalConverter();
|
||||
public OpenWireFormat wireFormat() {
|
||||
return protocolManagerWireFormat;
|
||||
}
|
||||
|
||||
public void initialize() {
|
||||
|
@ -357,22 +357,26 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
public void send(final ProducerInfo producerInfo,
|
||||
final Message messageSend,
|
||||
boolean sendProducerAck) throws Exception {
|
||||
final boolean sendProducerAck) throws Exception {
|
||||
messageSend.setBrokerInTime(System.currentTimeMillis());
|
||||
|
||||
final ActiveMQDestination destination = messageSend.getDestination();
|
||||
|
||||
ActiveMQDestination[] actualDestinations = null;
|
||||
final ActiveMQDestination[] actualDestinations;
|
||||
final int actualDestinationsCount;
|
||||
if (destination.isComposite()) {
|
||||
actualDestinations = destination.getCompositeDestinations();
|
||||
messageSend.setOriginalDestination(destination);
|
||||
actualDestinationsCount = actualDestinations.length;
|
||||
} else {
|
||||
actualDestinations = new ActiveMQDestination[]{destination};
|
||||
actualDestinations = null;
|
||||
actualDestinationsCount = 1;
|
||||
}
|
||||
|
||||
final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
|
||||
final org.apache.activemq.artemis.api.core.Message originalCoreMsg = OpenWireMessageConverter.inbound(messageSend, protocolManagerWireFormat, coreMessageObjectPools);
|
||||
|
||||
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, SimpleString.toSimpleString(this.connection.getState().getInfo().getClientId()));
|
||||
assert clientId.toString().equals(this.connection.getState().getInfo().getClientId()) : "Session cached clientId must be the same of the connection";
|
||||
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, clientId);
|
||||
|
||||
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
|
||||
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
|
||||
|
@ -384,15 +388,14 @@ public class AMQSession implements SessionCallback {
|
|||
|
||||
final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
|
||||
|
||||
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
|
||||
|
||||
final AtomicInteger count = actualDestinations != null ? new AtomicInteger(actualDestinationsCount) : null;
|
||||
|
||||
if (shouldBlockProducer) {
|
||||
connection.getContext().setDontSendReponse(true);
|
||||
}
|
||||
|
||||
for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
|
||||
final ActiveMQDestination dest = actualDestinations[i];
|
||||
for (int i = 0; i < actualDestinationsCount; i++) {
|
||||
final ActiveMQDestination dest = actualDestinations != null ? actualDestinations[i] : destination;
|
||||
final String physicalName = dest.getPhysicalName();
|
||||
final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
|
||||
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
|
||||
|
@ -405,74 +408,11 @@ public class AMQSession implements SessionCallback {
|
|||
} else {
|
||||
coreMsg.setRoutingType(RoutingType.MULTICAST);
|
||||
}
|
||||
PagingStore store = server.getPagingManager().getPageStore(address);
|
||||
|
||||
final PagingStore store = server.getPagingManager().getPageStore(address);
|
||||
|
||||
this.connection.disableTtl();
|
||||
if (shouldBlockProducer) {
|
||||
if (!store.checkMemory(() -> {
|
||||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
|
||||
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
|
||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.warn(e.getMessage(), e);
|
||||
exceptionToSend = e;
|
||||
}
|
||||
connection.enableTtl();
|
||||
if (count.decrementAndGet() == 0) {
|
||||
if (exceptionToSend != null) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.sendException(exceptionToSend);
|
||||
} else {
|
||||
server.getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
if (sendProducerAck) {
|
||||
try {
|
||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
connection.dispatchAsync(ack);
|
||||
} catch (Exception e) {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
} else {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
try {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(messageSend.getCommandId());
|
||||
connection.dispatchAsync(response);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
final IOException e = new IOException(errorMessage);
|
||||
ActiveMQServerLogger.LOGGER.warn(errorMessage);
|
||||
connection.serviceException(e);
|
||||
} catch (Exception ex) {
|
||||
ActiveMQServerLogger.LOGGER.debug(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
})) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.enableTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
sendShouldBlockProducer(producerInfo, messageSend, sendProducerAck, store, dest, count, coreMsg, address);
|
||||
} else {
|
||||
//non-persistent messages goes here, by default we stop reading from
|
||||
//transport
|
||||
|
@ -482,14 +422,14 @@ public class AMQSession implements SessionCallback {
|
|||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
|
||||
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
final RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
|
||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||
}
|
||||
|
||||
if (count.decrementAndGet() == 0) {
|
||||
if (count == null || count.decrementAndGet() == 0) {
|
||||
if (sendProducerAck) {
|
||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
final ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
connection.dispatchAsync(ack);
|
||||
}
|
||||
}
|
||||
|
@ -497,6 +437,79 @@ public class AMQSession implements SessionCallback {
|
|||
}
|
||||
}
|
||||
|
||||
private void sendShouldBlockProducer(final ProducerInfo producerInfo,
|
||||
final Message messageSend,
|
||||
final boolean sendProducerAck,
|
||||
final PagingStore store,
|
||||
final ActiveMQDestination dest,
|
||||
final AtomicInteger count,
|
||||
final org.apache.activemq.artemis.api.core.Message coreMsg,
|
||||
final SimpleString address) throws ResourceAllocationException {
|
||||
if (!store.checkMemory(() -> {
|
||||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
|
||||
if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
|
||||
throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
logger.warn(e.getMessage(), e);
|
||||
exceptionToSend = e;
|
||||
}
|
||||
connection.enableTtl();
|
||||
if (count == null || count.decrementAndGet() == 0) {
|
||||
if (exceptionToSend != null) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.sendException(exceptionToSend);
|
||||
} else {
|
||||
server.getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
if (sendProducerAck) {
|
||||
try {
|
||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
connection.dispatchAsync(ack);
|
||||
} catch (Exception e) {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
} else {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
try {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(messageSend.getCommandId());
|
||||
connection.dispatchAsync(response);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
final IOException e = new IOException(errorMessage);
|
||||
ActiveMQServerLogger.LOGGER.warn(errorMessage);
|
||||
connection.serviceException(e);
|
||||
} catch (Exception ex) {
|
||||
ActiveMQServerLogger.LOGGER.debug(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
})) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.enableTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
}
|
||||
|
||||
private void enableAutoReadAndTtl() {
|
||||
connection.getTransportConnection().setAutoRead(true);
|
||||
connection.enableTtl();
|
||||
|
@ -513,11 +526,7 @@ public class AMQSession implements SessionCallback {
|
|||
public ActiveMQServer getCoreServer() {
|
||||
return this.server;
|
||||
}
|
||||
/*
|
||||
public WireFormat getMarshaller() {
|
||||
return this.connection.getMarshaller();
|
||||
}
|
||||
*/
|
||||
|
||||
public ConnectionInfo getConnectionInfo() {
|
||||
return this.connInfo;
|
||||
}
|
||||
|
|
|
@ -16,18 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.openwire.util;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
|
||||
public class OpenWireUtil {
|
||||
|
||||
|
@ -41,13 +37,6 @@ public class OpenWireUtil {
|
|||
|
||||
public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration();
|
||||
|
||||
public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
|
||||
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
|
||||
|
||||
buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
|
||||
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
|
||||
|
@ -56,16 +45,15 @@ public class OpenWireUtil {
|
|||
*/
|
||||
public static ActiveMQDestination toAMQAddress(Message message, ActiveMQDestination actualDestination) {
|
||||
String address = message.getAddress();
|
||||
String strippedAddress = address;//.replace(JMS_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TEMP_QUEUE_ADDRESS_PREFIX, "").replace(JMS_TOPIC_ADDRESS_PREFIX, "").replace(JMS_TEMP_TOPIC_ADDRESS_PREFIX, "");
|
||||
|
||||
if (address == null) {
|
||||
if (address == null || address.equals(actualDestination.getPhysicalName())) {
|
||||
return actualDestination;
|
||||
}
|
||||
|
||||
if (actualDestination.isQueue()) {
|
||||
return new ActiveMQQueue(strippedAddress);
|
||||
return new ActiveMQQueue(address);
|
||||
} else {
|
||||
return new ActiveMQTopic(strippedAddress);
|
||||
return new ActiveMQTopic(address);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue