This closes #338
This commit is contained in:
commit
1562140493
|
@ -84,8 +84,6 @@ public abstract class MessageImpl implements MessageInternal {
|
||||||
|
|
||||||
private int endOfMessagePosition;
|
private int endOfMessagePosition;
|
||||||
|
|
||||||
private boolean copied = true;
|
|
||||||
|
|
||||||
private UUID userID;
|
private UUID userID;
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
// Constructors --------------------------------------------------
|
||||||
|
@ -152,7 +150,6 @@ public abstract class MessageImpl implements MessageInternal {
|
||||||
bufferValid = other.bufferValid;
|
bufferValid = other.bufferValid;
|
||||||
endOfBodyPosition = other.endOfBodyPosition;
|
endOfBodyPosition = other.endOfBodyPosition;
|
||||||
endOfMessagePosition = other.endOfMessagePosition;
|
endOfMessagePosition = other.endOfMessagePosition;
|
||||||
copied = other.copied;
|
|
||||||
|
|
||||||
if (other.buffer != null) {
|
if (other.buffer != null) {
|
||||||
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
|
// We need to copy the underlying buffer too, since the different messsages thereafter might have different
|
||||||
|
@ -438,29 +435,11 @@ public abstract class MessageImpl implements MessageInternal {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bodyChanged() {
|
public void bodyChanged() {
|
||||||
// If the body is changed we must copy the buffer otherwise can affect the previously sent message
|
|
||||||
// which might be in the Netty write queue
|
|
||||||
checkCopy();
|
|
||||||
|
|
||||||
bufferValid = false;
|
bufferValid = false;
|
||||||
|
|
||||||
endOfBodyPosition = -1;
|
endOfBodyPosition = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void checkCopy() {
|
|
||||||
if (!copied) {
|
|
||||||
forceCopy();
|
|
||||||
|
|
||||||
copied = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized void resetCopied() {
|
|
||||||
copied = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getEndOfMessagePosition() {
|
public int getEndOfMessagePosition() {
|
||||||
return endOfMessagePosition;
|
return endOfMessagePosition;
|
||||||
|
|
|
@ -33,12 +33,8 @@ public interface MessageInternal extends Message {
|
||||||
|
|
||||||
int getEndOfBodyPosition();
|
int getEndOfBodyPosition();
|
||||||
|
|
||||||
void checkCopy();
|
|
||||||
|
|
||||||
void bodyChanged();
|
void bodyChanged();
|
||||||
|
|
||||||
void resetCopied();
|
|
||||||
|
|
||||||
boolean isServerMessage();
|
boolean isServerMessage();
|
||||||
|
|
||||||
ActiveMQBuffer getEncodedBuffer();
|
ActiveMQBuffer getEncodedBuffer();
|
||||||
|
|
|
@ -93,8 +93,6 @@ public class SessionSendMessage extends MessagePacket {
|
||||||
// Position reader for reading by Netty
|
// Position reader for reading by Netty
|
||||||
bufferWrite.readerIndex(0);
|
bufferWrite.readerIndex(0);
|
||||||
|
|
||||||
message.resetCopied();
|
|
||||||
|
|
||||||
return bufferWrite;
|
return bufferWrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -280,7 +280,7 @@ public class JournalFilesRepository {
|
||||||
ActiveMQJournalLogger.LOGGER.checkFiles();
|
ActiveMQJournalLogger.LOGGER.checkFiles();
|
||||||
ActiveMQJournalLogger.LOGGER.info(debugFiles());
|
ActiveMQJournalLogger.LOGGER.info(debugFiles());
|
||||||
ActiveMQJournalLogger.LOGGER.seqOutOfOrder();
|
ActiveMQJournalLogger.LOGGER.seqOutOfOrder();
|
||||||
System.exit(-1);
|
throw new IllegalStateException("Sequence out of order");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID()) {
|
if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID()) {
|
||||||
|
@ -356,9 +356,7 @@ public class JournalFilesRepository {
|
||||||
calculatedSize = file.getFile().size();
|
calculatedSize = file.getFile().size();
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
e.printStackTrace();
|
throw new IllegalStateException(e.getMessage() + " file: " + file);
|
||||||
System.out.println("Can't get file size on " + file);
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
}
|
||||||
if (calculatedSize != fileSize) {
|
if (calculatedSize != fileSize) {
|
||||||
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
ActiveMQJournalLogger.LOGGER.deletingFile(file);
|
||||||
|
|
|
@ -429,21 +429,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void checkCopy() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void bodyChanged() {
|
public void bodyChanged() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void resetCopied() {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isServerMessage() {
|
public boolean isServerMessage() {
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue