This commit is contained in:
Martyn Taylor 2017-03-30 09:54:26 +01:00
commit 07597ba959
6 changed files with 14 additions and 15 deletions

View File

@ -1,3 +1,3 @@
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
<acceptor name="amqp">tcp://${host}:${amqp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>

View File

@ -1,3 +1,3 @@
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://${host}:${mqtt.port}?protocols=MQTT;useEpoll=true</acceptor>
<acceptor name="mqtt">tcp://${host}:${mqtt.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

View File

@ -1,3 +1,3 @@
<!-- STOMP Acceptor. -->
<acceptor name="stomp">tcp://${host}:${stomp.port}?protocols=STOMP;useEpoll=true</acceptor>
<acceptor name="stomp">tcp://${host}:${stomp.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

View File

@ -179,9 +179,9 @@ public class TransportConstants {
public static final boolean DEFAULT_TCP_NODELAY = true;
public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 1024 * 1024;
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 1024 * 1024;
public static final boolean DEFAULT_HTTP_ENABLED = false;

View File

@ -146,11 +146,14 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Receiver receiver;
try {
receiver = ((Receiver) delivery.getLink());
if (!delivery.isReadable()) {
return;
}
if (delivery.isPartial()) {
return;
}
receiver = ((Receiver) delivery.getLink());
if (delivery.isPartial()) {
return;
@ -160,11 +163,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
byte[] data;
synchronized (connection.getLock()) {
data = new byte[delivery.available()];
receiver.recv(data, 0, data.length);
receiver.advance();
}
data = new byte[delivery.available()];
receiver.recv(data, 0, data.length);
receiver.advance();
if (delivery.getRemoteState() instanceof TransactionalState) {
@ -174,9 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
synchronized (connection.getLock()) {
flow(amqpCredits, minCreditRefresh);
}
flow(amqpCredits, minCreditRefresh);
} catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();

View File

@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
mvn -Ptests -DfailIfNoTests=false -Pextra-tests -DskipPerformanceTests=false -Dtest=$1 test
mvn -Ptests -DfailIfNoTests=false -Pextra-tests -DskipStyleCheck=true -DskipPerformanceTests=false -Dtest=$1 test