HDFS-15900. RBF: empty blockpool id on dfsrouter caused by UNAVAILABLE NameNode. (#2787)

Co-authored-by: Harunobu Daikoku <hdaikoku@yahoo-corp.jp>
This commit is contained in:
Harunobu Daikoku 2021-03-29 11:43:29 +09:00 committed by GitHub
parent cd2501e54b
commit ea6595d3b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 147 additions and 7 deletions

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdfs.server.federation.router.RemoteLocationContext;
/**
@ -75,4 +78,45 @@ public class FederationNamespaceInfo extends RemoteLocationContext {
public String toString() {
return this.nameserviceId + "->" + this.blockPoolId + ":" + this.clusterId;
}
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (obj.getClass() != getClass()) {
return false;
}
FederationNamespaceInfo other = (FederationNamespaceInfo) obj;
return new EqualsBuilder()
.append(nameserviceId, other.nameserviceId)
.append(clusterId, other.clusterId)
.append(blockPoolId, other.blockPoolId)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 31)
.append(nameserviceId)
.append(clusterId)
.append(blockPoolId)
.toHashCode();
}
@Override
public int compareTo(RemoteLocationContext info) {
if (info instanceof FederationNamespaceInfo) {
FederationNamespaceInfo other = (FederationNamespaceInfo) info;
return new CompareToBuilder()
.append(nameserviceId, other.nameserviceId)
.append(clusterId, other.clusterId)
.append(blockPoolId, other.blockPoolId)
.toComparison();
}
return super.compareTo(info);
}
}

View File

@ -213,12 +213,15 @@ public class MembershipStoreImpl
nnRegistrations.put(nnId, nnRegistration);
}
nnRegistration.add(membership);
String bpId = membership.getBlockPoolId();
String cId = membership.getClusterId();
String nsId = membership.getNameserviceId();
FederationNamespaceInfo nsInfo =
new FederationNamespaceInfo(bpId, cId, nsId);
this.activeNamespaces.add(nsInfo);
if (membership.getState()
!= FederationNamenodeServiceState.UNAVAILABLE) {
String bpId = membership.getBlockPoolId();
String cId = membership.getClusterId();
String nsId = membership.getNameserviceId();
FederationNamespaceInfo nsInfo =
new FederationNamespaceInfo(bpId, cId, nsId);
this.activeNamespaces.add(nsInfo);
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.resolver;
import org.junit.Test;
import java.util.Set;
import java.util.TreeSet;
import static org.assertj.core.api.Assertions.assertThat;
public class TestFederationNamespaceInfo {
/**
* Regression test for HDFS-15900.
*/
@Test
public void testHashCode() {
Set<FederationNamespaceInfo> set = new TreeSet<>();
// set an empty bpId first
set.add(new FederationNamespaceInfo("", "nn1", "ns1"));
set.add(new FederationNamespaceInfo("bp1", "nn2", "ns1"));
assertThat(set).hasSize(2);
}
}

View File

@ -33,13 +33,17 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
@ -473,6 +477,56 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
}, 100, 3000);
}
@Test
public void testNamespaceInfoWithUnavailableNameNodeRegistration()
throws IOException {
// Populate the state store with one ACTIVE NameNode entry
// and one UNAVAILABLE NameNode entry
// 1) ns0:nn0 - ACTIVE
// 2) ns0:nn1 - UNAVAILABLE
List<MembershipState> registrationList = new ArrayList<>();
String router = ROUTERS[0];
String ns = NAMESERVICES[0];
String rpcAddress = "testrpcaddress";
String serviceAddress = "testserviceaddress";
String lifelineAddress = "testlifelineaddress";
String blockPoolId = "testblockpool";
String clusterId = "testcluster";
String webScheme = "http";
String webAddress = "testwebaddress";
boolean safemode = false;
MembershipState record = MembershipState.newInstance(
router, ns, NAMENODES[0], clusterId, blockPoolId,
rpcAddress, serviceAddress, lifelineAddress, webScheme,
webAddress, FederationNamenodeServiceState.ACTIVE, safemode);
registrationList.add(record);
// Set empty clusterId and blockPoolId for UNAVAILABLE NameNode
record = MembershipState.newInstance(
router, ns, NAMENODES[1], "", "",
rpcAddress, serviceAddress, lifelineAddress, webScheme,
webAddress, FederationNamenodeServiceState.UNAVAILABLE, safemode);
registrationList.add(record);
registerAndLoadRegistrations(registrationList);
GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
GetNamespaceInfoResponse response
= membershipStore.getNamespaceInfo(request);
Set<FederationNamespaceInfo> namespaces = response.getNamespaceInfo();
// Verify only one namespace is registered
assertEquals(1, namespaces.size());
// Verify the registered namespace has a valid pair of clusterId
// and blockPoolId derived from ACTIVE NameNode
FederationNamespaceInfo namespace = namespaces.iterator().next();
assertEquals(ns, namespace.getNameserviceId());
assertEquals(clusterId, namespace.getClusterId());
assertEquals(blockPoolId, namespace.getBlockPoolId());
}
/**
* Get a single namenode membership record from the store.
*