mirror of https://github.com/apache/activemq.git
added test cases for UDP fragmentation
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3d7f76719f
commit
2f1dcea144
|
@ -207,10 +207,10 @@ public class CommandChannel implements Service {
|
||||||
boolean lastFragment = false;
|
boolean lastFragment = false;
|
||||||
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
|
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
|
||||||
// write the header
|
// write the header
|
||||||
writeBuffer.rewind();
|
writeBuffer.clear();
|
||||||
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
|
||||||
lastFragment = offset + chunkSize >= length;
|
lastFragment = offset + chunkSize >= length;
|
||||||
if (lastFragment) {
|
if (chunkSize + offset > length) {
|
||||||
chunkSize = length - offset;
|
chunkSize = length - offset;
|
||||||
}
|
}
|
||||||
header.incrementCounter();
|
header.incrementCounter();
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.udp;
|
package org.apache.activemq.transport.udp;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.WireFormatInfo;
|
import org.apache.activemq.command.WireFormatInfo;
|
||||||
|
@ -25,6 +28,8 @@ import org.apache.activemq.transport.TransportFactory;
|
||||||
import org.apache.activemq.transport.TransportListener;
|
import org.apache.activemq.transport.TransportListener;
|
||||||
import org.apache.activemq.transport.TransportServer;
|
import org.apache.activemq.transport.TransportServer;
|
||||||
|
|
||||||
|
import javax.jms.MessageNotWriteableException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
|
@ -41,7 +46,8 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
|
||||||
|
|
||||||
protected Object lock = new Object();
|
protected Object lock = new Object();
|
||||||
protected Command receivedCommand;
|
protected Command receivedCommand;
|
||||||
private TransportServer server;
|
protected TransportServer server;
|
||||||
|
protected boolean large;
|
||||||
|
|
||||||
public void testSendingSmallMessage() throws Exception {
|
public void testSendingSmallMessage() throws Exception {
|
||||||
ConsumerInfo expected = new ConsumerInfo();
|
ConsumerInfo expected = new ConsumerInfo();
|
||||||
|
@ -70,6 +76,54 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSendingMediumMessage() throws Exception {
|
||||||
|
String text = createMessageBodyText(4 * 105);
|
||||||
|
ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium");
|
||||||
|
assertSendTextMessage(destination, text);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSendingLargeMessage() throws Exception {
|
||||||
|
String text = createMessageBodyText(4 * 1024);
|
||||||
|
ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large");
|
||||||
|
assertSendTextMessage(destination, text);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void assertSendTextMessage(ActiveMQDestination destination, String text) throws MessageNotWriteableException {
|
||||||
|
large = true;
|
||||||
|
|
||||||
|
ActiveMQTextMessage expected = new ActiveMQTextMessage();
|
||||||
|
|
||||||
|
expected.setText(text);
|
||||||
|
expected.setDestination(destination);
|
||||||
|
|
||||||
|
try {
|
||||||
|
System.out.println("About to send message of type: " + expected.getClass());
|
||||||
|
producer.oneway(expected);
|
||||||
|
|
||||||
|
Command received = assertCommandReceived();
|
||||||
|
assertTrue("Should have received a ActiveMQTextMessage but was: " + received, received instanceof ActiveMQTextMessage);
|
||||||
|
ActiveMQTextMessage actual = (ActiveMQTextMessage) received;
|
||||||
|
|
||||||
|
assertEquals("getDestination", expected.getDestination(), actual.getDestination());
|
||||||
|
assertEquals("getText", expected.getText(), actual.getText());
|
||||||
|
|
||||||
|
System.out.println("Received text message with: " + actual.getText().length() + " character(s)");
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
System.out.println("Caught: " + e);
|
||||||
|
e.printStackTrace();
|
||||||
|
fail("Failed to send to transport: " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String createMessageBodyText(int loopSize) {
|
||||||
|
StringBuffer buffer = new StringBuffer();
|
||||||
|
for (int i = 0; i < loopSize; i++) {
|
||||||
|
buffer.append("0123456789");
|
||||||
|
}
|
||||||
|
return buffer.toString();
|
||||||
|
}
|
||||||
|
|
||||||
protected void setUp() throws Exception {
|
protected void setUp() throws Exception {
|
||||||
server = createServer();
|
server = createServer();
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
|
@ -132,8 +186,13 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
|
||||||
if (command instanceof WireFormatInfo) {
|
if (command instanceof WireFormatInfo) {
|
||||||
System.out.println("Got WireFormatInfo: " + command);
|
System.out.println("Got WireFormatInfo: " + command);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
if (large) {
|
||||||
|
System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId());
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
System.out.println("### Received command: " + command);
|
System.out.println("### Received command: " + command);
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
receivedCommand = command;
|
receivedCommand = command;
|
||||||
|
@ -157,7 +216,10 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
|
||||||
protected Command assertCommandReceived() throws InterruptedException {
|
protected Command assertCommandReceived() throws InterruptedException {
|
||||||
Command answer = null;
|
Command answer = null;
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
lock.wait(1000);
|
answer = receivedCommand;
|
||||||
|
if (answer == null) {
|
||||||
|
lock.wait(10000);
|
||||||
|
}
|
||||||
answer = receivedCommand;
|
answer = receivedCommand;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue