diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 9b75b485094..b5b19a360ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -263,7 +263,8 @@ public class RouterClientProtocol implements ClientProtocol { RemoteLocation createLocation = null; try { createLocation = rpcServer.getCreateLocation(src); - return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); + return rpcClient.invokeSingle(createLocation, method, + HdfsFileStatus.class); } catch (IOException ioe) { final List newLocations = checkFaultTolerantRetry( method, src, ioe, createLocation, locations); @@ -299,7 +300,7 @@ public class RouterClientProtocol implements ClientProtocol { * locations to retry in. This is used by fault tolerant mount points. * @param method Method that failed and might be retried. * @param src Path where the method was invoked. - * @param e Exception that was triggered. + * @param ioe Exception that was triggered. * @param excludeLoc Location that failed and should be excluded. * @param locations All the locations to retry. * @return The locations where we should retry (excluding the failed ones). @@ -441,14 +442,19 @@ public class RouterClientProtocol implements ClientProtocol { throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - final List locations = - rpcServer.getLocationsForPath(src, true); RemoteMethod method = new RemoteMethod("addBlock", new Class[] {String.class, String.class, ExtendedBlock.class, DatanodeInfo[].class, long.class, String[].class, EnumSet.class}, new RemoteParam(), clientName, previous, excludedNodes, fileId, favoredNodes, addBlockFlags); + + if (previous != null) { + return rpcClient.invokeSingle(previous, method, LocatedBlock.class); + } + + final List locations = + rpcServer.getLocationsForPath(src, true); // TODO verify the excludedNodes and favoredNodes are acceptable to this NN return rpcClient.invokeSequential( locations, method, LocatedBlock.class, null); @@ -466,14 +472,19 @@ public class RouterClientProtocol implements ClientProtocol { throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); - final List locations = - rpcServer.getLocationsForPath(src, false); RemoteMethod method = new RemoteMethod("getAdditionalDatanode", new Class[] {String.class, long.class, ExtendedBlock.class, DatanodeInfo[].class, String[].class, DatanodeInfo[].class, int.class, String.class}, new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes, numAdditionalNodes, clientName); + + if (blk != null) { + return rpcClient.invokeSingle(blk, method, LocatedBlock.class); + } + + final List locations = + rpcServer.getLocationsForPath(src, false); return rpcClient.invokeSequential( locations, method, LocatedBlock.class, null); } @@ -495,12 +506,17 @@ public class RouterClientProtocol implements ClientProtocol { long fileId) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.WRITE); - final List locations = - rpcServer.getLocationsForPath(src, true); RemoteMethod method = new RemoteMethod("complete", new Class[] {String.class, String.class, ExtendedBlock.class, long.class}, new RemoteParam(), clientName, last, fileId); + + if (last != null) { + return rpcClient.invokeSingle(last, method, Boolean.class); + } + + final List locations = + rpcServer.getLocationsForPath(src, true); // Complete can return true/false, so don't expect a result return rpcClient.invokeSequential(locations, method, Boolean.class, null); } @@ -513,7 +529,7 @@ public class RouterClientProtocol implements ClientProtocol { RemoteMethod method = new RemoteMethod("updateBlockForPipeline", new Class[] {ExtendedBlock.class, String.class}, block, clientName); - return (LocatedBlock) rpcClient.invokeSingle(block, method); + return rpcClient.invokeSingle(block, method, LocatedBlock.class); } /** @@ -638,7 +654,7 @@ public class RouterClientProtocol implements ClientProtocol { RemoteMethod method = new RemoteMethod("concat", new Class[] {String.class, String[].class}, targetDestination.getDest(), sourceDestinations); - rpcClient.invokeSingle(targetDestination, method); + rpcClient.invokeSingle(targetDestination, method, Void.class); } @Override @@ -705,7 +721,7 @@ public class RouterClientProtocol implements ClientProtocol { final RemoteLocation firstLocation = locations.get(0); try { - return (boolean) rpcClient.invokeSingle(firstLocation, method); + return rpcClient.invokeSingle(firstLocation, method, Boolean.class); } catch (IOException ioe) { final List newLocations = checkFaultTolerantRetry( method, src, ioe, firstLocation, locations); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 03704381aca..2996357b90c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -731,6 +731,27 @@ public class RouterRpcClient { return ret; } + /** + * Invokes a remote method against the specified extendedBlock. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote method return. + * @param extendedBlock Target extendedBlock for the method. + * @param method The remote method and parameters to invoke. + * @param clazz Class for the return type. + * @return The result of invoking the method. + * @throws IOException If the invoke generated an error. + */ + public T invokeSingle(final ExtendedBlock extendedBlock, + RemoteMethod method, Class clazz) throws IOException { + String nsId = getNameserviceForBlockPoolId(extendedBlock.getBlockPoolId()); + @SuppressWarnings("unchecked") + T ret = (T)invokeSingle(nsId, method); + return ret; + } + /** * Invokes a single proxy call for a single location. * @@ -742,10 +763,12 @@ public class RouterRpcClient { * @return The result of invoking the method if successful. * @throws IOException If the invoke generated an error. */ - public Object invokeSingle(final RemoteLocationContext location, - RemoteMethod remoteMethod) throws IOException { + public T invokeSingle(final RemoteLocationContext location, + RemoteMethod remoteMethod, Class clazz) throws IOException { List locations = Collections.singletonList(location); - return invokeSequential(locations, remoteMethod); + @SuppressWarnings("unchecked") + T ret = (T)invokeSequential(locations, remoteMethod); + return ret; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java index 20548d51c9a..687e57839de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.creat import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -33,30 +34,38 @@ import java.io.IOException; import java.lang.reflect.Method; import java.net.URISyntaxException; import java.util.Arrays; +import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import org.apache.hadoop.crypto.CryptoProtocolVersion; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; +import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; @@ -233,6 +242,70 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc { testRename2(getRouterContext(), filename1, renamedFile, false); } + /** + * Verify some rpc with previous block not null. + */ + @Test + public void testPreviousBlockNotNull() + throws IOException, URISyntaxException { + final FederationRPCMetrics metrics = getRouterContext(). + getRouter().getRpcServer().getRPCMetrics(); + final ClientProtocol clientProtocol = getRouterProtocol(); + final EnumSet createFlag = EnumSet.of(CreateFlag.CREATE, + CreateFlag.OVERWRITE); + final String clientName = getRouterContext().getClient().getClientName(); + final String testPath = "/getAdditionalData/test.txt"; + final String ns1 = getCluster().getNameservices().get(1); + final FileSystem fileSystem1 = getCluster(). + getNamenode(ns1, null).getFileSystem(); + + try { + // Create the test file in NS1. + createFile(fileSystem1, testPath, 32); + + // Crate the test file via Router to get file status. + HdfsFileStatus status = clientProtocol.create( + testPath, new FsPermission("777"), clientName, + new EnumSetWritable<>(createFlag), true, (short) 1, + (long) 1024, CryptoProtocolVersion.supported(), null, null); + long proxyNumCreate = metrics.getProcessingOps(); + + // Add a block via router and previous block is null. + LocatedBlock blockOne = clientProtocol.addBlock( + testPath, clientName, null, null, + status.getFileId(), null, null); + assertNotNull(blockOne); + long proxyNumAddBlock = metrics.getProcessingOps(); + assertEquals(2, proxyNumAddBlock - proxyNumCreate); + + // Add a block via router and previous block is not null. + LocatedBlock blockTwo = clientProtocol.addBlock( + testPath, clientName, blockOne.getBlock(), null, + status.getFileId(), null, null); + assertNotNull(blockTwo); + long proxyNumAddBlock2 = metrics.getProcessingOps(); + assertEquals(1, proxyNumAddBlock2 - proxyNumAddBlock); + + // Get additionalDatanode via router and block is not null. + DatanodeInfo[] exclusions = new DatanodeInfo[0]; + LocatedBlock newBlock = clientProtocol.getAdditionalDatanode( + testPath, status.getFileId(), blockTwo.getBlock(), + blockTwo.getLocations(), blockTwo.getStorageIDs(), exclusions, + 1, clientName); + assertNotNull(newBlock); + long proxyNumAdditionalDatanode = metrics.getProcessingOps(); + assertEquals(1, proxyNumAdditionalDatanode - proxyNumAddBlock2); + + // Complete the file via router and last block is not null. + clientProtocol.complete(testPath, clientName, + newBlock.getBlock(), status.getFileId()); + long proxyNumComplete = metrics.getProcessingOps(); + assertEquals(1, proxyNumComplete - proxyNumAdditionalDatanode); + } finally { + clientProtocol.delete(testPath, true); + } + } + /** * Test recoverLease when the result is false. */