ARTEMIS-362 Broadcast only filled buffer content.

When publishing server connectors to other cluster members the whole buffer is sent.
This fixes ARTEMIS-362 by extracting the filled part of the buffer for broadcasting.

Added test case that checks that packet size does not exceed 1500 bytes with one connector.
This commit is contained in:
Lachezar Dobrev 2016-01-26 13:03:29 +02:00 committed by Clebert Suconic
parent 0c74b5fc83
commit cc4d24cfbb
2 changed files with 111 additions and 1 deletions

View File

@ -191,7 +191,9 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable {
tcConfig.encode(buff);
}
byte[] data = buff.toByteBuffer().array();
// Only send as many bytes as we need.
byte[] data = new byte[buff.readableBytes()];
buff.getBytes(buff.readerIndex(), data);
endpoint.broadcast(data);
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;
/** Test the {@link BroadcastGroupImpl}.<br> */
public class BroadcastGroupImplTest extends ActiveMQTestBase {
static class BroadcastEndpointFactoryImpl implements BroadcastEndpointFactory {
private static final long serialVersionUID = 1L;
int sent = 0;
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new BroadcastEndpointImpl(this);
}
}
static class BroadcastEndpointImpl implements BroadcastEndpoint {
private BroadcastEndpointFactoryImpl factory;
public BroadcastEndpointImpl(BroadcastEndpointFactoryImpl factory) {
this.factory = factory;
}
@Override
public void openClient() { }
@Override
public void openBroadcaster() { }
@Override
public void close(boolean isBroadcast) { }
@Override
public void broadcast(byte[] data) throws Exception {
if (data == null)
fail("Attempted to send datagram with null content");
if (data.length == 0)
fail("Attempted to send datagram with blank content");
if (data.length > 1500) // Common MTU size, not specification!
fail("Attempted to send a datagram with " + data.length + " bytes");
factory.sent++;
}
@Override
public byte[] receiveBroadcast() throws Exception {
return null;
}
@Override
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
return null;
}
}
/**
* Test the broadcasted packages length.<br>
* Broadcast and MultiCast techniques are commonly limited in size by
* underlying hardware. Broadcast and MultiCast protocols are typically not
* guaranteed (UDP) and as such large packages may be silently discarded by
* underlying hardware.<br>
* This test validates that Artemis Server does not broadcast packages above
* a size of 1500 bytes. The limit is not derived from any normative
* documents, but is rather derived from common MTU for network equipment.
*/
@Test
public void testBroadcastDatagramLength() throws Throwable {
BroadcastEndpointFactoryImpl befi;
befi = new BroadcastEndpointFactoryImpl();
InVMNodeManager node;
node = new InVMNodeManager(false);
String name;
name = "BroadcastGroupImplTest";
BroadcastGroupImpl test;
test = new BroadcastGroupImpl(node, name, 1000, null, befi);
TransportConfiguration tcon;
tcon = new TransportConfiguration(getClass().getName());
test.addConnector(tcon);
// Broadcast
test.broadcastConnectors();
// Make sure we sent one package
assertEquals("Incorrect number of sent datagrams", 1, befi.sent);
}
}