HDFS-11558. BPServiceActor thread name is too long. Contributed by Xiaobing Zhou
This commit is contained in:
parent
9dfe0b3515
commit
9eebbcf459
|
@ -68,7 +68,8 @@ class BPOfferService {
|
||||||
* handshake.
|
* handshake.
|
||||||
*/
|
*/
|
||||||
volatile DatanodeRegistration bpRegistration;
|
volatile DatanodeRegistration bpRegistration;
|
||||||
|
|
||||||
|
private final String nameserviceId;
|
||||||
private final DataNode dn;
|
private final DataNode dn;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -118,12 +119,16 @@ class BPOfferService {
|
||||||
mWriteLock.unlock();
|
mWriteLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
BPOfferService(List<InetSocketAddress> nnAddrs,
|
BPOfferService(
|
||||||
List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
|
final String nameserviceId,
|
||||||
|
List<InetSocketAddress> nnAddrs,
|
||||||
|
List<InetSocketAddress> lifelineNnAddrs,
|
||||||
|
DataNode dn) {
|
||||||
Preconditions.checkArgument(!nnAddrs.isEmpty(),
|
Preconditions.checkArgument(!nnAddrs.isEmpty(),
|
||||||
"Must pass at least one NN.");
|
"Must pass at least one NN.");
|
||||||
Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
|
Preconditions.checkArgument(nnAddrs.size() == lifelineNnAddrs.size(),
|
||||||
"Must pass same number of NN addresses and lifeline addresses.");
|
"Must pass same number of NN addresses and lifeline addresses.");
|
||||||
|
this.nameserviceId = nameserviceId;
|
||||||
this.dn = dn;
|
this.dn = dn;
|
||||||
|
|
||||||
for (int i = 0; i < nnAddrs.size(); ++i) {
|
for (int i = 0; i < nnAddrs.size(); ++i) {
|
||||||
|
@ -168,6 +173,14 @@ class BPOfferService {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets nameservice id to which this {@link BPOfferService} maps to.
|
||||||
|
* @return nameservice id, which can be null.
|
||||||
|
*/
|
||||||
|
String getNameserviceId() {
|
||||||
|
return nameserviceId;
|
||||||
|
}
|
||||||
|
|
||||||
String getBlockPoolId() {
|
String getBlockPoolId() {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -264,7 +264,10 @@ class BPServiceActor implements Runnable {
|
||||||
// This also initializes our block pool in the DN if we are
|
// This also initializes our block pool in the DN if we are
|
||||||
// the first NN connection for this BP.
|
// the first NN connection for this BP.
|
||||||
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
|
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
|
||||||
|
|
||||||
|
/* set thread name again to include NamespaceInfo when it's available. */
|
||||||
|
this.bpThread.setName(formatThreadName("heartbeating", nnAddr));
|
||||||
|
|
||||||
// Second phase of the handshake with the NN.
|
// Second phase of the handshake with the NN.
|
||||||
register(nsInfo);
|
register(nsInfo);
|
||||||
}
|
}
|
||||||
|
@ -483,14 +486,15 @@ class BPServiceActor implements Runnable {
|
||||||
lifelineSender.start();
|
lifelineSender.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String formatThreadName(String action, InetSocketAddress addr) {
|
private String formatThreadName(
|
||||||
Collection<StorageLocation> dataDirs =
|
final String action,
|
||||||
DataNode.getStorageLocations(dn.getConf());
|
final InetSocketAddress addr) {
|
||||||
return "DataNode: [" + dataDirs.toString() + "] " +
|
final String prefix = bpos.getBlockPoolId() != null ? bpos.getBlockPoolId()
|
||||||
action + " to " + addr;
|
: bpos.getNameserviceId();
|
||||||
|
return prefix + " " + action + " to " + addr;
|
||||||
}
|
}
|
||||||
|
|
||||||
//This must be called only by blockPoolManager.
|
//This must be called only by blockPoolManager.
|
||||||
void stop() {
|
void stop() {
|
||||||
shouldServiceRun = false;
|
shouldServiceRun = false;
|
||||||
|
@ -944,8 +948,8 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
lifelineThread = new Thread(this, formatThreadName("lifeline",
|
lifelineThread = new Thread(this,
|
||||||
lifelineNnAddr));
|
formatThreadName("lifeline", lifelineNnAddr));
|
||||||
lifelineThread.setDaemon(true);
|
lifelineThread.setDaemon(true);
|
||||||
lifelineThread.setUncaughtExceptionHandler(
|
lifelineThread.setUncaughtExceptionHandler(
|
||||||
new Thread.UncaughtExceptionHandler() {
|
new Thread.UncaughtExceptionHandler() {
|
||||||
|
|
|
@ -211,7 +211,7 @@ class BlockPoolManager {
|
||||||
lifelineAddrs.add(nnIdToLifelineAddr != null ?
|
lifelineAddrs.add(nnIdToLifelineAddr != null ?
|
||||||
nnIdToLifelineAddr.get(nnId) : null);
|
nnIdToLifelineAddr.get(nnId) : null);
|
||||||
}
|
}
|
||||||
BPOfferService bpos = createBPOS(addrs, lifelineAddrs);
|
BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
|
||||||
bpByNameserviceId.put(nsToAdd, bpos);
|
bpByNameserviceId.put(nsToAdd, bpos);
|
||||||
offerServices.add(bpos);
|
offerServices.add(bpos);
|
||||||
}
|
}
|
||||||
|
@ -261,8 +261,10 @@ class BlockPoolManager {
|
||||||
/**
|
/**
|
||||||
* Extracted out for test purposes.
|
* Extracted out for test purposes.
|
||||||
*/
|
*/
|
||||||
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
|
protected BPOfferService createBPOS(
|
||||||
|
final String nameserviceId,
|
||||||
|
List<InetSocketAddress> nnAddrs,
|
||||||
List<InetSocketAddress> lifelineNnAddrs) {
|
List<InetSocketAddress> lifelineNnAddrs) {
|
||||||
return new BPOfferService(nnAddrs, lifelineNnAddrs, dn);
|
return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -408,7 +408,7 @@ public class TestBPOfferService {
|
||||||
Mockito.eq(new InetSocketAddress(port)));
|
Mockito.eq(new InetSocketAddress(port)));
|
||||||
}
|
}
|
||||||
|
|
||||||
return new BPOfferService(Lists.newArrayList(nnMap.keySet()),
|
return new BPOfferService("test_ns", Lists.newArrayList(nnMap.keySet()),
|
||||||
Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
|
Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,9 @@ public class TestBlockPoolManager {
|
||||||
bpm = new BlockPoolManager(mockDN){
|
bpm = new BlockPoolManager(mockDN){
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs,
|
protected BPOfferService createBPOS(
|
||||||
|
final String nameserviceId,
|
||||||
|
List<InetSocketAddress> nnAddrs,
|
||||||
List<InetSocketAddress> lifelineNnAddrs) {
|
List<InetSocketAddress> lifelineNnAddrs) {
|
||||||
final int idx = mockIdx++;
|
final int idx = mockIdx++;
|
||||||
doLog("create #" + idx);
|
doLog("create #" + idx);
|
||||||
|
|
Loading…
Reference in New Issue