diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index e1bff28ff30..d08a5a9e6af 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -100,6 +100,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
disruptor
${disruptor.version}
+
+ org.apache.commons
+ commons-pool2
+ 2.6.0
+
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 98efbf8dbe4..63fb17ef766 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,4 +81,8 @@ public final class HddsConfigKeys {
"hdds.scm.chillmode.threshold.pct";
public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
+ public static final String HDDS_LOCK_MAX_CONCURRENCY =
+ "hdds.lock.max.concurrency";
+ public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100;
+
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
new file mode 100644
index 00000000000..c3020844927
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/ActiveLock.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Lock implementation which also maintains counter.
+ */
+public final class ActiveLock {
+
+ private Lock lock;
+ private AtomicInteger count;
+
+ /**
+ * Use ActiveLock#newInstance to create instance.
+ */
+ private ActiveLock() {
+ this.lock = new ReentrantLock();
+ this.count = new AtomicInteger(0);
+ }
+
+ /**
+ * Creates a new instance of ActiveLock.
+ *
+ * @return new ActiveLock
+ */
+ public static ActiveLock newInstance() {
+ return new ActiveLock();
+ }
+
+ /**
+ * Acquires the lock.
+ *
+ *
If the lock is not available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until the
+ * lock has been acquired.
+ */
+ public void lock() {
+ lock.lock();
+ }
+
+ /**
+ * Releases the lock.
+ */
+ public void unlock() {
+ lock.unlock();
+ }
+
+ /**
+ * Increment the active count of the lock.
+ */
+ void incrementActiveCount() {
+ count.incrementAndGet();
+ }
+
+ /**
+ * Decrement the active count of the lock.
+ */
+ void decrementActiveCount() {
+ count.decrementAndGet();
+ }
+
+ /**
+ * Returns the active count on the lock.
+ *
+ * @return Number of active leases on the lock.
+ */
+ int getActiveLockCount() {
+ return count.get();
+ }
+
+ /**
+ * Resets the active count on the lock.
+ */
+ void resetCounter() {
+ count.set(0);
+ }
+
+ @Override
+ public String toString() {
+ return lock.toString();
+ }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
new file mode 100644
index 00000000000..49cf544626b
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/LockManager.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Manages the locks on a given resource. A new lock is created for each
+ * and every unique resource. Uniqueness of resource depends on the
+ * {@code equals} implementation of it.
+ */
+public class LockManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
+
+ private final Map activeLocks = new ConcurrentHashMap<>();
+ private final GenericObjectPool lockPool =
+ new GenericObjectPool<>(new PooledLockFactory());
+
+ /**
+ * Creates new LockManager instance.
+ *
+ * @param conf Configuration object
+ */
+ public LockManager(Configuration conf) {
+ int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
+ HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
+ lockPool.setMaxTotal(maxPoolSize);
+ }
+
+
+ /**
+ * Acquires the lock on given resource.
+ *
+ * If the lock is not available then the current thread becomes
+ * disabled for thread scheduling purposes and lies dormant until the
+ * lock has been acquired.
+ */
+ public void lock(T resource) {
+ activeLocks.compute(resource, (k, v) -> {
+ ActiveLock lock;
+ try {
+ if (v == null) {
+ lock = lockPool.borrowObject();
+ } else {
+ lock = v;
+ }
+ lock.incrementActiveCount();
+ } catch (Exception ex) {
+ LOG.error("Unable to obtain lock.", ex);
+ throw new RuntimeException(ex);
+ }
+ return lock;
+ }).lock();
+ }
+
+ /**
+ * Releases the lock on given resource.
+ */
+ public void unlock(T resource) {
+ ActiveLock lock = activeLocks.get(resource);
+ if (lock == null) {
+ // Someone is releasing a lock which was never acquired. Log and return.
+ LOG.warn("Trying to release the lock on {}, which was never acquired.",
+ resource);
+ return;
+ }
+ lock.unlock();
+ activeLocks.computeIfPresent(resource, (k, v) -> {
+ v.decrementActiveCount();
+ if (v.getActiveLockCount() != 0) {
+ return v;
+ }
+ lockPool.returnObject(v);
+ return null;
+ });
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/PooledLockFactory.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/PooledLockFactory.java
new file mode 100644
index 00000000000..4c24ef74b28
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/PooledLockFactory.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.commons.pool2.BasePooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+
+/**
+ * Pool factory to create {@code ActiveLock} instances.
+ */
+public class PooledLockFactory extends BasePooledObjectFactory {
+
+ @Override
+ public ActiveLock create() throws Exception {
+ return ActiveLock.newInstance();
+ }
+
+ @Override
+ public PooledObject wrap(ActiveLock activeLock) {
+ return new DefaultPooledObject<>(activeLock);
+ }
+
+ @Override
+ public void activateObject(PooledObject pooledObject) {
+ pooledObject.getObject().resetCounter();
+ }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/package-info.java
new file mode 100644
index 00000000000..5c677ced745
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lock/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+/*
+ This package contains the lock related classes.
+ */
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index be19e901726..850044d1e55 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1168,4 +1168,15 @@
(in compressed format), but doesn't require fast io access such as SSD.
+
+
+ hdds.lock.max.concurrency
+ 100
+ HDDS
+ Locks in HDDS/Ozone uses object pool to maintain active locks
+ in the system, this property defines the max limit for the locks that
+ will be maintained in the pool.
+
+
+
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
new file mode 100644
index 00000000000..fa3030d0c35
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/TestLockManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test-cases to test LockManager.
+ */
+public class TestLockManager {
+
+ @Test(timeout = 1000)
+ public void testWithDifferentResource() {
+ LockManager manager = new LockManager<>(new OzoneConfiguration());
+ manager.lock("/resourceOne");
+ // This should work, as they are different resource.
+ manager.lock("/resourceTwo");
+ manager.unlock("/resourceOne");
+ manager.unlock("/resourceTwo");
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testWithSameResource() throws Exception {
+ LockManager manager = new LockManager<>(new OzoneConfiguration());
+ manager.lock("/resourceOne");
+ AtomicBoolean gotLock = new AtomicBoolean(false);
+ new Thread(() -> {
+ manager.lock("/resourceOne");
+ gotLock.set(true);
+ manager.unlock("/resourceOne");
+ }).start();
+ // Let's give some time for the new thread to run
+ Thread.sleep(100);
+ // Since the new thread is trying to get lock on same object, it will wait.
+ Assert.assertFalse(gotLock.get());
+ manager.unlock("/resourceOne");
+ // Since we have released the lock, the new thread should have the lock
+ // now
+ // Let's give some time for the new thread to run
+ Thread.sleep(100);
+ Assert.assertTrue(gotLock.get());
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/package-info.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/package-info.java
new file mode 100644
index 00000000000..a96bc16248c
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lock/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lock;
+/*
+ This package contains the lock related test classes.
+ */
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
index d54addddfb2..9c451e20df4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -79,9 +79,10 @@ public class BucketManagerImpl implements BucketManager {
@Override
public void createBucket(OmBucketInfo bucketInfo) throws IOException {
Preconditions.checkNotNull(bucketInfo);
- metadataManager.writeLock().lock();
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
+ metadataManager.getLock().acquireVolumeLock(volumeName);
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
@@ -118,7 +119,8 @@ public class BucketManagerImpl implements BucketManager {
}
throw ex;
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+ metadataManager.getLock().releaseVolumeLock(volumeName);
}
}
@@ -133,7 +135,7 @@ public class BucketManagerImpl implements BucketManager {
throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
- metadataManager.readLock().lock();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
byte[] value = metadataManager.getBucketTable().get(bucketKey);
@@ -151,7 +153,7 @@ public class BucketManagerImpl implements BucketManager {
}
throw ex;
} finally {
- metadataManager.readLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@@ -164,18 +166,11 @@ public class BucketManagerImpl implements BucketManager {
@Override
public void setBucketProperty(OmBucketArgs args) throws IOException {
Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
- //Check if volume exists
- if (metadataManager.getVolumeTable()
- .get(metadataManager.getVolumeKey(volumeName)) == null) {
- LOG.debug("volume: {} not found ", volumeName);
- throw new OMException("Volume doesn't exist",
- OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
byte[] value = metadataManager.getBucketTable().get(bucketKey);
//Check if bucket exist
if (value == null) {
@@ -230,7 +225,7 @@ public class BucketManagerImpl implements BucketManager {
}
throw ex;
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@@ -266,16 +261,8 @@ public class BucketManagerImpl implements BucketManager {
throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
- metadataManager.writeLock().lock();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
- //Check if volume exists
- if (metadataManager.getVolumeTable()
- .get(metadataManager.getVolumeKey(volumeName)) == null) {
- LOG.debug("volume: {} not found ", volumeName);
- throw new OMException("Volume doesn't exist",
- OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
//Check if bucket exists
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
if (metadataManager.getBucketTable().get(bucketKey) == null) {
@@ -297,7 +284,7 @@ public class BucketManagerImpl implements BucketManager {
}
throw ex;
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@@ -309,12 +296,8 @@ public class BucketManagerImpl implements BucketManager {
String startBucket, String bucketPrefix, int maxNumOfBuckets)
throws IOException {
Preconditions.checkNotNull(volumeName);
- metadataManager.readLock().lock();
- try {
- return metadataManager.listBuckets(
- volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
- } finally {
- metadataManager.readLock().unlock();
- }
+ return metadataManager.listBuckets(
+ volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
+
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 5cf869eee20..c14d0d841eb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -139,44 +139,38 @@ public class KeyManagerImpl implements KeyManager {
public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientID)
throws IOException {
Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
+ validateBucket(volumeName, bucketName);
+ byte[] openKey = metadataManager.getOpenKeyBytes(
+ volumeName, bucketName, keyName, clientID);
- try {
- validateBucket(volumeName, bucketName);
- byte[] openKey = metadataManager.getOpenKeyBytes(
- volumeName, bucketName, keyName, clientID);
-
- byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
- if (keyData == null) {
- LOG.error("Allocate block for a key not in open status in meta store" +
- " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
- throw new OMException("Open Key not found",
- OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
- }
- OmKeyInfo keyInfo =
- OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
- AllocatedBlock allocatedBlock =
- scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
- keyInfo.getFactor(), omId);
- OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
- .setBlockID(allocatedBlock.getBlockID())
- .setShouldCreateContainer(allocatedBlock.getCreateContainer())
- .setLength(scmBlockSize)
- .setOffset(0)
- .build();
- // current version not committed, so new blocks coming now are added to
- // the same version
- keyInfo.appendNewBlocks(Collections.singletonList(info));
- keyInfo.updateModifcationTime();
- metadataManager.getOpenKeyTable().put(openKey,
- keyInfo.getProtobuf().toByteArray());
- return info;
- } finally {
- metadataManager.writeLock().unlock();
+ byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
+ if (keyData == null) {
+ LOG.error("Allocate block for a key not in open status in meta store" +
+ " /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
+ throw new OMException("Open Key not found",
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
}
+ OmKeyInfo keyInfo =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
+ AllocatedBlock allocatedBlock =
+ scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
+ keyInfo.getFactor(), omId);
+ OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
+ .setBlockID(allocatedBlock.getBlockID())
+ .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+ .setLength(scmBlockSize)
+ .setOffset(0)
+ .build();
+ // current version not committed, so new blocks coming now are added to
+ // the same version
+ keyInfo.appendNewBlocks(Collections.singletonList(info));
+ keyInfo.updateModifcationTime();
+ metadataManager.getOpenKeyTable().put(openKey,
+ keyInfo.getProtobuf().toByteArray());
+ return info;
}
@Override
@@ -186,7 +180,7 @@ public class KeyManagerImpl implements KeyManager {
String bucketName = args.getBucketName();
validateBucket(volumeName, bucketName);
- metadataManager.writeLock().lock();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
String keyName = args.getKeyName();
ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType();
@@ -286,17 +280,17 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public void commitKey(OmKeyArgs args, long clientID) throws IOException {
Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
validateBucket(volumeName, bucketName);
byte[] openKey = metadataManager.getOpenKeyBytes(volumeName, bucketName,
@@ -329,17 +323,17 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_ALLOCATION);
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] keyBytes = metadataManager.getOzoneKeyBytes(
volumeName, bucketName, keyName);
@@ -357,7 +351,7 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException(ex.getMessage(),
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@@ -375,7 +369,7 @@ public class KeyManagerImpl implements KeyManager {
ResultCodes.FAILED_INVALID_KEY_NAME);
}
- metadataManager.writeLock().lock();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
// fromKeyName should exist
byte[] fromKey = metadataManager.getOzoneKeyBytes(
@@ -431,17 +425,17 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException(ex.getMessage(),
ResultCodes.FAILED_KEY_RENAME);
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@Override
public void deleteKey(OmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
String keyName = args.getKeyName();
+ metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
try {
byte[] objectKey = metadataManager.getOzoneKeyBytes(
volumeName, bucketName, keyName);
@@ -470,7 +464,7 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException(ex.getMessage(), ex,
ResultCodes.FAILED_KEY_DELETION);
} finally {
- metadataManager.writeLock().unlock();
+ metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
}
@@ -506,12 +500,8 @@ public class KeyManagerImpl implements KeyManager {
@Override
public List getExpiredOpenKeys() throws IOException {
- metadataManager.readLock().lock();
- try {
- return metadataManager.getExpiredOpenKeys();
- } finally {
- metadataManager.readLock().unlock();
- }
+ return metadataManager.getExpiredOpenKeys();
+
}
@Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 0e9ae42692f..c8fb39c5575 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.utils.db.Table;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.locks.Lock;
/**
* OM metadata manager interface.
@@ -51,18 +50,11 @@ public interface OMMetadataManager {
DBStore getStore();
/**
- * Returns the read lock used on Metadata DB.
+ * Returns the OzoneManagerLock used on Metadata DB.
*
- * @return readLock
+ * @return OzoneManagerLock
*/
- Lock readLock();
-
- /**
- * Returns the write lock used on Metadata DB.
- *
- * @return writeLock
- */
- Lock writeLock();
+ OzoneManagerLock getLock();
/**
* Given a volume return the corresponding DB key.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index a1d48ff3c50..a7e1beda9d9 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -52,9 +52,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
@@ -102,9 +99,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
private final DBStore store;
- // TODO: Make this lock move into Table instead of *ONE* lock for the whole
- // DB.
- private final ReadWriteLock lock;
+ private final OzoneManagerLock lock;
private final long openKeyExpireThresholdMS;
private final Table userTable;
@@ -116,7 +111,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
File metaDir = getOzoneMetaDirPath(conf);
- this.lock = new ReentrantReadWriteLock();
+ this.lock = new OzoneManagerLock(conf);
this.openKeyExpireThresholdMS = 1000 * conf.getInt(
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
@@ -280,23 +275,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
}
/**
- * Returns the read lock used on Metadata DB.
+ * Returns the OzoneManagerLock used on Metadata DB.
*
- * @return readLock
+ * @return OzoneManagerLock
*/
@Override
- public Lock readLock() {
- return lock.readLock();
- }
-
- /**
- * Returns the write lock used on Metadata DB.
- *
- * @return writeLock
- */
- @Override
- public Lock writeLock() {
- return lock.writeLock();
+ public OzoneManagerLock getLock() {
+ return lock;
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
new file mode 100644
index 00000000000..e9ea2dfcc46
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerLock.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.lock.LockManager;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
+
+/**
+ * Provides different locks to handle concurrency in OzoneMaster.
+ * We also maintain lock hierarchy, based on the weight.
+ *
+ *
+ *
+ * WEIGHT | LOCK |
+ *
+ *
+ * 0 | User Lock |
+ *
+ *
+ * 1 | Volume Lock |
+ *
+ *
+ * 2 | Bucket Lock |
+ *
+ *
+ *
+ * One cannot obtain a lower weight lock while holding a lock with higher
+ * weight. The other way around is possible.
+ *
+ *
+ * For example:
+ *
+ * -> acquireVolumeLock (will work)
+ * +-> acquireBucketLock (will work)
+ * +--> acquireUserLock (will throw Exception)
+ *
+ *
+ *
+ * To acquire a user lock you should not hold any Volume/Bucket lock. Similarly
+ * to acquire a Volume lock you should not hold any Bucket lock.
+ *
+ */
+public final class OzoneManagerLock {
+
+ private static final String VOLUME_LOCK = "volumeLock";
+ private static final String BUCKET_LOCK = "bucketLock";
+
+
+ private final LockManager manager;
+
+ // To maintain locks held by current thread.
+ private final ThreadLocal