HDFS-16429. Add DataSetLockManager to manage fine-grain locks for FsDataSetImpl. (#3900). Contributed by limingxiang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
Li MingXiang 2022-01-27 16:53:21 +08:00 committed by GitHub
parent 6136d630a3
commit e17c96a40a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 637 additions and 3 deletions

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.util;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Iterator;
import java.util.function.Consumer;
/**
* A low memory footprint {@link GSet} implementation,
* which uses an array for storing the elements
@ -86,17 +89,36 @@ public class LightWeightResizableGSet<K, E extends K>
}
@Override
public E put(final E element) {
public synchronized E put(final E element) {
E existing = super.put(element);
expandIfNecessary();
return existing;
}
@Override
public synchronized E get(K key) {
return super.get(key);
}
@Override
public synchronized E remove(K key) {
return super.remove(key);
}
@Override
public synchronized int size() {
return super.size();
}
public synchronized void getIterator(Consumer<Iterator<E>> consumer) {
consumer.accept(super.values().iterator());
}
/**
* Resize the internal table to given capacity.
*/
@SuppressWarnings("unchecked")
protected void resize(int cap) {
protected synchronized void resize(int cap) {
int newCapacity = actualArrayLength(cap);
if (newCapacity == this.capacity) {
return;
@ -121,7 +143,7 @@ public class LightWeightResizableGSet<K, E extends K>
/**
* Checks if we need to expand, and expands if necessary.
*/
protected void expandIfNecessary() {
protected synchronized void expandIfNecessary() {
if (size > this.threshold && capacity < MAX_ARRAY_LENGTH) {
resize(capacity * 2);
}

View File

@ -1658,6 +1658,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
DFS_NAMESERVICES_RESOLVER_IMPL =
"dfs.datanode.nameservices.resolver.impl";
public static final String
DFS_DATANODE_LOCKMANAGER_TRACE =
"dfs.datanode.lockmanager.trace";
public static final boolean
DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT = false;
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import java.util.concurrent.locks.Lock;
import static org.apache.hadoop.hdfs.server.datanode.DataSetLockManager.LOG;
/**
* Extending AutoCloseableLock such that the users can
* use a try-with-resource syntax.
*/
public class AutoCloseDataSetLock extends AutoCloseableLock {
private Lock lock;
private AutoCloseDataSetLock parentLock;
private DataNodeLockManager<AutoCloseDataSetLock> dataNodeLockManager;
public AutoCloseDataSetLock(Lock lock) {
this.lock = lock;
}
@Override
public void close() {
if (lock != null) {
lock.unlock();
if (dataNodeLockManager != null) {
dataNodeLockManager.hook();
}
} else {
LOG.error("Try to unlock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}
if (parentLock != null) {
parentLock.close();
}
}
/**
* Actually acquire the lock.
*/
public void lock() {
if (lock != null) {
lock.lock();
return;
}
LOG.error("Try to lock null lock" +
StringUtils.getStackTrace(Thread.currentThread()));
}
public void setParentLock(AutoCloseDataSetLock parent) {
if (parentLock == null) {
this.parentLock = parent;
}
}
public void setDataNodeLockManager(DataNodeLockManager<AutoCloseDataSetLock>
dataNodeLockManager) {
this.dataNodeLockManager = dataNodeLockManager;
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
/**
* Use for manage a set of lock for datanode.
*/
public interface DataNodeLockManager<T extends AutoCloseDataSetLock> {
/**
* Acquire block pool level first if you want to Acquire volume lock.
* Or only acquire block pool level lock.
*/
enum LockLevel {
BLOCK_POOl,
VOLUME
}
/**
* Acquire readLock and then lock.
*/
T readLock(LockLevel level, String... resources);
/**
* Acquire writeLock and then lock.
*/
T writeLock(LockLevel level, String... resources);
/**
* Add a lock to LockManager.
*/
void addLock(LockLevel level, String... resources);
/**
* Remove a lock from LockManager.
*/
void removeLock(LockLevel level, String... resources);
/**
* LockManager may need to back hook.
*/
void hook();
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common;
import java.util.concurrent.locks.Lock;
/**
* Some ut or temp replicaMap not need to lock with DataSetLockManager.
*/
public class NoLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
private final NoDataSetLock lock = new NoDataSetLock(null);
private static final class NoDataSetLock extends AutoCloseDataSetLock {
private NoDataSetLock(Lock lock) {
super(lock);
}
@Override
public void lock() {
}
@Override
public void close() {
}
}
public NoLockManager() {
}
@Override
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
return lock;
}
@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
return lock;
}
@Override
public void addLock(LockLevel level, String... resources) {
}
@Override
public void removeLock(LockLevel level, String... resources) {
}
@Override
public void hook() {
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
import java.util.HashMap;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Class for maintain a set of lock for fsDataSetImpl.
*/
public class DataSetLockManager implements DataNodeLockManager<AutoCloseDataSetLock> {
public static final Log LOG = LogFactory.getLog(DataSetLockManager.class);
private final HashMap<String, TrackLog> threadCountMap = new HashMap<>();
private final LockMap lockMap = new LockMap();
private boolean isFair = true;
private final boolean openLockTrace;
private Exception lastException;
/**
* Class for maintain lockMap and is thread safe.
*/
private class LockMap {
private final HashMap<String, AutoCloseDataSetLock> readlockMap = new HashMap<>();
private final HashMap<String, AutoCloseDataSetLock> writeLockMap = new HashMap<>();
public synchronized void addLock(String name, ReentrantReadWriteLock lock) {
AutoCloseDataSetLock readLock = new AutoCloseDataSetLock(lock.readLock());
AutoCloseDataSetLock writeLock = new AutoCloseDataSetLock(lock.writeLock());
if (openLockTrace) {
readLock.setDataNodeLockManager(DataSetLockManager.this);
writeLock.setDataNodeLockManager(DataSetLockManager.this);
}
readlockMap.putIfAbsent(name, readLock);
writeLockMap.putIfAbsent(name, writeLock);
}
public synchronized void removeLock(String name) {
if (!readlockMap.containsKey(name) || !writeLockMap.containsKey(name)) {
LOG.error("The lock " + name + " is not in LockMap");
}
readlockMap.remove(name);
writeLockMap.remove(name);
}
public synchronized AutoCloseDataSetLock getReadLock(String name) {
return readlockMap.get(name);
}
public synchronized AutoCloseDataSetLock getWriteLock(String name) {
return writeLockMap.get(name);
}
}
/**
* Generate lock order string concatenates with lock name.
* @param level which level lock want to acquire.
* @param resources lock name by lock order.
* @return lock order string concatenates with lock name.
*/
private String generateLockName(LockLevel level, String... resources) {
if (resources.length == 1 && level == LockLevel.BLOCK_POOl) {
if (resources[0] == null) {
throw new IllegalArgumentException("acquire a null block pool lock");
}
return resources[0];
} else if (resources.length == 2 && level == LockLevel.VOLUME) {
if (resources[0] == null || resources[1] == null) {
throw new IllegalArgumentException("acquire a null bp lock : "
+ resources[0] + "volume lock :" + resources[1]);
}
return resources[0] + resources[1];
} else {
throw new IllegalArgumentException("lock level do not match resource");
}
}
/**
* Class for record thread acquire lock stack trace and count.
*/
private static class TrackLog {
private final Stack<Exception> logStack = new Stack<>();
private int lockCount = 0;
private final String threadName;
TrackLog(String threadName) {
this.threadName = threadName;
incrLockCount();
}
public void incrLockCount() {
logStack.push(new Exception("lock stack trace"));
lockCount += 1;
}
public void decrLockCount() {
logStack.pop();
lockCount -= 1;
}
public void showLockMessage() {
LOG.error("hold lock thread name is:" + threadName +
" hold count is:" + lockCount);
while (!logStack.isEmpty()) {
Exception e = logStack.pop();
LOG.error("lock stack ", e);
}
}
public boolean shouldClear() {
return lockCount == 1;
}
}
public DataSetLockManager(Configuration conf) {
this.isFair = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_FAIR_DEFAULT);
this.openLockTrace = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE,
DFSConfigKeys.DFS_DATANODE_LOCKMANAGER_TRACE_DEFAULT);
}
public DataSetLockManager() {
this.openLockTrace = true;
}
@Override
public AutoCloseDataSetLock readLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getReadLock(level, resources[0]);
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getReadLock(level, resources);
volLock.setParentLock(bpLock);
if (openLockTrace) {
LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " +
resources[0]);
}
return volLock;
}
}
@Override
public AutoCloseDataSetLock writeLock(LockLevel level, String... resources) {
if (level == LockLevel.BLOCK_POOl) {
return getWriteLock(level, resources[0]);
} else {
AutoCloseDataSetLock bpLock = getReadLock(LockLevel.BLOCK_POOl, resources[0]);
AutoCloseDataSetLock volLock = getWriteLock(level, resources);
volLock.setParentLock(bpLock);
if (openLockTrace) {
LOG.info("Sub lock " + resources[0] + resources[1] + " parent lock " +
resources[0]);
}
return volLock;
}
}
/**
* Return a not null ReadLock.
*/
private AutoCloseDataSetLock getReadLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
AutoCloseDataSetLock lock = lockMap.getReadLock(lockName);
if (lock == null) {
LOG.warn("Ignore this error during dn restart: Not existing readLock "
+ lockName);
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
lock = lockMap.getReadLock(lockName);
}
lock.lock();
if (openLockTrace) {
putThreadName(getThreadName());
}
return lock;
}
/**
* Return a not null WriteLock.
*/
private AutoCloseDataSetLock getWriteLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
AutoCloseDataSetLock lock = lockMap.getWriteLock(lockName);
if (lock == null) {
LOG.warn("Ignore this error during dn restart: Not existing writeLock"
+ lockName);
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
lock = lockMap.getWriteLock(lockName);
}
lock.lock();
if (openLockTrace) {
putThreadName(getThreadName());
}
return lock;
}
@Override
public void addLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
if (level == LockLevel.BLOCK_POOl) {
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
} else {
lockMap.addLock(resources[0], new ReentrantReadWriteLock(isFair));
lockMap.addLock(lockName, new ReentrantReadWriteLock(isFair));
}
}
@Override
public void removeLock(LockLevel level, String... resources) {
String lockName = generateLockName(level, resources);
try (AutoCloseDataSetLock lock = writeLock(level, resources)) {
lock.lock();
lockMap.removeLock(lockName);
}
}
@Override
public void hook() {
if (openLockTrace) {
removeThreadName(getThreadName());
}
}
/**
* Add thread name when lock a lock.
*/
private synchronized void putThreadName(String thread) {
if (threadCountMap.containsKey(thread)) {
TrackLog trackLog = threadCountMap.get(thread);
trackLog.incrLockCount();
}
threadCountMap.putIfAbsent(thread, new TrackLog(thread));
}
public void lockLeakCheck() {
if (!openLockTrace) {
LOG.warn("not open lock leak check func");
return;
}
if (threadCountMap.isEmpty()) {
LOG.warn("all lock has release");
return;
}
setLastException(new Exception("lock Leak"));
threadCountMap.forEach((name, trackLog) -> trackLog.showLockMessage());
}
/**
* Remove thread name when unlock a lock.
*/
private synchronized void removeThreadName(String thread) {
if (threadCountMap.containsKey(thread)) {
TrackLog trackLog = threadCountMap.get(thread);
if (trackLog.shouldClear()) {
threadCountMap.remove(thread);
return;
}
trackLog.decrLockCount();
}
}
private void setLastException(Exception e) {
this.lastException = e;
}
public Exception getLastException() {
return lastException;
}
private String getThreadName() {
return Thread.currentThread().getName() + Thread.currentThread().getId();
}
}

View File

@ -6356,4 +6356,14 @@
times, we should mark it as a badnode.
</description>
</property>
<property>
<name>dfs.datanode.lockmanager.trace</name>
<value>false</value>
<description>
If this is true, after shut down datanode lock Manager will print all leak
thread that not release by lock Manager. Only used for test or trace dead lock
problem. In produce default set false, because it's have little performance loss.
</description>
</property>
</configuration>

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestDataSetLockManager {
private DataSetLockManager manager;
@Before
public void init() {
manager = new DataSetLockManager();
}
@Test(timeout = 5000)
public void testBaseFunc() {
manager.addLock(LockLevel.BLOCK_POOl, "BPtest");
manager.addLock(LockLevel.VOLUME, "BPtest", "Volumetest");
AutoCloseDataSetLock lock = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest");
AutoCloseDataSetLock lock1 = manager.readLock(LockLevel.BLOCK_POOl, "BPtest");
lock1.close();
lock.close();
manager.lockLeakCheck();
assertNull(manager.getLastException());
AutoCloseDataSetLock lock2 = manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest");
AutoCloseDataSetLock lock3 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest");
lock3.close();
lock2.close();
manager.lockLeakCheck();
assertNull(manager.getLastException());
AutoCloseDataSetLock lock4 = manager.writeLock(LockLevel.BLOCK_POOl, "BPtest");
AutoCloseDataSetLock lock5 = manager.readLock(LockLevel.VOLUME, "BPtest", "Volumetest");
lock5.close();
lock4.close();
manager.lockLeakCheck();
assertNull(manager.getLastException());
manager.writeLock(LockLevel.VOLUME, "BPtest", "Volumetest");
manager.lockLeakCheck();
Exception lastException = manager.getLastException();
assertEquals(lastException.getMessage(), "lock Leak");
}
@Test(timeout = 5000)
public void testAcquireWriteLockError() throws InterruptedException {
Thread t = new Thread(() -> {
manager.readLock(LockLevel.BLOCK_POOl, "test");
manager.writeLock(LockLevel.BLOCK_POOl, "test");
});
t.start();
Thread.sleep(1000);
manager.lockLeakCheck();
Exception lastException = manager.getLastException();
assertEquals(lastException.getMessage(), "lock Leak");
}
@Test(timeout = 5000)
public void testLockLeakCheck() {
manager.writeLock(LockLevel.BLOCK_POOl, "test");
manager.lockLeakCheck();
Exception lastException = manager.getLastException();
assertEquals(lastException.getMessage(), "lock Leak");
}
}