ARTEMIS-465 Create LM on write packet in ReplicationEndpoint

If a LM write packet is received from the live assume that the large
message exists and create a local reference.

Old behavour would reject the packet which could lead to loss of data on
failover see JIRA.
This commit is contained in:
Martyn Taylor 2016-04-01 23:13:04 +01:00 committed by Clebert Suconic
parent 7da22ff105
commit e2c6d0b7f7
3 changed files with 17 additions and 7 deletions

View File

@ -438,6 +438,10 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
largeMessage.setMessageID(id);
// We do this here to avoid a case where the replication gets a list without this file
// to avoid a race
largeMessage.validateFile();
if (largeMessage.isDurable()) {
// We store a marker on the journal that the large file is pending
long pendingRecordID = storePendingLargeMessage(id);

View File

@ -341,7 +341,7 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
// Private -------------------------------------------------------
private synchronized void validateFile() throws ActiveMQException {
public synchronized void validateFile() throws ActiveMQException {
try {
if (file == null) {
if (messageID <= 0) {

View File

@ -435,7 +435,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
SequentialFile channel1;
switch (msg.getFileType()) {
case LARGE_MESSAGE: {
ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false);
ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false, false);
if (!(largeMessage instanceof LargeServerMessageInSync)) {
ActiveMQServerLogger.LOGGER.largeMessageIncompatible();
return;
@ -536,7 +536,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet) {
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true);
final ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true, false);
if (message != null) {
executor.execute(new Runnable() {
@Override
@ -556,13 +556,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
* @param packet
*/
private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet) throws Exception {
ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false);
ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false, true);
if (message != null) {
message.addBytes(packet.getBody());
}
}
private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete) {
private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean delete, final boolean createIfNotExists) {
ReplicatedLargeMessage message;
if (delete) {
@ -571,8 +571,14 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
else {
message = largeMessages.get(messageId);
if (message == null) {
// No warnings if it's a delete, as duplicate deletes may be sent repeatedly.
ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
if (createIfNotExists) {
createLargeMessage(messageId, false);
message = largeMessages.get(messageId);
}
else {
// No warnings if it's a delete, as duplicate deletes may be sent repeatedly.
ActiveMQServerLogger.LOGGER.largeMessageNotAvailable(messageId);
}
}
}