From 983f6908895e1926592bce83c09353684c1efb21 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 8 Jul 2008 16:07:52 +0000 Subject: [PATCH] added a like for like comparision of OpenWire versus PB (see Performance2Test) git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@674866 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/protocolbuffer/OpenWire.java | 14 +-- .../src/main/proto/openwire.proto | 5 +- .../OpenWirePerformanceTest.java | 108 +++++++++++++++++ .../protocolbuffer/Performance2Test.java | 54 ++++++--- .../activemq/protocolbuffer/StopWatch.java | 111 ++++++++++++++++++ .../activemq/protocolbuffer/TestSupport.java | 29 +++++ 6 files changed, 292 insertions(+), 29 deletions(-) create mode 100644 activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java create mode 100644 activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/StopWatch.java create mode 100644 activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java diff --git a/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java b/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java index ae823cf90f..5f603c76b6 100755 --- a/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java +++ b/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java @@ -51,8 +51,8 @@ public final class OpenWire { "he.activemq.protocolbuffer.ShortProperty" + "\022G\n\rbyte_property\030\010 \003(\01320.org.apache.act" + "ivemq.protocolbuffer.ByteProperty\"\306\006\n\007Me" + - "ssage\022\023\n\013producer_id\030\001 \002(\005\022\030\n\020producer_c" + - "ounter\030\002 \002(\005\022D\n\013destination\030\003 \002(\0132/.org." + + "ssage\022\023\n\013producer_id\030\001 \001(\005\022\030\n\020producer_c" + + "ounter\030\002 \001(\005\022D\n\013destination\030\003 \002(\0132/.org." + "apache.activemq.protocolbuffer.Destinati" + "on\022M\n\024original_destination\030\004 \001(\0132/.org.a" + "pache.activemq.protocolbuffer.Destinatio" + @@ -4464,13 +4464,13 @@ public final class OpenWire { return org.apache.activemq.protocolbuffer.OpenWire.internal_static_org_apache_activemq_protocolbuffer_Message_fieldAccessorTable; } - // required int32 producer_id = 1; + // optional int32 producer_id = 1; private boolean hasProducerId; private int producerId_ = 0; public boolean hasProducerId() { return hasProducerId; } public int getProducerId() { return producerId_; } - // required int32 producer_counter = 2; + // optional int32 producer_counter = 2; private boolean hasProducerCounter; private int producerCounter_ = 0; public boolean hasProducerCounter() { return hasProducerCounter; } @@ -4631,8 +4631,6 @@ public final class OpenWire { public int getGroupSequence() { return groupSequence_; } public final boolean isInitialized() { - if (!hasProducerId) return false; - if (!hasProducerCounter) return false; if (!hasDestination) return false; if (!getDestination().isInitialized()) return false; if (hasOriginalDestination()) { @@ -5222,7 +5220,7 @@ public final class OpenWire { } - // required int32 producer_id = 1; + // optional int32 producer_id = 1; public boolean hasProducerId() { return result.hasProducerId(); } @@ -5240,7 +5238,7 @@ public final class OpenWire { return this; } - // required int32 producer_counter = 2; + // optional int32 producer_counter = 2; public boolean hasProducerCounter() { return result.hasProducerCounter(); } diff --git a/activemq-protocol-buffer/src/main/proto/openwire.proto b/activemq-protocol-buffer/src/main/proto/openwire.proto index 1001c6f63c..4327bd44c5 100644 --- a/activemq-protocol-buffer/src/main/proto/openwire.proto +++ b/activemq-protocol-buffer/src/main/proto/openwire.proto @@ -96,8 +96,9 @@ message Properties { // Message message Message { - required int32 producer_id = 1; - required int32 producer_counter = 2; + // TODO these should be required + optional int32 producer_id = 1; + optional int32 producer_counter = 2; // TODO no messageID? diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java new file mode 100644 index 0000000000..17a5427cfb --- /dev/null +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.protocolbuffer; + +import org.apache.activemq.command.*; +import org.apache.activemq.openwire.OpenWireFormat; + +import java.io.*; + +/** + * @version $Revision: 1.1 $ + */ +public class OpenWirePerformanceTest extends TestSupport { + + protected String fileName = "target/messages3.openwire"; + protected OpenWireFormat openWireFormat = createOpenWireFormat(); + protected ActiveMQDestination destination = new ActiveMQQueue("FOO.BAR"); + protected ProducerId producerId = new ProducerId(new SessionId(new ConnectionId("abc"), 1), 1); + + public void testPerformance() throws Exception { + OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName)); + DataOutputStream ds = new DataOutputStream(out); + + StopWatch watch = createStopWatch("writer"); + for (int i = 0; i < messageCount; i++) { + watch.start(); + Message message = new ActiveMQMessage(); + + message.setDestination(destination); + message.setPersistent(true); + message.setType("type:" + i); + message.setCorrelationId("ABCD"); + + if (useProducerId) { + message.setProducerId(producerId); + message.setMessageId(new MessageId(producerId, i)); + } + + if (verbose) { + System.out.println("Writing message: " + i + " = " + message); + } +/* + byte[] bytes = message.toByteArray(); + int size = bytes.length; + out.write(size); + //System.out.println("writing bytes: " + size); + out.write(bytes); +*/ + + openWireFormat.marshal(message, ds); + watch.stop(); + } + out.flush(); + out.close(); + + // now lets try read them! + StopWatch watch2 = createStopWatch("reader"); + InputStream in = new BufferedInputStream(new FileInputStream(fileName)); + DataInput dis = new DataInputStream(in); + + for (int i = 0; i < messageCount; i++) { + watch2.start(); + + Object message = openWireFormat.unmarshal(dis); +/* + int size = in.read(); + byte[] data = new byte[size]; + in.read(data); +*/ + if (verbose) { + System.out.println("Reading message: " + i + " = " + message); + } + watch2.stop(); + } + in.close(); + } + + private StopWatch createStopWatch(String name) { + StopWatch answer = new StopWatch(name); + answer.setLogFrequency(messageCount / 10); + return answer; + } + + + protected OpenWireFormat createOpenWireFormat() { + OpenWireFormat wf = new OpenWireFormat(); + wf.setCacheEnabled(true); + wf.setStackTraceEnabled(false); + wf.setVersion(OpenWireFormat.DEFAULT_VERSION); + return wf; + } + +} \ No newline at end of file diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java index 49bb33c554..921451f479 100644 --- a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java @@ -17,54 +17,70 @@ */ package org.apache.activemq.protocolbuffer; -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; -import junit.framework.TestCase; - -import java.io.FileInputStream; -import java.io.FileOutputStream; +import java.io.*; /** * @version $Revision: 1.1 $ */ -public class Performance2Test extends TestCase { - protected int messageCount = 10; - protected String fileName = "target/messages2.openwire"; +public class Performance2Test extends TestSupport { + protected String fileName = "target/messages2.openwire"; + protected OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build(); public void testPerformance() throws Exception { - FileOutputStream out = new FileOutputStream(fileName); - OpenWire.Destination destination = OpenWire.Destination.newBuilder().setName("FOO.BAR").setType(OpenWire.Destination.DestinationType.QUEUE).build(); + OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName)); + StopWatch watch = createStopWatch("writer"); for (int i = 0; i < messageCount; i++) { - OpenWire.Message message = OpenWire.Message.newBuilder() + watch.start(); + OpenWire.Message.Builder builder = OpenWire.Message.newBuilder() .setDestination(destination) .setPersistent(true) - .setProducerId(1234) - .setProducerCounter(i) .setType("type:" + i) - .build(); + .setCorrelationId("ABCD"); - System.out.println("Writing message: " + i + " = " + message); + if (useProducerId) { + builder = builder.setProducerId(1234) + .setProducerCounter(i); + } + + OpenWire.Message message = builder.build(); + + if (verbose) { + System.out.println("Writing message: " + i + " = " + message); + } byte[] bytes = message.toByteArray(); int size = bytes.length; out.write(size); - System.out.println("writing bytes: " + size); + //System.out.println("writing bytes: " + size); out.write(bytes); + watch.stop(); } out.flush(); out.close(); // now lets try read them! - FileInputStream in = new FileInputStream(fileName); + StopWatch watch2 = createStopWatch("reader"); + InputStream in = new BufferedInputStream(new FileInputStream(fileName)); for (int i = 0; i < messageCount; i++) { + watch2.start(); + int size = in.read(); byte[] data = new byte[size]; in.read(data); OpenWire.Message message = OpenWire.Message.parseFrom(data); - System.out.println("Reading message: " + i + " = " + message); + if (verbose) { + System.out.println("Reading message: " + i + " = " + message); + } + watch2.stop(); } in.close(); } + private StopWatch createStopWatch(String name) { + StopWatch answer = new StopWatch(name); + answer.setLogFrequency(messageCount / 10); + return answer; + } + } \ No newline at end of file diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/StopWatch.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/StopWatch.java new file mode 100644 index 0000000000..41f21dcaf7 --- /dev/null +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/StopWatch.java @@ -0,0 +1,111 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.protocolbuffer; + +import java.text.NumberFormat; + +/** + * @version $Revision: 1.1 $ + */ +public class StopWatch { + private final String id; + private int loopCount; + private int totalLoops; + private long groupElapsed; + private long totalElapsed; + private long startTime; + private long minTime = Long.MAX_VALUE; + private long maxTime = Long.MIN_VALUE; + private int logFrequency = 1000; + private NumberFormat numberFormat = NumberFormat.getNumberInstance(); + + public StopWatch(String id) { + this.id = id; + } + + public void start() { + startTime = System.currentTimeMillis(); + } + + public void stop() { + long elapsedTime = System.currentTimeMillis() - startTime; + groupElapsed += elapsedTime; + totalElapsed += elapsedTime; + loopCount++; + totalLoops++; + + if (elapsedTime > maxTime) { + maxTime = elapsedTime; + } + if (elapsedTime < minTime) { + minTime = elapsedTime; + } + if (logFrequency > 0 && loopCount % logFrequency == 0) { + System.out.println(toString()); + reset(); + } + } + + protected void reset() { + loopCount = 0; + groupElapsed = 0; + minTime = Long.MAX_VALUE; + maxTime = Long.MIN_VALUE; + } + + @Override + public String toString() { + double average = totalElapsed; + average *= logFrequency; + average /= totalLoops; + //average /= 1000; + return id + " count: " + loopCount + " elapsed: " + groupElapsed + " min: " + minTime + " max: " + maxTime + " average: " + formatSeconds(average); + } + + public int getLogFrequency() { + return logFrequency; + } + + public void setLogFrequency(int logFrequency) { + this.logFrequency = logFrequency; + } + + public int getLoopCount() { + return loopCount; + } + + public long getMaxTime() { + return maxTime; + } + + public long getMinTime() { + return minTime; + } + + public long getStartTime() { + return startTime; + } + + public long getGroupElapsed() { + return groupElapsed; + } + + protected String formatSeconds(double time) { + return numberFormat.format(time); + } +} diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java new file mode 100644 index 0000000000..ea89547163 --- /dev/null +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java @@ -0,0 +1,29 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.protocolbuffer; + +import junit.framework.TestCase; + +/** + * @version $Revision: 1.1 $ + */ +public class TestSupport extends TestCase { + protected int messageCount = 10000000; + protected boolean verbose = false; + protected boolean useProducerId = false; +}