mirror of https://github.com/apache/activemq.git
removed the caching of the marshalled form of a message.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@518745 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f7a30b80fc
commit
a0e92d713b
|
@ -91,7 +91,6 @@ public class CompositeDestinationBroker extends BrokerFilter {
|
|||
}
|
||||
message.setOriginalDestination(destination);
|
||||
message.setDestination(destinations[i]);
|
||||
message.evictMarshlledForm();
|
||||
next.send(producerExchange, message);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -480,7 +480,6 @@ public class Topic implements Destination {
|
|||
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
|
||||
message.setDestination(advisoryTopic);
|
||||
message.setTransactionId(null);
|
||||
message.evictMarshlledForm();
|
||||
|
||||
// Disable flow control for this since since we don't want to block.
|
||||
boolean originalFlowControl = context.isProducerFlowControl();
|
||||
|
|
|
@ -88,16 +88,10 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
|
|||
}
|
||||
|
||||
public synchronized void addMessageLast(MessageReference node) throws Exception{
|
||||
if(node!=null){
|
||||
node.decrementReferenceCount();
|
||||
}
|
||||
size++;
|
||||
}
|
||||
|
||||
public void addMessageFirst(MessageReference node) throws Exception{
|
||||
if(node!=null){
|
||||
node.decrementReferenceCount();
|
||||
}
|
||||
size++;
|
||||
}
|
||||
|
||||
|
@ -124,6 +118,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
|
|||
|
||||
public synchronized MessageReference next(){
|
||||
Message result = batchList.removeFirst();
|
||||
result.decrementReferenceCount();
|
||||
result.setRegionDestination(regionDestination);
|
||||
return result;
|
||||
}
|
||||
|
@ -137,10 +132,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements
|
|||
|
||||
public void recoverMessage(Message message) throws Exception{
|
||||
message.setRegionDestination(regionDestination);
|
||||
// only increment if count is zero (could have been cached)
|
||||
if(message.getReferenceCount()==0){
|
||||
message.incrementReferenceCount();
|
||||
}
|
||||
batchList.addLast(message);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.activemq.command;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
|
||||
public interface MarshallAware {
|
||||
|
@ -30,6 +29,4 @@ public interface MarshallAware {
|
|||
public void beforeUnmarshall(WireFormat wireFormat) throws IOException;
|
||||
public void afterUnmarshall(WireFormat wireFormat) throws IOException;
|
||||
|
||||
public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data);
|
||||
public ByteSequence getCachedMarshalledForm(WireFormat wireFormat);
|
||||
}
|
||||
|
|
|
@ -79,8 +79,6 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
private transient short referenceCount;
|
||||
private transient ActiveMQConnection connection;
|
||||
private transient org.apache.activemq.broker.region.Destination regionDestination;
|
||||
private transient WireFormat cachedWireFormat;
|
||||
private transient ByteSequence cachedWireFormatData;
|
||||
|
||||
private BrokerId [] brokerPath;
|
||||
protected boolean droppable = false;
|
||||
|
@ -124,8 +122,6 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
copy.arrival = arrival;
|
||||
copy.connection = connection;
|
||||
copy.regionDestination = regionDestination;
|
||||
copy.cachedWireFormat = cachedWireFormat;
|
||||
copy.cachedWireFormatData = cachedWireFormatData;
|
||||
//copying the broker path breaks networks - if a consumer re-uses a consumed
|
||||
//message and forwards it on
|
||||
//copy.brokerPath = brokerPath;
|
||||
|
@ -545,36 +541,6 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
return true;
|
||||
}
|
||||
|
||||
synchronized public ByteSequence getCachedMarshalledForm(WireFormat wireFormat) {
|
||||
if( cachedWireFormat == null || !cachedWireFormat.equals(wireFormat) ) {
|
||||
return null;
|
||||
}
|
||||
return cachedWireFormatData;
|
||||
}
|
||||
|
||||
synchronized public void evictMarshlledForm() {
|
||||
cachedWireFormat = null;
|
||||
cachedWireFormatData = null;
|
||||
}
|
||||
|
||||
synchronized public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
|
||||
cachedWireFormat = wireFormat;
|
||||
cachedWireFormatData = data;
|
||||
|
||||
int sizeChange=0;
|
||||
synchronized (this) {
|
||||
if( referenceCount > 0 ) {
|
||||
sizeChange = getSize();
|
||||
this.size=0;
|
||||
sizeChange -= getSize();
|
||||
}
|
||||
}
|
||||
|
||||
if( sizeChange!=0 && regionDestination!=null )
|
||||
regionDestination.getUsageManager().decreaseUsage(sizeChange);
|
||||
|
||||
}
|
||||
|
||||
public int incrementReferenceCount() {
|
||||
int rc;
|
||||
int size;
|
||||
|
@ -586,6 +552,7 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
if( rc==1 && regionDestination!=null )
|
||||
regionDestination.getUsageManager().increaseUsage(size);
|
||||
|
||||
// System.out.println(" + "+getDestination()+" :::: "+getMessageId()+" "+rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -600,6 +567,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
if( rc==0 && regionDestination!=null )
|
||||
regionDestination.getUsageManager().decreaseUsage(size);
|
||||
|
||||
// System.out.println(" - "+getDestination()+" :::: "+getMessageId()+" "+rc);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
@ -610,10 +579,6 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
size += marshalledProperties.getLength();
|
||||
if( content!=null )
|
||||
size += content.getLength();
|
||||
if( cachedWireFormatData !=null )
|
||||
size += cachedWireFormatData.getLength() + 12;
|
||||
else
|
||||
size *= 2; // Estimate what the cached data will add.
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
|
|
@ -496,7 +496,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
|
|||
if(message.getOriginalTransactionId()==null)
|
||||
message.setOriginalTransactionId(message.getTransactionId());
|
||||
message.setTransactionId(null);
|
||||
message.evictMarshlledForm();
|
||||
return message;
|
||||
}
|
||||
|
||||
|
|
|
@ -223,7 +223,6 @@ public class ForwardingBridge implements Service{
|
|||
if( message.getOriginalTransactionId()==null )
|
||||
message.setOriginalTransactionId(message.getTransactionId());
|
||||
message.setTransactionId(null);
|
||||
message.evictMarshlledForm();
|
||||
|
||||
|
||||
if( !message.isResponseRequired() ) {
|
||||
|
|
|
@ -130,9 +130,9 @@ final public class OpenWireFormat implements WireFormat {
|
|||
}
|
||||
|
||||
ByteSequence sequence=null;
|
||||
if( ma!=null ) {
|
||||
sequence = ma.getCachedMarshalledForm(this);
|
||||
}
|
||||
// if( ma!=null ) {
|
||||
// sequence = ma.getCachedMarshalledForm(this);
|
||||
// }
|
||||
|
||||
if( sequence == null ) {
|
||||
|
||||
|
@ -185,9 +185,9 @@ final public class OpenWireFormat implements WireFormat {
|
|||
sequence = bytesOut.toByteSequence();
|
||||
}
|
||||
|
||||
if( ma!=null ) {
|
||||
ma.setCachedMarshalledForm(this, sequence);
|
||||
}
|
||||
// if( ma!=null ) {
|
||||
// ma.setCachedMarshalledForm(this, sequence);
|
||||
// }
|
||||
}
|
||||
return sequence;
|
||||
}
|
||||
|
@ -204,9 +204,9 @@ final public class OpenWireFormat implements WireFormat {
|
|||
}
|
||||
|
||||
Object command = doUnmarshal(bytesIn);
|
||||
if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
|
||||
((MarshallAware) command).setCachedMarshalledForm(this, sequence);
|
||||
}
|
||||
// if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
|
||||
// ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
|
||||
// }
|
||||
return command;
|
||||
}
|
||||
|
||||
|
@ -367,7 +367,8 @@ final public class OpenWireFormat implements WireFormat {
|
|||
|
||||
if( o.isMarshallAware() ) {
|
||||
MarshallAware ma = (MarshallAware) o;
|
||||
ByteSequence sequence=ma.getCachedMarshalledForm(this);
|
||||
ByteSequence sequence=null;
|
||||
// sequence=ma.getCachedMarshalledForm(this);
|
||||
bs.writeBoolean(sequence!=null);
|
||||
if( sequence!=null ) {
|
||||
return 1 + sequence.getLength();
|
||||
|
@ -390,9 +391,11 @@ final public class OpenWireFormat implements WireFormat {
|
|||
|
||||
if( o.isMarshallAware() && bs.readBoolean() ) {
|
||||
|
||||
MarshallAware ma = (MarshallAware) o;
|
||||
ByteSequence sequence=ma.getCachedMarshalledForm(this);
|
||||
ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
|
||||
// We should not be doing any caching
|
||||
throw new IOException("Corrupted stream");
|
||||
// MarshallAware ma = (MarshallAware) o;
|
||||
// ByteSequence sequence=ma.getCachedMarshalledForm(this);
|
||||
// ds.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
|
||||
|
||||
} else {
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ public class BrokerSupport {
|
|||
message.setOriginalTransactionId(message.getTransactionId());
|
||||
message.setDestination(deadLetterDestination);
|
||||
message.setTransactionId(null);
|
||||
message.evictMarshlledForm();
|
||||
boolean originalFlowControl=context.isProducerFlowControl();
|
||||
try{
|
||||
context.setProducerFlowControl(false);
|
||||
|
|
Loading…
Reference in New Issue