HDDS-2018. Handle Set DtService of token for OM HA. (#1371)
This commit is contained in:
parent
f25fe92743
commit
cfa41a49af
|
@ -68,6 +68,7 @@ public class OMFailoverProxyProvider implements
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final long omVersion;
|
private final long omVersion;
|
||||||
private final UserGroupInformation ugi;
|
private final UserGroupInformation ugi;
|
||||||
|
private final Text delegationTokenService;
|
||||||
|
|
||||||
public OMFailoverProxyProvider(OzoneConfiguration configuration,
|
public OMFailoverProxyProvider(OzoneConfiguration configuration,
|
||||||
UserGroupInformation ugi) throws IOException {
|
UserGroupInformation ugi) throws IOException {
|
||||||
|
@ -75,6 +76,7 @@ public class OMFailoverProxyProvider implements
|
||||||
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
|
this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
|
||||||
this.ugi = ugi;
|
this.ugi = ugi;
|
||||||
loadOMClientConfigs(conf);
|
loadOMClientConfigs(conf);
|
||||||
|
this.delegationTokenService = computeDelegationTokenService();
|
||||||
|
|
||||||
currentProxyIndex = 0;
|
currentProxyIndex = 0;
|
||||||
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
|
currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
|
||||||
|
@ -178,10 +180,30 @@ public class OMFailoverProxyProvider implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized Text getCurrentProxyDelegationToken() {
|
public Text getCurrentProxyDelegationToken() {
|
||||||
return omProxyInfos.get(currentProxyOMNodeId).getDelegationTokenService();
|
return delegationTokenService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Text computeDelegationTokenService() {
|
||||||
|
// For HA, this will return "," separated address of all OM's.
|
||||||
|
StringBuilder rpcAddress = new StringBuilder();
|
||||||
|
int count = 0;
|
||||||
|
for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
|
||||||
|
omProxyInfos.entrySet()) {
|
||||||
|
count++;
|
||||||
|
rpcAddress =
|
||||||
|
rpcAddress.append(omProxyInfoSet.getValue().toString());
|
||||||
|
|
||||||
|
if (omProxyInfos.size() != count) {
|
||||||
|
rpcAddress.append(",");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Text(rpcAddress.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called whenever an error warrants failing over. It is determined by the
|
* Called whenever an error warrants failing over. It is determined by the
|
||||||
* retry policy.
|
* retry policy.
|
||||||
|
|
|
@ -41,13 +41,27 @@ public class OzoneDelegationTokenSelector
|
||||||
.getLogger(OzoneDelegationTokenSelector.class);
|
.getLogger(OzoneDelegationTokenSelector.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token selectToken(Text service,
|
public Token<OzoneTokenIdentifier> selectToken(Text service,
|
||||||
Collection<Token<? extends TokenIdentifier>> tokens) {
|
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||||
LOG.trace("Getting token for service {}", service);
|
LOG.trace("Getting token for service {}", service);
|
||||||
Token token = super.selectToken(service, tokens);
|
Token token = getSelectedTokens(service, tokens);
|
||||||
LOG.debug("Got tokens: {} for service {}", token, service);
|
LOG.debug("Got tokens: {} for service {}", token, service);
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Token<OzoneTokenIdentifier> getSelectedTokens(Text service,
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||||
|
if (service == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||||
|
if (OzoneTokenIdentifier.KIND_NAME.equals(token.getKind())
|
||||||
|
&& token.getService().toString().contains(service.toString())) {
|
||||||
|
return (Token<OzoneTokenIdentifier>) token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.security;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.security.OzoneTokenIdentifier.KIND_NAME;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to test OzoneDelegationTokenSelector.
|
||||||
|
*/
|
||||||
|
public class TestOzoneDelegationTokenSelector {
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTokenSelector() {
|
||||||
|
|
||||||
|
// set dummy details for identifier and password in token.
|
||||||
|
byte[] identifier =
|
||||||
|
RandomStringUtils.randomAlphabetic(10)
|
||||||
|
.getBytes(StandardCharsets.UTF_8);
|
||||||
|
byte[] password =
|
||||||
|
RandomStringUtils.randomAlphabetic(10)
|
||||||
|
.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
Token<OzoneTokenIdentifier> tokenIdentifierToken =
|
||||||
|
new Token<>(identifier, password, KIND_NAME, getService());
|
||||||
|
|
||||||
|
OzoneDelegationTokenSelector ozoneDelegationTokenSelector =
|
||||||
|
new OzoneDelegationTokenSelector();
|
||||||
|
|
||||||
|
Text service = new Text("om1:9862");
|
||||||
|
|
||||||
|
Token<OzoneTokenIdentifier> selectedToken =
|
||||||
|
ozoneDelegationTokenSelector.selectToken(service,
|
||||||
|
Collections.singletonList(tokenIdentifierToken));
|
||||||
|
|
||||||
|
|
||||||
|
Assert.assertNotNull(selectedToken);
|
||||||
|
|
||||||
|
|
||||||
|
tokenIdentifierToken.setService(new Text("om1:9863"));
|
||||||
|
selectedToken =
|
||||||
|
ozoneDelegationTokenSelector.selectToken(service,
|
||||||
|
Collections.singletonList(tokenIdentifierToken));
|
||||||
|
|
||||||
|
Assert.assertNull(selectedToken);
|
||||||
|
|
||||||
|
service = new Text("om1:9863");
|
||||||
|
selectedToken =
|
||||||
|
ozoneDelegationTokenSelector.selectToken(service,
|
||||||
|
Collections.singletonList(tokenIdentifierToken));
|
||||||
|
|
||||||
|
Assert.assertNotNull(selectedToken);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private Text getService() {
|
||||||
|
return new Text("om1:9862,om2:9862,om3:9862");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue