From c3a0b2edfcf1ccb03d1dd71022e1800a162c962c Mon Sep 17 00:00:00 2001 From: James Strachan Date: Wed, 9 Jul 2008 04:38:19 +0000 Subject: [PATCH] optimised the use of Protocol Buffer to avoid creating unnecessary byte[] for each message which seems to make things much faster git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@675094 13f79535-47bb-0310-9956-ffa450edef68 --- activemq-protocol-buffer/pom.xml | 7 +- .../activemq/protocolbuffer/OpenWire.java | 488 +++++++++--------- .../src/main/proto/openwire.proto | 37 +- .../OpenWirePerformanceTest.java | 3 +- .../protocolbuffer/Performance2Test.java | 28 +- .../protocolbuffer/PerformanceTest.java | 7 +- .../activemq/protocolbuffer/TestSupport.java | 2 +- 7 files changed, 297 insertions(+), 275 deletions(-) diff --git a/activemq-protocol-buffer/pom.xml b/activemq-protocol-buffer/pom.xml index 9b9c75e69f..5776ba92ac 100644 --- a/activemq-protocol-buffer/pom.xml +++ b/activemq-protocol-buffer/pom.xml @@ -70,8 +70,13 @@ pertest false true + + **/*Test.* + - **/PerformanceTest.java + diff --git a/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java b/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java index 5f603c76b6..a24c41a7a7 100755 --- a/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java +++ b/activemq-protocol-buffer/src/main/java/org/apache/activemq/protocolbuffer/OpenWire.java @@ -35,7 +35,7 @@ public final class OpenWire { "\030\002 \002(\005\"+\n\014LongProperty\022\014\n\004name\030\001 \002(\t\022\r\n\005" + "value\030\002 \002(\005\",\n\rFloatProperty\022\014\n\004name\030\001 \002" + "(\t\022\r\n\005value\030\002 \002(\002\"-\n\016DoubleProperty\022\014\n\004n" + - "ame\030\001 \002(\t\022\r\n\005value\030\002 \002(\001\"\336\004\n\nProperties\022" + + "ame\030\001 \002(\t\022\r\n\005value\030\002 \002(\001\"\377\004\n\nProperties\022" + "K\n\017string_property\030\001 \003(\01322.org.apache.ac" + "tivemq.protocolbuffer.StringProperty\022E\n\014" + "int_property\030\002 \003(\0132/.org.apache.activemq" + @@ -50,24 +50,24 @@ public final class OpenWire { "perty\022I\n\016short_property\030\007 \003(\01321.org.apac" + "he.activemq.protocolbuffer.ShortProperty" + "\022G\n\rbyte_property\030\010 \003(\01320.org.apache.act" + - "ivemq.protocolbuffer.ByteProperty\"\306\006\n\007Me" + - "ssage\022\023\n\013producer_id\030\001 \001(\005\022\030\n\020producer_c" + - "ounter\030\002 \001(\005\022D\n\013destination\030\003 \002(\0132/.org." + - "apache.activemq.protocolbuffer.Destinati" + - "on\022M\n\024original_destination\030\004 \001(\0132/.org.a" + - "pache.activemq.protocolbuffer.Destinatio" + - "n\022\020\n\010group_id\030\005 \001(\t\022\026\n\016property_bytes\030\006 " + - "\001(\014\022\026\n\016correlation_id\030\007 \001(\t\022\022\n\npersisten" + - "t\030\010 \001(\010\022\022\n\nexpiration\030\t \001(\003\022\020\n\010priority\030" + - "\n \001(\005\022A\n\010reply_to\030\013 \001(\0132/.org.apache.act" + - "ivemq.protocolbuffer.Destination\022\021\n\ttime" + - "stamp\030\014 \001(\003\022\014\n\004type\030\r \001(\t\022T\n\024local_trans" + - "action_id\030\016 \001(\01326.org.apache.activemq.pr" + - "otocolbuffer.LocalTransactionId\022N\n\021xa_tr" + - "ansaction_id\030\017 \001(\01323.org.apache.activemq" + - ".protocolbuffer.XATransactionId\022\022\n\ncompr" + - "essed\030\020 \001(\010\022\032\n\022redelivery_counter\030\021 \001(\005\022" + - "\023\n\013broker_path\030\022 \003(\t\022\022\n\ncluster_id\030\023 \003(\t" + + "ivemq.protocolbuffer.ByteProperty\022\021\n\ttim" + + "estamp\030\n \001(\003\022\014\n\004type\030\013 \001(\t\"\245\006\n\007Message\022\023" + + "\n\013producer_id\030\001 \001(\005\022\030\n\020producer_counter\030" + + "\002 \001(\005\022D\n\013destination\030\003 \002(\0132/.org.apache." + + "activemq.protocolbuffer.Destination\022M\n\024o" + + "riginal_destination\030\004 \001(\0132/.org.apache.a" + + "ctivemq.protocolbuffer.Destination\022\020\n\010gr" + + "oup_id\030\005 \001(\t\022\026\n\016property_bytes\030\006 \001(\014\022\022\n\n" + + "persistent\030\007 \001(\010\022\022\n\nexpiration\030\010 \001(\003\022\026\n\016" + + "correlation_id\030\t \001(\t\022A\n\010reply_to\030\n \001(\0132/" + + ".org.apache.activemq.protocolbuffer.Dest" + + "ination\022T\n\024local_transaction_id\030\013 \001(\01326." + + "org.apache.activemq.protocolbuffer.Local" + + "TransactionId\022N\n\021xa_transaction_id\030\014 \001(\013" + + "23.org.apache.activemq.protocolbuffer.XA" + + "TransactionId\022\032\n\022redelivery_counter\030\r \001(" + + "\005\022\023\n\013broker_path\030\016 \003(\t\022\022\n\ncluster_id\030\017 \003" + + "(\t\022\022\n\ncompressed\030\020 \001(\010\022\020\n\010priority\030\021 \001(\005" + "\022\017\n\007user_id\030\024 \001(\t\022\017\n\007arrival\030\026 \001(\003\022\026\n\016br" + "oker_in_time\030\027 \001(\003\022\027\n\017broker_out_time\030\030 " + "\001(\003\022\021\n\tdroppable\030\034 \001(\010\022\032\n\022receivedByDFBr" + @@ -3697,6 +3697,18 @@ public final class OpenWire { return byteProperty_.get(index); } + // optional int64 timestamp = 10; + private boolean hasTimestamp; + private long timestamp_ = 0L; + public boolean hasTimestamp() { return hasTimestamp; } + public long getTimestamp() { return timestamp_; } + + // optional string type = 11; + private boolean hasType; + private java.lang.String type_ = ""; + public boolean hasType() { return hasType; } + public java.lang.String getType() { return type_; } + public final boolean isInitialized() { for (org.apache.activemq.protocolbuffer.OpenWire.StringProperty element : getStringPropertyList()) { if (!element.isInitialized()) return false; @@ -3751,6 +3763,12 @@ public final class OpenWire { for (org.apache.activemq.protocolbuffer.OpenWire.ByteProperty element : getBytePropertyList()) { output.writeMessage(8, element); } + if (hasTimestamp()) { + output.writeInt64(10, getTimestamp()); + } + if (hasType()) { + output.writeString(11, getType()); + } getUnknownFields().writeTo(output); } @@ -3792,6 +3810,14 @@ public final class OpenWire { size += com.google.protobuf.CodedOutputStream .computeMessageSize(8, element); } + if (hasTimestamp()) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(10, getTimestamp()); + } + if (hasType()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(11, getType()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3993,6 +4019,12 @@ public final class OpenWire { } result.byteProperty_.addAll(other.byteProperty_); } + if (other.hasTimestamp()) { + setTimestamp(other.getTimestamp()); + } + if (other.hasType()) { + setType(other.getType()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4073,6 +4105,14 @@ public final class OpenWire { addByteProperty(subBuilder.buildPartial()); break; } + case 80: { + setTimestamp(input.readInt64()); + break; + } + case 90: { + setType(input.readString()); + break; + } } } } @@ -4437,6 +4477,42 @@ public final class OpenWire { result.byteProperty_ = java.util.Collections.emptyList(); return this; } + + // optional int64 timestamp = 10; + public boolean hasTimestamp() { + return result.hasTimestamp(); + } + public long getTimestamp() { + return result.getTimestamp(); + } + public Builder setTimestamp(long value) { + result.hasTimestamp = true; + result.timestamp_ = value; + return this; + } + public Builder clearTimestamp() { + result.hasTimestamp = false; + result.timestamp_ = 0L; + return this; + } + + // optional string type = 11; + public boolean hasType() { + return result.hasType(); + } + public java.lang.String getType() { + return result.getType(); + } + public Builder setType(java.lang.String value) { + result.hasType = true; + result.type_ = value; + return this; + } + public Builder clearType() { + result.hasType = false; + result.type_ = ""; + return this; + } } } @@ -4500,73 +4576,49 @@ public final class OpenWire { public boolean hasPropertyBytes() { return hasPropertyBytes; } public com.google.protobuf.ByteString getPropertyBytes() { return propertyBytes_; } - // optional string correlation_id = 7; - private boolean hasCorrelationId; - private java.lang.String correlationId_ = ""; - public boolean hasCorrelationId() { return hasCorrelationId; } - public java.lang.String getCorrelationId() { return correlationId_; } - - // optional bool persistent = 8; + // optional bool persistent = 7; private boolean hasPersistent; private boolean persistent_ = false; public boolean hasPersistent() { return hasPersistent; } public boolean getPersistent() { return persistent_; } - // optional int64 expiration = 9; + // optional int64 expiration = 8; private boolean hasExpiration; private long expiration_ = 0L; public boolean hasExpiration() { return hasExpiration; } public long getExpiration() { return expiration_; } - // optional int32 priority = 10; - private boolean hasPriority; - private int priority_ = 0; - public boolean hasPriority() { return hasPriority; } - public int getPriority() { return priority_; } + // optional string correlation_id = 9; + private boolean hasCorrelationId; + private java.lang.String correlationId_ = ""; + public boolean hasCorrelationId() { return hasCorrelationId; } + public java.lang.String getCorrelationId() { return correlationId_; } - // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 11; + // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 10; private boolean hasReplyTo; private org.apache.activemq.protocolbuffer.OpenWire.Destination replyTo_ = org.apache.activemq.protocolbuffer.OpenWire.Destination.getDefaultInstance(); public boolean hasReplyTo() { return hasReplyTo; } public org.apache.activemq.protocolbuffer.OpenWire.Destination getReplyTo() { return replyTo_; } - // optional int64 timestamp = 12; - private boolean hasTimestamp; - private long timestamp_ = 0L; - public boolean hasTimestamp() { return hasTimestamp; } - public long getTimestamp() { return timestamp_; } - - // optional string type = 13; - private boolean hasType; - private java.lang.String type_ = ""; - public boolean hasType() { return hasType; } - public java.lang.String getType() { return type_; } - - // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 14; + // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 11; private boolean hasLocalTransactionId; private org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId localTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.getDefaultInstance(); public boolean hasLocalTransactionId() { return hasLocalTransactionId; } public org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId getLocalTransactionId() { return localTransactionId_; } - // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 15; + // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 12; private boolean hasXaTransactionId; private org.apache.activemq.protocolbuffer.OpenWire.XATransactionId xaTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.getDefaultInstance(); public boolean hasXaTransactionId() { return hasXaTransactionId; } public org.apache.activemq.protocolbuffer.OpenWire.XATransactionId getXaTransactionId() { return xaTransactionId_; } - // optional bool compressed = 16; - private boolean hasCompressed; - private boolean compressed_ = false; - public boolean hasCompressed() { return hasCompressed; } - public boolean getCompressed() { return compressed_; } - - // optional int32 redelivery_counter = 17; + // optional int32 redelivery_counter = 13; private boolean hasRedeliveryCounter; private int redeliveryCounter_ = 0; public boolean hasRedeliveryCounter() { return hasRedeliveryCounter; } public int getRedeliveryCounter() { return redeliveryCounter_; } - // repeated string broker_path = 18; + // repeated string broker_path = 14; private java.util.List brokerPath_ = java.util.Collections.emptyList(); public java.util.List getBrokerPathList() { @@ -4577,7 +4629,7 @@ public final class OpenWire { return brokerPath_.get(index); } - // repeated string cluster_id = 19; + // repeated string cluster_id = 15; private java.util.List clusterId_ = java.util.Collections.emptyList(); public java.util.List getClusterIdList() { @@ -4588,6 +4640,18 @@ public final class OpenWire { return clusterId_.get(index); } + // optional bool compressed = 16; + private boolean hasCompressed; + private boolean compressed_ = false; + public boolean hasCompressed() { return hasCompressed; } + public boolean getCompressed() { return compressed_; } + + // optional int32 priority = 17; + private boolean hasPriority; + private int priority_ = 0; + public boolean hasPriority() { return hasPriority; } + public int getPriority() { return priority_; } + // optional string user_id = 20; private boolean hasUserId; private java.lang.String userId_ = ""; @@ -4668,44 +4732,38 @@ public final class OpenWire { if (hasPropertyBytes()) { output.writeBytes(6, getPropertyBytes()); } - if (hasCorrelationId()) { - output.writeString(7, getCorrelationId()); - } if (hasPersistent()) { - output.writeBool(8, getPersistent()); + output.writeBool(7, getPersistent()); } if (hasExpiration()) { - output.writeInt64(9, getExpiration()); + output.writeInt64(8, getExpiration()); } - if (hasPriority()) { - output.writeInt32(10, getPriority()); + if (hasCorrelationId()) { + output.writeString(9, getCorrelationId()); } if (hasReplyTo()) { - output.writeMessage(11, getReplyTo()); - } - if (hasTimestamp()) { - output.writeInt64(12, getTimestamp()); - } - if (hasType()) { - output.writeString(13, getType()); + output.writeMessage(10, getReplyTo()); } if (hasLocalTransactionId()) { - output.writeMessage(14, getLocalTransactionId()); + output.writeMessage(11, getLocalTransactionId()); } if (hasXaTransactionId()) { - output.writeMessage(15, getXaTransactionId()); + output.writeMessage(12, getXaTransactionId()); + } + if (hasRedeliveryCounter()) { + output.writeInt32(13, getRedeliveryCounter()); + } + for (java.lang.String element : getBrokerPathList()) { + output.writeString(14, element); + } + for (java.lang.String element : getClusterIdList()) { + output.writeString(15, element); } if (hasCompressed()) { output.writeBool(16, getCompressed()); } - if (hasRedeliveryCounter()) { - output.writeInt32(17, getRedeliveryCounter()); - } - for (java.lang.String element : getBrokerPathList()) { - output.writeString(18, element); - } - for (java.lang.String element : getClusterIdList()) { - output.writeString(19, element); + if (hasPriority()) { + output.writeInt32(17, getPriority()); } if (hasUserId()) { output.writeString(20, getUserId()); @@ -4761,57 +4819,49 @@ public final class OpenWire { size += com.google.protobuf.CodedOutputStream .computeBytesSize(6, getPropertyBytes()); } - if (hasCorrelationId()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(7, getCorrelationId()); - } if (hasPersistent()) { size += com.google.protobuf.CodedOutputStream - .computeBoolSize(8, getPersistent()); + .computeBoolSize(7, getPersistent()); } if (hasExpiration()) { size += com.google.protobuf.CodedOutputStream - .computeInt64Size(9, getExpiration()); + .computeInt64Size(8, getExpiration()); } - if (hasPriority()) { + if (hasCorrelationId()) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(10, getPriority()); + .computeStringSize(9, getCorrelationId()); } if (hasReplyTo()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(11, getReplyTo()); - } - if (hasTimestamp()) { - size += com.google.protobuf.CodedOutputStream - .computeInt64Size(12, getTimestamp()); - } - if (hasType()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(13, getType()); + .computeMessageSize(10, getReplyTo()); } if (hasLocalTransactionId()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(14, getLocalTransactionId()); + .computeMessageSize(11, getLocalTransactionId()); } if (hasXaTransactionId()) { size += com.google.protobuf.CodedOutputStream - .computeMessageSize(15, getXaTransactionId()); + .computeMessageSize(12, getXaTransactionId()); + } + if (hasRedeliveryCounter()) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(13, getRedeliveryCounter()); + } + for (java.lang.String element : getBrokerPathList()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(14, element); + } + for (java.lang.String element : getClusterIdList()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(15, element); } if (hasCompressed()) { size += com.google.protobuf.CodedOutputStream .computeBoolSize(16, getCompressed()); } - if (hasRedeliveryCounter()) { + if (hasPriority()) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(17, getRedeliveryCounter()); - } - for (java.lang.String element : getBrokerPathList()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(18, element); - } - for (java.lang.String element : getClusterIdList()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(19, element); + .computeInt32Size(17, getPriority()); } if (hasUserId()) { size += com.google.protobuf.CodedOutputStream @@ -4988,36 +5038,24 @@ public final class OpenWire { if (other.hasPropertyBytes()) { setPropertyBytes(other.getPropertyBytes()); } - if (other.hasCorrelationId()) { - setCorrelationId(other.getCorrelationId()); - } if (other.hasPersistent()) { setPersistent(other.getPersistent()); } if (other.hasExpiration()) { setExpiration(other.getExpiration()); } - if (other.hasPriority()) { - setPriority(other.getPriority()); + if (other.hasCorrelationId()) { + setCorrelationId(other.getCorrelationId()); } if (other.hasReplyTo()) { mergeReplyTo(other.getReplyTo()); } - if (other.hasTimestamp()) { - setTimestamp(other.getTimestamp()); - } - if (other.hasType()) { - setType(other.getType()); - } if (other.hasLocalTransactionId()) { mergeLocalTransactionId(other.getLocalTransactionId()); } if (other.hasXaTransactionId()) { mergeXaTransactionId(other.getXaTransactionId()); } - if (other.hasCompressed()) { - setCompressed(other.getCompressed()); - } if (other.hasRedeliveryCounter()) { setRedeliveryCounter(other.getRedeliveryCounter()); } @@ -5033,6 +5071,12 @@ public final class OpenWire { } result.clusterId_.addAll(other.clusterId_); } + if (other.hasCompressed()) { + setCompressed(other.getCompressed()); + } + if (other.hasPriority()) { + setPriority(other.getPriority()); + } if (other.hasUserId()) { setUserId(other.getUserId()); } @@ -5120,23 +5164,19 @@ public final class OpenWire { setPropertyBytes(input.readBytes()); break; } - case 58: { - setCorrelationId(input.readString()); - break; - } - case 64: { + case 56: { setPersistent(input.readBool()); break; } - case 72: { + case 64: { setExpiration(input.readInt64()); break; } - case 80: { - setPriority(input.readInt32()); + case 74: { + setCorrelationId(input.readString()); break; } - case 90: { + case 82: { org.apache.activemq.protocolbuffer.OpenWire.Destination.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.Destination.newBuilder(); if (hasReplyTo()) { subBuilder.mergeFrom(getReplyTo()); @@ -5145,15 +5185,7 @@ public final class OpenWire { setReplyTo(subBuilder.buildPartial()); break; } - case 96: { - setTimestamp(input.readInt64()); - break; - } - case 106: { - setType(input.readString()); - break; - } - case 114: { + case 90: { org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.newBuilder(); if (hasLocalTransactionId()) { subBuilder.mergeFrom(getLocalTransactionId()); @@ -5162,7 +5194,7 @@ public final class OpenWire { setLocalTransactionId(subBuilder.buildPartial()); break; } - case 122: { + case 98: { org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.newBuilder(); if (hasXaTransactionId()) { subBuilder.mergeFrom(getXaTransactionId()); @@ -5171,20 +5203,24 @@ public final class OpenWire { setXaTransactionId(subBuilder.buildPartial()); break; } + case 104: { + setRedeliveryCounter(input.readInt32()); + break; + } + case 114: { + addBrokerPath(input.readString()); + break; + } + case 122: { + addClusterId(input.readString()); + break; + } case 128: { setCompressed(input.readBool()); break; } case 136: { - setRedeliveryCounter(input.readInt32()); - break; - } - case 146: { - addBrokerPath(input.readString()); - break; - } - case 154: { - addClusterId(input.readString()); + setPriority(input.readInt32()); break; } case 162: { @@ -5360,25 +5396,7 @@ public final class OpenWire { return this; } - // optional string correlation_id = 7; - public boolean hasCorrelationId() { - return result.hasCorrelationId(); - } - public java.lang.String getCorrelationId() { - return result.getCorrelationId(); - } - public Builder setCorrelationId(java.lang.String value) { - result.hasCorrelationId = true; - result.correlationId_ = value; - return this; - } - public Builder clearCorrelationId() { - result.hasCorrelationId = false; - result.correlationId_ = ""; - return this; - } - - // optional bool persistent = 8; + // optional bool persistent = 7; public boolean hasPersistent() { return result.hasPersistent(); } @@ -5396,7 +5414,7 @@ public final class OpenWire { return this; } - // optional int64 expiration = 9; + // optional int64 expiration = 8; public boolean hasExpiration() { return result.hasExpiration(); } @@ -5414,25 +5432,25 @@ public final class OpenWire { return this; } - // optional int32 priority = 10; - public boolean hasPriority() { - return result.hasPriority(); + // optional string correlation_id = 9; + public boolean hasCorrelationId() { + return result.hasCorrelationId(); } - public int getPriority() { - return result.getPriority(); + public java.lang.String getCorrelationId() { + return result.getCorrelationId(); } - public Builder setPriority(int value) { - result.hasPriority = true; - result.priority_ = value; + public Builder setCorrelationId(java.lang.String value) { + result.hasCorrelationId = true; + result.correlationId_ = value; return this; } - public Builder clearPriority() { - result.hasPriority = false; - result.priority_ = 0; + public Builder clearCorrelationId() { + result.hasCorrelationId = false; + result.correlationId_ = ""; return this; } - // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 11; + // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 10; public boolean hasReplyTo() { return result.hasReplyTo(); } @@ -5466,43 +5484,7 @@ public final class OpenWire { return this; } - // optional int64 timestamp = 12; - public boolean hasTimestamp() { - return result.hasTimestamp(); - } - public long getTimestamp() { - return result.getTimestamp(); - } - public Builder setTimestamp(long value) { - result.hasTimestamp = true; - result.timestamp_ = value; - return this; - } - public Builder clearTimestamp() { - result.hasTimestamp = false; - result.timestamp_ = 0L; - return this; - } - - // optional string type = 13; - public boolean hasType() { - return result.hasType(); - } - public java.lang.String getType() { - return result.getType(); - } - public Builder setType(java.lang.String value) { - result.hasType = true; - result.type_ = value; - return this; - } - public Builder clearType() { - result.hasType = false; - result.type_ = ""; - return this; - } - - // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 14; + // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 11; public boolean hasLocalTransactionId() { return result.hasLocalTransactionId(); } @@ -5536,7 +5518,7 @@ public final class OpenWire { return this; } - // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 15; + // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 12; public boolean hasXaTransactionId() { return result.hasXaTransactionId(); } @@ -5570,25 +5552,7 @@ public final class OpenWire { return this; } - // optional bool compressed = 16; - public boolean hasCompressed() { - return result.hasCompressed(); - } - public boolean getCompressed() { - return result.getCompressed(); - } - public Builder setCompressed(boolean value) { - result.hasCompressed = true; - result.compressed_ = value; - return this; - } - public Builder clearCompressed() { - result.hasCompressed = false; - result.compressed_ = false; - return this; - } - - // optional int32 redelivery_counter = 17; + // optional int32 redelivery_counter = 13; public boolean hasRedeliveryCounter() { return result.hasRedeliveryCounter(); } @@ -5606,7 +5570,7 @@ public final class OpenWire { return this; } - // repeated string broker_path = 18; + // repeated string broker_path = 14; public java.util.List getBrokerPathList() { return java.util.Collections.unmodifiableList(result.brokerPath_); } @@ -5640,7 +5604,7 @@ public final class OpenWire { return this; } - // repeated string cluster_id = 19; + // repeated string cluster_id = 15; public java.util.List getClusterIdList() { return java.util.Collections.unmodifiableList(result.clusterId_); } @@ -5674,6 +5638,42 @@ public final class OpenWire { return this; } + // optional bool compressed = 16; + public boolean hasCompressed() { + return result.hasCompressed(); + } + public boolean getCompressed() { + return result.getCompressed(); + } + public Builder setCompressed(boolean value) { + result.hasCompressed = true; + result.compressed_ = value; + return this; + } + public Builder clearCompressed() { + result.hasCompressed = false; + result.compressed_ = false; + return this; + } + + // optional int32 priority = 17; + public boolean hasPriority() { + return result.hasPriority(); + } + public int getPriority() { + return result.getPriority(); + } + public Builder setPriority(int value) { + result.hasPriority = true; + result.priority_ = value; + return this; + } + public Builder clearPriority() { + result.hasPriority = false; + result.priority_ = 0; + return this; + } + // optional string user_id = 20; public boolean hasUserId() { return result.hasUserId(); @@ -5942,7 +5942,7 @@ public final class OpenWire { internal_static_org_apache_activemq_protocolbuffer_Properties_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_apache_activemq_protocolbuffer_Properties_descriptor, - new java.lang.String[] { "StringProperty", "IntProperty", "BoolProperty", "LongProperty", "DoubleProperty", "FloatProperty", "ShortProperty", "ByteProperty", }, + new java.lang.String[] { "StringProperty", "IntProperty", "BoolProperty", "LongProperty", "DoubleProperty", "FloatProperty", "ShortProperty", "ByteProperty", "Timestamp", "Type", }, org.apache.activemq.protocolbuffer.OpenWire.Properties.class, org.apache.activemq.protocolbuffer.OpenWire.Properties.Builder.class); private static final com.google.protobuf.Descriptors.Descriptor @@ -5953,7 +5953,7 @@ public final class OpenWire { internal_static_org_apache_activemq_protocolbuffer_Message_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_apache_activemq_protocolbuffer_Message_descriptor, - new java.lang.String[] { "ProducerId", "ProducerCounter", "Destination", "OriginalDestination", "GroupId", "PropertyBytes", "CorrelationId", "Persistent", "Expiration", "Priority", "ReplyTo", "Timestamp", "Type", "LocalTransactionId", "XaTransactionId", "Compressed", "RedeliveryCounter", "BrokerPath", "ClusterId", "UserId", "Arrival", "BrokerInTime", "BrokerOutTime", "Droppable", "ReceivedByDFBridge", "GroupSequence", }, + new java.lang.String[] { "ProducerId", "ProducerCounter", "Destination", "OriginalDestination", "GroupId", "PropertyBytes", "Persistent", "Expiration", "CorrelationId", "ReplyTo", "LocalTransactionId", "XaTransactionId", "RedeliveryCounter", "BrokerPath", "ClusterId", "Compressed", "Priority", "UserId", "Arrival", "BrokerInTime", "BrokerOutTime", "Droppable", "ReceivedByDFBridge", "GroupSequence", }, org.apache.activemq.protocolbuffer.OpenWire.Message.class, org.apache.activemq.protocolbuffer.OpenWire.Message.Builder.class); } diff --git a/activemq-protocol-buffer/src/main/proto/openwire.proto b/activemq-protocol-buffer/src/main/proto/openwire.proto index 4327bd44c5..a64e5f995f 100644 --- a/activemq-protocol-buffer/src/main/proto/openwire.proto +++ b/activemq-protocol-buffer/src/main/proto/openwire.proto @@ -92,6 +92,9 @@ message Properties { repeated FloatProperty float_property = 6; repeated ShortProperty short_property = 7; repeated ByteProperty byte_property = 8; + + optional int64 timestamp = 10; + optional string type = 11; } // Message @@ -109,30 +112,31 @@ message Message { optional bytes property_bytes = 6; - optional string correlation_id = 7; // TODO move this into the 'exchange id'? - optional bool persistent = 8; + optional bool persistent = 7; - optional int64 expiration = 9; + optional int64 expiration = 8; + optional string correlation_id = 9; -// TODO no byte? - optional int32 priority = 10; - optional Destination reply_to = 11; - optional int64 timestamp = 12; - optional string type = 13; + optional Destination reply_to = 10; - optional LocalTransactionId local_transaction_id = 14; - optional XATransactionId xa_transaction_id = 15; + optional LocalTransactionId local_transaction_id = 11; + optional XATransactionId xa_transaction_id = 12; // TODO why DataStructure and Content? // TODO targetConsumerId? + // TODO should we move more stuff into the Properties header? + optional int32 redelivery_counter = 13; + repeated string broker_path = 14; + repeated string cluster_id = 15; + optional bool compressed = 16; - optional int32 redelivery_counter = 17; - repeated string broker_path = 18; - repeated string cluster_id = 19; + + // TODO no byte? + optional int32 priority = 17; optional string user_id = 20; @@ -145,6 +149,9 @@ message Message { optional bool receivedByDFBridge = 29; optional int32 group_sequence = 40; - - } + +// TODO things to ponder +// should we move more message fields +// that are set by the sender (and rarely required by the broker +// into the Properties object? diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java index 17a5427cfb..56cea00dea 100644 --- a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/OpenWirePerformanceTest.java @@ -43,8 +43,8 @@ public class OpenWirePerformanceTest extends TestSupport { message.setDestination(destination); message.setPersistent(true); - message.setType("type:" + i); message.setCorrelationId("ABCD"); + //message.setType("type:" + i); if (useProducerId) { message.setProducerId(producerId); @@ -65,7 +65,6 @@ public class OpenWirePerformanceTest extends TestSupport { openWireFormat.marshal(message, ds); watch.stop(); } - out.flush(); out.close(); // now lets try read them! diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java index 921451f479..945b66097b 100644 --- a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/Performance2Test.java @@ -17,6 +17,9 @@ */ package org.apache.activemq.protocolbuffer; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.CodedInputStream; + import java.io.*; /** @@ -29,6 +32,7 @@ public class Performance2Test extends TestSupport { public void testPerformance() throws Exception { OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName)); + CodedOutputStream cout = CodedOutputStream.newInstance(out); StopWatch watch = createStopWatch("writer"); for (int i = 0; i < messageCount; i++) { @@ -36,7 +40,6 @@ public class Performance2Test extends TestSupport { OpenWire.Message.Builder builder = OpenWire.Message.newBuilder() .setDestination(destination) .setPersistent(true) - .setType("type:" + i) .setCorrelationId("ABCD"); if (useProducerId) { @@ -49,26 +52,29 @@ public class Performance2Test extends TestSupport { if (verbose) { System.out.println("Writing message: " + i + " = " + message); } - byte[] bytes = message.toByteArray(); - int size = bytes.length; - out.write(size); - //System.out.println("writing bytes: " + size); - out.write(bytes); + int size = message.getSerializedSize(); + cout.writeRawVarint32(size); + message.writeTo(cout); + watch.stop(); } - out.flush(); + cout.flush(); out.close(); // now lets try read them! StopWatch watch2 = createStopWatch("reader"); InputStream in = new BufferedInputStream(new FileInputStream(fileName)); + CodedInputStream cin = CodedInputStream.newInstance(in); + for (int i = 0; i < messageCount; i++) { watch2.start(); - int size = in.read(); - byte[] data = new byte[size]; - in.read(data); - OpenWire.Message message = OpenWire.Message.parseFrom(data); + int size = cin.readRawVarint32(); + int previous = cin.pushLimit(size); + //cin.setSizeLimit(size + 4); + OpenWire.Message message = OpenWire.Message.parseFrom(cin); + cin.popLimit(previous); + if (verbose) { System.out.println("Reading message: " + i + " = " + message); } diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/PerformanceTest.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/PerformanceTest.java index 554b669dc0..0fe8acfe63 100644 --- a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/PerformanceTest.java +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/PerformanceTest.java @@ -43,10 +43,12 @@ public class PerformanceTest extends TestCase { .setPersistent(true) .setProducerId(1234) .setProducerCounter(i) - .setType("type:" + i) .build(); + //.setType("type:" + i) System.out.println("Writing message: " + i + " = " + message); + int size = message.getSerializedSize(); + cout.writeRawVarint32(size); message.writeTo(cout); cout.flush(); } @@ -56,7 +58,10 @@ public class PerformanceTest extends TestCase { FileInputStream in = new FileInputStream(fileName); CodedInputStream cin = CodedInputStream.newInstance(in); for (int i = 0; i < messageCount; i++) { + int size = cin.readRawVarint32(); + int previous = cin.pushLimit(size); OpenWire.Message message = OpenWire.Message.parseFrom(cin); + cin.popLimit(previous); System.out.println("Reading message: " + i + " = " + message); } in.close(); diff --git a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java index ea89547163..50ef492b1b 100644 --- a/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java +++ b/activemq-protocol-buffer/src/test/java/org/apache/activemq/protocolbuffer/TestSupport.java @@ -23,7 +23,7 @@ import junit.framework.TestCase; * @version $Revision: 1.1 $ */ public class TestSupport extends TestCase { - protected int messageCount = 10000000; + protected int messageCount = 1000000; protected boolean verbose = false; protected boolean useProducerId = false; }