diff --git a/hadoop-client-modules/hadoop-client-api/pom.xml b/hadoop-client-modules/hadoop-client-api/pom.xml index b4b81011eb5..52263b3cfc3 100644 --- a/hadoop-client-modules/hadoop-client-api/pom.xml +++ b/hadoop-client-modules/hadoop-client-api/pom.xml @@ -126,6 +126,12 @@ org/apache/hadoop/yarn/client/api/package-info.class + + org.apache.hadoop:* + + org/apache/hadoop/hdfs/server/protocol/package-info.class + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index e9f424604b4..b552fa277d0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; @@ -791,6 +792,9 @@ class BPServiceActor implements Runnable { shouldServiceRun = false; return; } + if (InvalidBlockReportLeaseException.class.getName().equals(reClass)) { + fullBlockReportLeaseId = 0; + } LOG.warn("RemoteException in offerService", re); sleepAfterException(); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b19bfc13acf..c5e6d041859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -172,6 +172,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -1651,6 +1652,8 @@ public class NameNodeRpcServer implements NamenodeProtocols { bm.processReport(nodeReg, reports[index].getStorage(), blocks, context)); } + } else { + throw new InvalidBlockReportLeaseException(context.getReportId(), context.getLeaseId()); } } catch (UnregisteredNodeException une) { LOG.warn("Datanode {} is attempting to report but not register yet.", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InvalidBlockReportLeaseException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InvalidBlockReportLeaseException.java new file mode 100644 index 00000000000..8428b805f74 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/InvalidBlockReportLeaseException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This exception is thrown when a datanode sends a full block report but it is + * rejected by the Namenode due to an invalid lease (expired or otherwise). + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class InvalidBlockReportLeaseException extends IOException { + /** for java.io.Serializable. */ + private static final long serialVersionUID = 1L; + + public InvalidBlockReportLeaseException(long blockReportID, long leaseID) { + super("Block report 0x" + Long.toHexString(blockReportID) + " was rejected as lease 0x" + + Long.toHexString(leaseID) + " is invalid"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/package-info.java new file mode 100644 index 00000000000..21743595548 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides classes for the namenode server protocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java index d1ae0b600fc..225f7fc96c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportLease.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -41,12 +42,14 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -137,6 +140,72 @@ public class TestBlockReportLease { } } + @Test + public void testExceptionThrownWhenFBRLeaseExpired() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + Random rand = new Random(); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager blockManager = fsn.getBlockManager(); + BlockManager spyBlockManager = spy(blockManager); + fsn.setBlockManagerForTesting(spyBlockManager); + String poolId = cluster.getNamesystem().getBlockPoolId(); + + NamenodeProtocols rpcServer = cluster.getNameNodeRpc(); + + // Test based on one DataNode report to Namenode + DataNode dn = cluster.getDataNodes().get(0); + DatanodeDescriptor datanodeDescriptor = spyBlockManager + .getDatanodeManager().getDatanode(dn.getDatanodeId()); + + DatanodeRegistration dnRegistration = dn.getDNRegistrationForBP(poolId); + StorageReport[] storages = dn.getFSDataset().getStorageReports(poolId); + + // Send heartbeat and request full block report lease + HeartbeatResponse hbResponse = rpcServer.sendHeartbeat( + dnRegistration, storages, 0, 0, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + + // Remove full block report lease about dn + spyBlockManager.getBlockReportLeaseManager() + .removeLease(datanodeDescriptor); + + ExecutorService pool = Executors.newFixedThreadPool(1); + + // Trigger sendBlockReport + BlockReportContext brContext = new BlockReportContext(1, 0, + rand.nextLong(), hbResponse.getFullBlockReportLeaseId()); + Future sendBRfuturea = pool.submit(() -> { + // Build every storage with 100 blocks for sending report + DatanodeStorage[] datanodeStorages + = new DatanodeStorage[storages.length]; + for (int i = 0; i < storages.length; i++) { + datanodeStorages[i] = storages[i].getStorage(); + } + StorageBlockReport[] reports = createReports(datanodeStorages, 100); + + // Send blockReport + return rpcServer.blockReport(dnRegistration, poolId, reports, + brContext); + }); + + // Get result, it will not null if process successfully + ExecutionException exception = null; + try { + sendBRfuturea.get(); + } catch (ExecutionException e) { + exception = e; + } + assertNotNull(exception); + assertEquals(InvalidBlockReportLeaseException.class, + exception.getCause().getClass()); + } + } + @Test public void testCheckBlockReportLeaseWhenDnUnregister() throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index d300eac4b69..9d4b0db0804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.protocol.InvalidBlockReportLeaseException; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; @@ -39,7 +40,6 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -1187,8 +1187,9 @@ public class TestBPOfferService { // just reject and wait until DN request for a new leaseId if(leaseId == 1) { firstLeaseId = leaseId; - throw new ConnectException( - "network is not reachable for test. "); + InvalidBlockReportLeaseException e = + new InvalidBlockReportLeaseException(context.getReportId(), 1); + throw new RemoteException(e.getClass().getName(), e.getMessage()); } else { secondLeaseId = leaseId; return null;