diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Waiter.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Waiter.java new file mode 100644 index 0000000000..49e301085e --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Waiter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +public class Waiter { + + public interface Condition { + + boolean result(); + } + + /** This method will wait for the condition.result to be true or a timeout has ocurred. + * it will return the last result. */ + public static boolean waitFor(Condition condition, TimeUnit unit, long timeout, TimeUnit parkUnit, long parkTime) { + long timeoutNanos = unit.toNanos(timeout); + final long deadline = System.nanoTime() + timeoutNanos; + long parkNanos = parkUnit.toNanos(parkTime); + while (!condition.result() && (System.nanoTime() - deadline) < 0) { + // Wait some time + LockSupport.parkNanos(parkNanos); + } + return condition.result(); + } +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java index 42cf0742cf..a6da336d03 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.utils; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import org.junit.Assert; @@ -157,13 +158,13 @@ public class Wait { final long sleepMillis) { try { - final long expiry = System.currentTimeMillis() + durationMillis; + final long expiry = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(durationMillis); boolean conditionSatisified = condition.isSatisfied(); - while (!conditionSatisified && System.currentTimeMillis() < expiry) { + while (!conditionSatisified && System.nanoTime() - expiry < 0) { if (sleepMillis == 0) { Thread.yield(); } else { - TimeUnit.MILLISECONDS.sleep(sleepMillis); + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(sleepMillis)); } conditionSatisified = condition.isSatisfied(); } @@ -176,5 +177,4 @@ public class Wait { } } - } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/WaitTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/WaitTest.java new file mode 100644 index 0000000000..aa49d1ce4b --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/WaitTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class WaitTest { + + @Test + public void testWait() { + AtomicInteger intValue = new AtomicInteger(0); + + Assert.assertFalse(Wait.waitFor(() -> intValue.get() == 1, 100, 10)); + intValue.set(1); + Assert.assertTrue(Wait.waitFor(() -> intValue.get() == 1, 100, 10)); + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 991b014eed..0e9508b53a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback; import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; import org.apache.activemq.artemis.utils.AutomaticLatch; +import org.apache.activemq.artemis.utils.Waiter; import org.jboss.logging.Logger; /** This class is implementing Runnable to reuse a callback to close it. */ @@ -118,8 +119,9 @@ public class AIOSequentialFile extends AbstractSequentialFile { private void actualClose() { try { aioFile.close(); - } catch (IOException e) { - factory.onIOError(e, e.getMessage(), this); + } catch (Throwable e) { + // an exception here would means a double + logger.debug("Exeption while closing file - " + e.getMessage(), e); } finally { aioFile = null; pendingClose = false; @@ -135,14 +137,15 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public void waitNotPending() { try { - short retryPending = 0; - do { - pendingCallbacks.await(1, TimeUnit.SECONDS); - retryPending++; + for (short retryPending = 0; pendingClose && retryPending < 60; retryPending++) { + if (pendingCallbacks.await(1, TimeUnit.SECONDS)) { + break; + } } - while(pendingClose && retryPending < 60); if (pendingClose) { - AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it"); + if (!Waiter.waitFor(() -> !pendingClose, TimeUnit.SECONDS, 60, TimeUnit.NANOSECONDS, 1000)) { + AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it"); + } } } catch (InterruptedException e) { // nothing to be done here, other than log it and forward it @@ -153,6 +156,9 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException { + // a double call on close, should result on it waitingNotPending before another close is called + waitNotPending(); + if (!opened) { return; } @@ -332,14 +338,20 @@ public class AIOSequentialFile extends AbstractSequentialFile { void done(AIOSequentialFileFactory.AIOSequentialCallback callback) { if (callback.writeSequence == -1) { - callback.sequentialDone(); - pendingCallbacks.countDown(); + try { + callback.sequentialDone(); + } finally { + pendingCallbacks.countDown(); + } } if (callback.writeSequence == nextReadSequence) { nextReadSequence++; - callback.sequentialDone(); - pendingCallbacks.countDown(); + try { + callback.sequentialDone(); + } finally { + pendingCallbacks.countDown(); + } flushCallbacks(); } else { pendingCallbackList.add(callback); @@ -350,9 +362,12 @@ public class AIOSequentialFile extends AbstractSequentialFile { private void flushCallbacks() { while (!pendingCallbackList.isEmpty() && pendingCallbackList.peek().writeSequence == nextReadSequence) { AIOSequentialFileFactory.AIOSequentialCallback callback = pendingCallbackList.poll(); - callback.sequentialDone(); - nextReadSequence++; - pendingCallbacks.countDown(); + try { + callback.sequentialDone(); + } finally { + nextReadSequence++; + pendingCallbacks.countDown(); + } } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 4629557015..fcb69e4e0e 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -2578,16 +2578,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal setJournalState(JournalState.STOPPED); - if (providedIOThreadPool == null) { - threadPool.shutdown(); - - if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) { - threadPool.shutdownNow(); - } - threadPool = null; - ioExecutorFactory = null; - } - journalLock.writeLock().lock(); try { @@ -2612,6 +2602,19 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } finally { journalLock.writeLock().unlock(); } + + // I have to shutdown the pool after + // otherwise pending closes will not succeed in certain races + if (providedIOThreadPool == null) { + threadPool.shutdown(); + + if (!threadPool.awaitTermination(120, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + threadPool = null; + ioExecutorFactory = null; + } + } @Override