This commit is contained in:
Clebert Suconic 2018-01-22 18:02:04 -05:00
commit b740181929
4 changed files with 745 additions and 478 deletions

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.netty.buffer.ByteBuf;
@ -34,6 +35,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
private static final SimpleString EMPTY = new SimpleString("");
private static final long serialVersionUID = 4204223851422244307L;
// Attributes
@ -323,6 +325,14 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
* @return An array of SimpleStrings
*/
public SimpleString[] split(final char delim) {
if (this.str != null) {
return splitWithCachedString(this, delim);
} else {
return splitWithoutCachedString(delim);
}
}
private SimpleString[] splitWithoutCachedString(final char delim) {
List<SimpleString> all = null;
byte low = (byte) (delim & 0xFF); // low byte
@ -361,6 +371,58 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
}
}
private static SimpleString[] splitWithCachedString(final SimpleString simpleString, final int delim) {
final String str = simpleString.str;
final byte[] data = simpleString.data;
final int length = str.length();
List<SimpleString> all = null;
int index = 0;
while (index < length) {
final int delimIndex = str.indexOf(delim, index);
if (delimIndex == -1) {
//just need to add the last one
break;
} else {
all = addSimpleStringPart(all, data, index, delimIndex);
}
index = delimIndex + 1;
}
if (all == null) {
return new SimpleString[]{simpleString};
} else {
// Adding the last one
all = addSimpleStringPart(all, data, index, length);
// Converting it to arrays
final SimpleString[] parts = new SimpleString[all.size()];
return all.toArray(parts);
}
}
private static List<SimpleString> addSimpleStringPart(List<SimpleString> all,
final byte[] data,
final int startIndex,
final int endIndex) {
final int expectedLength = endIndex - startIndex;
final SimpleString ss;
if (expectedLength == 0) {
ss = EMPTY;
} else {
//extract a byte[] copy from this
final int ssIndex = startIndex << 1;
final int delIndex = endIndex << 1;
final byte[] bytes = Arrays.copyOfRange(data, ssIndex, delIndex);
ss = new SimpleString(bytes);
}
// We will create the ArrayList lazily
if (all == null) {
// There will be at least 3 strings on this case (which is the actual common usecase)
// For that reason I'm allocating the ArrayList with 3 already
all = new ArrayList<>(3);
}
all.add(ss);
return all;
}
/**
* checks to see if this SimpleString contains the char parameter passed in
*
@ -368,6 +430,9 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
* @return true if the char is found, false otherwise.
*/
public boolean contains(final char c) {
if (this.str != null) {
return this.str.indexOf(c) != -1;
}
final byte low = (byte) (c & 0xFF); // low byte
final byte high = (byte) (c >> 8 & 0xFF); // high byte

View File

@ -53,8 +53,10 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private AMQSession session;
private org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
@ -74,6 +76,7 @@ public class AMQConsumer {
boolean internalAddress) {
this.session = amqSession;
this.openwireDestination = d;
this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@ -286,7 +289,7 @@ public class AMQConsumer {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}
@ -331,6 +334,10 @@ public class AMQConsumer {
serverConsumer.close(false);
}
public boolean hasNotificationDestination() {
return hasNotificationDestination;
}
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination;
}

View File

@ -85,8 +85,12 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager;
private final Runnable enableAutoReadAndTtl;
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private String[] existingQueuesCache;
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@ -102,6 +106,8 @@ public class AMQSession implements SessionCallback {
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
this.converter = new OpenWireMessageConverter(marshaller.copy());
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
this.existingQueuesCache = null;
}
public boolean isClosed() {
@ -190,6 +196,33 @@ public class AMQSession implements SessionCallback {
return consumersList;
}
private boolean checkCachedExistingQueues(final SimpleString address,
final String physicalName,
final boolean isTemporary) throws Exception {
String[] existingQueuesCache = this.existingQueuesCache;
//lazy allocation of the cache
if (existingQueuesCache == null) {
//16 means 64 bytes with 32 bit references or 128 bytes with 64 bit references -> 1 or 2 cache lines with common archs
existingQueuesCache = new String[16];
assert (Integer.bitCount(existingQueuesCache.length) == 1) : "existingQueuesCache.length must be power of 2";
this.existingQueuesCache = existingQueuesCache;
}
final int hashCode = physicalName.hashCode();
//this.existingQueuesCache.length must be power of 2
final int mask = existingQueuesCache.length - 1;
final int index = hashCode & mask;
final String existingQueue = existingQueuesCache[index];
if (existingQueue != null && existingQueue.equals(physicalName)) {
//if the information is stale (ie no longer valid) it will fail later
return true;
}
final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
if (hasQueue) {
existingQueuesCache[index] = physicalName;
}
return hasQueue;
}
private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
boolean hasQueue = true;
if (!connection.containsKnownDestination(queueName)) {
@ -325,7 +358,7 @@ public class AMQSession implements SessionCallback {
boolean sendProducerAck) throws Exception {
messageSend.setBrokerInTime(System.currentTimeMillis());
ActiveMQDestination destination = messageSend.getDestination();
final ActiveMQDestination destination = messageSend.getDestination();
ActiveMQDestination[] actualDestinations = null;
if (destination.isComposite()) {
@ -335,19 +368,19 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, SimpleString.toSimpleString(this.connection.getState().getInfo().getClientId()));
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString())) {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString(messageSend.getMessageId().toString()));
}
boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
@ -356,14 +389,16 @@ public class AMQSession implements SessionCallback {
connection.getContext().setDontSendReponse(true);
}
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
final ActiveMQDestination dest = actualDestinations[i];
final String physicalName = dest.getPhysicalName();
final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);
if (actualDestinations[i].isQueue()) {
checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
if (dest.isQueue()) {
checkCachedExistingQueues(address, physicalName, dest.isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
@ -424,12 +459,8 @@ public class AMQSession implements SessionCallback {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
if (!store.checkMemory(() -> {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
})) {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
if (!store.checkMemory(enableAutoReadAndTtl)) {
enableAutoReadAndTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
@ -448,6 +479,11 @@ public class AMQSession implements SessionCallback {
}
}
private void enableAutoReadAndTtl() {
connection.getTransportConnection().setAutoRead(true);
connection.enableTtl();
}
public String convertWildcard(String physicalName) {
return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
}