This commit is contained in:
Clebert Suconic 2018-08-26 18:22:46 -04:00
commit f8140b91d4
10 changed files with 154 additions and 25 deletions

View File

@ -241,7 +241,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
writingChannel = ActiveMQBuffers.wrappedBuffer(bufferWrite);
currentFile = filesRepository.takeFile(false, false, false, true);
currentFile = filesRepository.openFileCMP();
sequentialFile = currentFile.getFile();

View File

@ -409,6 +409,16 @@ public class JournalFilesRepository {
return openedFiles.size();
}
public JournalFile openFileCMP() throws Exception {
JournalFile file = openFile();
SequentialFile sequentialFile = file.getFile();
sequentialFile.close();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
return file;
}
/**
* <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
* <p>In case there are no cached opened files, this method will block until the file was opened,
@ -468,7 +478,7 @@ public class JournalFilesRepository {
/**
* Open a file and place it into the openedFiles queue
*/
public void pushOpenedFile() throws Exception {
public synchronized void pushOpenedFile() throws Exception {
JournalFile nextOpenedFile = takeFile(true, true, true, false);
if (logger.isTraceEnabled()) {
@ -505,7 +515,7 @@ public class JournalFilesRepository {
* @throws Exception
* @see JournalImpl#initFileHeader(SequentialFileFactory, SequentialFile, int, long)
*/
public JournalFile takeFile(final boolean keepOpened,
private JournalFile takeFile(final boolean keepOpened,
final boolean multiAIO,
final boolean initFile,
final boolean tmpCompactExtension) throws Exception {

View File

@ -348,7 +348,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public String toString() {
return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
try {
return "JournalImpl(state=" + state + ", directory=[" + this.fileFactory.getDirectory().toString() + "], hash=" + super.toString() + ")";
} catch (Throwable e) {
logger.warn(e);
return super.toString();
}
}
@Override
@ -1278,6 +1283,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (logger.isTraceEnabled()) {
logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
}
tx.commit(usedFile);
} catch (ActiveMQShutdownException e) {
@ -1417,7 +1425,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private void checkDeleteSize() {
// HORNETQ-482 - Flush deletes only if memory is critical
if (recordsToDelete.size() > DELETE_FLUSH && runtime.freeMemory() < runtime.maxMemory() * 0.2) {
ActiveMQJournalLogger.LOGGER.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
logger.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
Iterator<RecordInfo> iter = records.iterator();
@ -1431,7 +1439,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
recordsToDelete.clear();
ActiveMQJournalLogger.LOGGER.debug("flush delete done");
logger.debug("flush delete done");
}
}
@ -1529,8 +1537,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new IllegalStateException("There is pending compacting operation");
}
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact compacting journal " + (++compactCount));
if (logger.isDebugEnabled()) {
logger.debug("JournalImpl::compact " + JournalImpl.this + " for its " + (++compactCount) + " time");
}
compactorLock.writeLock().lock();
@ -1540,7 +1548,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
boolean previousReclaimValue = isAutoReclaim();
try {
ActiveMQJournalLogger.LOGGER.debug("Starting compacting operation on journal");
if (logger.isDebugEnabled()) {
logger.debug("Starting compacting operation on journal " + this);
}
onCompactStart();
@ -1669,9 +1679,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
ActiveMQJournalLogger.LOGGER.debug("Finished compacting on journal");
if (logger.isDebugEnabled()) {
logger.debug("Finished compacting on journal " + this);
}
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Flushing compacting on journal " + this);
}
// An Exception was probably thrown, and the compactor was not cleared
if (compactor != null) {
try {
@ -1681,12 +1696,15 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
compactor = null;
}
if (logger.isDebugEnabled()) {
logger.debug("since compact finished, setAutoReclaim back into " + previousReclaimValue);
}
setAutoReclaim(previousReclaimValue);
}
} finally {
compactorLock.writeLock().unlock();
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact finishing");
if (logger.isDebugEnabled()) {
logger.debug("JournalImpl::compact finalized");
}

View File

@ -238,13 +238,13 @@ public class JournalTransaction {
// without setting this properly...
if (compacting && compactor != null) {
if (logger.isTraceEnabled()) {
logger.trace("adding tx " + this.id + " into compacting");
logger.trace("adding txID=" + this.id + " into compacting");
}
compactor.addCommandCommit(this, file);
} else {
if (logger.isTraceEnabled()) {
logger.trace("no compact commit " + this.id);
logger.trace("there was no compactor on commit txID=" + this.id);
}
if (pos != null) {
for (JournalUpdate trUpdate : pos) {

View File

@ -669,6 +669,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
try {
messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional) {
if (logger.isTraceEnabled()) {
logger.trace("calling getContext(true).done() for txID=" + txID + ",lineupContext=" + lineUpContext + " syncTransactional=" + syncTransactional + "... forcing call on getContext(true).done");
}
/**
* If {@code lineUpContext == false}, it means that we have previously lined up a
* context somewhere else (specifically see @{link TransactionImpl#asyncAppendCommit}),
@ -1742,7 +1745,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (record.isUpdate) {
PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
if (pgTX != null) {
pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
}
} else {
pageTransactionInfo.setCommitted(false);

View File

@ -149,7 +149,7 @@ public class ReplicatedJournal implements Journal {
final Persister persister,
final Object record) throws Exception {
if (log.isTraceEnabled()) {
log.trace("Append record TXid = " + id + " recordType = " + recordType);
log.trace("Append record txID=" + id + " recordType = " + recordType);
}
replicationManager.appendAddRecordTransactional(journalID, ADD_OPERATION_TYPE.ADD, txID, id, recordType, persister, record);
localJournal.appendAddRecordTransactional(txID, id, recordType, persister, record);
@ -164,7 +164,7 @@ public class ReplicatedJournal implements Journal {
@Override
public void appendCommitRecord(final long txID, final boolean sync) throws Exception {
if (log.isTraceEnabled()) {
log.trace("AppendCommit " + txID);
log.trace("AppendCommit txID=" + txID);
}
replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync);
@ -516,8 +516,8 @@ public class ReplicatedJournal implements Journal {
}
@Override
public void forceMoveNextFile() {
throw new UnsupportedOperationException();
public void forceMoveNextFile() throws Exception {
localJournal.forceMoveNextFile();
}
@Override

View File

@ -27,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
@ -116,8 +115,6 @@ public final class ReplicationManager implements ActiveMQComponent {
private volatile boolean enabled;
private final AtomicBoolean writable = new AtomicBoolean(true);
private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<>();
private final ExecutorFactory ioExecutorFactory;
@ -291,6 +288,12 @@ public final class ReplicationManager implements ActiveMQComponent {
logger.trace("Stopping being ignored as it hasn't been started");
return;
}
started = false;
}
if (logger.isTraceEnabled()) {
logger.trace("stop(clearTokens=" + clearTokens + ")", new Exception("Trace"));
}
// This is to avoid the write holding a lock while we are trying to close it
@ -300,7 +303,6 @@ public final class ReplicationManager implements ActiveMQComponent {
}
enabled = false;
writable.set(true);
if (clearTokens) {
clearReplicationTokens();
@ -312,7 +314,6 @@ public final class ReplicationManager implements ActiveMQComponent {
toStop.destroy();
}
remotingConnection = null;
started = false;
}
/**

View File

@ -976,6 +976,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
logger.debug("Stopping server");
synchronized (this) {
if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) {
return;

View File

@ -616,7 +616,7 @@ public class TransactionImpl implements Transaction {
public String toString() {
Date dt = new Date(this.createTime);
return "TransactionImpl [xid=" + xid +
", id=" +
", txID=" +
id +
", xid=" + xid +
", state=" +

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.util.LinkedList;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.journal.impl.JournalFilesRepository;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.Assert;
import org.junit.Test;
public class JournalFileRepositoryOrderTest extends ActiveMQTestBase {
@Test
public void testOrder() throws Throwable {
ExecutorService executorService = Executors.newFixedThreadPool(3, new ActiveMQThreadFactory("test", false, JournalFileRepositoryOrderTest.class.getClassLoader()));
final AtomicBoolean running = new AtomicBoolean(true);
Thread t = null;
try {
FakeSequentialFileFactory fakeSequentialFileFactory = new FakeSequentialFileFactory();
JournalImpl journal = new JournalImpl(new OrderedExecutorFactory(executorService), 10 * 1024, 2, -1, -1, 0, fakeSequentialFileFactory, "file", "file", 1, 0);
final JournalFilesRepository repository = journal.getFilesRepository();
final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<>();
// this is simulating how compating would return files into the journal
t = new Thread() {
@Override
public void run() {
while (running.get()) {
try {
Wait.waitFor(() -> !running.get() || dataFiles.size() > 10, 1000, 1);
while (running.get()) {
JournalFile file = dataFiles.poll();
if (file == null) break;
repository.addFreeFile(file, false);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
};
t.start();
JournalFile file = null;
LinkedList<Integer> values = new LinkedList<>();
for (int i = 0; i < 5000; i++) {
file = repository.openFile();
Assert.assertNotNull(file);
values.add(file.getRecordID());
dataFiles.push(file);
}
int previous = Integer.MIN_VALUE;
for (Integer v : values) {
Assert.assertTrue(v.intValue() > previous);
previous = v;
}
} finally {
running.set(false);
executorService.shutdownNow();
}
}
}