HDFS-16535. SlotReleaser should reuse the domain socket based on socket paths (#4158)
Reviewed-by: Lisheng Sun <sunlisheng@apache.org>
(cherry picked from commit 35d4c02bcc
)
This commit is contained in:
parent
eab586d566
commit
9ae903dd1b
|
@ -189,6 +189,7 @@ public class ShortCircuitCache implements Closeable {
|
||||||
final DfsClientShm shm = (DfsClientShm)slot.getShm();
|
final DfsClientShm shm = (DfsClientShm)slot.getShm();
|
||||||
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
|
final DomainSocket shmSock = shm.getPeer().getDomainSocket();
|
||||||
final String path = shmSock.getPath();
|
final String path = shmSock.getPath();
|
||||||
|
DomainSocket domainSocket = pathToDomainSocket.get(path);
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
int retries = 2;
|
int retries = 2;
|
||||||
|
@ -196,9 +197,10 @@ public class ShortCircuitCache implements Closeable {
|
||||||
while (retries > 0) {
|
while (retries > 0) {
|
||||||
try {
|
try {
|
||||||
if (domainSocket == null || !domainSocket.isOpen()) {
|
if (domainSocket == null || !domainSocket.isOpen()) {
|
||||||
// we are running in single thread mode, no protection needed for
|
|
||||||
// domainSocket
|
|
||||||
domainSocket = DomainSocket.connect(path);
|
domainSocket = DomainSocket.connect(path);
|
||||||
|
// we are running in single thread mode, no protection needed for
|
||||||
|
// pathToDomainSocket
|
||||||
|
pathToDomainSocket.put(path, domainSocket);
|
||||||
}
|
}
|
||||||
|
|
||||||
out = new DataOutputStream(
|
out = new DataOutputStream(
|
||||||
|
@ -221,13 +223,16 @@ public class ShortCircuitCache implements Closeable {
|
||||||
} catch (SocketException se) {
|
} catch (SocketException se) {
|
||||||
// the domain socket on datanode may be timed out, we retry once
|
// the domain socket on datanode may be timed out, we retry once
|
||||||
retries--;
|
retries--;
|
||||||
domainSocket.close();
|
if (domainSocket != null) {
|
||||||
domainSocket = null;
|
domainSocket.close();
|
||||||
|
domainSocket = null;
|
||||||
|
pathToDomainSocket.remove(path);
|
||||||
|
}
|
||||||
if (retries == 0) {
|
if (retries == 0) {
|
||||||
throw new SocketException("Create domain socket failed");
|
throw new SocketException("Create domain socket failed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} // end of while block
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(ShortCircuitCache.this + ": failed to release "
|
LOG.warn(ShortCircuitCache.this + ": failed to release "
|
||||||
+ "short-circuit shared memory slot " + slot + " by sending "
|
+ "short-circuit shared memory slot " + slot + " by sending "
|
||||||
|
@ -240,10 +245,10 @@ public class ShortCircuitCache implements Closeable {
|
||||||
} else {
|
} else {
|
||||||
shm.getEndpointShmManager().shutdown(shm);
|
shm.getEndpointShmManager().shutdown(shm);
|
||||||
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
|
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
|
||||||
domainSocket = null;
|
pathToDomainSocket.remove(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} // end of run()
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ShortCircuitReplicaCreator {
|
public interface ShortCircuitReplicaCreator {
|
||||||
|
@ -354,7 +359,11 @@ public class ShortCircuitCache implements Closeable {
|
||||||
*/
|
*/
|
||||||
private final DfsClientShmManager shmManager;
|
private final DfsClientShmManager shmManager;
|
||||||
|
|
||||||
private DomainSocket domainSocket = null;
|
/**
|
||||||
|
* A map contains all DomainSockets used in SlotReleaser. Keys are the domain socket
|
||||||
|
* paths of short-circuit shared memory segments.
|
||||||
|
*/
|
||||||
|
private Map<String, DomainSocket> pathToDomainSocket = new HashMap<>();
|
||||||
|
|
||||||
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
|
public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
|
||||||
return new ShortCircuitCache(
|
return new ShortCircuitCache(
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
|
||||||
|
@ -957,6 +958,83 @@ public class TestShortCircuitCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regression test for HDFS-16535
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testDomainSocketClosedByMultipleDNs() throws Exception {
|
||||||
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
|
String testName = "testDomainSocketClosedByMultipleDNs";
|
||||||
|
Configuration conf = createShortCircuitConf(testName, sockDir);
|
||||||
|
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||||
|
testName + "._PORT").getAbsolutePath());
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
final ShortCircuitCache cache =
|
||||||
|
fs.getClient().getClientContext().getShortCircuitCache();
|
||||||
|
|
||||||
|
ExtendedBlockId blockId0 = new ExtendedBlockId(123, "xyz");
|
||||||
|
ExtendedBlockId blockId1 = new ExtendedBlockId(456, "xyz");
|
||||||
|
|
||||||
|
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||||
|
DataNode dn1 = cluster.getDataNodes().get(1);
|
||||||
|
|
||||||
|
DomainPeer peer0 = new DomainPeer(DomainSocket.connect(new File(
|
||||||
|
sockDir.getDir(), testName + "." + dn0.getXferPort()).getAbsolutePath()));
|
||||||
|
DomainPeer peer1 = new DomainPeer(DomainSocket.connect(new File(
|
||||||
|
sockDir.getDir(), testName + "." + dn1.getXferPort()).getAbsolutePath()));
|
||||||
|
|
||||||
|
final DatanodeInfo dnInfo0 = new DatanodeInfo.DatanodeInfoBuilder()
|
||||||
|
.setNodeID(dn0.getDatanodeId()).build();
|
||||||
|
final DatanodeInfo dnInfo1 = new DatanodeInfo.DatanodeInfoBuilder()
|
||||||
|
.setNodeID(dn1.getDatanodeId()).build();
|
||||||
|
|
||||||
|
// Allocate 2 shm slots from DataNode-0
|
||||||
|
MutableBoolean usedPeer = new MutableBoolean(false);
|
||||||
|
Slot slot1 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
|
||||||
|
"testDomainSocketClosedByMultipleDNs_client");
|
||||||
|
dn0.getShortCircuitRegistry()
|
||||||
|
.registerSlot(blockId0, slot1.getSlotId(), false);
|
||||||
|
|
||||||
|
Slot slot2 = cache.allocShmSlot(dnInfo0, peer0, usedPeer, blockId0,
|
||||||
|
"testDomainSocketClosedByMultipleDNs_client");
|
||||||
|
dn0.getShortCircuitRegistry()
|
||||||
|
.registerSlot(blockId0, slot2.getSlotId(), false);
|
||||||
|
|
||||||
|
// Allocate 1 shm slot from DataNode-1
|
||||||
|
Slot slot3 = cache.allocShmSlot(dnInfo1, peer1, usedPeer, blockId1,
|
||||||
|
"testDomainSocketClosedByMultipleDNs_client");
|
||||||
|
dn1.getShortCircuitRegistry()
|
||||||
|
.registerSlot(blockId1, slot3.getSlotId(), false);
|
||||||
|
|
||||||
|
Assert.assertEquals(2, cache.getDfsClientShmManager().getShmNum());
|
||||||
|
Assert.assertEquals(1, dn0.getShortCircuitRegistry().getShmNum());
|
||||||
|
Assert.assertEquals(1, dn1.getShortCircuitRegistry().getShmNum());
|
||||||
|
|
||||||
|
// Release the slot of DataNode-1 first.
|
||||||
|
cache.scheduleSlotReleaser(slot3);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
Assert.assertEquals(1, cache.getDfsClientShmManager().getShmNum());
|
||||||
|
|
||||||
|
// Release the slots of DataNode-0.
|
||||||
|
cache.scheduleSlotReleaser(slot1);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
Assert.assertEquals("0 ShmNum means the shm of DataNode-0 is shutdown" +
|
||||||
|
" due to slot release failures.",
|
||||||
|
1, cache.getDfsClientShmManager().getShmNum());
|
||||||
|
cache.scheduleSlotReleaser(slot2);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
Assert.assertEquals(0, dn0.getShortCircuitRegistry().getShmNum());
|
||||||
|
Assert.assertEquals(0, dn1.getShortCircuitRegistry().getShmNum());
|
||||||
|
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testDNRestart() throws Exception {
|
public void testDNRestart() throws Exception {
|
||||||
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
|
||||||
|
|
Loading…
Reference in New Issue