mirror of https://github.com/apache/activemq.git
minor refactor in test cases which now demostrate a bug in PB
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@675098 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c3a0b2edfc
commit
f1e3a14e83
|
@ -17,76 +17,62 @@
|
|||
*/
|
||||
package org.apache.activemq.protocolbuffer;
|
||||
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class Performance2Test extends TestSupport {
|
||||
public class MarshallTest extends TestCase {
|
||||
protected int messageCount = 1000;
|
||||
protected String fileName = "target/marshall.openwire";
|
||||
|
||||
protected String fileName = "target/messages2.openwire";
|
||||
protected OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build();
|
||||
|
||||
public void testPerformance() throws Exception {
|
||||
OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName));
|
||||
public void testMarshalling() throws Exception {
|
||||
FileOutputStream out = new FileOutputStream(fileName);
|
||||
CodedOutputStream cout = CodedOutputStream.newInstance(out);
|
||||
OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build();
|
||||
|
||||
StopWatch watch = createStopWatch("writer");
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
watch.start();
|
||||
OpenWire.Message.Builder builder = OpenWire.Message.newBuilder()
|
||||
OpenWire.Message message = OpenWire.Message.newBuilder()
|
||||
.setDestination(destination)
|
||||
.setPersistent(true)
|
||||
.setCorrelationId("ABCD");
|
||||
.setProducerId(1234)
|
||||
.setProducerCounter(i)
|
||||
.build();
|
||||
//.setType("type:" + i)
|
||||
|
||||
if (useProducerId) {
|
||||
builder = builder.setProducerId(1234)
|
||||
.setProducerCounter(i);
|
||||
}
|
||||
|
||||
OpenWire.Message message = builder.build();
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Writing message: " + i + " = " + message);
|
||||
}
|
||||
System.out.println("Writing message: " + i + " = " + message);
|
||||
int size = message.getSerializedSize();
|
||||
cout.writeRawVarint32(size);
|
||||
message.writeTo(cout);
|
||||
|
||||
watch.stop();
|
||||
cout.flush();
|
||||
}
|
||||
cout.flush();
|
||||
out.close();
|
||||
|
||||
// now lets try read them!
|
||||
StopWatch watch2 = createStopWatch("reader");
|
||||
InputStream in = new BufferedInputStream(new FileInputStream(fileName));
|
||||
FileInputStream in = new FileInputStream(fileName);
|
||||
CodedInputStream cin = CodedInputStream.newInstance(in);
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
watch2.start();
|
||||
|
||||
int size = cin.readRawVarint32();
|
||||
int previous = cin.pushLimit(size);
|
||||
//cin.setSizeLimit(size + 4);
|
||||
OpenWire.Message message = OpenWire.Message.parseFrom(cin);
|
||||
cin.popLimit(previous);
|
||||
System.out.println("Reading message: " + i + " = " + message);
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Reading message: " + i + " = " + message);
|
||||
}
|
||||
watch2.stop();
|
||||
assertEquals("message.getPersistent()", true, message.getPersistent());
|
||||
assertEquals("message.getProducerId()", 1234, message.getProducerId());
|
||||
assertEquals("message.getProducerCounter()", i, message.getProducerCounter());
|
||||
OpenWire.Destination actualDestination = message.getDestination();
|
||||
assertNotNull("message.getDestination() is null!", actualDestination);
|
||||
assertEquals("destination.getName()", destination.getName(), actualDestination.getName());
|
||||
assertEquals("destination.getType()", destination.getType(), actualDestination.getType());
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
private StopWatch createStopWatch(String name) {
|
||||
StopWatch answer = new StopWatch(name);
|
||||
answer.setLogFrequency(messageCount / 10);
|
||||
return answer;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -27,7 +27,7 @@ import java.io.*;
|
|||
*/
|
||||
public class OpenWirePerformanceTest extends TestSupport {
|
||||
|
||||
protected String fileName = "target/messages3.openwire";
|
||||
protected String fileName = "target/openwire.openwire";
|
||||
protected OpenWireFormat openWireFormat = createOpenWireFormat();
|
||||
protected ActiveMQDestination destination = new ActiveMQQueue("FOO.BAR");
|
||||
protected ProducerId producerId = new ProducerId(new SessionId(new ConnectionId("abc"), 1), 1);
|
||||
|
@ -37,7 +37,7 @@ public class OpenWirePerformanceTest extends TestSupport {
|
|||
DataOutputStream ds = new DataOutputStream(out);
|
||||
|
||||
StopWatch watch = createStopWatch("writer");
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
for (long i = 0; i < messageCount; i++) {
|
||||
watch.start();
|
||||
Message message = new ActiveMQMessage();
|
||||
|
||||
|
@ -72,7 +72,7 @@ public class OpenWirePerformanceTest extends TestSupport {
|
|||
InputStream in = new BufferedInputStream(new FileInputStream(fileName));
|
||||
DataInput dis = new DataInputStream(in);
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
for (long i = 0; i < messageCount; i++) {
|
||||
watch2.start();
|
||||
|
||||
Object message = openWireFormat.unmarshal(dis);
|
||||
|
@ -89,13 +89,6 @@ public class OpenWirePerformanceTest extends TestSupport {
|
|||
in.close();
|
||||
}
|
||||
|
||||
private StopWatch createStopWatch(String name) {
|
||||
StopWatch answer = new StopWatch(name);
|
||||
answer.setLogFrequency(messageCount / 10);
|
||||
return answer;
|
||||
}
|
||||
|
||||
|
||||
protected OpenWireFormat createOpenWireFormat() {
|
||||
OpenWireFormat wf = new OpenWireFormat();
|
||||
wf.setCacheEnabled(true);
|
||||
|
|
|
@ -19,52 +19,82 @@ package org.apache.activemq.protocolbuffer;
|
|||
|
||||
import com.google.protobuf.CodedInputStream;
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.*;
|
||||
|
||||
/**
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class PerformanceTest extends TestCase {
|
||||
protected int messageCount = 10;
|
||||
protected String fileName = "target/messages.openwire";
|
||||
public class PerformanceTest extends TestSupport {
|
||||
|
||||
protected String fileName = "target/performance.openwire";
|
||||
protected OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build();
|
||||
|
||||
public void testPerformance() throws Exception {
|
||||
FileOutputStream out = new FileOutputStream(fileName);
|
||||
OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName));
|
||||
CodedOutputStream cout = CodedOutputStream.newInstance(out);
|
||||
OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build();
|
||||
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
OpenWire.Message message = OpenWire.Message.newBuilder()
|
||||
StopWatch watch = createStopWatch("writer");
|
||||
for (long i = 0; i < messageCount; i++) {
|
||||
watch.start();
|
||||
OpenWire.Message.Builder builder = OpenWire.Message.newBuilder()
|
||||
.setDestination(destination)
|
||||
.setPersistent(true)
|
||||
.setProducerId(1234)
|
||||
.setProducerCounter(i)
|
||||
.build();
|
||||
//.setType("type:" + i)
|
||||
.setCorrelationId("ABCD");
|
||||
|
||||
System.out.println("Writing message: " + i + " = " + message);
|
||||
if (useProducerId) {
|
||||
int producerCounter = (int) i;
|
||||
builder = builder.setProducerId(1234)
|
||||
.setProducerCounter(producerCounter);
|
||||
}
|
||||
|
||||
OpenWire.Message message = builder.build();
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Writing message: " + i + " = " + message);
|
||||
}
|
||||
int size = message.getSerializedSize();
|
||||
|
||||
cout.writeRawVarint32(size);
|
||||
message.writeTo(cout);
|
||||
cout.flush();
|
||||
|
||||
watch.stop();
|
||||
}
|
||||
cout.flush();
|
||||
out.close();
|
||||
|
||||
// now lets try read them!
|
||||
FileInputStream in = new FileInputStream(fileName);
|
||||
StopWatch watch2 = createStopWatch("reader");
|
||||
InputStream in = new BufferedInputStream(new FileInputStream(fileName));
|
||||
CodedInputStream cin = CodedInputStream.newInstance(in);
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
|
||||
for (long i = 0; i < messageCount; i++) {
|
||||
watch2.start();
|
||||
|
||||
int size = cin.readRawVarint32();
|
||||
int previous = cin.pushLimit(size);
|
||||
//cin.setSizeLimit(size + 4);
|
||||
OpenWire.Message message = OpenWire.Message.parseFrom(cin);
|
||||
cin.popLimit(previous);
|
||||
System.out.println("Reading message: " + i + " = " + message);
|
||||
|
||||
if (verbose) {
|
||||
System.out.println("Reading message: " + i + " = " + message);
|
||||
}
|
||||
if (doAssertions) {
|
||||
if (useProducerId) {
|
||||
assertEquals("message.getProducerId()", 1234, message.getProducerId());
|
||||
assertEquals("message.getProducerCounter()", i, message.getProducerCounter());
|
||||
}
|
||||
assertEquals("message.getPersistent()", true, message.getPersistent());
|
||||
assertEquals("message.getCorrelationId()", "ABCD", message.getCorrelationId());
|
||||
|
||||
OpenWire.Destination actualDestination = message.getDestination();
|
||||
assertNotNull("message.getDestination() is null!", actualDestination);
|
||||
assertEquals("destination.getName()", destination.getName(), actualDestination.getName());
|
||||
assertEquals("destination.getType()", destination.getType(), actualDestination.getType());
|
||||
}
|
||||
watch2.stop();
|
||||
}
|
||||
in.close();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -23,7 +23,15 @@ import junit.framework.TestCase;
|
|||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public class TestSupport extends TestCase {
|
||||
protected int messageCount = 1000000;
|
||||
// TODO seems like 4m messages cause Protocol Buffers to barf but 3m is fine :)
|
||||
protected long messageCount = 4 * 1000 * 1000;
|
||||
protected boolean verbose = false;
|
||||
protected boolean doAssertions = true;
|
||||
protected boolean useProducerId = false;
|
||||
|
||||
protected StopWatch createStopWatch(String name) {
|
||||
StopWatch answer = new StopWatch(name);
|
||||
answer.setLogFrequency((int) messageCount / 10);
|
||||
return answer;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue