diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 0e8b960ecd3..a435ff6c4e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -38,6 +38,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to pass state info * during RPC response header construction. + * * @param header The RPC response header builder. */ void updateResponseState(RpcResponseHeaderProto.Builder header); @@ -45,6 +46,7 @@ public interface AlignmentContext { /** * This is the intended client method call to implement to recieve state info * during RPC response processing. + * * @param header The RPC response header. */ void receiveResponseState(RpcResponseHeaderProto header); @@ -52,6 +54,7 @@ public interface AlignmentContext { /** * This is the intended client method call to pull last seen state info * into RPC request processing. + * * @param header The RPC request header builder. */ void updateRequestState(RpcRequestHeaderProto.Builder header); @@ -59,6 +62,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to receive * client state info during RPC response header processing. + * * @param header The RPC request header. * @return state id of in the request header. */ @@ -66,7 +70,19 @@ public interface AlignmentContext { /** * Returns the last seen state id of the alignment context instance. + * * @return the value of the last seen state id. */ long getLastSeenStateId(); + + /** + * Return true if this method call does need to be synced, false + * otherwise. sync meaning server state needs to have caught up with + * client state. + * + * @param protocolName the name of the protocol + * @param method the method call to check + * @return true if this method is async, false otherwise. + */ + boolean isCoordinatedCall(String protocolName, String method); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 68b0fbf0d55..63dd8e1dab6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -705,6 +705,7 @@ public abstract class Server { private int priorityLevel; // the priority level assigned by scheduler, 0 by default private long clientStateId; + private boolean isCallCoordinated; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -736,6 +737,7 @@ public abstract class Server { this.traceScope = traceScope; this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; + this.isCallCoordinated = false; } @Override @@ -821,6 +823,14 @@ public abstract class Server { this.clientStateId = stateId; } + public void markCallCoordinated(boolean flag) { + this.isCallCoordinated = flag; + } + + public boolean isCallCoordinated() { + return this.isCallCoordinated; + } + @InterfaceStability.Unstable public void deferResponse() { this.deferredResponse = true; @@ -2448,9 +2458,31 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); - if(alignmentContext != null) { - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + if(alignmentContext != null && call.rpcRequest != null && + (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { + // if call.rpcRequest is not RpcProtobufRequest, will skip the following + // step and treat the call as uncoordinated. As currently only certain + // ClientProtocol methods request made through RPC protobuf needs to be + // coordinated. + String methodName; + String protoName; + try { + ProtobufRpcEngine.RpcProtobufRequest req = + (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; + methodName = req.getRequestHeader().getMethodName(); + protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + } catch (IOException ioe) { + throw new RpcServerException("Rpc request header check fail", ioe); + } + if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { + call.markCallCoordinated(false); + } else { + call.markCallCoordinated(true); + long stateId = alignmentContext.receiveRequestState(header); + call.setClientStateId(stateId); + } + } else { + call.markCallCoordinated(false); } try { @@ -2634,8 +2666,8 @@ public abstract class Server { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here - if (alignmentContext != null && call.getClientStateId() > - alignmentContext.getLastSeenStateId()) { + if (alignmentContext != null && call.isCallCoordinated() && + call.getClientStateId() > alignmentContext.getLastSeenStateId()) { /* * The call processing should be postponed until the client call's * state id is aligned (>=) with the server state id. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index abb767d94f8..9e288ced981 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -42,6 +42,12 @@ public class ClientGSIContext implements AlignmentContext { return lastSeenStateId.get(); } + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + /** * Client side implementation only receives state alignment info. * It does not provide state alignment info therefore this does nothing. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index d4ab4ae71b3..2002c263f03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -126,7 +126,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly(atimeAffected = true) + @ReadOnly(atimeAffected = true, isCoordinated = true) LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; @@ -136,7 +136,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) FsServerDefaults getServerDefaults() throws IOException; /** @@ -269,7 +269,7 @@ public interface ClientProtocol { * @return All the in-use block storage policies currently. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy[] getStoragePolicies() throws IOException; /** @@ -312,7 +312,7 @@ public interface ClientProtocol { * If file/dir src is not found */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy getStoragePolicy(String path) throws IOException; /** @@ -679,7 +679,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; @@ -690,7 +690,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException; @@ -793,7 +793,7 @@ public interface ClientProtocol { * a symlink. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getPreferredBlockSize(String filename) throws IOException; @@ -933,7 +933,7 @@ public interface ClientProtocol { * cookie returned from the previous call. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException; @@ -969,7 +969,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileInfo(String src) throws IOException; /** @@ -984,7 +984,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) boolean isFileClosed(String src) throws IOException; /** @@ -1001,7 +1001,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileLinkInfo(String src) throws IOException; /** @@ -1015,7 +1015,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ContentSummary getContentSummary(String path) throws IOException; /** @@ -1128,7 +1128,7 @@ public interface ClientProtocol { * or an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) String getLinkTarget(String path) throws IOException; /** @@ -1199,7 +1199,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DataEncryptionKey getDataEncryptionKey() throws IOException; /** @@ -1268,7 +1268,7 @@ public interface ClientProtocol { * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException; @@ -1314,7 +1314,7 @@ public interface ClientProtocol { * @return A batch of CacheDirectiveEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException; @@ -1356,7 +1356,7 @@ public interface ClientProtocol { * @return A batch of CachePoolEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCachePools(String prevPool) throws IOException; @@ -1403,7 +1403,7 @@ public interface ClientProtocol { * Gets the ACLs of files and directories. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) AclStatus getAclStatus(String src) throws IOException; /** @@ -1417,7 +1417,7 @@ public interface ClientProtocol { * Get the encryption zone for a path. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EncryptionZone getEZForPath(String src) throws IOException; @@ -1429,7 +1429,7 @@ public interface ClientProtocol { * @return Batch of encryption zones. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listEncryptionZones( long prevId) throws IOException; @@ -1463,7 +1463,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List getXAttrs(String src, List xAttrs) throws IOException; @@ -1479,7 +1479,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List listXAttrs(String src) throws IOException; @@ -1514,7 +1514,7 @@ public interface ClientProtocol { * @throws IOException see specific implementation */ @Idempotent - @ReadOnly + @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call void checkAccess(String path, FsAction mode) throws IOException; /** @@ -1523,7 +1523,7 @@ public interface ClientProtocol { * the starting point for the inotify event stream. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getCurrentEditLogTxid() throws IOException; /** @@ -1531,7 +1531,7 @@ public interface ClientProtocol { * transactions for txids equal to or greater than txid. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EventBatchList getEditsFromTxid(long txid) throws IOException; /** @@ -1563,7 +1563,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId) throws IOException; /** @@ -1574,6 +1574,6 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(activeOnly = true) void msync() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java index 1782dcb6d84..1786ce1aef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java @@ -44,4 +44,11 @@ public @interface ReadOnly { * is only available on the active namenode. */ boolean activeOnly() default false; + + /** + * @return if true, when processing the rpc call of the target method, the + * server side will wait if server state id is behind client (msync). If + * false, the method will be processed regardless of server side state. + */ + boolean isCoordinated() default false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 00166929338..ecb9fd36247 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.lang.reflect.Method; +import java.util.HashSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -34,12 +38,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; class GlobalStateIdContext implements AlignmentContext { private final FSNamesystem namesystem; + private final HashSet coordinatedMethods; + /** * Server side constructor. * @param namesystem server side state provider */ GlobalStateIdContext(FSNamesystem namesystem) { this.namesystem = namesystem; + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) && + method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } } /** @@ -92,4 +107,10 @@ class GlobalStateIdContext implements AlignmentContext { public long getLastSeenStateId() { return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 657695926a2..6f8375b8e6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -351,6 +352,57 @@ public class TestObserverNode { assertEquals(1, readStatus.get()); } + @Test + public void testUncoordinatedCall() throws Exception { + // disable fast tailing so that coordination takes time. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + setUpCluster(1); + setObserverRead(true); + + // make a write call so that client will be ahead of + // observer for now. + dfs.mkdir(testPath, FsPermission.getDefault()); + + // a status flag, initialized to 0, after reader finished, this will be + // updated to 1, -1 on error + AtomicInteger readStatus = new AtomicInteger(0); + + // create a separate thread to make a blocking read. + Thread reader = new Thread(() -> { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + reader.start(); + + long before = System.currentTimeMillis(); + dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); + long after = System.currentTimeMillis(); + + // should succeed immediately, because datanodeReport is marked an + // uncoordinated call, and will not be waiting for server to catch up. + assertTrue(after - before < 200); + // by this time, reader thread should still be blocking, so the status not + // updated + assertEquals(0, readStatus.get()); + Thread.sleep(5000); + // reader thread status should still be unchanged after 5 sec... + assertEquals(0, readStatus.get()); + // and the reader thread is not dead, so it must be still waiting + assertEquals(Thread.State.WAITING, reader.getState()); + reader.interrupt(); + } + private void setUpCluster(int numObservers) throws Exception { qjmhaCluster = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers)