optimised the use of Protocol Buffer to avoid creating unnecessary byte[] for each message which seems to make things much faster

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@675094 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2008-07-09 04:38:19 +00:00
parent 983f690889
commit c3a0b2edfc
7 changed files with 297 additions and 275 deletions

View File

@ -70,8 +70,13 @@
<forkMode>pertest</forkMode> <forkMode>pertest</forkMode>
<childDelegation>false</childDelegation> <childDelegation>false</childDelegation>
<useFile>true</useFile> <useFile>true</useFile>
<includes>
<include>**/*Test.*</include>
</includes>
<excludes> <excludes>
<exclude>**/PerformanceTest.java</exclude> <!--
<exclude>**/.java</exclude>
-->
</excludes> </excludes>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -35,7 +35,7 @@ public final class OpenWire {
"\030\002 \002(\005\"+\n\014LongProperty\022\014\n\004name\030\001 \002(\t\022\r\n\005" + "\030\002 \002(\005\"+\n\014LongProperty\022\014\n\004name\030\001 \002(\t\022\r\n\005" +
"value\030\002 \002(\005\",\n\rFloatProperty\022\014\n\004name\030\001 \002" + "value\030\002 \002(\005\",\n\rFloatProperty\022\014\n\004name\030\001 \002" +
"(\t\022\r\n\005value\030\002 \002(\002\"-\n\016DoubleProperty\022\014\n\004n" + "(\t\022\r\n\005value\030\002 \002(\002\"-\n\016DoubleProperty\022\014\n\004n" +
"ame\030\001 \002(\t\022\r\n\005value\030\002 \002(\001\"\336\004\n\nProperties\022" + "ame\030\001 \002(\t\022\r\n\005value\030\002 \002(\001\"\377\004\n\nProperties\022" +
"K\n\017string_property\030\001 \003(\01322.org.apache.ac" + "K\n\017string_property\030\001 \003(\01322.org.apache.ac" +
"tivemq.protocolbuffer.StringProperty\022E\n\014" + "tivemq.protocolbuffer.StringProperty\022E\n\014" +
"int_property\030\002 \003(\0132/.org.apache.activemq" + "int_property\030\002 \003(\0132/.org.apache.activemq" +
@ -50,24 +50,24 @@ public final class OpenWire {
"perty\022I\n\016short_property\030\007 \003(\01321.org.apac" + "perty\022I\n\016short_property\030\007 \003(\01321.org.apac" +
"he.activemq.protocolbuffer.ShortProperty" + "he.activemq.protocolbuffer.ShortProperty" +
"\022G\n\rbyte_property\030\010 \003(\01320.org.apache.act" + "\022G\n\rbyte_property\030\010 \003(\01320.org.apache.act" +
"ivemq.protocolbuffer.ByteProperty\"\306\006\n\007Me" + "ivemq.protocolbuffer.ByteProperty\022\021\n\ttim" +
"ssage\022\023\n\013producer_id\030\001 \001(\005\022\030\n\020producer_c" + "estamp\030\n \001(\003\022\014\n\004type\030\013 \001(\t\"\245\006\n\007Message\022\023" +
"ounter\030\002 \001(\005\022D\n\013destination\030\003 \002(\0132/.org." + "\n\013producer_id\030\001 \001(\005\022\030\n\020producer_counter\030" +
"apache.activemq.protocolbuffer.Destinati" + "\002 \001(\005\022D\n\013destination\030\003 \002(\0132/.org.apache." +
"on\022M\n\024original_destination\030\004 \001(\0132/.org.a" + "activemq.protocolbuffer.Destination\022M\n\024o" +
"pache.activemq.protocolbuffer.Destinatio" + "riginal_destination\030\004 \001(\0132/.org.apache.a" +
"n\022\020\n\010group_id\030\005 \001(\t\022\026\n\016property_bytes\030\006 " + "ctivemq.protocolbuffer.Destination\022\020\n\010gr" +
"\001(\014\022\026\n\016correlation_id\030\007 \001(\t\022\022\n\npersisten" + "oup_id\030\005 \001(\t\022\026\n\016property_bytes\030\006 \001(\014\022\022\n\n" +
"t\030\010 \001(\010\022\022\n\nexpiration\030\t \001(\003\022\020\n\010priority\030" + "persistent\030\007 \001(\010\022\022\n\nexpiration\030\010 \001(\003\022\026\n\016" +
"\n \001(\005\022A\n\010reply_to\030\013 \001(\0132/.org.apache.act" + "correlation_id\030\t \001(\t\022A\n\010reply_to\030\n \001(\0132/" +
"ivemq.protocolbuffer.Destination\022\021\n\ttime" + ".org.apache.activemq.protocolbuffer.Dest" +
"stamp\030\014 \001(\003\022\014\n\004type\030\r \001(\t\022T\n\024local_trans" + "ination\022T\n\024local_transaction_id\030\013 \001(\01326." +
"action_id\030\016 \001(\01326.org.apache.activemq.pr" + "org.apache.activemq.protocolbuffer.Local" +
"otocolbuffer.LocalTransactionId\022N\n\021xa_tr" + "TransactionId\022N\n\021xa_transaction_id\030\014 \001(\013" +
"ansaction_id\030\017 \001(\01323.org.apache.activemq" + "23.org.apache.activemq.protocolbuffer.XA" +
".protocolbuffer.XATransactionId\022\022\n\ncompr" + "TransactionId\022\032\n\022redelivery_counter\030\r \001(" +
"essed\030\020 \001(\010\022\032\n\022redelivery_counter\030\021 \001(\005\022" + "\005\022\023\n\013broker_path\030\016 \003(\t\022\022\n\ncluster_id\030\017 \003" +
"\023\n\013broker_path\030\022 \003(\t\022\022\n\ncluster_id\030\023 \003(\t" + "(\t\022\022\n\ncompressed\030\020 \001(\010\022\020\n\010priority\030\021 \001(\005" +
"\022\017\n\007user_id\030\024 \001(\t\022\017\n\007arrival\030\026 \001(\003\022\026\n\016br" + "\022\017\n\007user_id\030\024 \001(\t\022\017\n\007arrival\030\026 \001(\003\022\026\n\016br" +
"oker_in_time\030\027 \001(\003\022\027\n\017broker_out_time\030\030 " + "oker_in_time\030\027 \001(\003\022\027\n\017broker_out_time\030\030 " +
"\001(\003\022\021\n\tdroppable\030\034 \001(\010\022\032\n\022receivedByDFBr" + "\001(\003\022\021\n\tdroppable\030\034 \001(\010\022\032\n\022receivedByDFBr" +
@ -3697,6 +3697,18 @@ public final class OpenWire {
return byteProperty_.get(index); return byteProperty_.get(index);
} }
// optional int64 timestamp = 10;
private boolean hasTimestamp;
private long timestamp_ = 0L;
public boolean hasTimestamp() { return hasTimestamp; }
public long getTimestamp() { return timestamp_; }
// optional string type = 11;
private boolean hasType;
private java.lang.String type_ = "";
public boolean hasType() { return hasType; }
public java.lang.String getType() { return type_; }
public final boolean isInitialized() { public final boolean isInitialized() {
for (org.apache.activemq.protocolbuffer.OpenWire.StringProperty element : getStringPropertyList()) { for (org.apache.activemq.protocolbuffer.OpenWire.StringProperty element : getStringPropertyList()) {
if (!element.isInitialized()) return false; if (!element.isInitialized()) return false;
@ -3751,6 +3763,12 @@ public final class OpenWire {
for (org.apache.activemq.protocolbuffer.OpenWire.ByteProperty element : getBytePropertyList()) { for (org.apache.activemq.protocolbuffer.OpenWire.ByteProperty element : getBytePropertyList()) {
output.writeMessage(8, element); output.writeMessage(8, element);
} }
if (hasTimestamp()) {
output.writeInt64(10, getTimestamp());
}
if (hasType()) {
output.writeString(11, getType());
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -3792,6 +3810,14 @@ public final class OpenWire {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(8, element); .computeMessageSize(8, element);
} }
if (hasTimestamp()) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(10, getTimestamp());
}
if (hasType()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(11, getType());
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -3993,6 +4019,12 @@ public final class OpenWire {
} }
result.byteProperty_.addAll(other.byteProperty_); result.byteProperty_.addAll(other.byteProperty_);
} }
if (other.hasTimestamp()) {
setTimestamp(other.getTimestamp());
}
if (other.hasType()) {
setType(other.getType());
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -4073,6 +4105,14 @@ public final class OpenWire {
addByteProperty(subBuilder.buildPartial()); addByteProperty(subBuilder.buildPartial());
break; break;
} }
case 80: {
setTimestamp(input.readInt64());
break;
}
case 90: {
setType(input.readString());
break;
}
} }
} }
} }
@ -4437,6 +4477,42 @@ public final class OpenWire {
result.byteProperty_ = java.util.Collections.emptyList(); result.byteProperty_ = java.util.Collections.emptyList();
return this; return this;
} }
// optional int64 timestamp = 10;
public boolean hasTimestamp() {
return result.hasTimestamp();
}
public long getTimestamp() {
return result.getTimestamp();
}
public Builder setTimestamp(long value) {
result.hasTimestamp = true;
result.timestamp_ = value;
return this;
}
public Builder clearTimestamp() {
result.hasTimestamp = false;
result.timestamp_ = 0L;
return this;
}
// optional string type = 11;
public boolean hasType() {
return result.hasType();
}
public java.lang.String getType() {
return result.getType();
}
public Builder setType(java.lang.String value) {
result.hasType = true;
result.type_ = value;
return this;
}
public Builder clearType() {
result.hasType = false;
result.type_ = "";
return this;
}
} }
} }
@ -4500,73 +4576,49 @@ public final class OpenWire {
public boolean hasPropertyBytes() { return hasPropertyBytes; } public boolean hasPropertyBytes() { return hasPropertyBytes; }
public com.google.protobuf.ByteString getPropertyBytes() { return propertyBytes_; } public com.google.protobuf.ByteString getPropertyBytes() { return propertyBytes_; }
// optional string correlation_id = 7; // optional bool persistent = 7;
private boolean hasCorrelationId;
private java.lang.String correlationId_ = "";
public boolean hasCorrelationId() { return hasCorrelationId; }
public java.lang.String getCorrelationId() { return correlationId_; }
// optional bool persistent = 8;
private boolean hasPersistent; private boolean hasPersistent;
private boolean persistent_ = false; private boolean persistent_ = false;
public boolean hasPersistent() { return hasPersistent; } public boolean hasPersistent() { return hasPersistent; }
public boolean getPersistent() { return persistent_; } public boolean getPersistent() { return persistent_; }
// optional int64 expiration = 9; // optional int64 expiration = 8;
private boolean hasExpiration; private boolean hasExpiration;
private long expiration_ = 0L; private long expiration_ = 0L;
public boolean hasExpiration() { return hasExpiration; } public boolean hasExpiration() { return hasExpiration; }
public long getExpiration() { return expiration_; } public long getExpiration() { return expiration_; }
// optional int32 priority = 10; // optional string correlation_id = 9;
private boolean hasPriority; private boolean hasCorrelationId;
private int priority_ = 0; private java.lang.String correlationId_ = "";
public boolean hasPriority() { return hasPriority; } public boolean hasCorrelationId() { return hasCorrelationId; }
public int getPriority() { return priority_; } public java.lang.String getCorrelationId() { return correlationId_; }
// optional .org.apache.activemq.protocolbuffer.Destination reply_to = 11; // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 10;
private boolean hasReplyTo; private boolean hasReplyTo;
private org.apache.activemq.protocolbuffer.OpenWire.Destination replyTo_ = org.apache.activemq.protocolbuffer.OpenWire.Destination.getDefaultInstance(); private org.apache.activemq.protocolbuffer.OpenWire.Destination replyTo_ = org.apache.activemq.protocolbuffer.OpenWire.Destination.getDefaultInstance();
public boolean hasReplyTo() { return hasReplyTo; } public boolean hasReplyTo() { return hasReplyTo; }
public org.apache.activemq.protocolbuffer.OpenWire.Destination getReplyTo() { return replyTo_; } public org.apache.activemq.protocolbuffer.OpenWire.Destination getReplyTo() { return replyTo_; }
// optional int64 timestamp = 12; // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 11;
private boolean hasTimestamp;
private long timestamp_ = 0L;
public boolean hasTimestamp() { return hasTimestamp; }
public long getTimestamp() { return timestamp_; }
// optional string type = 13;
private boolean hasType;
private java.lang.String type_ = "";
public boolean hasType() { return hasType; }
public java.lang.String getType() { return type_; }
// optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 14;
private boolean hasLocalTransactionId; private boolean hasLocalTransactionId;
private org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId localTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.getDefaultInstance(); private org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId localTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.getDefaultInstance();
public boolean hasLocalTransactionId() { return hasLocalTransactionId; } public boolean hasLocalTransactionId() { return hasLocalTransactionId; }
public org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId getLocalTransactionId() { return localTransactionId_; } public org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId getLocalTransactionId() { return localTransactionId_; }
// optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 15; // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 12;
private boolean hasXaTransactionId; private boolean hasXaTransactionId;
private org.apache.activemq.protocolbuffer.OpenWire.XATransactionId xaTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.getDefaultInstance(); private org.apache.activemq.protocolbuffer.OpenWire.XATransactionId xaTransactionId_ = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.getDefaultInstance();
public boolean hasXaTransactionId() { return hasXaTransactionId; } public boolean hasXaTransactionId() { return hasXaTransactionId; }
public org.apache.activemq.protocolbuffer.OpenWire.XATransactionId getXaTransactionId() { return xaTransactionId_; } public org.apache.activemq.protocolbuffer.OpenWire.XATransactionId getXaTransactionId() { return xaTransactionId_; }
// optional bool compressed = 16; // optional int32 redelivery_counter = 13;
private boolean hasCompressed;
private boolean compressed_ = false;
public boolean hasCompressed() { return hasCompressed; }
public boolean getCompressed() { return compressed_; }
// optional int32 redelivery_counter = 17;
private boolean hasRedeliveryCounter; private boolean hasRedeliveryCounter;
private int redeliveryCounter_ = 0; private int redeliveryCounter_ = 0;
public boolean hasRedeliveryCounter() { return hasRedeliveryCounter; } public boolean hasRedeliveryCounter() { return hasRedeliveryCounter; }
public int getRedeliveryCounter() { return redeliveryCounter_; } public int getRedeliveryCounter() { return redeliveryCounter_; }
// repeated string broker_path = 18; // repeated string broker_path = 14;
private java.util.List<java.lang.String> brokerPath_ = private java.util.List<java.lang.String> brokerPath_ =
java.util.Collections.emptyList(); java.util.Collections.emptyList();
public java.util.List<java.lang.String> getBrokerPathList() { public java.util.List<java.lang.String> getBrokerPathList() {
@ -4577,7 +4629,7 @@ public final class OpenWire {
return brokerPath_.get(index); return brokerPath_.get(index);
} }
// repeated string cluster_id = 19; // repeated string cluster_id = 15;
private java.util.List<java.lang.String> clusterId_ = private java.util.List<java.lang.String> clusterId_ =
java.util.Collections.emptyList(); java.util.Collections.emptyList();
public java.util.List<java.lang.String> getClusterIdList() { public java.util.List<java.lang.String> getClusterIdList() {
@ -4588,6 +4640,18 @@ public final class OpenWire {
return clusterId_.get(index); return clusterId_.get(index);
} }
// optional bool compressed = 16;
private boolean hasCompressed;
private boolean compressed_ = false;
public boolean hasCompressed() { return hasCompressed; }
public boolean getCompressed() { return compressed_; }
// optional int32 priority = 17;
private boolean hasPriority;
private int priority_ = 0;
public boolean hasPriority() { return hasPriority; }
public int getPriority() { return priority_; }
// optional string user_id = 20; // optional string user_id = 20;
private boolean hasUserId; private boolean hasUserId;
private java.lang.String userId_ = ""; private java.lang.String userId_ = "";
@ -4668,44 +4732,38 @@ public final class OpenWire {
if (hasPropertyBytes()) { if (hasPropertyBytes()) {
output.writeBytes(6, getPropertyBytes()); output.writeBytes(6, getPropertyBytes());
} }
if (hasCorrelationId()) {
output.writeString(7, getCorrelationId());
}
if (hasPersistent()) { if (hasPersistent()) {
output.writeBool(8, getPersistent()); output.writeBool(7, getPersistent());
} }
if (hasExpiration()) { if (hasExpiration()) {
output.writeInt64(9, getExpiration()); output.writeInt64(8, getExpiration());
} }
if (hasPriority()) { if (hasCorrelationId()) {
output.writeInt32(10, getPriority()); output.writeString(9, getCorrelationId());
} }
if (hasReplyTo()) { if (hasReplyTo()) {
output.writeMessage(11, getReplyTo()); output.writeMessage(10, getReplyTo());
}
if (hasTimestamp()) {
output.writeInt64(12, getTimestamp());
}
if (hasType()) {
output.writeString(13, getType());
} }
if (hasLocalTransactionId()) { if (hasLocalTransactionId()) {
output.writeMessage(14, getLocalTransactionId()); output.writeMessage(11, getLocalTransactionId());
} }
if (hasXaTransactionId()) { if (hasXaTransactionId()) {
output.writeMessage(15, getXaTransactionId()); output.writeMessage(12, getXaTransactionId());
}
if (hasRedeliveryCounter()) {
output.writeInt32(13, getRedeliveryCounter());
}
for (java.lang.String element : getBrokerPathList()) {
output.writeString(14, element);
}
for (java.lang.String element : getClusterIdList()) {
output.writeString(15, element);
} }
if (hasCompressed()) { if (hasCompressed()) {
output.writeBool(16, getCompressed()); output.writeBool(16, getCompressed());
} }
if (hasRedeliveryCounter()) { if (hasPriority()) {
output.writeInt32(17, getRedeliveryCounter()); output.writeInt32(17, getPriority());
}
for (java.lang.String element : getBrokerPathList()) {
output.writeString(18, element);
}
for (java.lang.String element : getClusterIdList()) {
output.writeString(19, element);
} }
if (hasUserId()) { if (hasUserId()) {
output.writeString(20, getUserId()); output.writeString(20, getUserId());
@ -4761,57 +4819,49 @@ public final class OpenWire {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getPropertyBytes()); .computeBytesSize(6, getPropertyBytes());
} }
if (hasCorrelationId()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(7, getCorrelationId());
}
if (hasPersistent()) { if (hasPersistent()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(8, getPersistent()); .computeBoolSize(7, getPersistent());
} }
if (hasExpiration()) { if (hasExpiration()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeInt64Size(9, getExpiration()); .computeInt64Size(8, getExpiration());
} }
if (hasPriority()) { if (hasCorrelationId()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeInt32Size(10, getPriority()); .computeStringSize(9, getCorrelationId());
} }
if (hasReplyTo()) { if (hasReplyTo()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(11, getReplyTo()); .computeMessageSize(10, getReplyTo());
}
if (hasTimestamp()) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(12, getTimestamp());
}
if (hasType()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(13, getType());
} }
if (hasLocalTransactionId()) { if (hasLocalTransactionId()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(14, getLocalTransactionId()); .computeMessageSize(11, getLocalTransactionId());
} }
if (hasXaTransactionId()) { if (hasXaTransactionId()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(15, getXaTransactionId()); .computeMessageSize(12, getXaTransactionId());
}
if (hasRedeliveryCounter()) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(13, getRedeliveryCounter());
}
for (java.lang.String element : getBrokerPathList()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(14, element);
}
for (java.lang.String element : getClusterIdList()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(15, element);
} }
if (hasCompressed()) { if (hasCompressed()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeBoolSize(16, getCompressed()); .computeBoolSize(16, getCompressed());
} }
if (hasRedeliveryCounter()) { if (hasPriority()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeInt32Size(17, getRedeliveryCounter()); .computeInt32Size(17, getPriority());
}
for (java.lang.String element : getBrokerPathList()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(18, element);
}
for (java.lang.String element : getClusterIdList()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(19, element);
} }
if (hasUserId()) { if (hasUserId()) {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
@ -4988,36 +5038,24 @@ public final class OpenWire {
if (other.hasPropertyBytes()) { if (other.hasPropertyBytes()) {
setPropertyBytes(other.getPropertyBytes()); setPropertyBytes(other.getPropertyBytes());
} }
if (other.hasCorrelationId()) {
setCorrelationId(other.getCorrelationId());
}
if (other.hasPersistent()) { if (other.hasPersistent()) {
setPersistent(other.getPersistent()); setPersistent(other.getPersistent());
} }
if (other.hasExpiration()) { if (other.hasExpiration()) {
setExpiration(other.getExpiration()); setExpiration(other.getExpiration());
} }
if (other.hasPriority()) { if (other.hasCorrelationId()) {
setPriority(other.getPriority()); setCorrelationId(other.getCorrelationId());
} }
if (other.hasReplyTo()) { if (other.hasReplyTo()) {
mergeReplyTo(other.getReplyTo()); mergeReplyTo(other.getReplyTo());
} }
if (other.hasTimestamp()) {
setTimestamp(other.getTimestamp());
}
if (other.hasType()) {
setType(other.getType());
}
if (other.hasLocalTransactionId()) { if (other.hasLocalTransactionId()) {
mergeLocalTransactionId(other.getLocalTransactionId()); mergeLocalTransactionId(other.getLocalTransactionId());
} }
if (other.hasXaTransactionId()) { if (other.hasXaTransactionId()) {
mergeXaTransactionId(other.getXaTransactionId()); mergeXaTransactionId(other.getXaTransactionId());
} }
if (other.hasCompressed()) {
setCompressed(other.getCompressed());
}
if (other.hasRedeliveryCounter()) { if (other.hasRedeliveryCounter()) {
setRedeliveryCounter(other.getRedeliveryCounter()); setRedeliveryCounter(other.getRedeliveryCounter());
} }
@ -5033,6 +5071,12 @@ public final class OpenWire {
} }
result.clusterId_.addAll(other.clusterId_); result.clusterId_.addAll(other.clusterId_);
} }
if (other.hasCompressed()) {
setCompressed(other.getCompressed());
}
if (other.hasPriority()) {
setPriority(other.getPriority());
}
if (other.hasUserId()) { if (other.hasUserId()) {
setUserId(other.getUserId()); setUserId(other.getUserId());
} }
@ -5120,23 +5164,19 @@ public final class OpenWire {
setPropertyBytes(input.readBytes()); setPropertyBytes(input.readBytes());
break; break;
} }
case 58: { case 56: {
setCorrelationId(input.readString());
break;
}
case 64: {
setPersistent(input.readBool()); setPersistent(input.readBool());
break; break;
} }
case 72: { case 64: {
setExpiration(input.readInt64()); setExpiration(input.readInt64());
break; break;
} }
case 80: { case 74: {
setPriority(input.readInt32()); setCorrelationId(input.readString());
break; break;
} }
case 90: { case 82: {
org.apache.activemq.protocolbuffer.OpenWire.Destination.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.Destination.newBuilder(); org.apache.activemq.protocolbuffer.OpenWire.Destination.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.Destination.newBuilder();
if (hasReplyTo()) { if (hasReplyTo()) {
subBuilder.mergeFrom(getReplyTo()); subBuilder.mergeFrom(getReplyTo());
@ -5145,15 +5185,7 @@ public final class OpenWire {
setReplyTo(subBuilder.buildPartial()); setReplyTo(subBuilder.buildPartial());
break; break;
} }
case 96: { case 90: {
setTimestamp(input.readInt64());
break;
}
case 106: {
setType(input.readString());
break;
}
case 114: {
org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.newBuilder(); org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.LocalTransactionId.newBuilder();
if (hasLocalTransactionId()) { if (hasLocalTransactionId()) {
subBuilder.mergeFrom(getLocalTransactionId()); subBuilder.mergeFrom(getLocalTransactionId());
@ -5162,7 +5194,7 @@ public final class OpenWire {
setLocalTransactionId(subBuilder.buildPartial()); setLocalTransactionId(subBuilder.buildPartial());
break; break;
} }
case 122: { case 98: {
org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.newBuilder(); org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.Builder subBuilder = org.apache.activemq.protocolbuffer.OpenWire.XATransactionId.newBuilder();
if (hasXaTransactionId()) { if (hasXaTransactionId()) {
subBuilder.mergeFrom(getXaTransactionId()); subBuilder.mergeFrom(getXaTransactionId());
@ -5171,20 +5203,24 @@ public final class OpenWire {
setXaTransactionId(subBuilder.buildPartial()); setXaTransactionId(subBuilder.buildPartial());
break; break;
} }
case 104: {
setRedeliveryCounter(input.readInt32());
break;
}
case 114: {
addBrokerPath(input.readString());
break;
}
case 122: {
addClusterId(input.readString());
break;
}
case 128: { case 128: {
setCompressed(input.readBool()); setCompressed(input.readBool());
break; break;
} }
case 136: { case 136: {
setRedeliveryCounter(input.readInt32()); setPriority(input.readInt32());
break;
}
case 146: {
addBrokerPath(input.readString());
break;
}
case 154: {
addClusterId(input.readString());
break; break;
} }
case 162: { case 162: {
@ -5360,25 +5396,7 @@ public final class OpenWire {
return this; return this;
} }
// optional string correlation_id = 7; // optional bool persistent = 7;
public boolean hasCorrelationId() {
return result.hasCorrelationId();
}
public java.lang.String getCorrelationId() {
return result.getCorrelationId();
}
public Builder setCorrelationId(java.lang.String value) {
result.hasCorrelationId = true;
result.correlationId_ = value;
return this;
}
public Builder clearCorrelationId() {
result.hasCorrelationId = false;
result.correlationId_ = "";
return this;
}
// optional bool persistent = 8;
public boolean hasPersistent() { public boolean hasPersistent() {
return result.hasPersistent(); return result.hasPersistent();
} }
@ -5396,7 +5414,7 @@ public final class OpenWire {
return this; return this;
} }
// optional int64 expiration = 9; // optional int64 expiration = 8;
public boolean hasExpiration() { public boolean hasExpiration() {
return result.hasExpiration(); return result.hasExpiration();
} }
@ -5414,25 +5432,25 @@ public final class OpenWire {
return this; return this;
} }
// optional int32 priority = 10; // optional string correlation_id = 9;
public boolean hasPriority() { public boolean hasCorrelationId() {
return result.hasPriority(); return result.hasCorrelationId();
} }
public int getPriority() { public java.lang.String getCorrelationId() {
return result.getPriority(); return result.getCorrelationId();
} }
public Builder setPriority(int value) { public Builder setCorrelationId(java.lang.String value) {
result.hasPriority = true; result.hasCorrelationId = true;
result.priority_ = value; result.correlationId_ = value;
return this; return this;
} }
public Builder clearPriority() { public Builder clearCorrelationId() {
result.hasPriority = false; result.hasCorrelationId = false;
result.priority_ = 0; result.correlationId_ = "";
return this; return this;
} }
// optional .org.apache.activemq.protocolbuffer.Destination reply_to = 11; // optional .org.apache.activemq.protocolbuffer.Destination reply_to = 10;
public boolean hasReplyTo() { public boolean hasReplyTo() {
return result.hasReplyTo(); return result.hasReplyTo();
} }
@ -5466,43 +5484,7 @@ public final class OpenWire {
return this; return this;
} }
// optional int64 timestamp = 12; // optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 11;
public boolean hasTimestamp() {
return result.hasTimestamp();
}
public long getTimestamp() {
return result.getTimestamp();
}
public Builder setTimestamp(long value) {
result.hasTimestamp = true;
result.timestamp_ = value;
return this;
}
public Builder clearTimestamp() {
result.hasTimestamp = false;
result.timestamp_ = 0L;
return this;
}
// optional string type = 13;
public boolean hasType() {
return result.hasType();
}
public java.lang.String getType() {
return result.getType();
}
public Builder setType(java.lang.String value) {
result.hasType = true;
result.type_ = value;
return this;
}
public Builder clearType() {
result.hasType = false;
result.type_ = "";
return this;
}
// optional .org.apache.activemq.protocolbuffer.LocalTransactionId local_transaction_id = 14;
public boolean hasLocalTransactionId() { public boolean hasLocalTransactionId() {
return result.hasLocalTransactionId(); return result.hasLocalTransactionId();
} }
@ -5536,7 +5518,7 @@ public final class OpenWire {
return this; return this;
} }
// optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 15; // optional .org.apache.activemq.protocolbuffer.XATransactionId xa_transaction_id = 12;
public boolean hasXaTransactionId() { public boolean hasXaTransactionId() {
return result.hasXaTransactionId(); return result.hasXaTransactionId();
} }
@ -5570,25 +5552,7 @@ public final class OpenWire {
return this; return this;
} }
// optional bool compressed = 16; // optional int32 redelivery_counter = 13;
public boolean hasCompressed() {
return result.hasCompressed();
}
public boolean getCompressed() {
return result.getCompressed();
}
public Builder setCompressed(boolean value) {
result.hasCompressed = true;
result.compressed_ = value;
return this;
}
public Builder clearCompressed() {
result.hasCompressed = false;
result.compressed_ = false;
return this;
}
// optional int32 redelivery_counter = 17;
public boolean hasRedeliveryCounter() { public boolean hasRedeliveryCounter() {
return result.hasRedeliveryCounter(); return result.hasRedeliveryCounter();
} }
@ -5606,7 +5570,7 @@ public final class OpenWire {
return this; return this;
} }
// repeated string broker_path = 18; // repeated string broker_path = 14;
public java.util.List<java.lang.String> getBrokerPathList() { public java.util.List<java.lang.String> getBrokerPathList() {
return java.util.Collections.unmodifiableList(result.brokerPath_); return java.util.Collections.unmodifiableList(result.brokerPath_);
} }
@ -5640,7 +5604,7 @@ public final class OpenWire {
return this; return this;
} }
// repeated string cluster_id = 19; // repeated string cluster_id = 15;
public java.util.List<java.lang.String> getClusterIdList() { public java.util.List<java.lang.String> getClusterIdList() {
return java.util.Collections.unmodifiableList(result.clusterId_); return java.util.Collections.unmodifiableList(result.clusterId_);
} }
@ -5674,6 +5638,42 @@ public final class OpenWire {
return this; return this;
} }
// optional bool compressed = 16;
public boolean hasCompressed() {
return result.hasCompressed();
}
public boolean getCompressed() {
return result.getCompressed();
}
public Builder setCompressed(boolean value) {
result.hasCompressed = true;
result.compressed_ = value;
return this;
}
public Builder clearCompressed() {
result.hasCompressed = false;
result.compressed_ = false;
return this;
}
// optional int32 priority = 17;
public boolean hasPriority() {
return result.hasPriority();
}
public int getPriority() {
return result.getPriority();
}
public Builder setPriority(int value) {
result.hasPriority = true;
result.priority_ = value;
return this;
}
public Builder clearPriority() {
result.hasPriority = false;
result.priority_ = 0;
return this;
}
// optional string user_id = 20; // optional string user_id = 20;
public boolean hasUserId() { public boolean hasUserId() {
return result.hasUserId(); return result.hasUserId();
@ -5942,7 +5942,7 @@ public final class OpenWire {
internal_static_org_apache_activemq_protocolbuffer_Properties_fieldAccessorTable = new internal_static_org_apache_activemq_protocolbuffer_Properties_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_apache_activemq_protocolbuffer_Properties_descriptor, internal_static_org_apache_activemq_protocolbuffer_Properties_descriptor,
new java.lang.String[] { "StringProperty", "IntProperty", "BoolProperty", "LongProperty", "DoubleProperty", "FloatProperty", "ShortProperty", "ByteProperty", }, new java.lang.String[] { "StringProperty", "IntProperty", "BoolProperty", "LongProperty", "DoubleProperty", "FloatProperty", "ShortProperty", "ByteProperty", "Timestamp", "Type", },
org.apache.activemq.protocolbuffer.OpenWire.Properties.class, org.apache.activemq.protocolbuffer.OpenWire.Properties.class,
org.apache.activemq.protocolbuffer.OpenWire.Properties.Builder.class); org.apache.activemq.protocolbuffer.OpenWire.Properties.Builder.class);
private static final com.google.protobuf.Descriptors.Descriptor private static final com.google.protobuf.Descriptors.Descriptor
@ -5953,7 +5953,7 @@ public final class OpenWire {
internal_static_org_apache_activemq_protocolbuffer_Message_fieldAccessorTable = new internal_static_org_apache_activemq_protocolbuffer_Message_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_apache_activemq_protocolbuffer_Message_descriptor, internal_static_org_apache_activemq_protocolbuffer_Message_descriptor,
new java.lang.String[] { "ProducerId", "ProducerCounter", "Destination", "OriginalDestination", "GroupId", "PropertyBytes", "CorrelationId", "Persistent", "Expiration", "Priority", "ReplyTo", "Timestamp", "Type", "LocalTransactionId", "XaTransactionId", "Compressed", "RedeliveryCounter", "BrokerPath", "ClusterId", "UserId", "Arrival", "BrokerInTime", "BrokerOutTime", "Droppable", "ReceivedByDFBridge", "GroupSequence", }, new java.lang.String[] { "ProducerId", "ProducerCounter", "Destination", "OriginalDestination", "GroupId", "PropertyBytes", "Persistent", "Expiration", "CorrelationId", "ReplyTo", "LocalTransactionId", "XaTransactionId", "RedeliveryCounter", "BrokerPath", "ClusterId", "Compressed", "Priority", "UserId", "Arrival", "BrokerInTime", "BrokerOutTime", "Droppable", "ReceivedByDFBridge", "GroupSequence", },
org.apache.activemq.protocolbuffer.OpenWire.Message.class, org.apache.activemq.protocolbuffer.OpenWire.Message.class,
org.apache.activemq.protocolbuffer.OpenWire.Message.Builder.class); org.apache.activemq.protocolbuffer.OpenWire.Message.Builder.class);
} }

View File

@ -92,6 +92,9 @@ message Properties {
repeated FloatProperty float_property = 6; repeated FloatProperty float_property = 6;
repeated ShortProperty short_property = 7; repeated ShortProperty short_property = 7;
repeated ByteProperty byte_property = 8; repeated ByteProperty byte_property = 8;
optional int64 timestamp = 10;
optional string type = 11;
} }
// Message // Message
@ -109,30 +112,31 @@ message Message {
optional bytes property_bytes = 6; optional bytes property_bytes = 6;
optional string correlation_id = 7;
// TODO move this into the 'exchange id'? // TODO move this into the 'exchange id'?
optional bool persistent = 8; optional bool persistent = 7;
optional int64 expiration = 9; optional int64 expiration = 8;
optional string correlation_id = 9;
// TODO no byte? optional Destination reply_to = 10;
optional int32 priority = 10;
optional Destination reply_to = 11;
optional int64 timestamp = 12;
optional string type = 13;
optional LocalTransactionId local_transaction_id = 14; optional LocalTransactionId local_transaction_id = 11;
optional XATransactionId xa_transaction_id = 15; optional XATransactionId xa_transaction_id = 12;
// TODO why DataStructure and Content? // TODO why DataStructure and Content?
// TODO targetConsumerId? // TODO targetConsumerId?
// TODO should we move more stuff into the Properties header?
optional int32 redelivery_counter = 13;
repeated string broker_path = 14;
repeated string cluster_id = 15;
optional bool compressed = 16; optional bool compressed = 16;
optional int32 redelivery_counter = 17;
repeated string broker_path = 18; // TODO no byte?
repeated string cluster_id = 19; optional int32 priority = 17;
optional string user_id = 20; optional string user_id = 20;
@ -145,6 +149,9 @@ message Message {
optional bool receivedByDFBridge = 29; optional bool receivedByDFBridge = 29;
optional int32 group_sequence = 40; optional int32 group_sequence = 40;
} }
// TODO things to ponder
// should we move more message fields
// that are set by the sender (and rarely required by the broker
// into the Properties object?

