HDFS-14258. Introduce Java Concurrent Package To DataXceiverServer Class. Contributed by BELUGA BEHR.

This commit is contained in:
Inigo Goiri 2019-02-15 16:32:27 -08:00
parent afe126d71f
commit dde0ab55aa
3 changed files with 372 additions and 165 deletions

View File

@ -581,7 +581,15 @@ public String reconfigurePropertyImpl(String property, String newVal)
"balancer max concurrent movers must be larger than 0")); "balancer max concurrent movers must be larger than 0"));
} }
} }
xserver.updateBalancerMaxConcurrentMovers(movers); boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
if (!success) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"Could not modify concurrent moves thread count"));
}
return Integer.toString(movers); return Integer.toString(movers);
} catch (NumberFormatException nfe) { } catch (NumberFormatException nfe) {
rootException = new ReconfigurationException( rootException = new ReconfigurationException(

View File

@ -21,35 +21,48 @@
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer; import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
/** /**
* Server used for receiving/sending a block of data. * Server used for receiving/sending a block of data. This is created to listen
* This is created to listen for requests from clients or * for requests from clients or other DataNodes. This small server does not use
* other DataNodes. This small server does not use the * the Hadoop IPC mechanism.
* Hadoop IPC mechanism.
*/ */
class DataXceiverServer implements Runnable { class DataXceiverServer implements Runnable {
public static final Logger LOG = DataNode.LOG; public static final Logger LOG = DataNode.LOG;
/**
* Default time to wait (in seconds) for the number of running threads to drop
* below the newly requested maximum before giving up.
*/
private static final int DEFAULT_RECONFIGURE_WAIT = 30;
private final PeerServer peerServer; private final PeerServer peerServer;
private final DataNode datanode; private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>(); private final HashMap<Peer, Thread> peers = new HashMap<>();
private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>(); private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<>();
private final Lock lock = new ReentrantLock();
private final Condition noPeers = lock.newCondition();
private boolean closed = false; private boolean closed = false;
private int maxReconfigureWaitTime = DEFAULT_RECONFIGURE_WAIT;
/** /**
* Maximal number of concurrent xceivers per node. * Maximal number of concurrent xceivers per node.
* Enforcing the limit is required in order to avoid data-node * Enforcing the limit is required in order to avoid data-node
@ -58,77 +71,123 @@ class DataXceiverServer implements Runnable {
int maxXceiverCount = int maxXceiverCount =
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
/** A manager to make sure that cluster balancing does not /**
* take too much resources. * A manager to make sure that cluster balancing does not take too much
* * resources.
* It limits the number of block moves for balancing and *
* the total amount of bandwidth they can use. * It limits the number of block moves for balancing and the total amount of
* bandwidth they can use.
*/ */
static class BlockBalanceThrottler extends DataTransferThrottler { static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads; private final Semaphore semaphore;
private final AtomicInteger maxThreads = new AtomicInteger(0); private int maxThreads;
/**Constructor /**
* * Constructor.
* @param bandwidth Total amount of bandwidth can be used for balancing *
* @param bandwidth Total amount of bandwidth can be used for balancing
*/ */
private BlockBalanceThrottler(long bandwidth, int maxThreads) { private BlockBalanceThrottler(long bandwidth, int maxThreads) {
super(bandwidth); super(bandwidth);
this.maxThreads.set(maxThreads); this.semaphore = new Semaphore(maxThreads, true);
this.maxThreads = maxThreads;
LOG.info("Balancing bandwidth is " + bandwidth + " bytes/s"); LOG.info("Balancing bandwidth is " + bandwidth + " bytes/s");
LOG.info("Number threads for balancing is " + maxThreads); LOG.info("Number threads for balancing is " + maxThreads);
} }
private void setMaxConcurrentMovers(int movers) { /**
this.maxThreads.set(movers); * Update the number of threads which may be used concurrently for moving
* blocks. The number of threads available can be scaled up or down. If
* increasing the number of threads, the request will be serviced
* immediately. However, if decreasing the number of threads, this method
* will block any new request for moves, wait for any existing backlog of
* move requests to clear, and wait for enough threads to have finished such
* that the total number of threads actively running is less than or equal
* to the new cap. If this method has been unable to successfully set the
* new, lower, cap within 'duration' seconds, the attempt will be aborted
* and the original cap will remain.
*
* @param newMaxThreads The new maximum number of threads for block moving
* @param duration The number of seconds to wait if decreasing threads
* @return true if new maximum was successfully applied; false otherwise
*/
private boolean setMaxConcurrentMovers(final int newMaxThreads,
final int duration) {
Preconditions.checkArgument(newMaxThreads > 0);
final int delta = newMaxThreads - this.maxThreads;
LOG.debug("Change concurrent thread count to {} from {}", newMaxThreads,
this.maxThreads);
if (delta == 0) {
return true;
}
if (delta > 0) {
LOG.debug("Adding thread capacity: {}", delta);
this.semaphore.release(delta);
this.maxThreads = newMaxThreads;
return true;
}
try {
LOG.debug("Removing thread capacity: {}. Max wait: {}", delta,
duration);
boolean acquired = this.semaphore.tryAcquire(Math.abs(delta), duration,
TimeUnit.SECONDS);
if (acquired) {
this.maxThreads = newMaxThreads;
} else {
LOG.warn("Could not lower thread count to {} from {}. Too busy.",
newMaxThreads, this.maxThreads);
}
return acquired;
} catch (InterruptedException e) {
LOG.warn("Interrupted before adjusting thread count: {}", delta);
return false;
}
} }
@VisibleForTesting @VisibleForTesting
int getMaxConcurrentMovers() { int getMaxConcurrentMovers() {
return this.maxThreads.get(); return this.maxThreads;
} }
/** Check if the block move can start. /**
* * Check if the block move can start
* Return true if the thread quota is not exceeded and *
* Return true if the thread quota is not exceeded and
* the counter is incremented; False otherwise. * the counter is incremented; False otherwise.
*/ */
synchronized boolean acquire() { boolean acquire() {
if (numThreads >= maxThreads.get()) { return this.semaphore.tryAcquire();
return false;
}
numThreads++;
return true;
} }
/** Mark that the move is completed. The thread counter is decremented. */ /**
synchronized void release() { * Mark that the move is completed. The thread counter is decremented.
numThreads--; */
void release() {
this.semaphore.release();
} }
} }
final BlockBalanceThrottler balanceThrottler; final BlockBalanceThrottler balanceThrottler;
/** /**
* We need an estimate for block size to check if the disk partition has * Stores an estimate for block size to check if the disk partition has enough
* enough space. Newer clients pass the expected block size to the DataNode. * space. Newer clients pass the expected block size to the DataNode. For
* For older clients we just use the server-side default block size. * older clients, just use the server-side default block size.
*/ */
final long estimateBlockSize; final long estimateBlockSize;
DataXceiverServer(PeerServer peerServer, Configuration conf, DataXceiverServer(PeerServer peerServer, Configuration conf,
DataNode datanode) { DataNode datanode) {
this.peerServer = peerServer; this.peerServer = peerServer;
this.datanode = datanode; this.datanode = datanode;
this.maxXceiverCount = this.maxXceiverCount =
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT); DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
//set up parameter for cluster balancing //set up parameter for cluster balancing
this.balanceThrottler = new BlockBalanceThrottler( this.balanceThrottler = new BlockBalanceThrottler(
conf.getLongBytes(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, conf.getLongBytes(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
@ -161,25 +220,25 @@ public void run() {
// another thread closed our listener socket - that's expected during shutdown, // another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances // but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) { if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ace);
} }
} catch (IOException ie) { } catch (IOException ie) {
IOUtils.cleanup(null, peer); IOUtils.closeQuietly(peer);
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ie);
} catch (OutOfMemoryError ie) { } catch (OutOfMemoryError ie) {
IOUtils.cleanup(null, peer); IOUtils.closeQuietly(peer);
// DataNode can run out of memory if there is too many transfers. // DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by // Log the event, Sleep for 30 seconds, other transfers may complete by
// then. // then.
LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie); LOG.error("DataNode is out of memory. Will retry in 30 seconds.", ie);
try { try {
Thread.sleep(30 * 1000); Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} }
} catch (Throwable te) { } catch (Throwable te) {
LOG.error(datanode.getDisplayName() LOG.error("{}:DataXceiverServer: Exiting.", datanode.getDisplayName(),
+ ":DataXceiverServer: Exiting due to: ", te); te);
datanode.shouldRun = false; datanode.shouldRun = false;
} }
} }
@ -189,8 +248,8 @@ public void run() {
peerServer.close(); peerServer.close();
closed = true; closed = true;
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getDisplayName() LOG.warn("{}:DataXceiverServer: close exception",
+ " :DataXceiverServer: close exception", ie); datanode.getDisplayName(), ie);
} }
// if in restart prep stage, notify peers before closing them. // if in restart prep stage, notify peers before closing them.
@ -200,16 +259,10 @@ public void run() {
// to send an OOB message to the client, but blocked on network for // to send an OOB message to the client, but blocked on network for
// long time, we need to force its termination. // long time, we need to force its termination.
LOG.info("Shutting down DataXceiverServer before restart"); LOG.info("Shutting down DataXceiverServer before restart");
// Allow roughly up to 2 seconds.
for (int i = 0; getNumPeers() > 0 && i < 10; i++) { waitAllPeers(2L, TimeUnit.SECONDS);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// ignore
}
}
} }
// Close all peers.
closeAllPeers(); closeAllPeers();
} }
@ -221,81 +274,158 @@ void kill() {
this.peerServer.close(); this.peerServer.close();
this.closed = true; this.closed = true;
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); LOG.warn("{}:DataXceiverServer.kill()", datanode.getDisplayName(), ie);
} }
} }
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
peersXceiver.put(peer, xceiver);
datanode.metrics.incrDataNodeActiveXceiversCount();
}
synchronized void closePeer(Peer peer) { void addPeer(Peer peer, Thread t, DataXceiver xceiver)
peers.remove(peer); throws IOException {
peersXceiver.remove(peer); lock.lock();
datanode.metrics.decrDataNodeActiveXceiversCount(); try {
IOUtils.cleanup(null, peer); if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
peersXceiver.put(peer, xceiver);
datanode.metrics.incrDataNodeActiveXceiversCount();
} finally {
lock.unlock();
}
}
void closePeer(Peer peer) {
lock.lock();
try {
peers.remove(peer);
peersXceiver.remove(peer);
datanode.metrics.decrDataNodeActiveXceiversCount();
IOUtils.closeQuietly(peer);
if (peers.isEmpty()) {
this.noPeers.signalAll();
}
} finally {
lock.unlock();
}
} }
// Sending OOB to all peers // Sending OOB to all peers
public synchronized void sendOOBToPeers() { public void sendOOBToPeers() {
if (!datanode.shutdownForUpgrade) { lock.lock();
return; try {
} if (!datanode.shutdownForUpgrade) {
return;
for (Peer p : peers.keySet()) {
try {
peersXceiver.get(p).sendOOB();
} catch (IOException e) {
LOG.warn("Got error when sending OOB message.", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when sending OOB message.");
} }
for (Peer p : peers.keySet()) {
try {
peersXceiver.get(p).sendOOB();
} catch (IOException e) {
LOG.warn("Got error when sending OOB message.", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when sending OOB message.");
}
}
} finally {
lock.unlock();
} }
} }
public synchronized void stopWriters() { public void stopWriters() {
for (Peer p : peers.keySet()) { lock.lock();
peersXceiver.get(p).stopWriter(); try {
peers.keySet().forEach(p -> peersXceiver.get(p).stopWriter());
} finally {
lock.unlock();
} }
} }
// Notify all peers of the shutdown and restart. /**
// datanode.shouldRun should still be true and datanode.restarting should * Notify all peers of the shutdown and restart. 'datanode.shouldRun' should
// be set true before calling this method. * still be true and 'datanode.restarting' should be set true before calling
synchronized void restartNotifyPeers() { * this method.
assert (datanode.shouldRun == true && datanode.shutdownForUpgrade); */
for (Thread t : peers.values()) { void restartNotifyPeers() {
assert (datanode.shouldRun && datanode.shutdownForUpgrade);
lock.lock();
try {
// interrupt each and every DataXceiver thread. // interrupt each and every DataXceiver thread.
t.interrupt(); peers.values().forEach(t -> t.interrupt());
} finally {
lock.unlock();
} }
} }
// Close all peers and clear the map. /**
synchronized void closeAllPeers() { * Close all peers and clear the map.
*/
void closeAllPeers() {
LOG.info("Closing all peers."); LOG.info("Closing all peers.");
for (Peer p : peers.keySet()) { lock.lock();
IOUtils.cleanup(null, p); try {
peers.keySet().forEach(p -> IOUtils.closeQuietly(p));
peers.clear();
peersXceiver.clear();
datanode.metrics.setDataNodeActiveXceiversCount(0);
this.noPeers.signalAll();
} finally {
lock.unlock();
} }
peers.clear();
peersXceiver.clear();
datanode.metrics.setDataNodeActiveXceiversCount(0);
} }
// Return the number of peers. /**
synchronized int getNumPeers() { * Causes a thread to block until all peers are removed, a certain amount of
return peers.size(); * time has passed, or the thread is interrupted.
*
* @param timeout the maximum time to wait, in nanoseconds
* @param unit the unit of time to wait
* @return true if thread returned because all peers were removed; false
* otherwise
*/
private boolean waitAllPeers(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
lock.lock();
try {
while (!peers.isEmpty()) {
if (nanos <= 0L) {
return false;
}
nanos = noPeers.awaitNanos(nanos);
}
} catch (InterruptedException e) {
LOG.debug("Interrupted waiting for peers to close");
return false;
} finally {
lock.unlock();
}
return true;
} }
// Return the number of peers and DataXceivers. /**
* Return the number of peers.
*
* @return the number of active peers
*/
int getNumPeers() {
lock.lock();
try {
return peers.size();
} finally {
lock.unlock();
}
}
/**
* Return the number of peers and DataXceivers.
*
* @return the number of peers and DataXceivers.
*/
@VisibleForTesting @VisibleForTesting
synchronized int getNumPeersXceiver() { int getNumPeersXceiver() {
return peersXceiver.size(); lock.lock();
try {
return peersXceiver.size();
} finally {
lock.unlock();
}
} }
@VisibleForTesting @VisibleForTesting
@ -303,13 +433,42 @@ PeerServer getPeerServer() {
return peerServer; return peerServer;
} }
synchronized void releasePeer(Peer peer) { /**
peers.remove(peer); * Release a peer.
peersXceiver.remove(peer); *
datanode.metrics.decrDataNodeActiveXceiversCount(); * @param peer The peer to release
*/
void releasePeer(Peer peer) {
lock.lock();
try {
peers.remove(peer);
peersXceiver.remove(peer);
datanode.metrics.decrDataNodeActiveXceiversCount();
} finally {
lock.unlock();
}
} }
public void updateBalancerMaxConcurrentMovers(int movers) { /**
balanceThrottler.setMaxConcurrentMovers(movers); * Update the number of threads which may be used concurrently for moving
* blocks.
*
* @param movers The new maximum number of threads for block moving
* @return true if new maximum was successfully applied; false otherwise
*/
public boolean updateBalancerMaxConcurrentMovers(final int movers) {
return balanceThrottler.setMaxConcurrentMovers(movers,
this.maxReconfigureWaitTime);
}
/**
* Update the maximum amount of time to wait for reconfiguration of the
* maximum number of block mover threads to complete.
*
* @param max The new maximum number of threads for block moving, in seconds
*/
@VisibleForTesting
void setMaxReconfigureWaitTime(int max) {
this.maxReconfigureWaitTime = max;
} }
} }

