This commit is contained in:
Clebert Suconic 2018-03-20 15:54:33 -04:00
commit d3b11bc439
3 changed files with 30 additions and 16 deletions

View File

@ -51,5 +51,5 @@ public interface LargeBodyEncoder {
/** /**
* This method must not be called directly by ActiveMQ Artemis clients. * This method must not be called directly by ActiveMQ Artemis clients.
*/ */
long getLargeBodySize(); long getLargeBodySize() throws ActiveMQException;
} }

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@ -238,10 +239,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public synchronized int getMemoryEstimate() { public synchronized int getMemoryEstimate() {
if (memoryEstimate == -1) { if (memoryEstimate == -1) {
// The body won't be on memory (aways on-file), so we don't consider this for paging // The body won't be on memory (aways on-file), so we don't consider this for paging
memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1;
getEncodeSize() +
(16 + 4) * 2 +
1;
} }
return memoryEstimate; return memoryEstimate;
@ -345,17 +343,39 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
return file; return file;
} }
private long getBodySize() throws ActiveMQException {
try {
if (bodySize < 0) {
if (file != null) {
bodySize = file.size();
} else {
SequentialFile tmpFile = createFile();
bodySize = tmpFile.size();
tmpFile.close();
}
}
return bodySize;
} catch (Exception e) {
ActiveMQIOErrorException errorException = new ActiveMQIOErrorException();
errorException.initCause(e);
throw errorException;
}
}
@Override @Override
public long getPersistentSize() throws ActiveMQException { public long getPersistentSize() throws ActiveMQException {
long size = super.getPersistentSize(); long size = super.getPersistentSize();
size += getBodyEncoder().getLargeBodySize(); size += getBodySize();
return size; return size;
} }
@Override @Override
public String toString() { public String toString() {
try { try {
return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + (properties != null ? properties.toString() : "") + "]@" + System.identityHashCode(this); return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() + ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) + ", durable=" + durable + ", address=" + getAddress() + ", properties=" + (properties != null ? properties.toString() : "") + "]@" + System.identityHashCode(this);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return "LargeServerMessage[messageID=" + messageID + "]"; return "LargeServerMessage[messageID=" + messageID + "]";
@ -463,15 +483,8 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
* @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize() * @see org.apache.activemq.artemis.core.message.LargeBodyEncoder#getLargeBodySize()
*/ */
@Override @Override
public long getLargeBodySize() { public long getLargeBodySize() throws ActiveMQException {
if (bodySize < 0) { return getBodySize();
try {
bodySize = file.size();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToCalculateFileSize(e);
}
}
return bodySize;
} }
} }
} }

View File

@ -1305,6 +1305,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
if (context != null) { if (context != null) {
context.close(); context.close();
context = null;
} }
largeMessage.releaseResources(); largeMessage.releaseResources();