View File

@ -43,8 +43,8 @@ public class OpenWirePerformanceTest extends TestSupport {
message.setDestination(destination); message.setDestination(destination);
message.setPersistent(true); message.setPersistent(true);
message.setType("type:" + i);
message.setCorrelationId("ABCD"); message.setCorrelationId("ABCD");
//message.setType("type:" + i);
if (useProducerId) { if (useProducerId) {
message.setProducerId(producerId); message.setProducerId(producerId);
@ -65,7 +65,6 @@ public class OpenWirePerformanceTest extends TestSupport {
openWireFormat.marshal(message, ds); openWireFormat.marshal(message, ds);
watch.stop(); watch.stop();
} }
out.flush();
out.close(); out.close();
// now lets try read them! // now lets try read them!

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.activemq.protocolbuffer; package org.apache.activemq.protocolbuffer;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.CodedInputStream;
import java.io.*; import java.io.*;
/** /**
@ -29,6 +32,7 @@ public class Performance2Test extends TestSupport {
public void testPerformance() throws Exception { public void testPerformance() throws Exception {
OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName)); OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName));
CodedOutputStream cout = CodedOutputStream.newInstance(out);
StopWatch watch = createStopWatch("writer"); StopWatch watch = createStopWatch("writer");
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
@ -36,7 +40,6 @@ public class Performance2Test extends TestSupport {
OpenWire.Message.Builder builder = OpenWire.Message.newBuilder() OpenWire.Message.Builder builder = OpenWire.Message.newBuilder()
.setDestination(destination) .setDestination(destination)
.setPersistent(true) .setPersistent(true)
.setType("type:" + i)
.setCorrelationId("ABCD"); .setCorrelationId("ABCD");
if (useProducerId) { if (useProducerId) {
@ -49,26 +52,29 @@ public class Performance2Test extends TestSupport {
if (verbose) { if (verbose) {
System.out.println("Writing message: " + i + " = " + message); System.out.println("Writing message: " + i + " = " + message);
} }
byte[] bytes = message.toByteArray(); int size = message.getSerializedSize();
int size = bytes.length; cout.writeRawVarint32(size);
out.write(size); message.writeTo(cout);
//System.out.println("writing bytes: " + size);
out.write(bytes);
watch.stop(); watch.stop();
} }
out.flush(); cout.flush();
out.close(); out.close();
// now lets try read them! // now lets try read them!
StopWatch watch2 = createStopWatch("reader"); StopWatch watch2 = createStopWatch("reader");
InputStream in = new BufferedInputStream(new FileInputStream(fileName)); InputStream in = new BufferedInputStream(new FileInputStream(fileName));
CodedInputStream cin = CodedInputStream.newInstance(in);
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
watch2.start(); watch2.start();
int size = in.read(); int size = cin.readRawVarint32();
byte[] data = new byte[size]; int previous = cin.pushLimit(size);
in.read(data); //cin.setSizeLimit(size + 4);
OpenWire.Message message = OpenWire.Message.parseFrom(data); OpenWire.Message message = OpenWire.Message.parseFrom(cin);
cin.popLimit(previous);
if (verbose) { if (verbose) {
System.out.println("Reading message: " + i + " = " + message); System.out.println("Reading message: " + i + " = " + message);
} }

View File

@ -43,10 +43,12 @@ public class PerformanceTest extends TestCase {
.setPersistent(true) .setPersistent(true)
.setProducerId(1234) .setProducerId(1234)
.setProducerCounter(i) .setProducerCounter(i)
.setType("type:" + i)
.build(); .build();
//.setType("type:" + i)
System.out.println("Writing message: " + i + " = " + message); System.out.println("Writing message: " + i + " = " + message);
int size = message.getSerializedSize();
cout.writeRawVarint32(size);
message.writeTo(cout); message.writeTo(cout);
cout.flush(); cout.flush();
} }
@ -56,7 +58,10 @@ public class PerformanceTest extends TestCase {
FileInputStream in = new FileInputStream(fileName); FileInputStream in = new FileInputStream(fileName);
CodedInputStream cin = CodedInputStream.newInstance(in); CodedInputStream cin = CodedInputStream.newInstance(in);
for (int i = 0; i < messageCount; i++) { for (int i = 0; i < messageCount; i++) {
int size = cin.readRawVarint32();
int previous = cin.pushLimit(size);
OpenWire.Message message = OpenWire.Message.parseFrom(cin); OpenWire.Message message = OpenWire.Message.parseFrom(cin);
cin.popLimit(previous);
System.out.println("Reading message: " + i + " = " + message); System.out.println("Reading message: " + i + " = " + message);
} }
in.close(); in.close();

View File

@ -23,7 +23,7 @@ import junit.framework.TestCase;
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
public class TestSupport extends TestCase { public class TestSupport extends TestCase {
protected int messageCount = 10000000; protected int messageCount = 1000000;
protected boolean verbose = false; protected boolean verbose = false;
protected boolean useProducerId = false; protected boolean useProducerId = false;
} }