View File

@ -176,55 +176,96 @@ public void testMaxConcurrentMoversReconfiguration()
@Test @Test
public void testAcquireWithMaxConcurrentMoversGreaterThanDefault() public void testAcquireWithMaxConcurrentMoversGreaterThanDefault()
throws IOException, ReconfigurationException { throws IOException, ReconfigurationException {
testAcquireWithMaxConcurrentMoversShared(10); final DataNode[] dns = createDNsForTest(1);
try {
testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 10);
} finally {
dns[0].shutdown();
}
} }
@Test @Test
public void testAcquireWithMaxConcurrentMoversLessThanDefault() public void testAcquireWithMaxConcurrentMoversLessThanDefault()
throws IOException, ReconfigurationException { throws IOException, ReconfigurationException {
testAcquireWithMaxConcurrentMoversShared(3); final DataNode[] dns = createDNsForTest(1);
}
private void testAcquireWithMaxConcurrentMoversShared(
int maxConcurrentMovers)
throws IOException, ReconfigurationException {
DataNode[] dns = null;
try { try {
dns = createDNsForTest(1); testAcquireOnMaxConcurrentMoversReconfiguration(dns[0], 3);
testAcquireOnMaxConcurrentMoversReconfiguration(dns[0],
maxConcurrentMovers);
} catch (IOException ioe) {
throw ioe;
} catch (ReconfigurationException re) {
throw re;
} finally { } finally {
shutDownDNs(dns); dns[0].shutdown();
} }
} }
private void shutDownDNs(DataNode[] dns) { /**
if (dns == null) { * Simulates a scenario where the DataNode has been reconfigured with fewer
return; * mover threads, but all of the current treads are busy and therefore the
} * DataNode is unable to honor this request within a reasonable amount of
* time. The DataNode eventually gives up and returns a flag indicating that
* the request was not honored.
*/
@Test
public void testFailedDecreaseConcurrentMovers()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
final DataNode dataNode = dns[0];
try {
// Set the current max to 2
dataNode.xserver.updateBalancerMaxConcurrentMovers(2);
for (int i = 0; i < dns.length; i++) { // Simulate grabbing 2 threads
try { dataNode.xserver.balanceThrottler.acquire();
if (dns[i] == null) { dataNode.xserver.balanceThrottler.acquire();
continue;
} dataNode.xserver.setMaxReconfigureWaitTime(1);
dns[i].shutdown();
} catch (Exception e) { // Attempt to set new maximum to 1
LOG.error("Cannot close: ", e); final boolean success =
} dataNode.xserver.updateBalancerMaxConcurrentMovers(1);
Assert.assertFalse(success);
} finally {
dataNode.shutdown();
}
}
/**
* Test with invalid configuration.
*/
@Test(expected = ReconfigurationException.class)
public void testFailedDecreaseConcurrentMoversReconfiguration()
throws IOException, ReconfigurationException {
final DataNode[] dns = createDNsForTest(1);
final DataNode dataNode = dns[0];
try {
// Set the current max to 2
dataNode.xserver.updateBalancerMaxConcurrentMovers(2);
// Simulate grabbing 2 threads
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.balanceThrottler.acquire();
dataNode.xserver.setMaxReconfigureWaitTime(1);
// Now try reconfigure maximum downwards with threads released
dataNode.reconfigurePropertyImpl(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "1");
} catch (ReconfigurationException e) {
Assert.assertEquals(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
e.getProperty());
Assert.assertEquals("1", e.getNewValue());
throw e;
} finally {
dataNode.shutdown();
} }
} }
private void testAcquireOnMaxConcurrentMoversReconfiguration( private void testAcquireOnMaxConcurrentMoversReconfiguration(
DataNode dataNode, int maxConcurrentMovers) throws IOException, DataNode dataNode, int maxConcurrentMovers) throws IOException,
ReconfigurationException { ReconfigurationException {
int defaultMaxThreads = dataNode.getConf().getInt( final int defaultMaxThreads = dataNode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
/** Test that the default setup is working */
for (int i = 0; i < defaultMaxThreads; i++) { for (int i = 0; i < defaultMaxThreads; i++) {
assertEquals("should be able to get thread quota", true, assertEquals("should be able to get thread quota", true,
dataNode.xserver.balanceThrottler.acquire()); dataNode.xserver.balanceThrottler.acquire());
@ -233,25 +274,24 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(
assertEquals("should not be able to get thread quota", false, assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire()); dataNode.xserver.balanceThrottler.acquire());
// Give back the threads
for (int i = 0; i < defaultMaxThreads; i++) {
dataNode.xserver.balanceThrottler.release();
}
/** Test that the change is applied correctly */
// change properties // change properties
dataNode.reconfigureProperty( dataNode.reconfigureProperty(
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
String.valueOf(maxConcurrentMovers)); String.valueOf(maxConcurrentMovers));
assertEquals("thread quota is wrong", maxConcurrentMovers, assertEquals("thread quota is wrong", maxConcurrentMovers,
dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota dataNode.xserver.balanceThrottler.getMaxConcurrentMovers());
int val = Math.abs(maxConcurrentMovers - defaultMaxThreads); for (int i = 0; i < maxConcurrentMovers; i++) {
if (defaultMaxThreads < maxConcurrentMovers) { assertEquals("should be able to get thread quota", true,
for (int i = 0; i < val; i++) { dataNode.xserver.balanceThrottler.acquire());
assertEquals("should be able to get thread quota", true,
dataNode.xserver.balanceThrottler.acquire());
}
} else if (defaultMaxThreads > maxConcurrentMovers) {
for (int i = 0; i < val; i++) {
assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire());
}
} }
assertEquals("should not be able to get thread quota", false, assertEquals("should not be able to get thread quota", false,