+ * An implementor must provide implicitly the caller identity to contextualize each operation (eg {@link JdbcLeaseLock}
+ * uses one caller per instance)
+ */
+interface LeaseLock extends AutoCloseable {
+
+ enum AcquireResult {
+ Timeout, Exit, Done
+ }
+
+ interface ExitCondition {
+
+ /**
+ * @return true as long as we should keep running
+ */
+ boolean keepRunning();
+ }
+
+ interface Pauser {
+
+ void idle();
+
+ static Pauser sleep(long idleTime, TimeUnit timeUnit) {
+ final long idleNanos = timeUnit.toNanos(idleTime);
+ //can fail spuriously but doesn't throw any InterruptedException
+ return () -> LockSupport.parkNanos(idleNanos);
+ }
+
+ static Pauser noWait() {
+ return () -> {
+ };
+ }
+ }
+
+ /**
+ * The expiration in milliseconds from the last valid acquisition/renew.
+ */
+ default long expirationMillis() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * It extends the lock expiration (if held) to {@link System#currentTimeMillis()} + {@link #expirationMillis()}.
+ *
+ * @return {@code true} if the expiration has been moved on, {@code false} otherwise
+ */
+ default boolean renew() {
+ return true;
+ }
+
+ /**
+ * Not reentrant lock acquisition operation.
+ * The lock can be acquired if is not held by anyone (including the caller) or has an expired ownership.
+ *
+ * @return {@code true} if has been acquired, {@code false} otherwise
+ */
+ boolean tryAcquire();
+
+ /**
+ * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+ * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done})or got interrupted (ie {@link AcquireResult#Exit}).
+ * After each failed attempt is performed a {@link Pauser#idle} call.
+ */
+ default AcquireResult tryAcquire(ExitCondition exitCondition, Pauser pauser) {
+ while (exitCondition.keepRunning()) {
+ if (tryAcquire()) {
+ return AcquireResult.Done;
+ } else {
+ pauser.idle();
+ }
+ }
+ return AcquireResult.Exit;
+ }
+
+ /**
+ * Not reentrant lock acquisition operation (ie {@link #tryAcquire()}).
+ * It tries to acquire the lock until will succeed (ie {@link AcquireResult#Done}), got interrupted (ie {@link AcquireResult#Exit})
+ * or exceed {@code tryAcquireTimeoutMillis}.
+ * After each failed attempt is performed a {@link Pauser#idle} call.
+ * If the specified timeout is <=0 then it behaves as {@link #tryAcquire(ExitCondition, Pauser)}.
+ */
+ default AcquireResult tryAcquire(long tryAcquireTimeoutMillis, Pauser pauser, ExitCondition exitCondition) {
+ if (tryAcquireTimeoutMillis < 0) {
+ return tryAcquire(exitCondition, pauser);
+ }
+ final long timeoutInNanosecond = TimeUnit.MILLISECONDS.toNanos(tryAcquireTimeoutMillis);
+ final long startAcquire = System.nanoTime();
+ while (exitCondition.keepRunning()) {
+ if (tryAcquire()) {
+ return AcquireResult.Done;
+ } else if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+ return AcquireResult.Timeout;
+ } else {
+ pauser.idle();
+ //check before doing anything if time is expired
+ if (System.nanoTime() - startAcquire >= timeoutInNanosecond) {
+ return AcquireResult.Timeout;
+ }
+ }
+ }
+ return AcquireResult.Exit;
+ }
+
+ /**
+ * @return {@code true} if there is a valid (ie not expired) owner, {@code false} otherwise
+ */
+ boolean isHeld();
+
+ /**
+ * @return {@code true} if the caller is a valid (ie not expired) owner, {@code false} otherwise
+ */
+ boolean isHeldByCaller();
+
+ /**
+ * It release the lock itself and all the resources used by it (eg connections, file handlers)
+ */
+ @Override
+ default void close() throws Exception {
+ release();
+ }
+
+ /**
+ * Perform the release if this lock is held by the caller.
+ */
+ void release();
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
new file mode 100644
index 0000000000..43751f8671
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+
+/**
+ * {@link LeaseLock} holder that allows to schedule a {@link LeaseLock#renew} task with a fixed {@link #renewPeriodMillis()} delay.
+ */
+interface ScheduledLeaseLock extends ActiveMQComponent {
+
+ LeaseLock lock();
+
+ long renewPeriodMillis();
+
+ static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
+ ArtemisExecutor executor,
+ String lockName,
+ LeaseLock lock,
+ long renewPeriodMillis,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
new file mode 100644
index 0000000000..e26879c6cc
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/SharedStateManager.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.function.Supplier;
+
+import org.apache.activemq.artemis.utils.UUID;
+
+/**
+ * Facade to abstract the operations on the shared state (inter-process and/or inter-thread) necessary to coordinate broker nodes.
+ */
+interface SharedStateManager extends AutoCloseable {
+
+ enum State {
+ LIVE, PAUSED, FAILING_BACK, NOT_STARTED, FIRST_TIME_START
+ }
+
+ LeaseLock liveLock();
+
+ LeaseLock backupLock();
+
+ UUID readNodeId();
+
+ void writeNodeId(UUID nodeId);
+
+ /**
+ * Purpose of this method is to setup the environment to provide a shared state between live/backup servers.
+ * That means:
+ * - check if a shared state exist and create it/wait for it if not
+ * - check if a nodeId exists and create it if not
+ *
+ * @param nodeIdFactory used to create the nodeId if needed
+ * @return the newly created NodeId or the old one if already present
+ */
+ UUID setup(Supplier extends UUID> nodeIdFactory);
+
+ State readState();
+
+ void writeState(State state);
+
+ @Override
+ default void close() throws Exception {
+
+ }
+}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5bdc59843f..a6155544ba 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -1884,6 +1884,27 @@
+
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.impl.jdbc; + +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider; +import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class JdbcLeaseLockTest { + + private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10); + private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER); + private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true"; + private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; + private JdbcSharedStateManager jdbcSharedStateManager; + + private LeaseLock lock() { + return lock(DEFAULT_LOCK_EXPIRATION_MILLIS); + } + + private LeaseLock lock(long acquireMillis) { + try { + return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + @Before + public void createLockTable() { + jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER); + } + + @After + public void dropLockTable() throws Exception { + jdbcSharedStateManager.destroy(); + jdbcSharedStateManager.close(); + } + + @Test + public void shouldAcquireLock() { + final LeaseLock lock = lock(); + final boolean acquired = lock.tryAcquire(); + Assert.assertTrue("Must acquire the lock!", acquired); + try { + Assert.assertTrue("The lock is been held by the caller!", lock.isHeldByCaller()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotAcquireLockWhenAlreadyHeldByOthers() { + final LeaseLock lock = lock(); + Assert.assertTrue("Must acquire the lock", lock.tryAcquire()); + try { + Assert.assertTrue("Lock held by the caller", lock.isHeldByCaller()); + final LeaseLock failingLock = lock(); + Assert.assertFalse("lock already held by other", failingLock.tryAcquire()); + Assert.assertFalse("lock already held by other", failingLock.isHeldByCaller()); + Assert.assertTrue("lock already held by other", failingLock.isHeld()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotAcquireLockTwice() { + final LeaseLock lock = lock(); + Assert.assertTrue("Must acquire the lock", lock.tryAcquire()); + try { + Assert.assertFalse("lock already acquired", lock.tryAcquire()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotCorruptGuardedState() throws InterruptedException { + final AtomicLong sharedState = new AtomicLong(0); + final int producers = 2; + final int writesPerProducer = 10; + final long idleMillis = 1000; + final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis; + final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS); + final CountDownLatch finished = new CountDownLatch(producers); + final LeaseLock[] locks = new LeaseLock[producers]; + final AtomicInteger lockIndex = new AtomicInteger(0); + final Runnable producerTask = () -> { + final LeaseLock lock = locks[lockIndex.getAndIncrement()]; + try { + for (int i = 0; i < writesPerProducer; i++) { + final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true); + if (acquireResult != LeaseLock.AcquireResult.Done) { + throw new IllegalStateException(acquireResult + " from " + Thread.currentThread()); + } + //avoid the atomic getAndIncrement operation on purpose + sharedState.lazySet(sharedState.get() + 1); + lock.release(); + } + } finally { + finished.countDown(); + } + }; + final Thread[] producerThreads = new Thread[producers]; + for (int i = 0; i < producers; i++) { + locks[i] = lock(); + producerThreads[i] = new Thread(producerTask); + } + Stream.of(producerThreads).forEach(Thread::start); + final long maxTestTime = millisToAcquireLock * writesPerProducer * producers; + Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS)); + Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get()); + } + + @Test + public void shouldAcquireExpiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + Assert.assertTrue("lock is already expired", lock.tryAcquire()); + } finally { + lock.release(); + } + } + + @Test + public void shouldOtherAcquireExpiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); + try { + Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); + } finally { + otherLock.release(); + } + } finally { + lock.release(); + } + } + + @Test + public void shouldRenewAcquiredLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Assert.assertTrue("lock is owned", lock.renew()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotRenewReleasedLock() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + lock.release(); + Assert.assertFalse("lock is already released", lock.isHeldByCaller()); + Assert.assertFalse("lock is already released", lock.isHeld()); + Assert.assertFalse("lock is already released", lock.renew()); + } + + @Test + public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + Assert.assertTrue("lock is owned", lock.renew()); + } finally { + lock.release(); + } + } + + @Test + public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException { + final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); + try { + Thread.sleep(lock.expirationMillis() * 2); + Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); + Assert.assertFalse("lock is already expired", lock.isHeld()); + final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); + Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); + try { + Assert.assertFalse("lock is owned by others", lock.renew()); + } finally { + otherLock.release(); + } + } finally { + lock.release(); + } + } +} + diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md index 5264b5563a..7bd4618e93 100644 --- a/docs/user-manual/en/persistence.md +++ b/docs/user-manual/en/persistence.md @@ -452,6 +452,21 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a The JDBC network connection timeout in milliseconds. The default value is 20000 milliseconds (ie 20 seconds). + +- `jdbc-lock-acquisition-timeout` + + The max allowed time in milliseconds while trying to acquire a JDBC lock. The default value + is 60000 milliseconds (ie 60 seconds). + +- `jdbc-lock-renew-period` + + The period in milliseconds of the keep alive service of a JDBC lock. The default value + is 2000 milliseconds (ie 2 seconds). + +- `jdbc-lock-expiration` + + The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value + is 20000 milliseconds (ie 20 seconds). Note that some DBMS (e.g. Oracle, 30 chars) have restrictions on the size of table names, this should be taken into consideration when configuring table names for the Artemis database store, pay particular attention to the page store table name, which can be appended with a unique ID of up to 20 characters. (for Oracle this would mean configuring a page-store-table-name of max size of 10 chars).