ARTEMIS-2414 Sync before closing file in case data loss

This commit is contained in:
Wei Yang 2019-07-25 19:04:58 +08:00 committed by Clebert Suconic
parent 36534a387e
commit edace8845e
26 changed files with 62 additions and 66 deletions

View File

@ -224,7 +224,7 @@ public class PrintData extends DBOption {
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
page.close(false, false);
int msgID = 0;

View File

@ -401,7 +401,7 @@ public final class XmlDataExporter extends DBOption {
Page page = pageStore.createPage(pageId);
page.open();
List<PagedMessage> messages = page.read(storageManager);
page.close();
page.close(false, false);
int messageId = 0;

View File

@ -93,11 +93,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
if (isOpen()) {
close();
}
try {
if (isOpen()) {
close(false);
}
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {
logger.trace("Fine error while deleting file", t);

View File

@ -142,7 +142,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void delete() {
close();
close(false);
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
@ -361,7 +361,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override
public void close(boolean waitOnSync) {
if (this.mappedFile != null) {
if (factory.isDatasync())
if (waitOnSync && factory.isDatasync())
this.mappedFile.force();
this.mappedFile.close();
this.mappedFile = null;

View File

@ -131,11 +131,16 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
}
@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
super.close();
try {
if (channel != null) {
if (factory.isDatasync())
if (waitSync && factory.isDatasync())
channel.force(false);
channel.close();
}

View File

@ -796,7 +796,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close();
file.getFile().close(false);
} catch (Throwable ignored) {
}
}

View File

@ -34,7 +34,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -136,7 +135,6 @@ public class StompSession implements SessionCallback {
ICoreMessage coreMessage = serverMessage.toCore();
LargeServerMessageImpl largeMessage = null;
ICoreMessage newServerMessage = serverMessage.toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
@ -179,13 +177,7 @@ public class StompSession implements SessionCallback {
ActiveMQStompProtocolLogger.LOGGER.debug(e);
}
return 0;
} finally {
if (largeMessage != null) {
largeMessage.releaseResources();
largeMessage = null;
}
}
}
@Override
@ -369,7 +361,7 @@ public class StompSession implements SessionCallback {
largeMessage.addBytes(bytes);
largeMessage.releaseResources();
largeMessage.releaseResources(true);
largeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);

View File

@ -231,7 +231,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
} finally {
try {
if (page != null) {
page.close(false);
page.close(false, false);
}
} catch (Throwable ignored) {
}
@ -527,13 +527,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
pgdMessagesList = depagedPage.read(storageManager);
} finally {
try {
depagedPage.close(false);
depagedPage.close(false, false);
} catch (Exception e) {
}
storageManager.afterPageRead();
}
depagedPage.close(false);
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();

View File

@ -308,15 +308,15 @@ public final class Page implements Comparable<Page> {
file.position(0);
}
public void close() throws Exception {
close(false);
public void close(boolean sendEvent) throws Exception {
close(sendEvent, true);
}
/**
* sendEvent means it's a close happening from a major event such moveNext.
* While reading the cache we don't need (and shouldn't inform the backup
*/
public synchronized void close(boolean sendEvent) throws Exception {
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
@ -325,7 +325,7 @@ public final class Page implements Comparable<Page> {
// leave it to the soft cache to decide when to release it now
pageCache = null;
}
file.close();
file.close(waitSync);
Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {

View File

@ -582,7 +582,7 @@ public class PagingStoreImpl implements PagingStore {
file.position(0);
file.close();
file.close(false);
return page;
}

View File

@ -65,7 +65,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources();
lsm.releaseResources(true);
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm;
}
@ -274,7 +274,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
throw new RuntimeException(e);
} finally {
try {
file.close();
file.close(false);
} catch (Exception ignored) {
}
}
@ -298,7 +298,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
} finally {
if (closeFile) {
try {
file.close();
file.close(false);
} catch (Exception ignored) {
}
}
@ -313,7 +313,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
@Override
public synchronized void deleteFile() throws Exception {
validateFile();
releaseResources();
releaseResources(false);
storageManager.deleteLargeMessageFile(this);
}
@ -328,11 +328,13 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
@Override
public synchronized void releaseResources() {
public synchronized void releaseResources(boolean sync) {
if (file != null && file.isOpen()) {
try {
file.sync();
file.close();
if (sync) {
file.sync();
}
file.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
}
@ -410,7 +412,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
file.position(oldPosition);
if (!originallyOpen) {
file.close();
file.close(false);
newMessage.getFile().close();
}
@ -437,7 +439,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
} else {
SequentialFile tmpFile = createFile();
bodySize = tmpFile.size();
tmpFile.close();
tmpFile.close(false);
}
}
return bodySize;
@ -519,7 +521,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public void open() throws ActiveMQException {
try {
if (cFile != null && cFile.isOpen()) {
cFile.close();
cFile.close(false);
}
cFile = file.cloneFile();
cFile.open();
@ -532,7 +534,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public void close() throws ActiveMQException {
try {
if (cFile != null) {
cFile.close();
cFile.close(false);
}
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);

View File

@ -96,11 +96,11 @@ public final class LargeServerMessageInSync implements ReplicatedLargeMessage {
}
@Override
public synchronized void releaseResources() {
public synchronized void releaseResources(boolean sync) {
if (logger.isTraceEnabled()) {
logger.trace("release resources called on " + mainLM, new Exception("trace"));
}
mainLM.releaseResources();
mainLM.releaseResources(sync);
if (appendFile != null && appendFile.isOpen()) {
try {
appendFile.close();

View File

@ -35,7 +35,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe
}
@Override
public void releaseResources() {
public void releaseResources(boolean sync) {
}
@Override

View File

@ -1023,7 +1023,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
currentLargeMessage.addBytes(body);
if (!continues) {
currentLargeMessage.releaseResources();
currentLargeMessage.releaseResources(true);
if (messageBodySize >= 0) {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);

View File

@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
Message setMessageID(long id);
/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources()
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
*/
void releaseResources();
void releaseResources(boolean sync);
/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()

View File

@ -322,7 +322,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
largeMessage.releaseResources();
largeMessage.releaseResources(true);
}
largeMessages.clear();
@ -343,7 +343,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
for (ConcurrentMap<Integer, Page> map : pageIndex.values()) {
for (Page page : map.values()) {
try {
page.sync();
page.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e);

View File

@ -41,7 +41,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage
* Close the files if opened
*/
@Override
void releaseResources();
void releaseResources(boolean sync);
@Override
void deleteFile() throws Exception;

View File

@ -1388,7 +1388,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
context = null;
}
largeMessage.releaseResources();
largeMessage.releaseResources(false);
largeMessage.decrementDelayDeletionCount();

View File

@ -173,7 +173,7 @@ public class RaceOnCursorIteratorTest extends ActiveMQTestBase {
public void testSkipNullPageCache() throws Exception {
skipNullPageCache = true;
// Simulate scenario #2 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
queue.getPageSubscription().getPagingStore().getCurrentPage().close();
queue.getPageSubscription().getPagingStore().getCurrentPage().close(false);
PagedReference ref = queue.getPageSubscription().iterator().next();
assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),

View File

@ -2305,7 +2305,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
}
@ -2332,7 +2332,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
session.createQueue(ADDRESS, ADDRESS, true);
@ -2497,7 +2497,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
fileMessage.releaseResources();
fileMessage.releaseResources(false);
session.createQueue(ADDRESS, ADDRESS, true);

View File

@ -113,7 +113,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
session.createQueue("A", RoutingType.ANYCAST, "A");
@ -331,7 +331,7 @@ public class ServerLargeMessageTest extends ActiveMQTestBase {
largeServerMessage.setMessageID(1234);
largeServerMessage.addBytes(new byte[0]);
assertTrue(open.get());
largeServerMessage.releaseResources();
largeServerMessage.releaseResources(true);
assertTrue(sync.get());
}

View File

@ -468,7 +468,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
session.createQueue("A", RoutingType.MULTICAST, "A", true);
@ -540,7 +540,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
session.createQueue("A", RoutingType.MULTICAST, "A", true);
@ -884,7 +884,7 @@ public class XmlImportExportTest extends ActiveMQTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
producer.send(fileMessage);

View File

@ -137,7 +137,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
fileMessage.releaseResources();
fileMessage.releaseResources(false);
message = fileMessage;
} else {

View File

@ -105,7 +105,7 @@ public class PageTest extends ActiveMQTestBase {
addPageElements(simpleDestination, impl, numberOfElements);
impl.sync();
impl.close();
impl.close(false, false);
file = factory.createSequentialFile("00010.page");
file.open();
@ -168,7 +168,7 @@ public class PageTest extends ActiveMQTestBase {
file.writeDirect(buffer, true);
impl.close();
impl.close(false);
file = factory.createSequentialFile("00010.page");
file.open();

View File

@ -78,7 +78,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
page.close(false, false);
Assert.assertEquals(1, msgs.size());

View File

@ -292,7 +292,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
List<PagedMessage> msg = page.read(new NullStorageManager());
page.close();
page.close(false, false);
Assert.assertEquals(5, msg.size());
@ -490,7 +490,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
for (Page page : readPages) {
page.open();
List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
page.close(false, false);
for (PagedMessage msg : msgs) {
long id = msg.getMessage().toCore().getBodyBuffer().readLong();
@ -551,7 +551,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
List<PagedMessage> msgs = page.read(new NullStorageManager());
page.close();
page.close(false, false);
for (PagedMessage msg : msgs) {
@ -565,7 +565,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
lastPage.open();
List<PagedMessage> lastMessages = lastPage.read(new NullStorageManager());
lastPage.close();
lastPage.close(false, false);
Assert.assertEquals(1, lastMessages.size());
lastMessages.get(0).getMessage().toCore().getBodyBuffer().resetReaderIndex();
@ -696,7 +696,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Assert.assertEquals(msg.getMessageID(), msg.getLongProperty("count").longValue());
}
page.close();
page.close(false, false);
page.delete(null);
} else {
System.out.println("Depaged!!!! numerOfMessages = " + msgsRead + " of " + NUMBER_OF_MESSAGES);