ARTEMIS-2205 Optimizing some Lambda usages

https://issues.apache.org/jira/browse/ARTEMIS-2205
This commit is contained in:
Francesco Nigro 2018-12-17 09:12:19 -05:00
parent d79762fa04
commit 8281e3b58f
6 changed files with 51 additions and 59 deletions
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton
artemis-server/src/main/java/org/apache/activemq/artemis/core

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
@ -53,6 +52,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
@ -79,7 +79,7 @@ import org.jboss.logging.Logger;
/**
* This is the Equivalent for the ServerConsumer
*/
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler, MessageReferenceCallback {
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
@ -92,7 +92,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
private Consumer brokerConsumer;
private ReadyListener onflowControlReady;
protected final AMQPSessionContext protonSession;
protected final Sender sender;
protected final AMQPConnectionContext connection;
@ -117,6 +117,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
* to sync the credits we have versus the credits that are being held in proton
* */
private final Object creditsLock = new Object();
private final java.util.function.Consumer<? super MessageReference> executeDelivery;
public ProtonServerSenderContext(AMQPConnectionContext connection,
Sender sender,
@ -127,6 +128,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
this.sender = sender;
this.protonSession = protonSession;
this.sessionSPI = server;
this.executeDelivery = this::executeDelivery;
}
public Object getBrokerConsumer() {
@ -164,7 +166,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
public boolean hasCredits() {
if (!connection.flowControl(brokerConsumer::promptDelivery)) {
if (!connection.flowControl(onflowControlReady)) {
return false;
}
@ -488,6 +490,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
try {
brokerConsumer = (Consumer) sessionSPI.createSender(this, queue, multicast ? null : selector, browseOnly);
onflowControlReady = brokerConsumer::promptDelivery;
} catch (ActiveMQAMQPResourceLimitExceededException e1) {
throw new ActiveMQAMQPResourceLimitExceededException(e1.getMessage());
} catch (ActiveMQSecurityException e) {
@ -747,7 +750,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
if (messageReference instanceof Runnable && consumer.allowReferenceCallback()) {
messageReference.setCallback(this);
messageReference.onDelivery(executeDelivery);
connection.runNow((Runnable)messageReference);
} else {
connection.runNow(() -> executeDelivery(messageReference));
@ -760,8 +763,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
}
@Override
public void executeDelivery(MessageReference messageReference) {
private void executeDelivery(MessageReference messageReference) {
try {
if (sender.getLocalState() == EndpointState.CLOSED) {

View File

@ -18,13 +18,13 @@ package org.apache.activemq.artemis.core.paging.cursor;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
@ -75,7 +75,7 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
private long messageSize = -1;
private MessageReferenceCallback callback;
private Consumer<? super MessageReference> onDelivery;
@Override
public Object getProtocolData() {
@ -93,22 +93,26 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
}
@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery;
}
/**
* It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
*/
@Override
public void run() {
MessageReferenceCallback callback = this.callback;
try {
if (callback != null) {
callback.executeDelivery(this);
final Consumer<? super MessageReference> onDelivery = this.onDelivery;
if (onDelivery != null) {
try {
onDelivery.accept(this);
} finally {
this.onDelivery = null;
}
} finally {
this.callback = null;
}
}
@Override
public synchronized PagedMessage getPagedMessage() {
PagedMessage returnMessage = message != null ? message.get() : null;

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.core.server;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -44,7 +46,14 @@ public interface MessageReference {
SimpleString getLastValueProperty();
void setCallback(MessageReferenceCallback callback);
/**
* This is to be used in cases where a message delivery happens on an executor.
* Most MessageReference implementations will allow execution, and if it does,
* and the protocol requires an execution per message, this callback may be used.
*
* At the time of this implementation only AMQP was used.
*/
void onDelivery(Consumer<? super MessageReference> callback);
/**
* We define this method aggregation here because on paging we need to hold the original estimate,

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/** This is to be used in cases where a message delivery happens on an executor.
* Most MessageReference implementations will allow execution, and if it does,
* and the protocol requires an execution per message, this callback may be used.
*
* At the time of this implementation only AMQP was used. */
public interface MessageReferenceCallback {
void executeDelivery(MessageReference reference);
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@ -33,7 +34,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
* This is useful for example, for stock prices, where you're only interested in the latest value
* for a particular stock
*/
@SuppressWarnings("ALL")
public class LastValueQueue extends QueueImpl {
private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
@ -238,7 +239,7 @@ public class LastValueQueue extends QueueImpl {
}
@Override
public void setCallback(MessageReferenceCallback callback) {
public void onDelivery(Consumer<? super MessageReference> callback) {
// HolderReference may be reused among different consumers, so we don't set a callback and won't support Runnables
}

View File

@ -17,12 +17,12 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MessageReferenceCallback;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -55,7 +55,7 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
private Object protocolData;
private MessageReferenceCallback callback;
private Consumer<? super MessageReference> onDelivery;
// Static --------------------------------------------------------
@ -88,20 +88,23 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
// MessageReference implementation -------------------------------
@Override
public void setCallback(MessageReferenceCallback callback) {
this.callback = callback;
public void onDelivery(Consumer<? super MessageReference> onDelivery) {
assert this.onDelivery == null;
this.onDelivery = onDelivery;
}
/**
* It will call {@link Consumer#accept(Object)} on {@code this} of the {@link Consumer} registered in {@link #onDelivery(Consumer)}, if any.
*/
@Override
public void run() {
MessageReferenceCallback callback = this.callback;
try {
if (callback != null) {
callback.executeDelivery(this);
final Consumer<? super MessageReference> onDelivery = this.onDelivery;
if (onDelivery != null) {
try {
onDelivery.accept(this);
} finally {
this.onDelivery = null;
}
} finally {
this.callback = null;
}
}