From 8ec17f6aead51e5be167f93ae56b6bee314a19f2 Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Thu, 13 Sep 2018 16:22:37 -0700 Subject: [PATCH] HDFS-13880. Add mechanism to allow certain RPC calls to bypass sync. Contributed by Chen Liang. --- .../apache/hadoop/ipc/AlignmentContext.java | 16 +++++ .../java/org/apache/hadoop/ipc/Server.java | 42 ++++++++++-- .../apache/hadoop/hdfs/ClientGSIContext.java | 6 ++ .../hadoop/hdfs/protocol/ClientProtocol.java | 68 +++++++++---------- .../hdfs/server/namenode/ha/ReadOnly.java | 7 ++ .../server/namenode/GlobalStateIdContext.java | 21 ++++++ .../server/namenode/ha/TestObserverNode.java | 52 ++++++++++++++ 7 files changed, 173 insertions(+), 39 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 0e8b960ecd3..a435ff6c4e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -38,6 +38,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to pass state info * during RPC response header construction. + * * @param header The RPC response header builder. */ void updateResponseState(RpcResponseHeaderProto.Builder header); @@ -45,6 +46,7 @@ public interface AlignmentContext { /** * This is the intended client method call to implement to recieve state info * during RPC response processing. + * * @param header The RPC response header. */ void receiveResponseState(RpcResponseHeaderProto header); @@ -52,6 +54,7 @@ public interface AlignmentContext { /** * This is the intended client method call to pull last seen state info * into RPC request processing. + * * @param header The RPC request header builder. */ void updateRequestState(RpcRequestHeaderProto.Builder header); @@ -59,6 +62,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to receive * client state info during RPC response header processing. + * * @param header The RPC request header. * @return state id of in the request header. */ @@ -66,7 +70,19 @@ public interface AlignmentContext { /** * Returns the last seen state id of the alignment context instance. + * * @return the value of the last seen state id. */ long getLastSeenStateId(); + + /** + * Return true if this method call does need to be synced, false + * otherwise. sync meaning server state needs to have caught up with + * client state. + * + * @param protocolName the name of the protocol + * @param method the method call to check + * @return true if this method is async, false otherwise. + */ + boolean isCoordinatedCall(String protocolName, String method); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2a86ede3f2d..04588e81823 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -707,6 +707,7 @@ public abstract class Server { private int priorityLevel; // the priority level assigned by scheduler, 0 by default private long clientStateId; + private boolean isCallCoordinated; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -738,6 +739,7 @@ public abstract class Server { this.traceScope = traceScope; this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; + this.isCallCoordinated = false; } @Override @@ -823,6 +825,14 @@ public abstract class Server { this.clientStateId = stateId; } + public void markCallCoordinated(boolean flag) { + this.isCallCoordinated = flag; + } + + public boolean isCallCoordinated() { + return this.isCallCoordinated; + } + @InterfaceStability.Unstable public void deferResponse() { this.deferredResponse = true; @@ -2527,9 +2537,31 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); - if(alignmentContext != null) { - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + if(alignmentContext != null && call.rpcRequest != null && + (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { + // if call.rpcRequest is not RpcProtobufRequest, will skip the following + // step and treat the call as uncoordinated. As currently only certain + // ClientProtocol methods request made through RPC protobuf needs to be + // coordinated. + String methodName; + String protoName; + try { + ProtobufRpcEngine.RpcProtobufRequest req = + (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; + methodName = req.getRequestHeader().getMethodName(); + protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + } catch (IOException ioe) { + throw new RpcServerException("Rpc request header check fail", ioe); + } + if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { + call.markCallCoordinated(false); + } else { + call.markCallCoordinated(true); + long stateId = alignmentContext.receiveRequestState(header); + call.setClientStateId(stateId); + } + } else { + call.markCallCoordinated(false); } try { @@ -2712,8 +2744,8 @@ public abstract class Server { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here - if (alignmentContext != null && call.getClientStateId() > - alignmentContext.getLastSeenStateId()) { + if (alignmentContext != null && call.isCallCoordinated() && + call.getClientStateId() > alignmentContext.getLastSeenStateId()) { /* * The call processing should be postponed until the client call's * state id is aligned (>=) with the server state id. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 10fa0e15e4a..6d366a63c34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -44,6 +44,12 @@ public class ClientGSIContext implements AlignmentContext { return lastSeenStateId.get(); } + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + /** * Client side implementation only receives state alignment info. * It does not provide state alignment info therefore this does nothing. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 61932d931c2..035ca1fc6d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -129,7 +129,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly(atimeAffected = true) + @ReadOnly(atimeAffected = true, isCoordinated = true) LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; @@ -139,7 +139,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) FsServerDefaults getServerDefaults() throws IOException; /** @@ -280,7 +280,7 @@ public interface ClientProtocol { * @return All the in-use block storage policies currently. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy[] getStoragePolicies() throws IOException; /** @@ -323,7 +323,7 @@ public interface ClientProtocol { * If file/dir src is not found */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy getStoragePolicy(String path) throws IOException; /** @@ -690,7 +690,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; @@ -702,7 +702,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException; @@ -830,7 +830,7 @@ public interface ClientProtocol { * a symlink. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getPreferredBlockSize(String filename) throws IOException; @@ -984,7 +984,7 @@ public interface ClientProtocol { * cookie returned from the previous call. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException; @@ -1020,7 +1020,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileInfo(String src) throws IOException; /** @@ -1035,7 +1035,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) boolean isFileClosed(String src) throws IOException; /** @@ -1052,7 +1052,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileLinkInfo(String src) throws IOException; /** @@ -1067,7 +1067,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsLocatedFileStatus getLocatedFileInfo(String src, boolean needBlockToken) throws IOException; @@ -1082,7 +1082,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ContentSummary getContentSummary(String path) throws IOException; /** @@ -1195,7 +1195,7 @@ public interface ClientProtocol { * or an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) String getLinkTarget(String path) throws IOException; /** @@ -1266,7 +1266,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DataEncryptionKey getDataEncryptionKey() throws IOException; /** @@ -1335,7 +1335,7 @@ public interface ClientProtocol { * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException; @@ -1363,7 +1363,7 @@ public interface ClientProtocol { * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReportListing getSnapshotDiffReportListing(String snapshotRoot, String fromSnapshot, String toSnapshot, byte[] startPath, int index) throws IOException; @@ -1410,7 +1410,7 @@ public interface ClientProtocol { * @return A batch of CacheDirectiveEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException; @@ -1452,7 +1452,7 @@ public interface ClientProtocol { * @return A batch of CachePoolEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCachePools(String prevPool) throws IOException; @@ -1499,7 +1499,7 @@ public interface ClientProtocol { * Gets the ACLs of files and directories. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) AclStatus getAclStatus(String src) throws IOException; /** @@ -1513,7 +1513,7 @@ public interface ClientProtocol { * Get the encryption zone for a path. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EncryptionZone getEZForPath(String src) throws IOException; @@ -1525,7 +1525,7 @@ public interface ClientProtocol { * @return Batch of encryption zones. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listEncryptionZones( long prevId) throws IOException; @@ -1550,7 +1550,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listReencryptionStatus(long prevId) throws IOException; @@ -1584,7 +1584,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List getXAttrs(String src, List xAttrs) throws IOException; @@ -1600,7 +1600,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List listXAttrs(String src) throws IOException; @@ -1635,7 +1635,7 @@ public interface ClientProtocol { * @throws IOException see specific implementation */ @Idempotent - @ReadOnly + @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call void checkAccess(String path, FsAction mode) throws IOException; /** @@ -1644,7 +1644,7 @@ public interface ClientProtocol { * the starting point for the inotify event stream. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getCurrentEditLogTxid() throws IOException; /** @@ -1652,7 +1652,7 @@ public interface ClientProtocol { * transactions for txids equal to or greater than txid. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EventBatchList getEditsFromTxid(long txid) throws IOException; /** @@ -1710,7 +1710,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException; /** @@ -1719,7 +1719,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) Map getErasureCodingCodecs() throws IOException; /** @@ -1730,7 +1730,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException; /** @@ -1770,7 +1770,7 @@ public interface ClientProtocol { */ @Idempotent @Deprecated - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId) throws IOException; /** @@ -1785,7 +1785,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException; @@ -1797,6 +1797,6 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) void msync() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java index 1782dcb6d84..1786ce1aef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java @@ -44,4 +44,11 @@ public @interface ReadOnly { * is only available on the active namenode. */ boolean activeOnly() default false; + + /** + * @return if true, when processing the rpc call of the target method, the + * server side will wait if server state id is behind client (msync). If + * false, the method will be processed regardless of server side state. + */ + boolean isCoordinated() default false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 00166929338..ecb9fd36247 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.lang.reflect.Method; +import java.util.HashSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -34,12 +38,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; class GlobalStateIdContext implements AlignmentContext { private final FSNamesystem namesystem; + private final HashSet coordinatedMethods; + /** * Server side constructor. * @param namesystem server side state provider */ GlobalStateIdContext(FSNamesystem namesystem) { this.namesystem = namesystem; + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) && + method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } } /** @@ -92,4 +107,10 @@ class GlobalStateIdContext implements AlignmentContext { public long getLastSeenStateId() { return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 16371b10308..89bfffb4494 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -342,6 +343,57 @@ public class TestObserverNode { assertEquals(1, readStatus.get()); } + @Test + public void testUncoordinatedCall() throws Exception { + // disable fast tailing so that coordination takes time. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + setUpCluster(1); + setObserverRead(true); + + // make a write call so that client will be ahead of + // observer for now. + dfs.mkdir(testPath, FsPermission.getDefault()); + + // a status flag, initialized to 0, after reader finished, this will be + // updated to 1, -1 on error + AtomicInteger readStatus = new AtomicInteger(0); + + // create a separate thread to make a blocking read. + Thread reader = new Thread(() -> { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + reader.start(); + + long before = System.currentTimeMillis(); + dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); + long after = System.currentTimeMillis(); + + // should succeed immediately, because datanodeReport is marked an + // uncoordinated call, and will not be waiting for server to catch up. + assertTrue(after - before < 200); + // by this time, reader thread should still be blocking, so the status not + // updated + assertEquals(0, readStatus.get()); + Thread.sleep(5000); + // reader thread status should still be unchanged after 5 sec... + assertEquals(0, readStatus.get()); + // and the reader thread is not dead, so it must be still waiting + assertEquals(Thread.State.WAITING, reader.getState()); + reader.interrupt(); + } + private void setUpCluster(int numObservers) throws Exception { qjmhaCluster = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers)