ARTEMIS-3084 Deal with async close and double close

Since the libaio.close is now async
there might be a situation with more than one close called during a server.stop();

This should deal with that scenario
This commit is contained in:
Clebert Suconic 2021-01-29 16:05:07 -05:00
parent 59eb515c1b
commit 755947ee0b
5 changed files with 124 additions and 29 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class Waiter {
public interface Condition {
boolean result();
}
/** This method will wait for the condition.result to be true or a timeout has ocurred.
* it will return the last result. */
public static boolean waitFor(Condition condition, TimeUnit unit, long timeout, TimeUnit parkUnit, long parkTime) {
long timeoutNanos = unit.toNanos(timeout);
final long deadline = System.nanoTime() + timeoutNanos;
long parkNanos = parkUnit.toNanos(parkTime);
while (!condition.result() && (System.nanoTime() - deadline) < 0) {
// Wait some time
LockSupport.parkNanos(parkNanos);
}
return condition.result();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert; import org.junit.Assert;
@ -157,13 +158,13 @@ public class Wait {
final long sleepMillis) { final long sleepMillis) {
try { try {
final long expiry = System.currentTimeMillis() + durationMillis; final long expiry = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationMillis);
boolean conditionSatisified = condition.isSatisfied(); boolean conditionSatisified = condition.isSatisfied();
while (!conditionSatisified && System.currentTimeMillis() < expiry) { while (!conditionSatisified && System.nanoTime() - expiry < 0) {
if (sleepMillis == 0) { if (sleepMillis == 0) {
Thread.yield(); Thread.yield();
} else { } else {
TimeUnit.MILLISECONDS.sleep(sleepMillis); LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(sleepMillis));
} }
conditionSatisified = condition.isSatisfied(); conditionSatisified = condition.isSatisfied();
} }
@ -176,5 +177,4 @@ public class Wait {
} }
} }
} }

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
public class WaitTest {
@Test
public void testWait() {
AtomicInteger intValue = new AtomicInteger(0);
Assert.assertFalse(Wait.waitFor(() -> intValue.get() == 1, 100, 10));
intValue.set(1);
Assert.assertTrue(Wait.waitFor(() -> intValue.get() == 1, 100, 10));
}
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.AutomaticLatch; import org.apache.activemq.artemis.utils.AutomaticLatch;
import org.apache.activemq.artemis.utils.Waiter;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** This class is implementing Runnable to reuse a callback to close it. */ /** This class is implementing Runnable to reuse a callback to close it. */
@ -118,8 +119,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
private void actualClose() { private void actualClose() {
try { try {
aioFile.close(); aioFile.close();
} catch (IOException e) { } catch (Throwable e) {
factory.onIOError(e, e.getMessage(), this); // an exception here would means a double
logger.debug("Exeption while closing file - " + e.getMessage(), e);
} finally { } finally {
aioFile = null; aioFile = null;
pendingClose = false; pendingClose = false;
@ -135,15 +137,16 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void waitNotPending() { public void waitNotPending() {
try { try {
short retryPending = 0; for (short retryPending = 0; pendingClose && retryPending < 60; retryPending++) {
do { if (pendingCallbacks.await(1, TimeUnit.SECONDS)) {
pendingCallbacks.await(1, TimeUnit.SECONDS); break;
retryPending++; }
} }
while(pendingClose && retryPending < 60);
if (pendingClose) { if (pendingClose) {
if (!Waiter.waitFor(() -> !pendingClose, TimeUnit.SECONDS, 60, TimeUnit.NANOSECONDS, 1000)) {
AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it"); AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it");
} }
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
// nothing to be done here, other than log it and forward it // nothing to be done here, other than log it and forward it
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
@ -153,6 +156,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException { public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException {
// a double call on close, should result on it waitingNotPending before another close is called
waitNotPending();
if (!opened) { if (!opened) {
return; return;
} }
@ -332,14 +338,20 @@ public class AIOSequentialFile extends AbstractSequentialFile {
void done(AIOSequentialFileFactory.AIOSequentialCallback callback) { void done(AIOSequentialFileFactory.AIOSequentialCallback callback) {
if (callback.writeSequence == -1) { if (callback.writeSequence == -1) {
try {
callback.sequentialDone(); callback.sequentialDone();
} finally {
pendingCallbacks.countDown(); pendingCallbacks.countDown();
} }
}
if (callback.writeSequence == nextReadSequence) { if (callback.writeSequence == nextReadSequence) {
nextReadSequence++; nextReadSequence++;
try {
callback.sequentialDone(); callback.sequentialDone();
} finally {
pendingCallbacks.countDown(); pendingCallbacks.countDown();
}
flushCallbacks(); flushCallbacks();
} else { } else {
pendingCallbackList.add(callback); pendingCallbackList.add(callback);
@ -350,11 +362,14 @@ public class AIOSequentialFile extends AbstractSequentialFile {
private void flushCallbacks() { private void flushCallbacks() {
while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence) { while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence) {
AIOSequentialFileFactory.AIOSequentialCallback callback = pendingCallbackList.poll(); AIOSequentialFileFactory.AIOSequentialCallback callback = pendingCallbackList.poll();
try {
callback.sequentialDone(); callback.sequentialDone();
} finally {
nextReadSequence++; nextReadSequence++;
pendingCallbacks.countDown(); pendingCallbacks.countDown();
} }
} }
}
@Override @Override
public void sync() { public void sync() {

View File

@ -2578,16 +2578,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
setJournalState(JournalState.STOPPED); setJournalState(JournalState.STOPPED);
if (providedIOThreadPool == null) {
threadPool.shutdown();
if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
threadPool = null;
ioExecutorFactory = null;
}
journalLock.writeLock().lock(); journalLock.writeLock().lock();
try { try {
@ -2612,6 +2602,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
} finally { } finally {
journalLock.writeLock().unlock(); journalLock.writeLock().unlock();
} }
// I have to shutdown the pool after
// otherwise pending closes will not succeed in certain races
if (providedIOThreadPool == null) {
threadPool.shutdown();
if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) {
threadPool.shutdownNow();
}
threadPool = null;
ioExecutorFactory = null;
}
} }
@Override @Override