HDFS-13639. SlotReleaser is not fast enough (#1885)

(cherry picked from commit be374faf42)
This commit is contained in:
leosunli 2020-05-22 04:21:17 +08:00 committed by S O'Donnell
parent 80d4636067
commit b3e968d21e
4 changed files with 155 additions and 13 deletions

View File

@ -497,4 +497,14 @@ public class DfsClientShmManager implements Closeable {
public DomainSocketWatcher getDomainSocketWatcher() { public DomainSocketWatcher getDomainSocketWatcher() {
return domainSocketWatcher; return domainSocketWatcher;
} }
@VisibleForTesting
public int getShmNum() {
int segments = 0;
for (EndpointShmManager endpointShmManager : datanodes.values()) {
segments +=
endpointShmManager.notFull.size() + endpointShmManager.full.size();
}
return segments;
}
} }

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -181,25 +182,52 @@ public class ShortCircuitCache implements Closeable {
@Override @Override
public void run() { public void run() {
if (slot == null) {
return;
}
LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot); LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot);
final DfsClientShm shm = (DfsClientShm)slot.getShm(); final DfsClientShm shm = (DfsClientShm)slot.getShm();
final DomainSocket shmSock = shm.getPeer().getDomainSocket(); final DomainSocket shmSock = shm.getPeer().getDomainSocket();
final String path = shmSock.getPath(); final String path = shmSock.getPath();
DataOutputStream out = null;
boolean success = false; boolean success = false;
try (DomainSocket sock = DomainSocket.connect(path); int retries = 2;
DataOutputStream out = new DataOutputStream( try {
new BufferedOutputStream(sock.getOutputStream()))) { while (retries > 0) {
try {
if (domainSocket == null || !domainSocket.isOpen()) {
// we are running in single thread mode, no protection needed for
// domainSocket
domainSocket = DomainSocket.connect(path);
}
out = new DataOutputStream(
new BufferedOutputStream(domainSocket.getOutputStream()));
new Sender(out).releaseShortCircuitFds(slot.getSlotId()); new Sender(out).releaseShortCircuitFds(slot.getSlotId());
DataInputStream in = new DataInputStream(sock.getInputStream()); DataInputStream in =
new DataInputStream(domainSocket.getInputStream());
ReleaseShortCircuitAccessResponseProto resp = ReleaseShortCircuitAccessResponseProto resp =
ReleaseShortCircuitAccessResponseProto.parseFrom( ReleaseShortCircuitAccessResponseProto
PBHelperClient.vintPrefixed(in)); .parseFrom(PBHelperClient.vintPrefixed(in));
if (resp.getStatus() != Status.SUCCESS) { if (resp.getStatus() != Status.SUCCESS) {
String error = resp.hasError() ? resp.getError() : "(unknown)"; String error = resp.hasError() ? resp.getError() : "(unknown)";
throw new IOException(resp.getStatus().toString() + ": " + error); throw new IOException(resp.getStatus().toString() + ": " + error);
} }
LOG.trace("{}: released {}", this, slot); LOG.trace("{}: released {}", this, slot);
success = true; success = true;
break;
} catch (SocketException se) {
// the domain socket on datanode may be timed out, we retry once
retries--;
domainSocket.close();
domainSocket = null;
if (retries == 0) {
throw new SocketException("Create domain socket failed");
}
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn(ShortCircuitCache.this + ": failed to release " LOG.warn(ShortCircuitCache.this + ": failed to release "
+ "short-circuit shared memory slot " + slot + " by sending " + "short-circuit shared memory slot " + slot + " by sending "
@ -211,6 +239,8 @@ public class ShortCircuitCache implements Closeable {
shmManager.freeSlot(slot); shmManager.freeSlot(slot);
} else { } else {
shm.getEndpointShmManager().shutdown(shm); shm.getEndpointShmManager().shutdown(shm);
IOUtilsClient.cleanupWithLogger(LOG, domainSocket, out);
domainSocket = null;
} }
} }
} }
@ -324,6 +354,8 @@ public class ShortCircuitCache implements Closeable {
*/ */
private final DfsClientShmManager shmManager; private final DfsClientShmManager shmManager;
private DomainSocket domainSocket = null;
public static ShortCircuitCache fromConf(ShortCircuitConf conf) { public static ShortCircuitCache fromConf(ShortCircuitConf conf) {
return new ShortCircuitCache( return new ShortCircuitCache(
conf.getShortCircuitStreamsCacheSize(), conf.getShortCircuitStreamsCacheSize(),
@ -997,6 +1029,9 @@ public class ShortCircuitCache implements Closeable {
* @param slot The slot to release. * @param slot The slot to release.
*/ */
public void scheduleSlotReleaser(Slot slot) { public void scheduleSlotReleaser(Slot slot) {
if (slot == null) {
return;
}
Preconditions.checkState(shmManager != null); Preconditions.checkState(shmManager != null);
releaserExecutor.execute(new SlotReleaser(slot)); releaserExecutor.execute(new SlotReleaser(slot));
} }

View File

@ -404,4 +404,9 @@ public class ShortCircuitRegistry {
public synchronized boolean visit(Visitor visitor) { public synchronized boolean visit(Visitor visitor) {
return visitor.accept(segments, slots); return visitor.accept(segments, slots);
} }
@VisibleForTesting
public int getShmNum() {
return segments.size();
}
} }

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -28,6 +29,7 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -910,4 +912,94 @@ public class TestShortCircuitCache {
} }
} }
} }
@Test(timeout = 60000)
public void testDomainSocketClosedByDN() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf =
createShortCircuitConf("testDomainSocketClosedByDN", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
DomainPeer peer = getDomainPeerToDn(conf);
MutableBoolean usedPeer = new MutableBoolean(false);
ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
// Allocating the first shm slot requires using up a peer.
Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");
cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot1.getSlotId(), false);
Slot slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");
cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot2.getSlotId(), false);
cache.scheduleSlotReleaser(slot1);
Thread.sleep(2000);
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);
Assert.assertEquals(0,
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}
@Test(timeout = 60000)
public void testDNRestart() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf("testDNRestart", sockDir);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
DomainPeer peer = getDomainPeerToDn(conf);
MutableBoolean usedPeer = new MutableBoolean(false);
ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz");
final DatanodeInfo datanode = new DatanodeInfo.DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()).build();
// Allocating the first shm slot requires using up a peer.
Slot slot1 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");
cluster.getDataNodes().get(0).getShortCircuitRegistry()
.registerSlot(blockId, slot1.getSlotId(), false);
// restart the datanode to invalidate the cache
cluster.restartDataNode(0);
Thread.sleep(1000);
// after the restart, new allocation and release should not be affect
cache.scheduleSlotReleaser(slot1);
Slot slot2 = null;
try {
slot2 = cache.allocShmSlot(datanode, peer, usedPeer, blockId,
"testReleaseSlotReuseDomainSocket_client");
} catch (ClosedChannelException ce) {
}
cache.scheduleSlotReleaser(slot2);
Thread.sleep(2000);
Assert.assertEquals(0,
cluster.getDataNodes().get(0).getShortCircuitRegistry().getShmNum());
Assert.assertEquals(0, cache.getDfsClientShmManager().getShmNum());
} finally {
cluster.shutdown();
}
}
} }