diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index d8b5f93b49..7740beff48 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -241,7 +241,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
- currentFile = filesRepository.takeFile(false, false, false, true);
+ currentFile = filesRepository.openFileCMP();
sequentialFile = currentFile.getFile();
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 5896fba68e..eb4740f41c 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -409,6 +409,16 @@ public class JournalFilesRepository {
return openedFiles.size();
}
+ public JournalFile openFileCMP() throws Exception {
+ JournalFile file = openFile();
+
+ SequentialFile sequentialFile = file.getFile();
+ sequentialFile.close();
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+
+ return file;
+ }
+
/**
*
This method will instantly return the opened file, and schedule opening and reclaiming.
* In case there are no cached opened files, this method will block until the file was opened,
@@ -468,7 +478,7 @@ public class JournalFilesRepository {
/**
* Open a file and place it into the openedFiles queue
*/
- public void pushOpenedFile() throws Exception {
+ public synchronized void pushOpenedFile() throws Exception {
JournalFile nextOpenedFile = takeFile(true, true, true, false);
if (logger.isTraceEnabled()) {
@@ -505,7 +515,7 @@ public class JournalFilesRepository {
* @throws Exception
* @see JournalImpl#initFileHeader(SequentialFileFactory, SequentialFile, int, long)
*/
- public JournalFile takeFile(final boolean keepOpened,
+ private JournalFile takeFile(final boolean keepOpened,
final boolean multiAIO,
final boolean initFile,
final boolean tmpCompactExtension) throws Exception {
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 30ed6e33e2..47bdc5b7c4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -348,7 +348,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public String toString() {
- return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
+ try {
+ return "JournalImpl(state=" + state + ", directory=[" + this.fileFactory.getDirectory().toString() + "], hash=" + super.toString() + ")";
+ } catch (Throwable e) {
+ logger.warn(e);
+ return super.toString();
+ }
}
@Override
@@ -1278,6 +1283,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+ if (logger.isTraceEnabled()) {
+ logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+ }
tx.commit(usedFile);
} catch (ActiveMQShutdownException e) {
@@ -1417,7 +1425,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void checkDeleteSize() {
// HORNETQ-482 - Flush deletes only if memory is critical
if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2) {
- ActiveMQJournalLogger.LOGGER.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ logger.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator iter = records.iterator();
@@ -1431,7 +1439,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
recordsToDelete.clear();
- ActiveMQJournalLogger.LOGGER.debug("flush delete done");
+ logger.debug("flush delete done");
}
}
@@ -1529,8 +1537,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new IllegalStateException("There is pending compacting operation");
}
- if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
- ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact compacting journal " + (++compactCount));
+ if (logger.isDebugEnabled()) {
+ logger.debug("JournalImpl::compact " + JournalImpl.this + " for its " + (++compactCount) + " time");
}
compactorLock.writeLock().lock();
@@ -1540,7 +1548,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
boolean previousReclaimValue = isAutoReclaim();
try {
- ActiveMQJournalLogger.LOGGER.debug("Starting compacting operation on journal");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting compacting operation on journal " + this);
+ }
onCompactStart();
@@ -1669,9 +1679,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- ActiveMQJournalLogger.LOGGER.debug("Finished compacting on journal");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Finished compacting on journal " + this);
+ }
} finally {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Flushing compacting on journal " + this);
+ }
// An Exception was probably thrown, and the compactor was not cleared
if (compactor != null) {
try {
@@ -1681,12 +1696,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactor = null;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("since compact finished, setAutoReclaim back into " + previousReclaimValue);
+ }
setAutoReclaim(previousReclaimValue);
}
} finally {
compactorLock.writeLock().unlock();
- if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
- ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact finishing");
+ if (logger.isDebugEnabled()) {
+ logger.debug("JournalImpl::compact finalized");
}
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 8f05d8d5a5..ffc016a3a5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -238,13 +238,13 @@ public class JournalTransaction {
// without setting this properly...
if (compacting && compactor != null) {
if (logger.isTraceEnabled()) {
- logger.trace("adding tx " + this.id + " into compacting");
+ logger.trace("adding txID=" + this.id + " into compacting");
}
compactor.addCommandCommit(this, file);
} else {
if (logger.isTraceEnabled()) {
- logger.trace("no compact commit " + this.id);
+ logger.trace("there was no compactor on commit txID=" + this.id);
}
if (pos != null) {
for (JournalUpdate trUpdate : pos) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index d28eec87fd..8c3cc77f3f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -669,6 +669,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
try {
messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("calling getContext(true).done() for txID=" + txID + ",lineupContext=" + lineUpContext + " syncTransactional=" + syncTransactional + "... forcing call on getContext(true).done");
+ }
/**
* If {@code lineUpContext == false}, it means that we have previously lined up a
* context somewhere else (specifically see @{link TransactionImpl#asyncAppendCommit}),
@@ -1742,7 +1745,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (record.isUpdate) {
PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
- pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
+ if (pgTX != null) {
+ pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
+ }
} else {
pageTransactionInfo.setCommitted(false);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
index 7ad06f5c46..8a04cc18aa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java
@@ -149,7 +149,7 @@ public class ReplicatedJournal implements Journal {
final Persister persister,
final Object record) throws Exception {
if (log.isTraceEnabled()) {
- log.trace("Append record TXid = " + id + " recordType = " + recordType);
+ log.trace("Append record txID=" + id + " recordType = " + recordType);
}
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
@@ -164,7 +164,7 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
if (log.isTraceEnabled()) {
- log.trace("AppendCommit " + txID);
+ log.trace("AppendCommit txID=" + txID);
}
replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync);
@@ -516,8 +516,8 @@ public class ReplicatedJournal implements Journal {
}
@Override
- public void forceMoveNextFile() {
- throw new UnsupportedOperationException();
+ public void forceMoveNextFile() throws Exception {
+ localJournal.forceMoveNextFile();
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index 6973706505..b307789228 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@@ -116,8 +115,6 @@ public final class ReplicationManager implements ActiveMQComponent {
private volatile boolean enabled;
- private final AtomicBoolean writable = new AtomicBoolean(true);
-
private final Queue pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory ioExecutorFactory;
@@ -291,6 +288,12 @@ public final class ReplicationManager implements ActiveMQComponent {
logger.trace("Stopping being ignored as it hasn't been started");
return;
}
+
+ started = false;
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("stop(clearTokens=" + clearTokens + ")", new Exception("Trace"));
}
// This is to avoid the write holding a lock while we are trying to close it
@@ -300,7 +303,6 @@ public final class ReplicationManager implements ActiveMQComponent {
}
enabled = false;
- writable.set(true);
if (clearTokens) {
clearReplicationTokens();
@@ -312,7 +314,6 @@ public final class ReplicationManager implements ActiveMQComponent {
toStop.destroy();
}
remotingConnection = null;
- started = false;
}
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a8e64477a7..4acc77bab0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -976,6 +976,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
+ logger.debug("Stopping server");
+
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
return;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index e3a0904740..e925c3b70a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -616,7 +616,7 @@ public class TransactionImpl implements Transaction {
public String toString() {
Date dt = new Date(this.createTime);
return "TransactionImpl [xid=" + xid +
- ", id=" +
+ ", txID=" +
id +
", xid=" + xid +
", state=" +
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
new file mode 100644
index 0000000000..e223f0906f
--- /dev/null
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalFileRepositoryOrderTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.unit.core.journal.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.core.journal.impl.JournalFile;
+import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JournalFileRepositoryOrderTest extends ActiveMQTestBase {
+
+ @Test
+ public void testOrder() throws Throwable {
+ ExecutorService executorService = Executors.newFixedThreadPool(3, new ActiveMQThreadFactory("test", false, JournalFileRepositoryOrderTest.class.getClassLoader()));
+ final AtomicBoolean running = new AtomicBoolean(true);
+ Thread t = null;
+ try {
+ FakeSequentialFileFactory fakeSequentialFileFactory = new FakeSequentialFileFactory();
+ JournalImpl journal = new JournalImpl(new OrderedExecutorFactory(executorService), 10 * 1024, 2, -1, -1, 0, fakeSequentialFileFactory, "file", "file", 1, 0);
+
+ final JournalFilesRepository repository = journal.getFilesRepository();
+ final BlockingDeque dataFiles = new LinkedBlockingDeque<>();
+
+
+ // this is simulating how compating would return files into the journal
+ t = new Thread() {
+ @Override
+ public void run() {
+ while (running.get()) {
+ try {
+ Wait.waitFor(() -> !running.get() || dataFiles.size() > 10, 1000, 1);
+ while (running.get()) {
+ JournalFile file = dataFiles.poll();
+ if (file == null) break;
+ repository.addFreeFile(file, false);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ t.start();
+ JournalFile file = null;
+ LinkedList values = new LinkedList<>();
+ for (int i = 0; i < 5000; i++) {
+ file = repository.openFile();
+ Assert.assertNotNull(file);
+ values.add(file.getRecordID());
+ dataFiles.push(file);
+ }
+
+ int previous = Integer.MIN_VALUE;
+ for (Integer v : values) {
+ Assert.assertTrue(v.intValue() > previous);
+ previous = v;
+ }
+
+ } finally {
+ running.set(false);
+ executorService.shutdownNow();
+ }
+
+ }
+}