ARTEMIS-1442 Shutdown server if can't move file on journal

This commit is contained in:
Clebert Suconic 2017-09-28 21:06:21 -04:00
parent 178d403117
commit aa3e8941d1
3 changed files with 209 additions and 15 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.journal.impl;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.AccessController;
@ -49,6 +50,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
@ -175,6 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Compacting may replace this structure
private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
private final IOCriticalErrorListener criticalErrorListener;
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
@ -265,6 +270,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, 5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
}
public JournalImpl(final ExecutorFactory ioExecutors,
final int fileSize,
final int minFiles,
@ -277,9 +283,28 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final String fileExtension,
final int maxAIO,
final int userVersion) {
this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null);
}
public JournalImpl(final ExecutorFactory ioExecutors,
final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final int journalFileOpenTimeout,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
final int maxAIO,
final int userVersion,
IOCriticalErrorListener criticalErrorListener) {
super(fileFactory.isSupportsCallbacks(), fileSize);
this.criticalErrorListener = criticalErrorListener;
this.providedIOThreadPool = ioExecutors;
if (fileSize % fileFactory.getAlignment() != 0) {
@ -2910,23 +2935,45 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* @throws Exception
*/
protected JournalFile switchFileIfNecessary(int size) throws Exception {
// We take into account the fileID used on the Header
if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER)) {
throw new IllegalArgumentException("Record is too large to store " + size);
}
if (!currentFile.getFile().fits(size)) {
moveNextFile(true);
// The same check needs to be done at the new file also
try {
if (!currentFile.getFile().fits(size)) {
// Sanity check, this should never happen
throw new IllegalStateException("Invalid logic on buffer allocation");
moveNextFile(true);
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size)) {
// Sanity check, this should never happen
throw new IllegalStateException("Invalid logic on buffer allocation");
}
}
return currentFile;
} catch (Throwable e) {
criticalIO(e);
return null; // this will never happen, the method will call throw
}
return currentFile;
}
private void criticalIO(Throwable e) throws Exception {
if (criticalErrorListener != null) {
criticalErrorListener.onIOException(e, e.getMessage(), currentFile == null ? null : currentFile.getFile());
}
if (e instanceof Exception) {
throw (Exception) e;
} else if (e instanceof IllegalStateException) {
throw (IllegalStateException) e;
} else {
IOException ioex = new IOException();
ioex.initCause(e);
throw ioex;
}
}
private CountDownLatch newLatch(int countDown) {
if (state == JournalState.STOPPED) {
throw new RuntimeException("Server is not started");
@ -2956,7 +3003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
/**
* You need to guarantee lock.acquire() before calling this method!
*/
private void moveNextFile(final boolean scheduleReclaim) throws Exception {
protected void moveNextFile(final boolean scheduleReclaim) throws Exception {
filesRepository.closeFile(currentFile);
currentFile = filesRepository.openFile();

View File

@ -71,19 +71,19 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
private SequentialFileFactory journalFF;
protected SequentialFileFactory journalFF;
private SequentialFileFactory bindingsFF;
protected SequentialFileFactory bindingsFF;
SequentialFileFactory largeMessagesFactory;
private Journal originalMessageJournal;
protected Journal originalMessageJournal;
private Journal originalBindingsJournal;
protected Journal originalBindingsJournal;
protected String largeMessagesDirectory;
private ReplicationManager replicator;
protected ReplicationManager replicator;
public JournalStorageManager(final Configuration config,
final CriticalAnalyzer analyzer,
@ -124,7 +124,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF.setDatasync(config.isJournalDatasync());
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@ -160,7 +160,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment());
}
Journal localMessage = new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0);
Journal localMessage = createMessageJournal(config, criticalErrorListener, fileSize);
messageJournal = localMessage;
originalMessageJournal = localMessage;
@ -176,6 +176,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
}
}
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
}
// Life Cycle Handlers
@Override
protected void beforeStart() throws Exception {

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.critical;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Test;
public class ShutdownOnCriticalIOErrorMoveNext extends ActiveMQTestBase {
@Test
public void testSimplyDownAfterError() throws Exception {
deleteDirectory(new File("./target/server"));
ActiveMQServer server = createServer("./target/server");
try {
server.start();
ConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue("queue"));
try {
for (int i = 0; i < 500; i++) {
producer.send(session.createTextMessage("text"));
}
} catch (JMSException expected) {
}
Wait.waitFor(() -> !server.isStarted());
Assert.assertFalse(server.isStarted());
System.out.println("Sent messages");
} finally {
server.stop();
}
}
ActiveMQServer createServer(String folder) throws Exception {
final AtomicBoolean blocked = new AtomicBoolean(false);
Configuration conf = createConfig(folder);
ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
conf.setPersistenceEnabled(true);
ActiveMQServer server = new ActiveMQServerImpl(conf, securityManager) {
@Override
protected StorageManager createStorageManager() {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
@Override
protected Journal createMessageJournal(Configuration config,
IOCriticalErrorListener criticalErrorListener,
int fileSize) {
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
@Override
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
super.moveNextFile(scheduleReclaim);
if (blocked.get()) {
throw new IllegalStateException("forcibly down");
}
}
};
}
@Override
public void storeMessage(Message message) throws Exception {
super.storeMessage(message);
blocked.set(true);
}
};
this.getCriticalAnalyzer().add(storageManager);
return storageManager;
}
};
return server;
}
Configuration createConfig(String folder) throws Exception {
Configuration configuration = createDefaultConfig(true);
configuration.setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(folder + "/journal").setBindingsDirectory(folder + "/bindings").setPagingDirectory(folder + "/paging").
setLargeMessagesDirectory(folder + "/largemessage").setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(true);
return configuration;
}
}